Merge pull request #20033 from nextcloud/s3-seekable-stream
Enable fseek for files in S3 storage
This commit is contained in:
commit
f5919d5b83
|
@ -1025,6 +1025,7 @@ return array(
|
||||||
'OC\\Files\\Storage\\Wrapper\\Wrapper' => $baseDir . '/lib/private/Files/Storage/Wrapper/Wrapper.php',
|
'OC\\Files\\Storage\\Wrapper\\Wrapper' => $baseDir . '/lib/private/Files/Storage/Wrapper/Wrapper.php',
|
||||||
'OC\\Files\\Stream\\Encryption' => $baseDir . '/lib/private/Files/Stream/Encryption.php',
|
'OC\\Files\\Stream\\Encryption' => $baseDir . '/lib/private/Files/Stream/Encryption.php',
|
||||||
'OC\\Files\\Stream\\Quota' => $baseDir . '/lib/private/Files/Stream/Quota.php',
|
'OC\\Files\\Stream\\Quota' => $baseDir . '/lib/private/Files/Stream/Quota.php',
|
||||||
|
'OC\\Files\\Stream\\SeekableHttpStream' => $baseDir . '/lib/private/Files/Stream/SeekableHttpStream.php',
|
||||||
'OC\\Files\\Type\\Detection' => $baseDir . '/lib/private/Files/Type/Detection.php',
|
'OC\\Files\\Type\\Detection' => $baseDir . '/lib/private/Files/Type/Detection.php',
|
||||||
'OC\\Files\\Type\\Loader' => $baseDir . '/lib/private/Files/Type/Loader.php',
|
'OC\\Files\\Type\\Loader' => $baseDir . '/lib/private/Files/Type/Loader.php',
|
||||||
'OC\\Files\\Type\\TemplateManager' => $baseDir . '/lib/private/Files/Type/TemplateManager.php',
|
'OC\\Files\\Type\\TemplateManager' => $baseDir . '/lib/private/Files/Type/TemplateManager.php',
|
||||||
|
|
|
@ -1054,6 +1054,7 @@ class ComposerStaticInit53792487c5a8370acc0b06b1a864ff4c
|
||||||
'OC\\Files\\Storage\\Wrapper\\Wrapper' => __DIR__ . '/../../..' . '/lib/private/Files/Storage/Wrapper/Wrapper.php',
|
'OC\\Files\\Storage\\Wrapper\\Wrapper' => __DIR__ . '/../../..' . '/lib/private/Files/Storage/Wrapper/Wrapper.php',
|
||||||
'OC\\Files\\Stream\\Encryption' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Encryption.php',
|
'OC\\Files\\Stream\\Encryption' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Encryption.php',
|
||||||
'OC\\Files\\Stream\\Quota' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Quota.php',
|
'OC\\Files\\Stream\\Quota' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Quota.php',
|
||||||
|
'OC\\Files\\Stream\\SeekableHttpStream' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/SeekableHttpStream.php',
|
||||||
'OC\\Files\\Type\\Detection' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Detection.php',
|
'OC\\Files\\Type\\Detection' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Detection.php',
|
||||||
'OC\\Files\\Type\\Loader' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Loader.php',
|
'OC\\Files\\Type\\Loader' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Loader.php',
|
||||||
'OC\\Files\\Type\\TemplateManager' => __DIR__ . '/../../..' . '/lib/private/Files/Type/TemplateManager.php',
|
'OC\\Files\\Type\\TemplateManager' => __DIR__ . '/../../..' . '/lib/private/Files/Type/TemplateManager.php',
|
||||||
|
|
|
@ -30,6 +30,7 @@ use Aws\S3\MultipartUploader;
|
||||||
use Aws\S3\ObjectUploader;
|
use Aws\S3\ObjectUploader;
|
||||||
use Aws\S3\S3Client;
|
use Aws\S3\S3Client;
|
||||||
use Icewind\Streams\CallbackWrapper;
|
use Icewind\Streams\CallbackWrapper;
|
||||||
|
use OC\Files\Stream\SeekableHttpStream;
|
||||||
|
|
||||||
const S3_UPLOAD_PART_SIZE = 524288000; // 500MB
|
const S3_UPLOAD_PART_SIZE = 524288000; // 500MB
|
||||||
|
|
||||||
|
@ -49,10 +50,11 @@ trait S3ObjectTrait {
|
||||||
* @since 7.0.0
|
* @since 7.0.0
|
||||||
*/
|
*/
|
||||||
function readObject($urn) {
|
function readObject($urn) {
|
||||||
$client = $this->getConnection();
|
return SeekableHttpStream::open(function ($range) use ($urn) {
|
||||||
$command = $client->getCommand('GetObject', [
|
$command = $this->getConnection()->getCommand('GetObject', [
|
||||||
'Bucket' => $this->bucket,
|
'Bucket' => $this->bucket,
|
||||||
'Key' => $urn
|
'Key' => $urn,
|
||||||
|
'Range' => 'bytes=' . $range,
|
||||||
]);
|
]);
|
||||||
$request = \Aws\serialize($command);
|
$request = \Aws\serialize($command);
|
||||||
$headers = [];
|
$headers = [];
|
||||||
|
@ -64,12 +66,13 @@ trait S3ObjectTrait {
|
||||||
$opts = [
|
$opts = [
|
||||||
'http' => [
|
'http' => [
|
||||||
'protocol_version' => 1.1,
|
'protocol_version' => 1.1,
|
||||||
'header' => $headers
|
'header' => $headers,
|
||||||
]
|
],
|
||||||
];
|
];
|
||||||
|
|
||||||
$context = stream_context_create($opts);
|
$context = stream_context_create($opts);
|
||||||
return fopen($request->getUri(), 'r', false, $context);
|
return fopen($request->getUri(), 'r', false, $context);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -87,7 +90,7 @@ trait S3ObjectTrait {
|
||||||
$uploader = new MultipartUploader($this->getConnection(), $countStream, [
|
$uploader = new MultipartUploader($this->getConnection(), $countStream, [
|
||||||
'bucket' => $this->bucket,
|
'bucket' => $this->bucket,
|
||||||
'key' => $urn,
|
'key' => $urn,
|
||||||
'part_size' => S3_UPLOAD_PART_SIZE
|
'part_size' => S3_UPLOAD_PART_SIZE,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -114,7 +117,7 @@ trait S3ObjectTrait {
|
||||||
function deleteObject($urn) {
|
function deleteObject($urn) {
|
||||||
$this->getConnection()->deleteObject([
|
$this->getConnection()->deleteObject([
|
||||||
'Bucket' => $this->bucket,
|
'Bucket' => $this->bucket,
|
||||||
'Key' => $urn
|
'Key' => $urn,
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,182 @@
|
||||||
|
<?php
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @copyright Copyright (c) 2020, Lukas Stabe (lukas@stabe.de)
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as
|
||||||
|
* published by the Free Software Foundation, either version 3 of the
|
||||||
|
* License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace OC\Files\Stream;
|
||||||
|
|
||||||
|
use Icewind\Streams\File;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A stream wrapper that uses http range requests to provide a seekable stream for http reading
|
||||||
|
*/
|
||||||
|
class SeekableHttpStream implements File {
|
||||||
|
private const PROTOCOL = 'httpseek';
|
||||||
|
|
||||||
|
private static $registered = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers the stream wrapper using the `httpseek://` url scheme
|
||||||
|
* $return void
|
||||||
|
*/
|
||||||
|
private static function registerIfNeeded() {
|
||||||
|
if (!self::$registered) {
|
||||||
|
stream_wrapper_register(
|
||||||
|
self::PROTOCOL,
|
||||||
|
self::class
|
||||||
|
);
|
||||||
|
self::$registered = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open a readonly-seekable http stream
|
||||||
|
*
|
||||||
|
* The provided callback will be called with byte range and should return an http stream for the requested range
|
||||||
|
*
|
||||||
|
* @param callable $callback
|
||||||
|
* @return false|resource
|
||||||
|
*/
|
||||||
|
public static function open(callable $callback) {
|
||||||
|
$context = stream_context_create([
|
||||||
|
SeekableHttpStream::PROTOCOL => [
|
||||||
|
'callback' => $callback
|
||||||
|
],
|
||||||
|
]);
|
||||||
|
|
||||||
|
SeekableHttpStream::registerIfNeeded();
|
||||||
|
return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @var resource */
|
||||||
|
public $context;
|
||||||
|
|
||||||
|
/** @var callable */
|
||||||
|
private $openCallback;
|
||||||
|
|
||||||
|
/** @var resource */
|
||||||
|
private $current;
|
||||||
|
/** @var int */
|
||||||
|
private $offset = 0;
|
||||||
|
|
||||||
|
private function reconnect(int $start) {
|
||||||
|
$range = $start . '-';
|
||||||
|
if ($this->current != null) {
|
||||||
|
fclose($this->current);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->current = ($this->openCallback)($range);
|
||||||
|
|
||||||
|
if ($this->current === false) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
$responseHead = stream_get_meta_data($this->current)['wrapper_data'];
|
||||||
|
$rangeHeaders = array_values(array_filter($responseHead, function ($v) {
|
||||||
|
return preg_match('#^content-range:#i', $v) === 1;
|
||||||
|
}));
|
||||||
|
if (!$rangeHeaders) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
$contentRange = $rangeHeaders[0];
|
||||||
|
|
||||||
|
$content = trim(explode(':', $contentRange)[1]);
|
||||||
|
$range = trim(explode(' ', $content)[1]);
|
||||||
|
$begin = intval(explode('-', $range)[0]);
|
||||||
|
|
||||||
|
if ($begin !== $start) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->offset = $begin;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_open($path, $mode, $options, &$opened_path) {
|
||||||
|
$options = stream_context_get_options($this->context)[self::PROTOCOL];
|
||||||
|
$this->openCallback = $options['callback'];
|
||||||
|
|
||||||
|
return $this->reconnect(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_read($count) {
|
||||||
|
if (!$this->current) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
$ret = fread($this->current, $count);
|
||||||
|
$this->offset += strlen($ret);
|
||||||
|
return $ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_seek($offset, $whence = SEEK_SET) {
|
||||||
|
switch ($whence) {
|
||||||
|
case SEEK_SET:
|
||||||
|
if ($offset === $this->offset) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return $this->reconnect($offset);
|
||||||
|
case SEEK_CUR:
|
||||||
|
if ($offset === 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return $this->reconnect($this->offset + $offset);
|
||||||
|
case SEEK_END:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_tell() {
|
||||||
|
return $this->offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_stat() {
|
||||||
|
return fstat($this->current);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_eof() {
|
||||||
|
return feof($this->current);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_close() {
|
||||||
|
fclose($this->current);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_write($data) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_set_option($option, $arg1, $arg2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_truncate($size) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_lock($operation) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stream_flush() {
|
||||||
|
return; //noop because readonly stream
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,7 +31,7 @@ abstract class ObjectStoreTest extends TestCase {
|
||||||
*/
|
*/
|
||||||
abstract protected function getInstance();
|
abstract protected function getInstance();
|
||||||
|
|
||||||
private function stringToStream($data) {
|
protected function stringToStream($data) {
|
||||||
$stream = fopen('php://temp', 'w+');
|
$stream = fopen('php://temp', 'w+');
|
||||||
fwrite($stream, $data);
|
fwrite($stream, $data);
|
||||||
rewind($stream);
|
rewind($stream);
|
||||||
|
|
|
@ -27,7 +27,7 @@ use OC\Files\ObjectStore\S3;
|
||||||
class MultiPartUploadS3 extends S3 {
|
class MultiPartUploadS3 extends S3 {
|
||||||
function writeObject($urn, $stream) {
|
function writeObject($urn, $stream) {
|
||||||
$this->getConnection()->upload($this->bucket, $urn, $stream, 'private', [
|
$this->getConnection()->upload($this->bucket, $urn, $stream, 'private', [
|
||||||
'mup_threshold' => 1
|
'mup_threshold' => 1,
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,8 +36,8 @@ class NonSeekableStream extends Wrapper {
|
||||||
public static function wrap($source) {
|
public static function wrap($source) {
|
||||||
$context = stream_context_create([
|
$context = stream_context_create([
|
||||||
'nonseek' => [
|
'nonseek' => [
|
||||||
'source' => $source
|
'source' => $source,
|
||||||
]
|
],
|
||||||
]);
|
]);
|
||||||
return Wrapper::wrapSource($source, $context, 'nonseek', self::class);
|
return Wrapper::wrapSource($source, $context, 'nonseek', self::class);
|
||||||
}
|
}
|
||||||
|
@ -83,4 +83,20 @@ class S3Test extends ObjectStoreTest {
|
||||||
|
|
||||||
$this->assertEquals(file_get_contents(__FILE__), stream_get_contents($result));
|
$this->assertEquals(file_get_contents(__FILE__), stream_get_contents($result));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testSeek() {
|
||||||
|
$data = file_get_contents(__FILE__);
|
||||||
|
|
||||||
|
$instance = $this->getInstance();
|
||||||
|
$instance->writeObject('seek', $this->stringToStream($data));
|
||||||
|
|
||||||
|
$read = $instance->readObject('seek');
|
||||||
|
$this->assertEquals(substr($data, 0, 100), fread($read, 100));
|
||||||
|
|
||||||
|
fseek($read, 10);
|
||||||
|
$this->assertEquals(substr($data, 10, 100), fread($read, 100));
|
||||||
|
|
||||||
|
fseek($read, 100, SEEK_CUR);
|
||||||
|
$this->assertEquals(substr($data, 210, 100), fread($read, 100));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue