From d0a2fa050694232554242c6344439915f3f09d12 Mon Sep 17 00:00:00 2001 From: Joas Schilling Date: Wed, 18 May 2016 14:27:48 +0200 Subject: [PATCH] Lock jobs while executing them, to allow multiple executors to run in parallel --- cron.php | 1 + db_structure.xml | 17 ++++ lib/private/BackgroundJob/JobList.php | 84 +++++++++------ lib/private/Server.php | 6 +- lib/public/BackgroundJob/IJobList.php | 11 ++ tests/lib/BackgroundJob/JobListTest.php | 130 +++++++++++------------- version.php | 2 +- 7 files changed, 149 insertions(+), 102 deletions(-) diff --git a/cron.php b/cron.php index c4bc9e9667..c8ce5e2b50 100644 --- a/cron.php +++ b/cron.php @@ -138,6 +138,7 @@ try { $executedJobs = []; while ($job = $jobList->getNext()) { if (isset($executedJobs[$job->getId()])) { + $jobList->unlockJob($job); break; } diff --git a/db_structure.xml b/db_structure.xml index 6e57b003fc..24b5a4b748 100644 --- a/db_structure.xml +++ b/db_structure.xml @@ -968,12 +968,29 @@ + last_run integer false + + + last_checked + integer + + false + + + + + reserved_at + integer + + false + + job_class_index diff --git a/lib/private/BackgroundJob/JobList.php b/lib/private/BackgroundJob/JobList.php index 2429b83044..c84969776c 100644 --- a/lib/private/BackgroundJob/JobList.php +++ b/lib/private/BackgroundJob/JobList.php @@ -25,27 +25,34 @@ namespace OC\BackgroundJob; use OCP\AppFramework\QueryException; +use OCP\AppFramework\Utility\ITimeFactory; use OCP\BackgroundJob\IJob; use OCP\BackgroundJob\IJobList; use OCP\AutoloadNotAllowedException; use OCP\DB\QueryBuilder\IQueryBuilder; +use OCP\IConfig; +use OCP\IDBConnection; class JobList implements IJobList { - /** @var \OCP\IDBConnection */ + + /** @var IDBConnection */ protected $connection; - /** - * @var \OCP\IConfig $config - */ + /**@var IConfig */ protected $config; + /**@var ITimeFactory */ + protected $timeFactory; + /** - * @param \OCP\IDBConnection $connection - * @param \OCP\IConfig $config + * @param IDBConnection $connection + * @param IConfig $config + * @param ITimeFactory $timeFactory */ - public function __construct($connection, $config) { + public function __construct(IDBConnection $connection, IConfig $config, ITimeFactory $timeFactory) { $this->connection = $connection; $this->config = $config; + $this->timeFactory = $timeFactory; } /** @@ -71,6 +78,7 @@ class JobList implements IJobList { 'class' => $query->createNamedParameter($class), 'argument' => $query->createNamedParameter($argument), 'last_run' => $query->createNamedParameter(0, IQueryBuilder::PARAM_INT), + 'last_checked' => $query->createNamedParameter($this->timeFactory->getTime(), IQueryBuilder::PARAM_INT), ]); $query->execute(); } @@ -167,45 +175,40 @@ class JobList implements IJobList { * @return IJob|null */ public function getNext() { - $lastId = $this->getLastJob(); - $query = $this->connection->getQueryBuilder(); $query->select('*') ->from('jobs') - ->where($query->expr()->lt('id', $query->createNamedParameter($lastId, IQueryBuilder::PARAM_INT))) - ->orderBy('id', 'DESC') + ->where($query->expr()->lte('reserved_at', $query->createNamedParameter($this->timeFactory->getTime() - 12 * 3600, IQueryBuilder::PARAM_INT))) + ->orderBy('last_checked', 'ASC') ->setMaxResults(1); + + $update = $this->connection->getQueryBuilder(); + $update->update('jobs') + ->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime())) + ->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime())) + ->where($update->expr()->eq('id', $update->createParameter('jobid'))); + + $this->connection->lockTable('jobs'); $result = $query->execute(); $row = $result->fetch(); $result->closeCursor(); if ($row) { - $jobId = $row['id']; + $update->setParameter('jobid', $row['id']); + $update->execute(); + $this->connection->unlockTable(); + $job = $this->buildJob($row); - } else { - //begin at the start of the queue - $query = $this->connection->getQueryBuilder(); - $query->select('*') - ->from('jobs') - ->orderBy('id', 'DESC') - ->setMaxResults(1); - $result = $query->execute(); - $row = $result->fetch(); - $result->closeCursor(); - if ($row) { - $jobId = $row['id']; - $job = $this->buildJob($row); - } else { - return null; //empty job list + if ($job === null) { + // Background job from disabled app, try again. + return $this->getNext(); } - } - if (is_null($job)) { - $this->removeById($jobId); - return $this->getNext(); - } else { return $job; + } else { + $this->connection->unlockTable(); + return null; } } @@ -267,13 +270,30 @@ class JobList implements IJobList { * @param IJob $job */ public function setLastJob($job) { + $this->unlockJob($job); $this->config->setAppValue('backgroundjob', 'lastjob', $job->getId()); } + /** + * Remove the reservation for a job + * + * @param IJob $job + */ + public function unlockJob($job) { + $query = $this->connection->getQueryBuilder(); + $query->update('jobs') + ->set('reserved_at', $query->expr()->literal(0, IQueryBuilder::PARAM_INT)) + ->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT))); + $query->execute(); + } + /** * get the id of the last ran job * * @return int + * @deprecated 9.1.0 - The functionality behind the value is deprecated, it + * only tells you which job finished last, but since we now allow multiple + * executors to run in parallel, it's not used to calculate the next job. */ public function getLastJob() { return (int) $this->config->getAppValue('backgroundjob', 'lastjob', 0); diff --git a/lib/private/Server.php b/lib/private/Server.php index a4294ee2c8..0b7b8f9e40 100644 --- a/lib/private/Server.php +++ b/lib/private/Server.php @@ -362,7 +362,11 @@ class Server extends ServerContainer implements IServerContainer { }); $this->registerService('JobList', function (Server $c) { $config = $c->getConfig(); - return new \OC\BackgroundJob\JobList($c->getDatabaseConnection(), $config); + return new \OC\BackgroundJob\JobList( + $c->getDatabaseConnection(), + $config, + new TimeFactory() + ); }); $this->registerService('Router', function (Server $c) { $cacheFactory = $c->getMemCacheFactory(); diff --git a/lib/public/BackgroundJob/IJobList.php b/lib/public/BackgroundJob/IJobList.php index 5a76ce1ba2..9e401e6841 100644 --- a/lib/public/BackgroundJob/IJobList.php +++ b/lib/public/BackgroundJob/IJobList.php @@ -92,11 +92,22 @@ interface IJobList { */ public function setLastJob($job); + /** + * Remove the reservation for a job + * + * @param IJob $job + * @since 9.1.0 + */ + public function unlockJob($job); + /** * get the id of the last ran job * * @return int * @since 7.0.0 + * @deprecated 9.1.0 - The functionality behind the value is deprecated, it + * only tells you which job finished last, but since we now allow multiple + * executors to run in parallel, it's not used to calculate the next job. */ public function getLastJob(); diff --git a/tests/lib/BackgroundJob/JobListTest.php b/tests/lib/BackgroundJob/JobListTest.php index 6eed804bc3..b8dcb735a2 100644 --- a/tests/lib/BackgroundJob/JobListTest.php +++ b/tests/lib/BackgroundJob/JobListTest.php @@ -9,7 +9,7 @@ namespace Test\BackgroundJob; use OCP\BackgroundJob\IJob; -use OCP\IDBConnection; +use OCP\DB\QueryBuilder\IQueryBuilder; use Test\TestCase; /** @@ -22,20 +22,31 @@ class JobListTest extends TestCase { /** @var \OC\BackgroundJob\JobList */ protected $instance; + /** @var \OCP\IDBConnection */ + protected $connection; + /** @var \OCP\IConfig|\PHPUnit_Framework_MockObject_MockObject */ protected $config; + /** @var \OCP\AppFramework\Utility\ITimeFactory|\PHPUnit_Framework_MockObject_MockObject */ + protected $timeFactory; + protected function setUp() { parent::setUp(); - $connection = \OC::$server->getDatabaseConnection(); - $this->clearJobsList($connection); - $this->config = $this->getMock('\OCP\IConfig'); - $this->instance = new \OC\BackgroundJob\JobList($connection, $this->config); + $this->connection = \OC::$server->getDatabaseConnection(); + $this->clearJobsList(); + $this->config = $this->getMock('OCP\IConfig'); + $this->timeFactory = $this->getMock('OCP\AppFramework\Utility\ITimeFactory'); + $this->instance = new \OC\BackgroundJob\JobList( + $this->connection, + $this->config, + $this->timeFactory + ); } - protected function clearJobsList(IDBConnection $connection) { - $query = $connection->getQueryBuilder(); + protected function clearJobsList() { + $query = $this->connection->getQueryBuilder(); $query->delete('jobs'); $query->execute(); } @@ -131,8 +142,6 @@ class JobListTest extends TestCase { $this->instance->add($job, $argument); $this->assertFalse($this->instance->has($job, 10)); - - $this->instance->remove($job, $argument); } public function testGetLastJob() { @@ -144,50 +153,65 @@ class JobListTest extends TestCase { $this->assertEquals(15, $this->instance->getLastJob()); } + protected function createTempJob($class, $argument, $reservedTime = 0, $lastChecked = 0) { + if ($lastChecked === 0) { + $lastChecked = time(); + } + + $query = $this->connection->getQueryBuilder(); + $query->insert('jobs') + ->values([ + 'class' => $query->createNamedParameter($class), + 'argument' => $query->createNamedParameter($argument), + 'last_run' => $query->createNamedParameter(0, IQueryBuilder::PARAM_INT), + 'last_checked' => $query->createNamedParameter($lastChecked, IQueryBuilder::PARAM_INT), + 'reserved_at' => $query->createNamedParameter($reservedTime, IQueryBuilder::PARAM_INT), + ]); + $query->execute(); + } + public function testGetNext() { $job = new TestJob(); - $this->instance->add($job, 1); - $this->instance->add($job, 2); + $this->createTempJob(get_class($job), 1, 0, 12345); + $this->createTempJob(get_class($job), 2, 0, 12346); $jobs = $this->getAllSorted(); + $savedJob1 = $jobs[0]; - $savedJob1 = $jobs[count($jobs) - 2]; - $savedJob2 = $jobs[count($jobs) - 1]; - - $this->config->expects($this->once()) - ->method('getAppValue') - ->with('backgroundjob', 'lastjob', 0) - ->will($this->returnValue($savedJob2->getId())); - + $this->timeFactory->expects($this->atLeastOnce()) + ->method('getTime') + ->willReturn(123456789); $nextJob = $this->instance->getNext(); $this->assertEquals($savedJob1, $nextJob); - - $this->instance->remove($job, 1); - $this->instance->remove($job, 2); } - public function testGetNextWrapAround() { + public function testGetNextSkipReserved() { $job = new TestJob(); - $this->instance->add($job, 1); - $this->instance->add($job, 2); - - $jobs = $this->getAllSorted(); - - $savedJob1 = $jobs[count($jobs) - 2]; - $savedJob2 = $jobs[count($jobs) - 1]; - - $this->config->expects($this->once()) - ->method('getAppValue') - ->with('backgroundjob', 'lastjob', 0) - ->will($this->returnValue($savedJob1->getId())); + $this->createTempJob(get_class($job), 1, 123456789, 12345); + $this->createTempJob(get_class($job), 2, 0, 12346); + $this->timeFactory->expects($this->atLeastOnce()) + ->method('getTime') + ->willReturn(123456789); $nextJob = $this->instance->getNext(); - $this->assertEquals($savedJob2, $nextJob); + $this->assertEquals(get_class($job), get_class($nextJob)); + $this->assertEquals(2, $nextJob->getArgument()); + } - $this->instance->remove($job, 1); - $this->instance->remove($job, 2); + public function testGetNextSkipNonExisting() { + $job = new TestJob(); + $this->createTempJob('\OC\Non\Existing\Class', 1, 0, 12345); + $this->createTempJob(get_class($job), 2, 0, 12346); + + $this->timeFactory->expects($this->atLeastOnce()) + ->method('getTime') + ->willReturn(123456789); + $nextJob = $this->instance->getNext(); + + $this->assertEquals(get_class($job), get_class($nextJob)); + $this->assertEquals(2, $nextJob->getArgument()); } /** @@ -203,8 +227,6 @@ class JobListTest extends TestCase { $addedJob = $jobs[count($jobs) - 1]; $this->assertEquals($addedJob, $this->instance->getById($addedJob->getId())); - - $this->instance->remove($job, $argument); } public function testSetLastRun() { @@ -223,33 +245,5 @@ class JobListTest extends TestCase { $this->assertGreaterThanOrEqual($timeStart, $addedJob->getLastRun()); $this->assertLessThanOrEqual($timeEnd, $addedJob->getLastRun()); - - $this->instance->remove($job); - } - - public function testGetNextNonExisting() { - $job = new TestJob(); - $this->instance->add($job, 1); - $this->instance->add('\OC\Non\Existing\Class'); - $this->instance->add($job, 2); - - $jobs = $this->getAllSorted(); - - $savedJob1 = $jobs[count($jobs) - 2]; - $savedJob2 = $jobs[count($jobs) - 1]; - - $this->config->expects($this->any()) - ->method('getAppValue') - ->with('backgroundjob', 'lastjob', 0) - ->will($this->returnValue($savedJob1->getId())); - - $this->instance->getNext(); - - $nextJob = $this->instance->getNext(); - - $this->assertEquals($savedJob2, $nextJob); - - $this->instance->remove($job, 1); - $this->instance->remove($job, 2); } } diff --git a/version.php b/version.php index d9e1ca1df1..a4f1c4dbce 100644 --- a/version.php +++ b/version.php @@ -26,7 +26,7 @@ // We only can count up. The 4. digit is only for the internal patchlevel to trigger DB upgrades // between betas, final and RCs. This is _not_ the public version number. Reset minor/patchlevel // when updating major/minor version number. -$OC_Version = array(9, 1, 0, 2); +$OC_Version = array(9, 1, 0, 3); // The human readable string $OC_VersionString = '9.1.0 pre alpha';