Compare commits
1 Commits
master
...
enh/s3-mul
Author | SHA1 | Date |
---|---|---|
Julius Härtl | d359843211 |
|
@ -38,6 +38,7 @@ use OC\Files\View;
|
|||
use OCA\DAV\Connector\Sabre\Exception\FileLocked;
|
||||
use OCA\DAV\Connector\Sabre\Exception\Forbidden;
|
||||
use OCA\DAV\Connector\Sabre\Exception\InvalidPath;
|
||||
use OCA\DAV\Upload\FutureFile;
|
||||
use OCP\Files\FileInfo;
|
||||
use OCP\Files\ForbiddenException;
|
||||
use OCP\Files\InvalidPathException;
|
||||
|
@ -375,6 +376,13 @@ class Directory extends \OCA\DAV\Connector\Sabre\Node implements \Sabre\DAV\ICol
|
|||
* @throws \Sabre\DAV\Exception\Forbidden
|
||||
*/
|
||||
public function moveInto($targetName, $fullSourcePath, INode $sourceNode) {
|
||||
if ($sourceNode instanceof FutureFile) {
|
||||
$sourceView = new View('');
|
||||
// will use copyFromStorage then
|
||||
$sourceView->copy($sourceView->getAbsolutePath($sourceNode->getPath()), $this->fileView->getAbsolutePath($this->getPath() . '/' . $targetName));
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!$sourceNode instanceof Node) {
|
||||
// it's a file of another kind, like FutureFile
|
||||
if ($sourceNode instanceof IFile) {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
|
||||
namespace OCA\DAV\Upload;
|
||||
|
||||
use OC\Files\View;
|
||||
use OCA\DAV\Connector\Sabre\Directory;
|
||||
use OCA\DAV\Connector\Sabre\Exception\Forbidden;
|
||||
use Sabre\DAV\Exception\BadRequest;
|
||||
|
@ -93,6 +94,10 @@ class ChunkingPlugin extends ServerPlugin {
|
|||
// do a move manually, skipping Sabre's default "delete" for existing nodes
|
||||
try {
|
||||
$this->server->tree->move($path, $destination);
|
||||
$sourceNode = $this->server->tree->getNodeForPath($path);
|
||||
if ($sourceNode instanceof FutureFile) {
|
||||
$sourceNode->delete();
|
||||
}
|
||||
} catch (Forbidden $e) {
|
||||
$sourceNode = $this->server->tree->getNodeForPath($path);
|
||||
if ($sourceNode instanceof FutureFile) {
|
||||
|
|
|
@ -68,6 +68,10 @@ class FutureFile implements \Sabre\DAV\IFile {
|
|||
return AssemblyStream::wrap($nodes);
|
||||
}
|
||||
|
||||
public function getPath() {
|
||||
return $this->root->getFileInfo()->getInternalPath() . '/.file';
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
|
|
|
@ -40,7 +40,9 @@ use OCP\Files\Cache\ICacheEntry;
|
|||
use OCP\Files\FileInfo;
|
||||
use OCP\Files\NotFoundException;
|
||||
use OCP\Files\ObjectStore\IObjectStore;
|
||||
use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
|
||||
use OCP\Files\Storage\IStorage;
|
||||
use function GuzzleHttp\Promise\all;
|
||||
|
||||
class ObjectStoreStorage extends \OC\Files\Storage\Common {
|
||||
use CopyDirectory;
|
||||
|
@ -86,7 +88,6 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
|
|||
|
||||
public function mkdir($path) {
|
||||
$path = $this->normalizePath($path);
|
||||
|
||||
if ($this->file_exists($path)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -536,6 +537,40 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
|
|||
if ($sourceStorage->instanceOfStorage(ObjectStoreStorage::class)) {
|
||||
/** @var ObjectStoreStorage $sourceStorage */
|
||||
if ($sourceStorage->getObjectStore()->getStorageId() === $this->getObjectStore()->getStorageId()) {
|
||||
if (strpos($sourceInternalPath, 'uploads/') === 0 && substr($sourceInternalPath, -strlen('/.file')) === '/.file') {
|
||||
if ($this->objectStore instanceof IObjectStoreMultiPartUpload) {
|
||||
$this->touch($targetInternalPath);
|
||||
$urn = $this->getURN($this->getCache()->getId($targetInternalPath));
|
||||
$uploadId = $this->objectStore->initiateMultipartUpload($urn);
|
||||
|
||||
$folderContents = $this->getCache()->getFolderContents(dirname($sourceInternalPath));
|
||||
$partNumber = 1;
|
||||
$results = [];
|
||||
$async = false; // FIXME: In case of switching to async part copy, make sure to batch the requests
|
||||
foreach ($folderContents as $chunkFile) {
|
||||
// FIXME: Part number must be positive integer between 1 and 10,000 so we need to be careful for cases where the number of chunks exceeds 10000
|
||||
$results[] = $this->objectStore->uploadMultipartPartCopy($this->getURN($chunkFile->getId()), $urn, $partNumber++, $uploadId, $async);
|
||||
}
|
||||
if ($async) {
|
||||
$results = \GuzzleHttp\Promise\Utils::all($results)->wait();
|
||||
}
|
||||
$uploadParts = array_map(function($result, $partNumber) {
|
||||
return [
|
||||
'ETag' => $result->get('CopyPartResult')['ETag'],
|
||||
'PartNumber' => $partNumber+1,
|
||||
];
|
||||
}, $results, array_keys($results));
|
||||
$result = $this->objectStore->completeMultipartUpload($urn, $uploadId, $uploadParts);
|
||||
$stat = $this->stat($targetInternalPath);
|
||||
$mimetypeDetector = \OC::$server->getMimeTypeDetector();
|
||||
$mimetype = $mimetypeDetector->detectPath($targetInternalPath);
|
||||
$stat['size'] = (int)$result->get('ContentLength');
|
||||
$stat['mimetype'] = $mimetype;
|
||||
$stat['etag'] = $this->getETag($targetInternalPath);
|
||||
$this->getCache()->put($targetInternalPath, $stat);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
$sourceEntry = $sourceStorage->getCache()->get($sourceInternalPath);
|
||||
$this->copyInner($sourceEntry, $targetInternalPath);
|
||||
return true;
|
||||
|
|
|
@ -24,9 +24,11 @@
|
|||
|
||||
namespace OC\Files\ObjectStore;
|
||||
|
||||
use Icewind\Streams\CallbackWrapper;
|
||||
use OCP\Files\ObjectStore\IObjectStore;
|
||||
use OCP\Files\ObjectStore\IObjectStoreMultiPartUpload;
|
||||
|
||||
class S3 implements IObjectStore {
|
||||
class S3 implements IObjectStore, IObjectStoreMultiPartUpload {
|
||||
use S3ConnectionTrait;
|
||||
use S3ObjectTrait;
|
||||
|
||||
|
@ -41,4 +43,61 @@ class S3 implements IObjectStore {
|
|||
public function getStorageId() {
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
public function initiateMultipartUpload(string $urn): string {
|
||||
$upload = $this->getConnection()->createMultipartUpload([
|
||||
'Bucket' => $this->bucket,
|
||||
'Key' => $urn,
|
||||
]);
|
||||
$uploadId = $upload->get('UploadId');
|
||||
\OC::$server->getMemCacheFactory()->createDistributed('s3')->set('uploadId-' . $urn, $uploadId);
|
||||
return $uploadId;
|
||||
}
|
||||
|
||||
public function uploadMultipartPart(string $urn, string $uploadId, $stream, $size) {
|
||||
$cache = \OC::$server->getMemCacheFactory()->createDistributed('s3');
|
||||
$part = $cache->get('partNumber-' . $urn) ?? 0;
|
||||
$part++;
|
||||
\OC::$server->getMemCacheFactory()->createDistributed('s3')->set('partNumber-' . $urn, $part);
|
||||
$count = 0;
|
||||
$countStream = CallbackWrapper::wrap($stream, function ($read) use (&$count) {
|
||||
$count += $read;
|
||||
});
|
||||
$this->getConnection()->uploadPart([
|
||||
'Body' => $countStream,
|
||||
'Bucket' => $this->bucket,
|
||||
'Key' => $urn,
|
||||
'ContentLength' => $size,
|
||||
'PartNumber' => $part,
|
||||
'UploadId' => $uploadId,
|
||||
]);
|
||||
|
||||
}
|
||||
|
||||
public function completeMultipartUpload(string $urn, string $uploadId, array $result) {
|
||||
$this->getConnection()->completeMultipartUpload([
|
||||
'Bucket' => $this->bucket,
|
||||
'Key' => $urn,
|
||||
'UploadId' => $uploadId,
|
||||
'MultipartUpload' => [ 'Parts' => $result ],
|
||||
]);
|
||||
return $this->getConnection()->headObject([
|
||||
'Bucket' => $this->bucket,
|
||||
'Key' => $urn,
|
||||
]);
|
||||
}
|
||||
|
||||
public function uploadMultipartPartCopy(string $sourceUrn, string $targetUrn, int $partNumber, string $uploadId, $async = false) {
|
||||
$args = [
|
||||
'Bucket' => $this->bucket,
|
||||
'CopySource' => $this->bucket . '/' . $sourceUrn,
|
||||
'Key' => $targetUrn,
|
||||
'PartNumber' => $partNumber,
|
||||
'UploadId' => $uploadId,
|
||||
];
|
||||
if ($async) {
|
||||
return $this->getConnection()->uploadPartCopyAsync($args);
|
||||
}
|
||||
return $this->getConnection()->uploadPartCopy($args);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue