Merge pull request #14300 from owncloud/commandbus
Add async command system to handle asynchronous operations
This commit is contained in:
commit
0c1e6fad6c
2
3rdparty
2
3rdparty
|
@ -1 +1 @@
|
|||
Subproject commit 588b1308f4abf58acb3bb8519f6952d9890cca89
|
||||
Subproject commit fa00c2f1b348441cd777370098d266aa78a90833
|
|
@ -50,6 +50,8 @@ if (!OC_Config::getValue('maintenance', false)) {
|
|||
OCP\User::logout();
|
||||
}
|
||||
|
||||
\OC::$server->getCommandBus()->requireSync('\OC\Command\FileAccess');
|
||||
|
||||
// Register settings scripts
|
||||
OCP\App::registerAdmin('files_encryption', 'settings-admin');
|
||||
OCP\App::registerPersonal('files_encryption', 'settings-personal');
|
||||
|
|
|
@ -979,7 +979,7 @@
|
|||
<type>text</type>
|
||||
<default></default>
|
||||
<notnull>true</notnull>
|
||||
<length>256</length>
|
||||
<length>4000</length>
|
||||
</field>
|
||||
|
||||
<field>
|
||||
|
|
|
@ -57,6 +57,9 @@ class JobList implements IJobList {
|
|||
$class = $job;
|
||||
}
|
||||
$argument = json_encode($argument);
|
||||
if (strlen($argument) > 4000) {
|
||||
throw new \InvalidArgumentException('Background job arguments can\'t exceed 4000 characters (json encoded)');
|
||||
}
|
||||
$query = $this->conn->prepare('INSERT INTO `*PREFIX*jobs`(`class`, `argument`, `last_run`) VALUES(?, ?, 0)');
|
||||
$query->execute(array($class, $argument));
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ abstract class QueuedJob extends Job {
|
|||
* @param \OC\Log $logger
|
||||
*/
|
||||
public function execute($jobList, $logger = null) {
|
||||
$jobList->remove($this);
|
||||
$jobList->remove($this, $this->argument);
|
||||
parent::execute($jobList, $logger);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace OC\Command;
|
||||
|
||||
use OCP\Command\IBus;
|
||||
use OCP\Command\ICommand;
|
||||
use SuperClosure\Serializer;
|
||||
|
||||
/**
|
||||
* Asynchronous command bus that uses the background job system as backend
|
||||
*/
|
||||
class AsyncBus implements IBus {
|
||||
/**
|
||||
* @var \OCP\BackgroundJob\IJobList
|
||||
*/
|
||||
private $jobList;
|
||||
|
||||
/**
|
||||
* List of traits for command which require sync execution
|
||||
*
|
||||
* @var string[]
|
||||
*/
|
||||
private $syncTraits = [];
|
||||
|
||||
/**
|
||||
* @param \OCP\BackgroundJob\IJobList $jobList
|
||||
*/
|
||||
function __construct($jobList) {
|
||||
$this->jobList = $jobList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule a command to be fired
|
||||
*
|
||||
* @param \OCP\Command\ICommand | callable $command
|
||||
*/
|
||||
public function push($command) {
|
||||
if ($this->canRunAsync($command)) {
|
||||
$this->jobList->add($this->getJobClass($command), $this->serializeCommand($command));
|
||||
} else {
|
||||
$this->runCommand($command);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Require all commands using a trait to be run synchronous
|
||||
*
|
||||
* @param string $trait
|
||||
*/
|
||||
public function requireSync($trait) {
|
||||
$this->syncTraits[] = trim($trait, '\\');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \OCP\Command\ICommand | callable $command
|
||||
*/
|
||||
private function runCommand($command) {
|
||||
if ($command instanceof ICommand) {
|
||||
$command->handle();
|
||||
} else {
|
||||
$command();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \OCP\Command\ICommand | callable $command
|
||||
* @return string
|
||||
*/
|
||||
private function getJobClass($command) {
|
||||
if ($command instanceof \Closure) {
|
||||
return 'OC\Command\ClosureJob';
|
||||
} else if (is_callable($command)) {
|
||||
return 'OC\Command\CallableJob';
|
||||
} else if ($command instanceof ICommand) {
|
||||
return 'OC\Command\CommandJob';
|
||||
} else {
|
||||
throw new \InvalidArgumentException('Invalid command');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \OCP\Command\ICommand | callable $command
|
||||
* @return string
|
||||
*/
|
||||
private function serializeCommand($command) {
|
||||
if ($command instanceof \Closure) {
|
||||
$serializer = new Serializer();
|
||||
return $serializer->serialize($command);
|
||||
} else if (is_callable($command) or $command instanceof ICommand) {
|
||||
return serialize($command);
|
||||
} else {
|
||||
throw new \InvalidArgumentException('Invalid command');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \OCP\Command\ICommand | callable $command
|
||||
* @return bool
|
||||
*/
|
||||
private function canRunAsync($command) {
|
||||
$traits = $this->getTraits($command);
|
||||
foreach ($traits as $trait) {
|
||||
if (array_search($trait, $this->syncTraits) !== false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \OCP\Command\ICommand | callable $command
|
||||
* @return string[]
|
||||
*/
|
||||
private function getTraits($command) {
|
||||
if ($command instanceof ICommand) {
|
||||
return class_uses($command);
|
||||
} else {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace OC\Command;
|
||||
|
||||
use OC\BackgroundJob\QueuedJob;
|
||||
|
||||
class CallableJob extends QueuedJob {
|
||||
protected function run($serializedCallable) {
|
||||
$callable = unserialize($serializedCallable);
|
||||
if (is_callable($callable)) {
|
||||
$callable();
|
||||
} else {
|
||||
throw new \InvalidArgumentException('Invalid serialized callable');
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace OC\Command;
|
||||
|
||||
use OC\BackgroundJob\QueuedJob;
|
||||
use SuperClosure\Serializer;
|
||||
|
||||
class ClosureJob extends QueuedJob {
|
||||
protected function run($serializedCallable) {
|
||||
$serializer = new Serializer();
|
||||
$callable = $serializer->unserialize($serializedCallable);
|
||||
if (is_callable($callable)) {
|
||||
$callable();
|
||||
} else {
|
||||
throw new \InvalidArgumentException('Invalid serialized callable');
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace OC\Command;
|
||||
|
||||
use OC\BackgroundJob\QueuedJob;
|
||||
use OCP\Command\ICommand;
|
||||
|
||||
/**
|
||||
* Wrap a command in the background job interface
|
||||
*/
|
||||
class CommandJob extends QueuedJob {
|
||||
protected function run($serializedCommand) {
|
||||
$command = unserialize($serializedCommand);
|
||||
if ($command instanceof ICommand) {
|
||||
$command->handle();
|
||||
} else {
|
||||
throw new \InvalidArgumentException('Invalid serialized command');
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace OC\Command;
|
||||
|
||||
use OCP\IUser;
|
||||
|
||||
trait FileAccess {
|
||||
protected function getUserFolder(IUser $user) {
|
||||
\OC_Util::setupFS($user->getUID());
|
||||
return \OC::$server->getUserFolder($user->getUID());
|
||||
}
|
||||
}
|
|
@ -38,6 +38,7 @@ use OC\AppFramework\Http\Request;
|
|||
use OC\AppFramework\Db\Db;
|
||||
use OC\AppFramework\Utility\SimpleContainer;
|
||||
use OC\Cache\UserCache;
|
||||
use OC\Command\AsyncBus;
|
||||
use OC\Diagnostics\NullQueryLogger;
|
||||
use OC\Diagnostics\EventLogger;
|
||||
use OC\Diagnostics\QueryLogger;
|
||||
|
@ -291,6 +292,10 @@ class Server extends SimpleContainer implements IServerContainer {
|
|||
$this->registerService('IniWrapper', function ($c) {
|
||||
return new IniGetWrapper();
|
||||
});
|
||||
$this->registerService('AsyncCommandBus', function (Server $c) {
|
||||
$jobList = $c->getJobList();
|
||||
return new AsyncBus($jobList);
|
||||
});
|
||||
$this->registerService('TrustedDomainHelper', function ($c) {
|
||||
return new TrustedDomainHelper($this->getConfig());
|
||||
});
|
||||
|
@ -777,6 +782,13 @@ class Server extends SimpleContainer implements IServerContainer {
|
|||
return $this->query('IniWrapper');
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \OCP\Command\IBus
|
||||
*/
|
||||
function getCommandBus(){
|
||||
return $this->query('AsyncCommandBus');
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the trusted domain helper
|
||||
*
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace OCP\Command;
|
||||
|
||||
interface IBus {
|
||||
/**
|
||||
* Schedule a command to be fired
|
||||
*
|
||||
* @param \OCP\Command\ICommand | callable $command
|
||||
*/
|
||||
public function push($command);
|
||||
|
||||
/**
|
||||
* Require all commands using a trait to be run synchronous
|
||||
*
|
||||
* @param string $trait
|
||||
*/
|
||||
public function requireSync($trait);
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
<?php
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace OCP\Command;
|
||||
|
||||
interface ICommand {
|
||||
/**
|
||||
* Run the command
|
||||
*/
|
||||
public function handle();
|
||||
}
|
|
@ -318,4 +318,9 @@ interface IServerContainer {
|
|||
* @return \bantu\IniGetWrapper\IniGetWrapper
|
||||
*/
|
||||
function getIniWrapper();
|
||||
|
||||
/**
|
||||
* @return \OCP\Command\IBus
|
||||
*/
|
||||
function getCommandBus();
|
||||
}
|
||||
|
|
|
@ -21,13 +21,18 @@ class DummyJobList extends \OC\BackgroundJob\JobList {
|
|||
|
||||
private $last = 0;
|
||||
|
||||
public function __construct(){}
|
||||
public function __construct() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \OC\BackgroundJob\Job|string $job
|
||||
* @param mixed $argument
|
||||
*/
|
||||
public function add($job, $argument = null) {
|
||||
if (is_string($job)) {
|
||||
/** @var \OC\BackgroundJob\Job $job */
|
||||
$job = new $job;
|
||||
}
|
||||
$job->setArgument($argument);
|
||||
if (!$this->has($job, null)) {
|
||||
$this->jobs[] = $job;
|
||||
|
|
|
@ -0,0 +1,179 @@
|
|||
<?php
|
||||
|
||||
/**
|
||||
* Copyright (c) 2015 Robin Appelman <icewind@owncloud.com>
|
||||
* This file is licensed under the Affero General Public License version 3 or
|
||||
* later.
|
||||
* See the COPYING-README file.
|
||||
*/
|
||||
|
||||
namespace Test\Command;
|
||||
|
||||
use OC\Command\FileAccess;
|
||||
use OCP\Command\IBus;
|
||||
use OCP\Command\ICommand;
|
||||
use Test\BackgroundJob\DummyJobList;
|
||||
use Test\TestCase;
|
||||
|
||||
class SimpleCommand implements ICommand {
|
||||
public function handle() {
|
||||
AsyncBus::$lastCommand = 'SimpleCommand';
|
||||
}
|
||||
}
|
||||
|
||||
class StateFullCommand implements ICommand {
|
||||
private $state;
|
||||
|
||||
function __construct($state) {
|
||||
$this->state = $state;
|
||||
}
|
||||
|
||||
public function handle() {
|
||||
AsyncBus::$lastCommand = $this->state;
|
||||
}
|
||||
}
|
||||
|
||||
class FilesystemCommand implements ICommand {
|
||||
use FileAccess;
|
||||
|
||||
public function handle() {
|
||||
AsyncBus::$lastCommand = 'FileAccess';
|
||||
}
|
||||
}
|
||||
|
||||
function basicFunction() {
|
||||
AsyncBus::$lastCommand = 'function';
|
||||
}
|
||||
|
||||
// clean class to prevent phpunit putting closure in $this
|
||||
class ThisClosureTest {
|
||||
private function privateMethod() {
|
||||
AsyncBus::$lastCommand = 'closure-this';
|
||||
}
|
||||
|
||||
public function test(IBus $bus) {
|
||||
$bus->push(function () {
|
||||
$this->privateMethod();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class AsyncBus extends TestCase {
|
||||
/**
|
||||
* Basic way to check output from a command
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
public static $lastCommand;
|
||||
|
||||
/**
|
||||
* @var \OCP\BackgroundJob\IJobList
|
||||
*/
|
||||
private $jobList;
|
||||
|
||||
/**
|
||||
* @var \OCP\Command\IBus
|
||||
*/
|
||||
private $bus;
|
||||
|
||||
public static function DummyCommand() {
|
||||
self::$lastCommand = 'static';
|
||||
}
|
||||
|
||||
public function setUp() {
|
||||
$this->jobList = new DummyJobList();
|
||||
$this->bus = new \OC\Command\AsyncBus($this->jobList);
|
||||
self::$lastCommand = '';
|
||||
}
|
||||
|
||||
public function testSimpleCommand() {
|
||||
$command = new SimpleCommand();
|
||||
$this->bus->push($command);
|
||||
$this->runJobs();
|
||||
$this->assertEquals('SimpleCommand', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testStateFullCommand() {
|
||||
$command = new StateFullCommand('foo');
|
||||
$this->bus->push($command);
|
||||
$this->runJobs();
|
||||
$this->assertEquals('foo', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testStaticCallable() {
|
||||
$this->bus->push(['\Test\Command\AsyncBus', 'DummyCommand']);
|
||||
$this->runJobs();
|
||||
$this->assertEquals('static', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testMemberCallable() {
|
||||
$command = new StateFullCommand('bar');
|
||||
$this->bus->push([$command, 'handle']);
|
||||
$this->runJobs();
|
||||
$this->assertEquals('bar', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testFunctionCallable() {
|
||||
$this->bus->push('\Test\Command\BasicFunction');
|
||||
$this->runJobs();
|
||||
$this->assertEquals('function', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testClosure() {
|
||||
$this->bus->push(function () {
|
||||
AsyncBus::$lastCommand = 'closure';
|
||||
});
|
||||
$this->runJobs();
|
||||
$this->assertEquals('closure', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testClosureSelf() {
|
||||
$this->bus->push(function () {
|
||||
self::$lastCommand = 'closure-self';
|
||||
});
|
||||
$this->runJobs();
|
||||
$this->assertEquals('closure-self', self::$lastCommand);
|
||||
}
|
||||
|
||||
|
||||
public function testClosureThis() {
|
||||
// clean class to prevent phpunit putting closure in $this
|
||||
$test = new ThisClosureTest();
|
||||
$test->test($this->bus);
|
||||
$this->runJobs();
|
||||
$this->assertEquals('closure-this', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testClosureBind() {
|
||||
$state = 'bar';
|
||||
$this->bus->push(function () use ($state) {
|
||||
self::$lastCommand = 'closure-' . $state;
|
||||
});
|
||||
$this->runJobs();
|
||||
$this->assertEquals('closure-bar', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testFileFileAccessCommand() {
|
||||
$this->bus->push(new FilesystemCommand());
|
||||
$this->assertEquals('', self::$lastCommand);
|
||||
$this->runJobs();
|
||||
$this->assertEquals('FileAccess', self::$lastCommand);
|
||||
}
|
||||
|
||||
public function testFileFileAccessCommandSync() {
|
||||
$this->bus->requireSync('\OC\Command\FileAccess');
|
||||
$this->bus->push(new FilesystemCommand());
|
||||
$this->assertEquals('FileAccess', self::$lastCommand);
|
||||
self::$lastCommand = '';
|
||||
$this->runJobs();
|
||||
$this->assertEquals('', self::$lastCommand);
|
||||
}
|
||||
|
||||
|
||||
private function runJobs() {
|
||||
$jobs = $this->jobList->getAll();
|
||||
foreach ($jobs as $job) {
|
||||
$job->execute($this->jobList);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,7 +23,7 @@
|
|||
// We only can count up. The 4. digit is only for the internal patchlevel to trigger DB upgrades
|
||||
// between betas, final and RCs. This is _not_ the public version number. Reset minor/patchlevel
|
||||
// when updating major/minor version number.
|
||||
$OC_Version=array(8, 0, 0, 9);
|
||||
$OC_Version=array(8, 0, 0, 10);
|
||||
|
||||
// The human readable string
|
||||
$OC_VersionString='8.0';
|
||||
|
|
Loading…
Reference in New Issue