options = Collection::fromConfig( $options, array('concurrency' => 10), array('client', 'bucket', 'iterator', 'source_converter') ); $this->init(); } public static function getAllEvents() { return array(self::BEFORE_TRANSFER, self::AFTER_TRANSFER); } /** * Begin transferring files */ public function transfer() { // Pull out chunks of uploads to upload in parallel $iterator = new ChunkedIterator($this->options['iterator'], $this->options['concurrency']); foreach ($iterator as $files) { $this->transferFiles($files); } } /** * Create a command or special transfer action for the * * @param \SplFileInfo $file File used to build the transfer * * @return CommandInterface|callable */ abstract protected function createTransferAction(\SplFileInfo $file); /** * Hook to initialize subclasses * @codeCoverageIgnore */ protected function init() {} /** * Process and transfer a group of files * * @param array $files Files to transfer */ protected function transferFiles(array $files) { // Create the base event data object $event = array('sync' => $this, 'client' => $this->options['client']); $commands = array(); foreach ($files as $file) { if ($action = $this->createTransferAction($file)) { $event = array('command' => $action, 'file' => $file) + $event; $this->dispatch(self::BEFORE_TRANSFER, $event); if ($action instanceof CommandInterface) { $commands[] = $action; } elseif (is_callable($action)) { $action(); $this->dispatch(self::AFTER_TRANSFER, $event); } } } $this->transferCommands($commands); } /** * Transfer an array of commands in parallel * * @param array $commands Commands to transfer */ protected function transferCommands(array $commands) { if ($commands) { $this->options['client']->execute($commands); // Notify listeners that each command finished $event = array('sync' => $this, 'client' => $this->options['client']); foreach ($commands as $command) { $event['command'] = $command; $this->dispatch(self::AFTER_TRANSFER, $event); } } } }