First attempt to make multipartPart upload working

Signed-off-by: Julius Härtl <jus@bitgrid.net>
This commit is contained in:
Julius Härtl 2021-05-06 18:26:42 +02:00
parent bf86050c77
commit 6773071038
No known key found for this signature in database
GPG Key ID: 4C614C6ED2CDE6DF
6 changed files with 153 additions and 2 deletions

View File

@ -38,6 +38,7 @@ use OC\Files\View;
use OCA\DAV\Connector\Sabre\Exception\FileLocked;
use OCA\DAV\Connector\Sabre\Exception\Forbidden;
use OCA\DAV\Connector\Sabre\Exception\InvalidPath;
use OCA\DAV\Upload\FutureFile;
use OCP\Files\FileInfo;
use OCP\Files\ForbiddenException;
use OCP\Files\InvalidPathException;
@ -375,6 +376,13 @@ class Directory extends \OCA\DAV\Connector\Sabre\Node implements \Sabre\DAV\ICol
* @throws \Sabre\DAV\Exception\Forbidden
*/
public function moveInto($targetName, $fullSourcePath, INode $sourceNode) {
if ($sourceNode instanceof FutureFile) {
$sourceView = new View('');
// will use copyFromStorage then
$sourceView->copy($sourceView->getAbsolutePath($sourceNode->getPath()), $this->fileView->getAbsolutePath($targetName));
return true;
}
if (!$sourceNode instanceof Node) {
// it's a file of another kind, like FutureFile
if ($sourceNode instanceof IFile) {

View File

@ -26,6 +26,7 @@
namespace OCA\DAV\Upload;
use OC\Files\View;
use OCA\DAV\Connector\Sabre\Directory;
use OCA\DAV\Connector\Sabre\Exception\Forbidden;
use Sabre\DAV\Exception\BadRequest;

View File

@ -68,6 +68,10 @@ class FutureFile implements \Sabre\DAV\IFile {
return AssemblyStream::wrap($nodes);
}
public function getPath() {
return $this->root->getFileInfo()->getInternalPath() . '/.file';
}
/**
* @inheritdoc
*/

View File

@ -40,6 +40,7 @@ use OCP\Files\Cache\ICacheEntry;
use OCP\Files\FileInfo;
use OCP\Files\NotFoundException;
use OCP\Files\ObjectStore\IObjectStore;
use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
use OCP\Files\Storage\IStorage;
class ObjectStoreStorage extends \OC\Files\Storage\Common {
@ -86,7 +87,6 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
public function mkdir($path) {
$path = $this->normalizePath($path);
if ($this->file_exists($path)) {
return false;
}
@ -123,6 +123,18 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
$data['storage_mtime'] = $mTime;
$data['etag'] = $this->getETag($path);
$this->getCache()->put($path, $data);
// CreateMultipartUpload
if (strpos($path, 'uploads/') === 0 && $this->objectStore instanceof IObjectStoreMultiPartUpload) {
$multipartPath = $path . '/.multipart';
$multipartFileId = $this->getCache()->put($multipartPath, [
'mimetype' => 'application/octet-stream',
'size' => 0,
'mtime' => $mTime,
'storage_mtime' => $mTime,
'etag' => $this->getETag($multipartPath)
]);
$this->objectStore->initiateMultipartUpload($this->getURN($multipartFileId));
}
return true;
}
}
@ -470,6 +482,18 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
$exists = $this->getCache()->inCache($path);
$uploadPath = $exists ? $path : $path . '.part';
// UploadPart
// FIXME: This should be moved to the chunking plugin
if ($this->objectStore instanceof IObjectStoreMultiPartUpload && strpos($path, 'uploads/') === 0) {
$fileId = $this->getCache()->put($path, $stat);
$urn = $this->getURN($this->getCache()->getId(dirname($path) . '/.multipart'));
$partId = (int)(basename($path));
$size = (int)$_SERVER['CONTENT_LENGTH'];
// FIXME: Find a proper way to store the upload id in case we are using the multipart upload
$uploadId = \OC::$server->getMemCacheFactory()->createDistributed('s3')->get('uploadId-' . $urn);
$this->objectStore->uploadMultipartPart($urn, $uploadId, $partId, $stream, $size);
return $size;
}
if ($exists) {
$fileId = $stat['fileid'];
} else {
@ -536,6 +560,40 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
if ($sourceStorage->instanceOfStorage(ObjectStoreStorage::class)) {
/** @var ObjectStoreStorage $sourceStorage */
if ($sourceStorage->getObjectStore()->getStorageId() === $this->getObjectStore()->getStorageId()) {
// CompleteMultipartUpload
// FIXME: This should be moved to the chunking plugin
if (strpos($sourceInternalPath, 'uploads/') === 0 && basename($sourceInternalPath) === '.file') {
if ($this->objectStore instanceof IObjectStoreMultiPartUpload) {
$multipartPath = dirname($sourceInternalPath) . '/.multipart';
$chunkPath = dirname($multipartPath);
$urn = $this->getURN($this->getCache()->getId($multipartPath));
// FIXME: Find a proper way to store the upload id in case we are using the multipart upload
// FIXME: Find a proper way to store the upload part responses
$uploadId = \OC::$server->getMemCacheFactory()->createDistributed('s3')->get('uploadId-' . $urn);
$uploads = \OC::$server->getMemCacheFactory()->createDistributed('s3')->get('uploads-' . $urn);
ksort($uploads);
try {
$result = $this->objectStore->completeMultipartUpload($urn, $uploadId, array_values($uploads));
} catch (\Throwable $e) {
throw $e;
} finally {
\OC::$server->getMemCacheFactory()->createDistributed('s3')->remove('uploads-' . $urn);
\OC::$server->getMemCacheFactory()->createDistributed('s3')->remove('uploadId-' . $urn);
}
$this->rename($multipartPath, $targetInternalPath);
$stat = $this->stat($targetInternalPath);
$mimetypeDetector = \OC::$server->getMimeTypeDetector();
$mimetype = $mimetypeDetector->detectPath($targetInternalPath);
$stat['size'] = (int)$result->get('ContentLength');
$stat['mimetype'] = $mimetype;
$stat['etag'] = $this->getETag($targetInternalPath);
$stat['permissions'] = \OCP\Constants::PERMISSION_ALL - \OCP\Constants::PERMISSION_CREATE;
$this->getCache()->put($targetInternalPath, $stat);
$this->unlink($chunkPath);
return true;
}
}
$sourceEntry = $sourceStorage->getCache()->get($sourceInternalPath);
$this->copyInner($sourceEntry, $targetInternalPath);
return true;

View File

@ -24,9 +24,11 @@
namespace OC\Files\ObjectStore;
use Icewind\Streams\CallbackWrapper;
use OCP\Files\ObjectStore\IObjectStore;
use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
class S3 implements IObjectStore {
class S3 implements IObjectStore, IObjectStoreMultiPartUpload {
use S3ConnectionTrait;
use S3ObjectTrait;
@ -41,4 +43,44 @@ class S3 implements IObjectStore {
public function getStorageId() {
return $this->id;
}
public function initiateMultipartUpload(string $urn): string {
$upload = $this->getConnection()->createMultipartUpload([
'Bucket' => $this->bucket,
'Key' => $urn,
]);
$uploadId = $upload->get('UploadId');
\OC::$server->getMemCacheFactory()->createDistributed('s3')->set('uploadId-' . $urn, $uploadId);
return $uploadId;
}
public function uploadMultipartPart(string $urn, string $uploadId, int $partId, $stream, $size) {
$result = $this->getConnection()->uploadPart([
'Body' => $stream,
'Bucket' => $this->bucket,
'Key' => $urn,
'ContentLength' => $size,
'PartNumber' => $partId,
'UploadId' => $uploadId,
]);
$uploads = \OC::$server->getMemCacheFactory()->createDistributed('s3')->get('uploads-' . $urn);
$uploads[$partId] = [
'ETag' => trim($result->get('ETag'), '"'),
'PartNumber' => $partId,
];
\OC::$server->getMemCacheFactory()->createDistributed('s3')->set('uploads-' . $urn, $uploads);
}
public function completeMultipartUpload(string $urn, string $uploadId, array $result) {
$this->getConnection()->completeMultipartUpload([
'Bucket' => $this->bucket,
'Key' => $urn,
'UploadId' => $uploadId,
'MultipartUpload' => [ 'Parts' => $result ],
]);
return $this->getConnection()->headObject([
'Bucket' => $this->bucket,
'Key' => $urn,
]);
}
}

View File

@ -0,0 +1,38 @@
<?php
/*
* @copyright Copyright (c) 2021 Julius Härtl <jus@bitgrid.net>
*
* @author Julius Härtl <jus@bitgrid.net>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
declare(strict_types=1);
namespace OCP\Files\ObjectStore;
interface IObjectStoreMultiPartUpload {
public function initiateMultipartUpload(string $urn): string;
public function uploadMultipartPart(string $urn, string $uploadId, int $partId, $stream, $size);
public function completeMultipartUpload(string $urn, string $uploadId, array $result);
}