Compare commits

...

1 Commits

Author SHA1 Message Date
Julius Härtl d359843211
Use UploadPartCopy for building the file from chunks directly on the object storage
Signed-off-by: Julius Härtl <jus@bitgrid.net>
2021-05-07 10:42:50 +02:00
5 changed files with 113 additions and 2 deletions

View File

@ -38,6 +38,7 @@ use OC\Files\View;
use OCA\DAV\Connector\Sabre\Exception\FileLocked;
use OCA\DAV\Connector\Sabre\Exception\Forbidden;
use OCA\DAV\Connector\Sabre\Exception\InvalidPath;
use OCA\DAV\Upload\FutureFile;
use OCP\Files\FileInfo;
use OCP\Files\ForbiddenException;
use OCP\Files\InvalidPathException;
@ -375,6 +376,13 @@ class Directory extends \OCA\DAV\Connector\Sabre\Node implements \Sabre\DAV\ICol
* @throws \Sabre\DAV\Exception\Forbidden
*/
public function moveInto($targetName, $fullSourcePath, INode $sourceNode) {
if ($sourceNode instanceof FutureFile) {
$sourceView = new View('');
// will use copyFromStorage then
$sourceView->copy($sourceView->getAbsolutePath($sourceNode->getPath()), $this->fileView->getAbsolutePath($this->getPath() . '/' . $targetName));
return true;
}
if (!$sourceNode instanceof Node) {
// it's a file of another kind, like FutureFile
if ($sourceNode instanceof IFile) {

View File

@ -26,6 +26,7 @@
namespace OCA\DAV\Upload;
use OC\Files\View;
use OCA\DAV\Connector\Sabre\Directory;
use OCA\DAV\Connector\Sabre\Exception\Forbidden;
use Sabre\DAV\Exception\BadRequest;
@ -93,6 +94,10 @@ class ChunkingPlugin extends ServerPlugin {
// do a move manually, skipping Sabre's default "delete" for existing nodes
try {
$this->server->tree->move($path, $destination);
$sourceNode = $this->server->tree->getNodeForPath($path);
if ($sourceNode instanceof FutureFile) {
$sourceNode->delete();
}
} catch (Forbidden $e) {
$sourceNode = $this->server->tree->getNodeForPath($path);
if ($sourceNode instanceof FutureFile) {

View File

@ -68,6 +68,10 @@ class FutureFile implements \Sabre\DAV\IFile {
return AssemblyStream::wrap($nodes);
}
public function getPath() {
return $this->root->getFileInfo()->getInternalPath() . '/.file';
}
/**
* @inheritdoc
*/

View File

@ -40,7 +40,9 @@ use OCP\Files\Cache\ICacheEntry;
use OCP\Files\FileInfo;
use OCP\Files\NotFoundException;
use OCP\Files\ObjectStore\IObjectStore;
use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
use OCP\Files\Storage\IStorage;
use function GuzzleHttp\Promise\all;
class ObjectStoreStorage extends \OC\Files\Storage\Common {
use CopyDirectory;
@ -86,7 +88,6 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
public function mkdir($path) {
$path = $this->normalizePath($path);
if ($this->file_exists($path)) {
return false;
}
@ -536,6 +537,40 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
if ($sourceStorage->instanceOfStorage(ObjectStoreStorage::class)) {
/** @var ObjectStoreStorage $sourceStorage */
if ($sourceStorage->getObjectStore()->getStorageId() === $this->getObjectStore()->getStorageId()) {
if (strpos($sourceInternalPath, 'uploads/') === 0 && substr($sourceInternalPath, -strlen('/.file')) === '/.file') {
if ($this->objectStore instanceof IObjectStoreMultiPartUpload) {
$this->touch($targetInternalPath);
$urn = $this->getURN($this->getCache()->getId($targetInternalPath));
$uploadId = $this->objectStore->initiateMultipartUpload($urn);
$folderContents = $this->getCache()->getFolderContents(dirname($sourceInternalPath));
$partNumber = 1;
$results = [];
$async = false; // FIXME: In case of switching to async part copy, make sure to batch the requests
foreach ($folderContents as $chunkFile) {
// FIXME: Part number must be positive integer between 1 and 10,000 so we need to be careful for cases where the number of chunks exceeds 10000
$results[] = $this->objectStore->uploadMultipartPartCopy($this->getURN($chunkFile->getId()), $urn, $partNumber++, $uploadId, $async);
}
if ($async) {
$results = \GuzzleHttp\Promise\Utils::all($results)->wait();
}
$uploadParts = array_map(function($result, $partNumber) {
return [
'ETag' => $result->get('CopyPartResult')['ETag'],
'PartNumber' => $partNumber+1,
];
}, $results, array_keys($results));
$result = $this->objectStore->completeMultipartUpload($urn, $uploadId, $uploadParts);
$stat = $this->stat($targetInternalPath);
$mimetypeDetector = \OC::$server->getMimeTypeDetector();
$mimetype = $mimetypeDetector->detectPath($targetInternalPath);
$stat['size'] = (int)$result->get('ContentLength');
$stat['mimetype'] = $mimetype;
$stat['etag'] = $this->getETag($targetInternalPath);
$this->getCache()->put($targetInternalPath, $stat);
return true;
}
}
$sourceEntry = $sourceStorage->getCache()->get($sourceInternalPath);
$this->copyInner($sourceEntry, $targetInternalPath);
return true;

View File

@ -24,9 +24,11 @@
namespace OC\Files\ObjectStore;
use Icewind\Streams\CallbackWrapper;
use OCP\Files\ObjectStore\IObjectStore;
use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
class S3 implements IObjectStore {
class S3 implements IObjectStore, IObjectStoreMultiPartUpload {
use S3ConnectionTrait;
use S3ObjectTrait;
@ -41,4 +43,61 @@ class S3 implements IObjectStore {
public function getStorageId() {
return $this->id;
}
public function initiateMultipartUpload(string $urn): string {
$upload = $this->getConnection()->createMultipartUpload([
'Bucket' => $this->bucket,
'Key' => $urn,
]);
$uploadId = $upload->get('UploadId');
\OC::$server->getMemCacheFactory()->createDistributed('s3')->set('uploadId-' . $urn, $uploadId);
return $uploadId;
}
public function uploadMultipartPart(string $urn, string $uploadId, $stream, $size) {
$cache = \OC::$server->getMemCacheFactory()->createDistributed('s3');
$part = $cache->get('partNumber-' . $urn) ?? 0;
$part++;
\OC::$server->getMemCacheFactory()->createDistributed('s3')->set('partNumber-' . $urn, $part);
$count = 0;
$countStream = CallbackWrapper::wrap($stream, function ($read) use (&$count) {
$count += $read;
});
$this->getConnection()->uploadPart([
'Body' => $countStream,
'Bucket' => $this->bucket,
'Key' => $urn,
'ContentLength' => $size,
'PartNumber' => $part,
'UploadId' => $uploadId,
]);
}
public function completeMultipartUpload(string $urn, string $uploadId, array $result) {
$this->getConnection()->completeMultipartUpload([
'Bucket' => $this->bucket,
'Key' => $urn,
'UploadId' => $uploadId,
'MultipartUpload' => [ 'Parts' => $result ],
]);
return $this->getConnection()->headObject([
'Bucket' => $this->bucket,
'Key' => $urn,
]);
}
public function uploadMultipartPartCopy(string $sourceUrn, string $targetUrn, int $partNumber, string $uploadId, $async = false) {
$args = [
'Bucket' => $this->bucket,
'CopySource' => $this->bucket . '/' . $sourceUrn,
'Key' => $targetUrn,
'PartNumber' => $partNumber,
'UploadId' => $uploadId,
];
if ($async) {
return $this->getConnection()->uploadPartCopyAsync($args);
}
return $this->getConnection()->uploadPartCopy($args);
}
}