Skip to content

Commit

Permalink
Implementation of RedisQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Ville Mattila committed Jul 20, 2013
1 parent 5c0642e commit 06dd62b
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 9 deletions.
51 changes: 51 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,57 @@ the name of the tube to the constructor.

Non-deleted but fetched jobs are returned to the queue when the script ends.

### RedisQueue ([Redis](http://redis.io/) server)

$redis = new \Predis\Client();
$queueListKey = 'queue_key';
$queue = new RedisQueue('queue_id', $redis, $queueListKey);

RedisQueue uses the [Predis PHP Library](https://github.com/nrk/predis) to access
the configured Redis servers. The actual queue is implemented by a Redis list.

* `pushJob()` adds the job payload to the list using `[LPUSH](http://redis.io/commands/lpush)`
* `fetchJob()` fetches the job using `[BRPOPLPUSH](http://redis.io/commands/brpoplpush)` or `[RPOPLPUSH](http://redis.io/commands/rpoplpush)`
* `finalizeJob()` deletes the job using `[LREM](http://redis.io/commands/lrem)` from the processing queue
* `releaseJob()` moves the job back to the list queue using `[RPOPLPUSH](http://redis.io/commands/lrem)`

Non-deleted but fetched jobs are returned to the queue when the script ends.

RedisQueue uses a concept of processing queue to ensure the queue reliability also in case of the client
failures. Processing queue lives in a special key only between `fetchJob()` and `finalizeJob()` (or `releaseJob()`) calls.
The processing queue key name is automatically constructed in `fetchJob()` call and follows by default the pattern
`<queue_name>:<host_name>:<pid>(random unique string)`. [Read more about reliable queue pattern.](http://redis.io/commands/rpoplpush)

**Queue Configuration**

You can further customize the queue configuration by passing fourth argument to the queue constructor.

$queue = new RedisQueue('queue_id', $redis, $queueListKey, $configuration);

`$configuration` should be an associative array. The default configuration (and possible variables) are following.

$configuration = array(
'processing_queue_key_prefix' => '%q:%h:%p',
'allow_infinite_blocking' => false,
'skip_shutdown_release' => false,
);

`processing_queue_key_prefix`: The prefix pattern for the processing queue key.
There are a few placeholders that are replaced with actual values:
`%q` main queue name, `%h` hostname and `%p` PHP process ID.

`allow_infinite_blocking`: By default, if you do not pass any timeout
(or `NULL` or `0` or `false`) for `fetchJob()`, RedisQueue will do
non-blocking `[RPOPLPUSH](http://redis.io/commands/rpoplpush)`
call instead of blocking `[BRPOPLPUSH](http://redis.io/commands/brpoplpush)`. If the queue
contains no jobs, the function is returned immediately. If you set `allow_infinite_blocking` to `true` and
pass no timeout to `fetchJob()`, the queue forces to use `[BRPOPLPUSH](http://redis.io/commands/brpoplpush)` even
with no timeout (=infinite blocking). Use with care.

`ship_shutdown_release`: By default, the queue registers a call that releases
possibly unreleased and unfinished but fetched jobs back to the queue. Set to `true` to
disable the functionality.

### IronMQQueue

$ironMQ = new \IronMQ(array(
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
},
"require-dev": {
"pda/pheanstalk": "2.1.0",
"iron-io/iron_mq": "1.4.5"
"iron-io/iron_mq": "1.4.5",
"predis/predis": ">=0.8"
},
"autoload": {
"psr-0": {"Eventio\\BBQ": "src/"}
Expand Down
34 changes: 34 additions & 0 deletions src/Eventio/BBQ/Job/RedisQueueJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace Eventio\BBQ\Job;

use Eventio\BBQ\Job\Job;

/**
* @author Ville Mattila <[email protected]>
*/
class RedisQueueJob extends Job
{

public function __construct($payload, $rawData, $processingKey = null)
{
parent::__construct($payload);
$this->rawData = $rawData;
$this->processingKey = $processingKey;
}

protected $rawData;

public function getRawData()
{
return $this->rawData;
}

protected $processingKey;

public function getProcessingKey()
{
return $this->processingKey;
}

}
12 changes: 8 additions & 4 deletions src/Eventio/BBQ/Queue/AbstractQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function mayHaveJob()
*/
protected function lockJob(JobInterface $job)
{
$this->lockedJobs[] = $job;
$this->lockedJobs[spl_object_hash($job)] = $job;
}

/**
Expand All @@ -57,9 +57,7 @@ protected function lockJob(JobInterface $job)
*/
protected function deleteLockedJob(JobInterface $job)
{
$this->lockedJobs = array_filter($this->lockedJobs, function($item) use ($job) {
return ($job !== $item);
});
unset($this->lockedJobs[spl_object_hash($job)]);
}

abstract protected function init();
Expand All @@ -79,4 +77,10 @@ public function hasLockedJobs()
return (count($this->lockedJobs) > 0);
}

public function releaseLockedJobs()
{
foreach ($this->lockedJobs as $lockedJob) {
$this->releaseJob($lockedJob);
}
}
}
8 changes: 4 additions & 4 deletions src/Eventio/BBQ/Queue/DirectoryQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ protected function init()
}
}

