From 93de63777e4a519484ad8f50f7a8ee809697c20a Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Fri, 26 Oct 2018 19:15:23 +0200 Subject: [PATCH] extend storage api to allow directly writing a stream to storage this removes the need for temporary storages with some external storage backends. The new method is added to a separate interface to maintain compatibility with storage backends implementing the storage interface directly (without inheriting common) Currently the interface is implemented for objectstorage based storages and local storage and used by webdav uploads Signed-off-by: Robin Appelman --- apps/dav/lib/Connector/Sabre/File.php | 27 +++--- .../Files/ObjectStore/ObjectStoreStorage.php | 94 ++++++++++++------- lib/private/Files/Storage/Common.php | 19 +++- lib/private/Files/Storage/Local.php | 4 + lib/private/Files/Stream/CountReadStream.php | 65 +++++++++++++ .../Files/Storage/IWriteStreamStorage.php | 40 ++++++++ .../lib/Files/Stream/CountReadStreamTest.php | 49 ++++++++++ 7 files changed, 251 insertions(+), 47 deletions(-) create mode 100644 lib/private/Files/Stream/CountReadStream.php create mode 100644 lib/public/Files/Storage/IWriteStreamStorage.php create mode 100644 tests/lib/Files/Stream/CountReadStreamTest.php diff --git a/apps/dav/lib/Connector/Sabre/File.php b/apps/dav/lib/Connector/Sabre/File.php index 9e927ff85e..a8e6d8b907 100644 --- a/apps/dav/lib/Connector/Sabre/File.php +++ b/apps/dav/lib/Connector/Sabre/File.php @@ -164,14 +164,19 @@ class File extends Node implements IFile { $this->changeLock(ILockingProvider::LOCK_EXCLUSIVE); } - $target = $partStorage->fopen($internalPartPath, 'wb'); - if ($target === false) { - \OC::$server->getLogger()->error('\OC\Files\Filesystem::fopen() failed', ['app' => 'webdav']); - // because we have no clue about the cause we can only throw back a 500/Internal Server Error - throw new Exception('Could not write file contents'); + if ($partStorage->instanceOfStorage(Storage\IWriteStreamStorage::class)) { + $count = $partStorage->writeStream($internalPartPath, $data); + $result = $count > 0; + } else { + $target = $partStorage->fopen($internalPartPath, 'wb'); + if ($target === false) { + \OC::$server->getLogger()->error('\OC\Files\Filesystem::fopen() failed', ['app' => 'webdav']); + // because we have no clue about the cause we can only throw back a 500/Internal Server Error + throw new Exception('Could not write file contents'); + } + list($count, $result) = \OC_Helper::streamCopy($data, $target); + fclose($target); } - list($count, $result) = \OC_Helper::streamCopy($data, $target); - fclose($target); if ($result === false) { $expected = -1; @@ -185,7 +190,7 @@ class File extends Node implements IFile { // double check if the file was fully received // compare expected and actual size if (isset($_SERVER['CONTENT_LENGTH']) && $_SERVER['REQUEST_METHOD'] === 'PUT') { - $expected = (int) $_SERVER['CONTENT_LENGTH']; + $expected = (int)$_SERVER['CONTENT_LENGTH']; if ($count !== $expected) { throw new BadRequest('expected filesize ' . $expected . ' got ' . $count); } @@ -219,7 +224,7 @@ class File extends Node implements IFile { $renameOkay = $storage->moveFromStorage($partStorage, $internalPartPath, $internalPath); $fileExists = $storage->file_exists($internalPath); if ($renameOkay === false || $fileExists === false) { - \OC::$server->getLogger()->error('renaming part file to final file failed ($run: ' . ( $run ? 'true' : 'false' ) . ', $renameOkay: ' . ( $renameOkay ? 'true' : 'false' ) . ', $fileExists: ' . ( $fileExists ? 'true' : 'false' ) . ')', ['app' => 'webdav']); + \OC::$server->getLogger()->error('renaming part file to final file failed ($run: ' . ($run ? 'true' : 'false') . ', $renameOkay: ' . ($renameOkay ? 'true' : 'false') . ', $fileExists: ' . ($fileExists ? 'true' : 'false') . ')', ['app' => 'webdav']); throw new Exception('Could not rename part file to final file'); } } catch (ForbiddenException $ex) { @@ -246,7 +251,7 @@ class File extends Node implements IFile { $this->header('X-OC-MTime: accepted'); } } - + if ($view) { $this->emitPostHooks($exists); } @@ -443,7 +448,7 @@ class File extends Node implements IFile { //detect aborted upload if (isset ($_SERVER['REQUEST_METHOD']) && $_SERVER['REQUEST_METHOD'] === 'PUT') { if (isset($_SERVER['CONTENT_LENGTH'])) { - $expected = (int) $_SERVER['CONTENT_LENGTH']; + $expected = (int)$_SERVER['CONTENT_LENGTH']; if ($bytesWritten !== $expected) { $chunk_handler->remove($info['index']); throw new BadRequest( diff --git a/lib/private/Files/ObjectStore/ObjectStoreStorage.php b/lib/private/Files/ObjectStore/ObjectStoreStorage.php index 3ce919a4cb..71acd27783 100644 --- a/lib/private/Files/ObjectStore/ObjectStoreStorage.php +++ b/lib/private/Files/ObjectStore/ObjectStoreStorage.php @@ -28,6 +28,7 @@ namespace OC\Files\ObjectStore; use Icewind\Streams\CallbackWrapper; use Icewind\Streams\IteratorDirectory; use OC\Files\Cache\CacheEntry; +use OC\Files\Stream\CountReadStream; use OCP\Files\ObjectStore\IObjectStore; class ObjectStoreStorage extends \OC\Files\Storage\Common { @@ -382,41 +383,8 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common { } public function writeBack($tmpFile, $path) { - $stat = $this->stat($path); - if (empty($stat)) { - // create new file - $stat = array( - 'permissions' => \OCP\Constants::PERMISSION_ALL - \OCP\Constants::PERMISSION_CREATE, - ); - } - // update stat with new data - $mTime = time(); - $stat['size'] = filesize($tmpFile); - $stat['mtime'] = $mTime; - $stat['storage_mtime'] = $mTime; - - // run path based detection first, to use file extension because $tmpFile is only a random string - $mimetypeDetector = \OC::$server->getMimeTypeDetector(); - $mimetype = $mimetypeDetector->detectPath($path); - if ($mimetype === 'application/octet-stream') { - $mimetype = $mimetypeDetector->detect($tmpFile); - } - - $stat['mimetype'] = $mimetype; - $stat['etag'] = $this->getETag($path); - - $fileId = $this->getCache()->put($path, $stat); - try { - //upload to object storage - $this->objectStore->writeObject($this->getURN($fileId), fopen($tmpFile, 'r')); - } catch (\Exception $ex) { - $this->getCache()->remove($path); - $this->logger->logException($ex, [ - 'app' => 'objectstore', - 'message' => 'Could not create object ' . $this->getURN($fileId) . ' for ' . $path, - ]); - throw $ex; // make this bubble up - } + $size = filesize($tmpFile); + $this->writeStream($path, fopen($tmpFile, 'r'), $size); } /** @@ -433,4 +401,60 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common { public function needsPartFile() { return false; } + + public function file_put_contents($path, $data) { + $stream = fopen('php://temp', 'r+'); + fwrite($stream, $data); + rewind($stream); + return $this->writeStream($path, $stream, strlen($data)) > 0; + } + + public function writeStream(string $path, $stream, int $size = null): int { + $stat = $this->stat($path); + if (empty($stat)) { + // create new file + $stat = [ + 'permissions' => \OCP\Constants::PERMISSION_ALL - \OCP\Constants::PERMISSION_CREATE, + ]; + } + // update stat with new data + $mTime = time(); + $stat['size'] = (int)$size; + $stat['mtime'] = $mTime; + $stat['storage_mtime'] = $mTime; + + $mimetypeDetector = \OC::$server->getMimeTypeDetector(); + $mimetype = $mimetypeDetector->detectPath($path); + + $stat['mimetype'] = $mimetype; + $stat['etag'] = $this->getETag($path); + + $fileId = $this->getCache()->put($path, $stat); + try { + //upload to object storage + if ($size === null) { + $countStream = CountReadStream::wrap($stream, function ($writtenSize) use ($fileId, &$size) { + $this->getCache()->update($fileId, [ + 'size' => $writtenSize + ]); + $size = $writtenSize; + }); + $this->objectStore->writeObject($this->getURN($fileId), $countStream); + if (is_resource($countStream)) { + fclose($countStream); + } + } else { + $this->objectStore->writeObject($this->getURN($fileId), $stream); + } + } catch (\Exception $ex) { + $this->getCache()->remove($path); + $this->logger->logException($ex, [ + 'app' => 'objectstore', + 'message' => 'Could not create object ' . $this->getURN($fileId) . ' for ' . $path, + ]); + throw $ex; // make this bubble up + } + + return $size; + } } diff --git a/lib/private/Files/Storage/Common.php b/lib/private/Files/Storage/Common.php index b6c82f3a1d..6324050b47 100644 --- a/lib/private/Files/Storage/Common.php +++ b/lib/private/Files/Storage/Common.php @@ -54,6 +54,7 @@ use OCP\Files\InvalidPathException; use OCP\Files\ReservedWordException; use OCP\Files\Storage\ILockingStorage; use OCP\Files\Storage\IStorage; +use OCP\Files\Storage\IWriteStreamStorage; use OCP\ILogger; use OCP\Lock\ILockingProvider; use OCP\Lock\LockedException; @@ -69,7 +70,7 @@ use OCP\Lock\LockedException; * Some \OC\Files\Storage\Common methods call functions which are first defined * in classes which extend it, e.g. $this->stat() . */ -abstract class Common implements Storage, ILockingStorage { +abstract class Common implements Storage, ILockingStorage, IWriteStreamStorage { use LocalTempFileTrait; @@ -809,4 +810,20 @@ abstract class Common implements Storage, ILockingStorage { public function needsPartFile() { return true; } + + /** + * fallback implementation + * + * @param string $path + * @param resource $stream + * @param int $size + * @return int + */ + public function writeStream(string $path, $stream, int $size = null): int { + $target = $this->fopen($path, 'w'); + list($count, $result) = \OC_Helper::streamCopy($stream, $target); + fclose($stream); + fclose($target); + return $count; + } } diff --git a/lib/private/Files/Storage/Local.php b/lib/private/Files/Storage/Local.php index 46b53dcf95..5f7232e64b 100644 --- a/lib/private/Files/Storage/Local.php +++ b/lib/private/Files/Storage/Local.php @@ -462,4 +462,8 @@ class Local extends \OC\Files\Storage\Common { return parent::moveFromStorage($sourceStorage, $sourceInternalPath, $targetInternalPath); } } + + public function writeStream(string $path, $stream, int $size = null): int { + return (int)file_put_contents($this->getSourcePath($path), $stream); + } } diff --git a/lib/private/Files/Stream/CountReadStream.php b/lib/private/Files/Stream/CountReadStream.php new file mode 100644 index 0000000000..93cadf8f21 --- /dev/null +++ b/lib/private/Files/Stream/CountReadStream.php @@ -0,0 +1,65 @@ + + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OC\Files\Stream; + +use Icewind\Streams\Wrapper; + +class CountReadStream extends Wrapper { + /** @var int */ + private $count; + + /** @var callback */ + private $callback; + + public static function wrap($source, $callback) { + $context = stream_context_create(array( + 'count' => array( + 'source' => $source, + 'callback' => $callback, + ) + )); + return Wrapper::wrapSource($source, $context, 'count', self::class); + } + + public function dir_opendir($path, $options) { + return false; + } + + public function stream_open($path, $mode, $options, &$opened_path) { + $context = $this->loadContext('count'); + + $this->callback = $context['callback']; + return true; + } + + public function stream_read($count) { + $result = parent::stream_read($count); + $this->count += strlen($result); + return $result; + } + + public function stream_close() { + $result = parent::stream_close(); + call_user_func($this->callback, $this->count); + return $result; + } +} diff --git a/lib/public/Files/Storage/IWriteStreamStorage.php b/lib/public/Files/Storage/IWriteStreamStorage.php new file mode 100644 index 0000000000..39a28dd037 --- /dev/null +++ b/lib/public/Files/Storage/IWriteStreamStorage.php @@ -0,0 +1,40 @@ + + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OCP\Files\Storage; + +/** + * Interface that adds the ability to write a stream directly to file + * + * @since 15.0.0 + */ +interface IWriteStreamStorage extends IStorage { + /** + * Write the data from a stream to a file + * + * @param string $path + * @param resource $stream + * @param int|null $size the size of the stream if known in advance + * @return int the number of bytes written + * @since 15.0.0 + */ + public function writeStream(string $path, $stream, int $size = null): int; +} diff --git a/tests/lib/Files/Stream/CountReadStreamTest.php b/tests/lib/Files/Stream/CountReadStreamTest.php new file mode 100644 index 0000000000..99291d1644 --- /dev/null +++ b/tests/lib/Files/Stream/CountReadStreamTest.php @@ -0,0 +1,49 @@ + + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace Test\Files\Stream; + +use OC\Files\Stream\CountReadStream; +use Test\TestCase; + +class CountReadStreamTest extends TestCase { + private function getStream($data) { + $handle = fopen('php://temp', 'w+'); + fwrite($handle, $data); + rewind($handle); + return $handle; + } + + public function testBasicCount() { + $source = $this->getStream('foo'); + $stream = CountReadStream::wrap($source, function ($size) { + $this->assertEquals(3, $size); + }); + stream_get_contents($stream); + } + + public function testLarger() { + $stream = CountReadStream::wrap(fopen(__DIR__ . '/../../../data/testimage.mp4', 'r'), function ($size) { + $this->assertEquals(383631, $size); + }); + stream_get_contents($stream); + } +}