From 74ae7b8929a7fd3f539fd15efb9533424114a480 Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Tue, 17 Feb 2015 16:49:14 +0100 Subject: [PATCH] Add async command system to handle asynchronous operations --- lib/private/backgroundjob/queuedjob.php | 2 +- lib/private/command/asyncbus.php | 70 +++++++++++ lib/private/command/callablejob.php | 22 ++++ lib/private/command/closurejob.php | 24 ++++ lib/private/command/commandjob.php | 26 +++++ lib/private/server.php | 12 ++ lib/public/command/ibus.php | 18 +++ lib/public/command/icommand.php | 16 +++ lib/public/iservercontainer.php | 5 + tests/lib/backgroundjob/dummyjoblist.php | 7 +- tests/lib/command/asyncbus.php | 143 +++++++++++++++++++++++ 11 files changed, 343 insertions(+), 2 deletions(-) create mode 100644 lib/private/command/asyncbus.php create mode 100644 lib/private/command/callablejob.php create mode 100644 lib/private/command/closurejob.php create mode 100644 lib/private/command/commandjob.php create mode 100644 lib/public/command/ibus.php create mode 100644 lib/public/command/icommand.php create mode 100644 tests/lib/command/asyncbus.php diff --git a/lib/private/backgroundjob/queuedjob.php b/lib/private/backgroundjob/queuedjob.php index 884b22a40f..93dc5a2f06 100644 --- a/lib/private/backgroundjob/queuedjob.php +++ b/lib/private/backgroundjob/queuedjob.php @@ -35,7 +35,7 @@ abstract class QueuedJob extends Job { * @param \OC\Log $logger */ public function execute($jobList, $logger = null) { - $jobList->remove($this); + $jobList->remove($this, $this->argument); parent::execute($jobList, $logger); } } diff --git a/lib/private/command/asyncbus.php b/lib/private/command/asyncbus.php new file mode 100644 index 0000000000..fc9c85acc3 --- /dev/null +++ b/lib/private/command/asyncbus.php @@ -0,0 +1,70 @@ + + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace OC\Command; + +use OCP\Command\IBus; +use OCP\Command\ICommand; +use SuperClosure\Serializer; + +/** + * Asynchronous command bus that uses the background job system as backend + */ +class AsyncBus implements IBus { + /** + * @var \OCP\BackgroundJob\IJobList + */ + private $jobList; + + /** + * @param \OCP\BackgroundJob\IJobList $jobList + */ + function __construct($jobList) { + $this->jobList = $jobList; + } + + /** + * Schedule a command to be fired + * + * @param \OCP\Command\ICommand | callable $command + */ + public function push($command) { + $this->jobList->add($this->getJobClass($command), $this->serializeCommand($command)); + } + + /** + * @param \OCP\Command\ICommand | callable $command + * @return string + */ + private function getJobClass($command) { + if ($command instanceof \Closure) { + return 'OC\Command\ClosureJob'; + } else if (is_callable($command)) { + return 'OC\Command\CallableJob'; + } else if ($command instanceof ICommand) { + return 'OC\Command\CommandJob'; + } else { + throw new \InvalidArgumentException('Invalid command'); + } + } + + /** + * @param \OCP\Command\ICommand | callable $command + * @return string + */ + private function serializeCommand($command) { + if ($command instanceof \Closure) { + $serializer = new Serializer(); + return $serializer->serialize($command); + } else if (is_callable($command) or $command instanceof ICommand) { + return serialize($command); + } else { + throw new \InvalidArgumentException('Invalid command'); + } + } +} diff --git a/lib/private/command/callablejob.php b/lib/private/command/callablejob.php new file mode 100644 index 0000000000..6b755d615e --- /dev/null +++ b/lib/private/command/callablejob.php @@ -0,0 +1,22 @@ + + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace OC\Command; + +use OC\BackgroundJob\QueuedJob; + +class CallableJob extends QueuedJob { + protected function run($serializedCallable) { + $callable = unserialize($serializedCallable); + if (is_callable($callable)) { + $callable(); + } else { + throw new \InvalidArgumentException('Invalid serialized callable'); + } + } +} diff --git a/lib/private/command/closurejob.php b/lib/private/command/closurejob.php new file mode 100644 index 0000000000..abba120b74 --- /dev/null +++ b/lib/private/command/closurejob.php @@ -0,0 +1,24 @@ + + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace OC\Command; + +use OC\BackgroundJob\QueuedJob; +use SuperClosure\Serializer; + +class ClosureJob extends QueuedJob { + protected function run($serializedCallable) { + $serializer = new Serializer(); + $callable = $serializer->unserialize($serializedCallable); + if (is_callable($callable)) { + $callable(); + } else { + throw new \InvalidArgumentException('Invalid serialized callable'); + } + } +} diff --git a/lib/private/command/commandjob.php b/lib/private/command/commandjob.php new file mode 100644 index 0000000000..b2c7d30ee5 --- /dev/null +++ b/lib/private/command/commandjob.php @@ -0,0 +1,26 @@ + + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace OC\Command; + +use OC\BackgroundJob\QueuedJob; +use OCP\Command\ICommand; + +/** + * Wrap a command in the background job interface + */ +class CommandJob extends QueuedJob { + protected function run($serializedCommand) { + $command = unserialize($serializedCommand); + if ($command instanceof ICommand) { + $command->handle(); + } else { + throw new \InvalidArgumentException('Invalid serialized command'); + } + } +} diff --git a/lib/private/server.php b/lib/private/server.php index f6fa5387e4..ca3a17451a 100644 --- a/lib/private/server.php +++ b/lib/private/server.php @@ -38,6 +38,7 @@ use OC\AppFramework\Http\Request; use OC\AppFramework\Db\Db; use OC\AppFramework\Utility\SimpleContainer; use OC\Cache\UserCache; +use OC\Command\AsyncBus; use OC\Diagnostics\NullQueryLogger; use OC\Diagnostics\EventLogger; use OC\Diagnostics\QueryLogger; @@ -291,6 +292,10 @@ class Server extends SimpleContainer implements IServerContainer { $this->registerService('IniWrapper', function ($c) { return new IniGetWrapper(); }); + $this->registerService('AsyncCommandBus', function (Server $c) { + $jobList = $c->getJobList(); + return new AsyncBus($jobList); + }); $this->registerService('TrustedDomainHelper', function ($c) { return new TrustedDomainHelper($this->getConfig()); }); @@ -777,6 +782,13 @@ class Server extends SimpleContainer implements IServerContainer { return $this->query('IniWrapper'); } + /** + * @return \OCP\Command\IBus + */ + function getAsyncCommandBus(){ + return $this->query('AsyncCommandBus'); + } + /** * Get the trusted domain helper * diff --git a/lib/public/command/ibus.php b/lib/public/command/ibus.php new file mode 100644 index 0000000000..707f8fd072 --- /dev/null +++ b/lib/public/command/ibus.php @@ -0,0 +1,18 @@ + + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace OCP\Command; + +interface IBus { + /** + * Schedule a command to be fired + * + * @param \OCP\Command\ICommand | callable $command + */ + public function push($command); +} diff --git a/lib/public/command/icommand.php b/lib/public/command/icommand.php new file mode 100644 index 0000000000..6de61258a4 --- /dev/null +++ b/lib/public/command/icommand.php @@ -0,0 +1,16 @@ + + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace OCP\Command; + +interface ICommand { + /** + * Run the command + */ + public function handle(); +} diff --git a/lib/public/iservercontainer.php b/lib/public/iservercontainer.php index f2806529a4..3b73426d9f 100644 --- a/lib/public/iservercontainer.php +++ b/lib/public/iservercontainer.php @@ -317,4 +317,9 @@ interface IServerContainer { * @return \bantu\IniGetWrapper\IniGetWrapper */ function getIniWrapper(); + + /** + * @return \OCP\Command\IBus + */ + function getAsyncCommandBus(); } diff --git a/tests/lib/backgroundjob/dummyjoblist.php b/tests/lib/backgroundjob/dummyjoblist.php index 7801269b27..6cc690fd55 100644 --- a/tests/lib/backgroundjob/dummyjoblist.php +++ b/tests/lib/backgroundjob/dummyjoblist.php @@ -21,13 +21,18 @@ class DummyJobList extends \OC\BackgroundJob\JobList { private $last = 0; - public function __construct(){} + public function __construct() { + } /** * @param \OC\BackgroundJob\Job|string $job * @param mixed $argument */ public function add($job, $argument = null) { + if (is_string($job)) { + /** @var \OC\BackgroundJob\Job $job */ + $job = new $job; + } $job->setArgument($argument); if (!$this->has($job, null)) { $this->jobs[] = $job; diff --git a/tests/lib/command/asyncbus.php b/tests/lib/command/asyncbus.php new file mode 100644 index 0000000000..030c416953 --- /dev/null +++ b/tests/lib/command/asyncbus.php @@ -0,0 +1,143 @@ + + * This file is licensed under the Affero General Public License version 3 or + * later. + * See the COPYING-README file. + */ + +namespace Test\Command; + +use OCP\Command\ICommand; +use Test\BackgroundJob\DummyJobList; +use Test\TestCase; + +class SimpleCommand implements ICommand { + public function handle() { + AsyncBus::$lastCommand = 'SimpleCommand'; + } +} + +class StateFullCommand implements ICommand { + private $state; + + function __construct($state) { + $this->state = $state; + } + + public function handle() { + AsyncBus::$lastCommand = $this->state; + } +} + +function basicFunction() { + AsyncBus::$lastCommand = 'function'; +} + +class AsyncBus extends TestCase { + /** + * Basic way to check output from a command + * + * @var string + */ + public static $lastCommand; + + /** + * @var \OCP\BackgroundJob\IJobList + */ + private $jobList; + + /** + * @var \OCP\Command\IBus + */ + private $bus; + + public static function DummyCommand() { + self::$lastCommand = 'static'; + } + + public function setUp() { + $this->jobList = new DummyJobList(); + $this->bus = new \OC\Command\AsyncBus($this->jobList); + self::$lastCommand = ''; + } + + public function testSimpleCommand() { + $command = new SimpleCommand(); + $this->bus->push($command); + $this->runJobs(); + $this->assertEquals('SimpleCommand', self::$lastCommand); + } + + public function testStateFullCommand() { + $command = new StateFullCommand('foo'); + $this->bus->push($command); + $this->runJobs(); + $this->assertEquals('foo', self::$lastCommand); + } + + public function testStaticCallable() { + $this->bus->push(['\Test\Command\AsyncBus', 'DummyCommand']); + $this->runJobs(); + $this->assertEquals('static', self::$lastCommand); + } + + public function testMemberCallable() { + $command = new StateFullCommand('bar'); + $this->bus->push([$command, 'handle']); + $this->runJobs(); + $this->assertEquals('bar', self::$lastCommand); + } + + public function testFunctionCallable() { + $this->bus->push('\Test\Command\BasicFunction'); + $this->runJobs(); + $this->assertEquals('function', self::$lastCommand); + } + + public function testClosure() { + $this->bus->push(function () { + AsyncBus::$lastCommand = 'closure'; + }); + $this->runJobs(); + $this->assertEquals('closure', self::$lastCommand); + } + + public function testClosureSelf() { + $this->bus->push(function () { + self::$lastCommand = 'closure-self'; + }); + $this->runJobs(); + $this->assertEquals('closure-self', self::$lastCommand); + } + + private function privateMethod() { + self::$lastCommand = 'closure-this'; + } + + public function testClosureThis() { + $this->bus->push(function () { + $this->privateMethod(); + }); + $this->runJobs(); + $this->assertEquals('closure-this', self::$lastCommand); + } + + public function testClosureBind() { + $state = 'bar'; + $this->bus->push(function () use ($state) { + self::$lastCommand = 'closure-' . $state; + }); + $this->runJobs(); + $this->assertEquals('closure-bar', self::$lastCommand); + } + + + private function runJobs() { + $jobs = $this->jobList->getAll(); + foreach ($jobs as $job) { + $job->execute($this->jobList); + } + } +}