From 06dd62b9c6b8a068b7b7cd78af5362fa016b02b7 Mon Sep 17 00:00:00 2001 From: Ville Mattila Date: Sat, 20 Jul 2013 17:07:25 +0300 Subject: [PATCH] Implementation of RedisQueue --- Readme.md | 51 +++++++ composer.json | 3 +- src/Eventio/BBQ/Job/RedisQueueJob.php | 34 +++++ src/Eventio/BBQ/Queue/AbstractQueue.php | 12 +- src/Eventio/BBQ/Queue/DirectoryQueue.php | 8 +- src/Eventio/BBQ/Queue/RedisQueue.php | 103 ++++++++++++++ .../BBQ/Tests/Functional/RedisTest.php | 53 +++++++ .../BBQ/Tests/Queue/RedisQueueTest.php | 130 ++++++++++++++++++ 8 files changed, 385 insertions(+), 9 deletions(-) create mode 100644 src/Eventio/BBQ/Job/RedisQueueJob.php create mode 100644 src/Eventio/BBQ/Queue/RedisQueue.php create mode 100644 tests/Eventio/BBQ/Tests/Functional/RedisTest.php create mode 100644 tests/Eventio/BBQ/Tests/Queue/RedisQueueTest.php diff --git a/Readme.md b/Readme.md index 6cc8d2f..5be6e67 100644 --- a/Readme.md +++ b/Readme.md @@ -92,6 +92,57 @@ the name of the tube to the constructor. Non-deleted but fetched jobs are returned to the queue when the script ends. +### RedisQueue ([Redis](http://redis.io/) server) + + $redis = new \Predis\Client(); + $queueListKey = 'queue_key'; + $queue = new RedisQueue('queue_id', $redis, $queueListKey); + +RedisQueue uses the [Predis PHP Library](https://github.com/nrk/predis) to access +the configured Redis servers. The actual queue is implemented by a Redis list. + +* `pushJob()` adds the job payload to the list using `[LPUSH](http://redis.io/commands/lpush)` +* `fetchJob()` fetches the job using `[BRPOPLPUSH](http://redis.io/commands/brpoplpush)` or `[RPOPLPUSH](http://redis.io/commands/rpoplpush)` +* `finalizeJob()` deletes the job using `[LREM](http://redis.io/commands/lrem)` from the processing queue +* `releaseJob()` moves the job back to the list queue using `[RPOPLPUSH](http://redis.io/commands/lrem)` + +Non-deleted but fetched jobs are returned to the queue when the script ends. + +RedisQueue uses a concept of processing queue to ensure the queue reliability also in case of the client +failures. Processing queue lives in a special key only between `fetchJob()` and `finalizeJob()` (or `releaseJob()`) calls. +The processing queue key name is automatically constructed in `fetchJob()` call and follows by default the pattern +`::(random unique string)`. [Read more about reliable queue pattern.](http://redis.io/commands/rpoplpush) + +**Queue Configuration** + +You can further customize the queue configuration by passing fourth argument to the queue constructor. + + $queue = new RedisQueue('queue_id', $redis, $queueListKey, $configuration); + +`$configuration` should be an associative array. The default configuration (and possible variables) are following. + + $configuration = array( + 'processing_queue_key_prefix' => '%q:%h:%p', + 'allow_infinite_blocking' => false, + 'skip_shutdown_release' => false, + ); + +`processing_queue_key_prefix`: The prefix pattern for the processing queue key. +There are a few placeholders that are replaced with actual values: +`%q` main queue name, `%h` hostname and `%p` PHP process ID. + +`allow_infinite_blocking`: By default, if you do not pass any timeout +(or `NULL` or `0` or `false`) for `fetchJob()`, RedisQueue will do +non-blocking `[RPOPLPUSH](http://redis.io/commands/rpoplpush)` +call instead of blocking `[BRPOPLPUSH](http://redis.io/commands/brpoplpush)`. If the queue +contains no jobs, the function is returned immediately. If you set `allow_infinite_blocking` to `true` and +pass no timeout to `fetchJob()`, the queue forces to use `[BRPOPLPUSH](http://redis.io/commands/brpoplpush)` even +with no timeout (=infinite blocking). Use with care. + +`ship_shutdown_release`: By default, the queue registers a call that releases +possibly unreleased and unfinished but fetched jobs back to the queue. Set to `true` to +disable the functionality. + ### IronMQQueue $ironMQ = new \IronMQ(array( diff --git a/composer.json b/composer.json index e7902a1..c1c03a2 100644 --- a/composer.json +++ b/composer.json @@ -18,7 +18,8 @@ }, "require-dev": { "pda/pheanstalk": "2.1.0", - "iron-io/iron_mq": "1.4.5" + "iron-io/iron_mq": "1.4.5", + "predis/predis": ">=0.8" }, "autoload": { "psr-0": {"Eventio\\BBQ": "src/"} diff --git a/src/Eventio/BBQ/Job/RedisQueueJob.php b/src/Eventio/BBQ/Job/RedisQueueJob.php new file mode 100644 index 0000000..6a4ddf3 --- /dev/null +++ b/src/Eventio/BBQ/Job/RedisQueueJob.php @@ -0,0 +1,34 @@ + + */ +class RedisQueueJob extends Job +{ + + public function __construct($payload, $rawData, $processingKey = null) + { + parent::__construct($payload); + $this->rawData = $rawData; + $this->processingKey = $processingKey; + } + + protected $rawData; + + public function getRawData() + { + return $this->rawData; + } + + protected $processingKey; + + public function getProcessingKey() + { + return $this->processingKey; + } + +} \ No newline at end of file diff --git a/src/Eventio/BBQ/Queue/AbstractQueue.php b/src/Eventio/BBQ/Queue/AbstractQueue.php index 1300fb5..fda69da 100644 --- a/src/Eventio/BBQ/Queue/AbstractQueue.php +++ b/src/Eventio/BBQ/Queue/AbstractQueue.php @@ -48,7 +48,7 @@ public function mayHaveJob() */ protected function lockJob(JobInterface $job) { - $this->lockedJobs[] = $job; + $this->lockedJobs[spl_object_hash($job)] = $job; } /** @@ -57,9 +57,7 @@ protected function lockJob(JobInterface $job) */ protected function deleteLockedJob(JobInterface $job) { - $this->lockedJobs = array_filter($this->lockedJobs, function($item) use ($job) { - return ($job !== $item); - }); + unset($this->lockedJobs[spl_object_hash($job)]); } abstract protected function init(); @@ -79,4 +77,10 @@ public function hasLockedJobs() return (count($this->lockedJobs) > 0); } + public function releaseLockedJobs() + { + foreach ($this->lockedJobs as $lockedJob) { + $this->releaseJob($lockedJob); + } + } } \ No newline at end of file diff --git a/src/Eventio/BBQ/Queue/DirectoryQueue.php b/src/Eventio/BBQ/Queue/DirectoryQueue.php index 7760e5a..138e1f4 100644 --- a/src/Eventio/BBQ/Queue/DirectoryQueue.php +++ b/src/Eventio/BBQ/Queue/DirectoryQueue.php @@ -98,11 +98,11 @@ protected function init() } } + /** + * @deprecated, use releaseLockedJobs() instead + */ public function releaseUnfinishedJobs() { - foreach ($this->lockedJobs as $lockedJob) { - $this->releaseJob($lockedJob); - } + $this->releaseLockedJobs(); } - } \ No newline at end of file diff --git a/src/Eventio/BBQ/Queue/RedisQueue.php b/src/Eventio/BBQ/Queue/RedisQueue.php new file mode 100644 index 0000000..68eff76 --- /dev/null +++ b/src/Eventio/BBQ/Queue/RedisQueue.php @@ -0,0 +1,103 @@ + + */ +class RedisQueue extends AbstractQueue +{ + + /** + * @var \Predis\ClientInterface + */ + protected $predis; + + /** + * @var string + */ + protected $queueKey; + + /** + * @var string + */ + protected $processingQueueKeyPrefix; + + public function __construct($id, \Predis\ClientInterface $predis, $queueKey, array $config = array()) + { + $this->predis = $predis; + $this->queueKey = $queueKey; + + parent::__construct($id, $config); + } + + protected function init() + { + $this->config = array_merge(array( + 'processing_queue_key_prefix' => '%q:%h:%p', + 'allow_infinite_blocking' => false, + 'skip_shutdown_release' => false + ), $this->config); + + $this->processingQueueKeyPrefix = $this->convertProcessingKey($this->config['processing_queue_key_prefix']); + + if ($this->config['skip_shutdown_release'] == false) { + register_shutdown_function(array($this, 'releaseLockedJobs')); + } + } + + private function convertProcessingKey($template) + { + $replaces = array( + '%q' => $this->queueKey, + '%h' => gethostname(), + '%p' => getmypid(), + ); + return str_replace(array_keys($replaces), array_values($replaces), $template); + } + + public function fetchJob($timeout = null) + { + $processingKey = uniqid($this->processingQueueKeyPrefix); + + if (false === $this->config['allow_infinite_blocking'] && !$timeout) { + $redisJob = $this->predis->rpoplpush($this->queueKey, $processingKey); + } else { + $redisJob = $this->predis->brpoplpush($this->queueKey, $processingKey, $timeout); + } + if (!$redisJob) { + return null; + } + + $job = new RedisQueueJob(unserialize($redisJob), $redisJob, $processingKey); + $job->setQueue($this); + + $this->lockJob($job); + return $job; + } + + public function finalizeJob(JobInterface $job) + { + // + $this->predis->lrem($job->getProcessingKey(), 0, $job->getRawData()); + $this->deleteLockedJob($job); + } + + public function pushJob(JobPayloadInterface $jobPayload) + { + // Adds a new job to the main queue + $this->predis->lpush($this->queueKey, serialize($jobPayload)); + } + + public function releaseJob(JobInterface $job) + { + // Returning the job back to the main queue from the processing queue + $this->predis->rpoplpush($job->getProcessingKey(), $this->queueKey); + $this->deleteLockedJob($job); + } + +} \ No newline at end of file diff --git a/tests/Eventio/BBQ/Tests/Functional/RedisTest.php b/tests/Eventio/BBQ/Tests/Functional/RedisTest.php new file mode 100644 index 0000000..2b87efd --- /dev/null +++ b/tests/Eventio/BBQ/Tests/Functional/RedisTest.php @@ -0,0 +1,53 @@ + + */ +class RedisTest extends \PHPUnit_Framework_TestCase +{ + + protected function setUp() + { + if (!class_exists('\\Predis\\Client')) { + $this->markTestSkipped( + 'Predis library is not installed' + ); + } + } + + public function createBBQRedisQueue() + { + $bbq = new BBQ(); + + $predis = new \Predis\Client(); + $bbq->registerQueue(new RedisQueue('queue1', $predis, 'eventio:bbq:queue1')); + + return $bbq; + } + + public function testJob() + { + + $bbq = $this->createBBQRedisQueue(); + $bbq->pushJob('queue1', new StringPayload('Test 1')); + + unset($bbq); + + $bbq = $this->createBBQRedisQueue(); + $job = $bbq->fetchJob('queue1'); + + $this->assertInstanceOf('\\Eventio\\BBQ\\Job\\RedisQueueJob', $job); + + $payload = $job->getPayload(); + $this->assertEquals('Test 1', (string) $payload); + + $bbq->finalizeJob($job); + } + +} \ No newline at end of file diff --git a/tests/Eventio/BBQ/Tests/Queue/RedisQueueTest.php b/tests/Eventio/BBQ/Tests/Queue/RedisQueueTest.php new file mode 100644 index 0000000..0c33f69 --- /dev/null +++ b/tests/Eventio/BBQ/Tests/Queue/RedisQueueTest.php @@ -0,0 +1,130 @@ + + */ +class RedisQueueTest extends \PHPUnit_Framework_TestCase +{ + + protected function setUp() + { + if (!class_exists('\\Predis\\Client')) { + $this->markTestSkipped( + 'Predis library is not installed' + ); + } + } + + /** + * @return \Eventio\BBQ\Queue\RedisQueue + */ + private function createRealQueue() + { + $predis = new \Predis\Client(); + return $this->createQueueOnPredis($predis); + } + + /** + * @return \Eventio\BBQ\Queue\RedisQueue + */ + private function createQueueOnPredis($predis, $queueName = 'queue1') + { + $predis->del($queueName); + return new RedisQueue('redis_queue1', $predis, $queueName); + } + + public function testJob() + { + $queue = $this->createRealQueue(); + + $job = $queue->fetchJob(); + $this->assertNull($job); + + $queue->pushJob(new StringPayload('RedisQueueTest::testJob')); + $job = $queue->fetchJob(); + $this->assertInstanceOf('\\Eventio\\BBQ\\Job\\RedisQueueJob', $job); + $this->assertTrue($queue->hasLockedJobs()); + + $queue->finalizeJob($job); + $this->assertFalse($queue->hasLockedJobs()); + + $job = $queue->fetchJob(); + $this->assertNull($job); + } + + public function testFetchJobTimeout() + { + $queue = $this->createRealQueue(); + + $job = $queue->fetchJob(); + $this->assertNull($job); + + $timeNow = time(); + $job = $queue->fetchJob(5); + $timeAfter = time(); + + $this->assertNull($job); + $this->assertLessThanOrEqual(1, abs($timeAfter - $timeNow - 5)); + } + + public function testFetchJobCall() { + + $predisStub = $this->getMockBuilder('\\Predis\\Client') + ->setMethods(array('rpoplpush')) + ->getMock(); + + $queue = $this->createQueueOnPredis($predisStub); + $queueName = 'queue1'; + + $predisStub->expects($this->once()) + ->method('rpoplpush') + ->with($this->equalTo($queueName), $this->anything()); + + $job = $queue->fetchJob(); + $this->assertNull($job); + } + + public function testFinalizeJobCall() { + + $predisStub = $this->getMockBuilder('\\Predis\\Client') + ->setMethods(array('lrem')) + //->enableProxyingToOriginalMethods() // @see https://github.com/sebastianbergmann/phpunit-mock-objects/issues/132 + ->getMock(); + + $queue = $this->createQueueOnPredis($predisStub); + $queueName = 'queue1'; + + $queue->pushJob(new StringPayload('RedisQueueTest::testFinalizeJobCall')); + $job = $queue->fetchJob(); + $this->assertTrue($queue->hasLockedJobs()); + + $predisStub->expects($this->once()) + ->method('lrem') + ->with($this->equalTo($job->getProcessingKey()), $this->equalTo(0), $this->anything()); + + $queue->finalizeJob($job); + $this->assertFalse($queue->hasLockedJobs()); + } + + public function testFetchJobCallWithTimeout() { + + $predisStub = $this->getMockBuilder('\\Predis\\Client') + ->setMethods(array('brpoplpush')) + ->getMock(); + + $queue = $this->createQueueOnPredis($predisStub); + $queueName = 'queue1'; + $predisStub->expects($this->once()) + ->method('brpoplpush') + ->with($this->equalTo($queueName), $this->anything()); + + $job = $queue->fetchJob(1); + $this->assertNull($job); + } + +} \ No newline at end of file