/**
* @deprecated, use releaseLockedJobs() instead
*/
public function releaseUnfinishedJobs()
{
foreach ($this->lockedJobs as $lockedJob) {
$this->releaseJob($lockedJob);
}
$this->releaseLockedJobs();
}

}
103 changes: 103 additions & 0 deletions src/Eventio/BBQ/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

namespace Eventio\BBQ\Queue;

use Eventio\BBQ\Job\JobInterface;
use Eventio\BBQ\Job\Payload\JobPayloadInterface;
use Eventio\BBQ\Job\RedisQueueJob;

/**
* @author Ville Mattila <[email protected]>
*/
class RedisQueue extends AbstractQueue
{

/**
* @var \Predis\ClientInterface
*/
protected $predis;

/**
* @var string
*/
protected $queueKey;

/**
* @var string
*/
protected $processingQueueKeyPrefix;

public function __construct($id, \Predis\ClientInterface $predis, $queueKey, array $config = array())
{
$this->predis = $predis;
$this->queueKey = $queueKey;

parent::__construct($id, $config);
}

protected function init()
{
$this->config = array_merge(array(
'processing_queue_key_prefix' => '%q:%h:%p',
'allow_infinite_blocking' => false,
'skip_shutdown_release' => false
), $this->config);

$this->processingQueueKeyPrefix = $this->convertProcessingKey($this->config['processing_queue_key_prefix']);

if ($this->config['skip_shutdown_release'] == false) {
register_shutdown_function(array($this, 'releaseLockedJobs'));
}
}

private function convertProcessingKey($template)
{
$replaces = array(
'%q' => $this->queueKey,
'%h' => gethostname(),
'%p' => getmypid(),
);
return str_replace(array_keys($replaces), array_values($replaces), $template);
}

public function fetchJob($timeout = null)
{
$processingKey = uniqid($this->processingQueueKeyPrefix);

if (false === $this->config['allow_infinite_blocking'] && !$timeout) {
$redisJob = $this->predis->rpoplpush($this->queueKey, $processingKey);
} else {
$redisJob = $this->predis->brpoplpush($this->queueKey, $processingKey, $timeout);
}
if (!$redisJob) {
return null;
}

$job = new RedisQueueJob(unserialize($redisJob), $redisJob, $processingKey);
$job->setQueue($this);

$this->lockJob($job);
return $job;
}

public function finalizeJob(JobInterface $job)
{
//
$this->predis->lrem($job->getProcessingKey(), 0, $job->getRawData());
$this->deleteLockedJob($job);
}

public function pushJob(JobPayloadInterface $jobPayload)
{
// Adds a new job to the main queue
$this->predis->lpush($this->queueKey, serialize($jobPayload));
}

public function releaseJob(JobInterface $job)
{
// Returning the job back to the main queue from the processing queue
$this->predis->rpoplpush($job->getProcessingKey(), $this->queueKey);
$this->deleteLockedJob($job);
}

}
53 changes: 53 additions & 0 deletions tests/Eventio/BBQ/Tests/Functional/RedisTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

namespace Eventio\BBQ\Tests\Functional;

use Eventio\BBQ;
use Eventio\BBQ\Job\Payload\StringPayload;
use Eventio\BBQ\Queue\RedisQueue;

/**
* @author Ville Mattila <[email protected]>
*/
class RedisTest extends \PHPUnit_Framework_TestCase
{

protected function setUp()
{
if (!class_exists('\\Predis\\Client')) {
$this->markTestSkipped(
'Predis library is not installed'
);
}
}

public function createBBQRedisQueue()
{
$bbq = new BBQ();

$predis = new \Predis\Client();
$bbq->registerQueue(new RedisQueue('queue1', $predis, 'eventio:bbq:queue1'));

return $bbq;
}

public function testJob()
{

$bbq = $this->createBBQRedisQueue();
$bbq->pushJob('queue1', new StringPayload('Test 1'));

unset($bbq);

$bbq = $this->createBBQRedisQueue();
$job = $bbq->fetchJob('queue1');

$this->assertInstanceOf('\\Eventio\\BBQ\\Job\\RedisQueueJob', $job);

$payload = $job->getPayload();
$this->assertEquals('Test 1', (string) $payload);

$bbq->finalizeJob($job);
}

}
Loading

0 comments on commit 06dd62b

Please sign in to comment.