make seekable s3 stream generic

Signed-off-by: Robin Appelman <robin@icewind.nl>
This commit is contained in:
Robin Appelman 2020-03-19 14:32:25 +01:00
parent 14401efb0f
commit 7b07e7251c
No known key found for this signature in database
GPG Key ID: 42B69D8A64526EFB
2 changed files with 106 additions and 57 deletions

View File

@ -30,6 +30,7 @@ use Aws\S3\MultipartUploader;
use Aws\S3\ObjectUploader; use Aws\S3\ObjectUploader;
use Aws\S3\S3Client; use Aws\S3\S3Client;
use Icewind\Streams\CallbackWrapper; use Icewind\Streams\CallbackWrapper;
use OC\Files\Stream\SeekableHttpStream;
const S3_UPLOAD_PART_SIZE = 524288000; // 500MB const S3_UPLOAD_PART_SIZE = 524288000; // 500MB
@ -49,16 +50,29 @@ trait S3ObjectTrait {
* @since 7.0.0 * @since 7.0.0
*/ */
function readObject($urn) { function readObject($urn) {
$context = stream_context_create([ return SeekableHttpStream::open(function ($range) use ($urn) {
's3seek' => [ $command = $this->getConnection()->getCommand('GetObject', [
'client' => $this->getConnection(), 'Bucket' => $this->bucket,
'bucket' => $this->bucket, 'Key' => $urn,
'urn' => $urn, 'Range' => 'bytes=' . $range,
], ]);
]); $request = \Aws\serialize($command);
$headers = [];
foreach ($request->getHeaders() as $key => $values) {
foreach ($values as $value) {
$headers[] = "$key: $value";
}
}
$opts = [
'http' => [
'protocol_version' => 1.1,
'header' => $headers,
],
];
S3SeekableReadStream::registerIfNeeded(); $context = stream_context_create($opts);
return fopen('s3seek://', 'r', false, $context); return fopen($request->getUri(), 'r', false, $context);
});
} }
/** /**
@ -76,7 +90,7 @@ trait S3ObjectTrait {
$uploader = new MultipartUploader($this->getConnection(), $countStream, [ $uploader = new MultipartUploader($this->getConnection(), $countStream, [
'bucket' => $this->bucket, 'bucket' => $this->bucket,
'key' => $urn, 'key' => $urn,
'part_size' => S3_UPLOAD_PART_SIZE 'part_size' => S3_UPLOAD_PART_SIZE,
]); ]);
try { try {
@ -103,7 +117,7 @@ trait S3ObjectTrait {
function deleteObject($urn) { function deleteObject($urn) {
$this->getConnection()->deleteObject([ $this->getConnection()->deleteObject([
'Bucket' => $this->bucket, 'Bucket' => $this->bucket,
'Key' => $urn 'Key' => $urn,
]); ]);
} }

View File

@ -20,34 +20,60 @@
* *
*/ */
namespace OC\Files\ObjectStore; namespace OC\Files\Stream;
use Icewind\Streams\File;
/** /**
* A stream wrapper that uses http range requests to provide a seekable * A stream wrapper that uses http range requests to provide a seekable stream for http reading
* stream of a file in S3 storage.
*/ */
class S3SeekableReadStream { class SeekableHttpStream implements File {
private const PROTOCOL = 'httpseek';
private static $registered = false; private static $registered = false;
/** /**
* Registers the stream wrapper using the `s3seek://` url scheme * Registers the stream wrapper using the `httpseek://` url scheme
* $return void * $return void
*/ */
public static function registerIfNeeded() { private static function registerIfNeeded() {
if (!self::$registered) { if (!self::$registered) {
stream_wrapper_register( stream_wrapper_register(
's3seek', self::PROTOCOL,
'OC\Files\ObjectStore\S3SeekableReadStream' self::class
); );
self::$registered = true; self::$registered = true;
} }
} }
private $client; /**
private $bucket; * Open a readonly-seekable http stream
private $urn; *
* The provided callback will be called with byte range and should return an http stream for the requested range
*
* @param callable $callback
* @return false|resource
*/
public static function open(callable $callback) {
$context = stream_context_create([
SeekableHttpStream::PROTOCOL => [
'callback' => $callback
],
]);
SeekableHttpStream::registerIfNeeded();
return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context);
}
/** @var resource */
public $context;
/** @var callable */
private $openCallback;
/** @var resource */
private $current; private $current;
/** @var int */
private $offset = 0; private $offset = 0;
private function reconnect($range) { private function reconnect($range) {
@ -55,29 +81,11 @@ class S3SeekableReadStream {
fclose($this->current); fclose($this->current);
} }
$command = $this->client->getCommand('GetObject', [ $this->current = ($this->openCallback)($range);
'Bucket' => $this->bucket,
'Key' => $this->urn, if ($this->current === false) {
'Range' => 'bytes=' . $range, return false;
]);
$request = \Aws\serialize($command);
$headers = [];
foreach ($request->getHeaders() as $key => $values) {
foreach ($values as $value) {
$headers[] = "$key: $value";
}
} }
$opts = [
'http' => [
'protocol_version' => 1.1,
'header' => $headers,
],
];
$context = stream_context_create($opts);
$this->current = fopen($request->getUri(), 'r', false, $context);
if ($this->current === false) {return false;}
$responseHead = stream_get_meta_data($this->current)['wrapper_data']; $responseHead = stream_get_meta_data($this->current)['wrapper_data'];
$contentRange = array_values(array_filter($responseHead, function ($v) { $contentRange = array_values(array_filter($responseHead, function ($v) {
@ -93,30 +101,35 @@ class S3SeekableReadStream {
} }
function stream_open($path, $mode, $options, &$opened_path) { function stream_open($path, $mode, $options, &$opened_path) {
$o = stream_context_get_options($this->context)['s3seek']; $options = stream_context_get_options($this->context)[self::PROTOCOL];
$this->bucket = $o['bucket']; $this->openCallback = $options['callback'];
$this->urn = $o['urn'];
$this->client = $o['client'];
return $this->reconnect('0-'); return $this->reconnect('0-');
} }
function stream_read($count) { function stream_read($count) {
if (!$this->current) {
return false;
}
$ret = fread($this->current, $count); $ret = fread($this->current, $count);
$this->offset += strlen($ret); $this->offset += strlen($ret);
return $ret; return $ret;
} }
function stream_seek($offset, $whence) { function stream_seek($offset, $whence = SEEK_SET) {
switch ($whence) { switch ($whence) {
case SEEK_SET: case SEEK_SET:
if ($offset === $this->offset) {return true;} if ($offset === $this->offset) {
return $this->reconnect($offset . '-'); return true;
case SEEK_CUR: }
if ($offset === 0) {return true;} return $this->reconnect($offset . '-');
return $this->reconnect(($this->offset + $offset) . '-'); case SEEK_CUR:
case SEEK_END: if ($offset === 0) {
return false; return true;
}
return $this->reconnect(($this->offset + $offset) . '-');
case SEEK_END:
return false;
} }
return false; return false;
} }
@ -136,4 +149,26 @@ class S3SeekableReadStream {
function stream_close() { function stream_close() {
fclose($this->current); fclose($this->current);
} }
public function stream_write($data) {
return false;
}
public function stream_set_option($option, $arg1, $arg2) {
return false;
}
public function stream_truncate($size) {
return false;
}
public function stream_lock($operation) {
return false;
}
public function stream_flush() {
return; //noop because readonly stream
}
} }