diff --git a/.travis.yml b/.travis.yml index e093e87..e9c667e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,5 @@ language: php + php: - '7.4' - '7.3' @@ -6,19 +7,17 @@ php: - '7.1' - '7.0' - '5.6' - - hhvm -matrix: - exclude: - - php: hhvm - env: ENABLE_REDIS_EXT=1 - allow_failures: - - php: '7.4' - - php: '7.3' - - php: '7.2' - - php: hhvm + env: - ENABLE_REDIS_EXT=0 - ENABLE_REDIS_EXT=1 -before_script: + +matrix: + allow_failures: + - php: '7.4' + - php: '5.6' + +install: + - echo 'error_reporting = E_ALL & ~E_NOTICE & ~E_STRICT & ~E_DEPRECATED' >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini - sh -c "if [ $ENABLE_REDIS_EXT -eq 1 ]; then echo \"extension=redis.so\" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini; fi" - composer install diff --git a/README.md b/README.md index d60b6f8..e06c13a 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,6 @@ jobs on one or more queues, and processing them later. [![Downloads](https://img.shields.io/packagist/dt/resque/php-resque.svg?style=flat-square)](https://packagist.org/packages/resque/php-resque) [![Build Status](https://img.shields.io/travis/resque/php-resque.svg?style=flat-square&logo=travis)](http://travis-ci.org/resque/php-resque) -[![Tested With HHVM](https://img.shields.io/hhvm/resque/php-resque.svg?style=flat-square)](http://travis-ci.org/resque/php-resque) [![Code Quality](https://img.shields.io/scrutinizer/g/resque/php-resque.svg?style=flat-square&logo=scrutinizer)](https://scrutinizer-ci.com/g/resque/php-resque/) [![Code Coverage](https://img.shields.io/scrutinizer/coverage/g/resque/php-resque.svg?style=flat-square&logo=scrutinizer)](https://scrutinizer-ci.com/g/resque/php-resque/) [![Dependency Status](https://img.shields.io/librariesio/github/resque/php-resque.svg?style=flat-square)](https://libraries.io/github/resque/php-resque) @@ -20,7 +19,7 @@ jobs on one or more queues, and processing them later. [![Latest Release](https://img.shields.io/github/release/resque/php-resque.svg?style=flat-square&logo=github&logoColor=white)](https://github.com/resque/php-resque) [![Latest Release Date](https://img.shields.io/github/release-date/resque/php-resque.svg?style=flat-square&logo=github&logoColor=white)](https://github.com/resque/php-resque) [![Commits Since Latest Release](https://img.shields.io/github/commits-since/resque/php-resque/latest.svg?style=flat-square&logo=github&logoColor=white)](https://github.com/resque/php-resque) -[![Maintenance Status](https://img.shields.io/maintenance/yes/2019.svg?style=flat-square&logo=github&logoColor=white)](https://github.com/resque/php-resque) +[![Maintenance Status](https://img.shields.io/maintenance/yes/2020.svg?style=flat-square&logo=github&logoColor=white)](https://github.com/resque/php-resque) [![Contributors](https://img.shields.io/github/contributors/resque/php-resque.svg?style=flat-square&logo=github&logoColor=white)](https://github.com/resque/php-resque) [![Chat on Slack](https://img.shields.io/badge/chat-Slack-blue.svg?style=flat-square&logo=slack&logoColor=white)](https://join.slack.com/t/php-resque/shared_invite/enQtNTIwODk0OTc1Njg3LWYyODczMTZjMzI2N2JkYWUzM2FlNDk5ZjY2ZGM4Njc4YjFiMzU2ZWFjOGQxMDIyNmE5MTBlNWEzODBiMmVmOTI) @@ -56,6 +55,15 @@ It also supports the following additional features: - Has built in support for `setUp` and `tearDown` methods, called pre and post jobs +Additionally it includes php-resque-scheduler, a PHP port of [resque-scheduler](http://github.com/resque/resque), +which adds support for scheduling items in the future to Resque. It has been +designed to be an almost direct-copy of the Ruby plugin + +At the moment, php-resque-scheduler only supports delayed jobs, which is the +ability to push a job to the queue and have it run at a certain timestamp, or +in a number of seconds. Support for recurring jobs (similar to CRON) is planned +for a future release. + This port was originally made by [Chris Boulton](https://github.com/chrisboulton), with maintenance by the community. See for more on that history. @@ -218,6 +226,43 @@ echo Resque_Job_PID::get($token); Function returns `0` if the `perform` hasn't started yet, or if it has already ended. +## Delayed Jobs + +To quote the documentation for the Ruby resque-scheduler: + +> Delayed jobs are one-off jobs that you want to be put into a queue at some +point in the future. The classic example is sending an email: + + require 'Resque/Resque.php'; + require 'ResqueScheduler/ResqueScheduler.php'; + + $in = 3600; + $args = array('id' => $user->id); + ResqueScheduler::enqueueIn($in, 'email', 'SendFollowUpEmail', $args); + +The above will store the job for 1 hour in the delayed queue, and then pull the +job off and submit it to the `email` queue in Resque for processing as soon as +a worker is available. + +Instead of passing a relative time in seconds, you can also supply a timestamp +as either a DateTime object or integer containing a UNIX timestamp to the +`enqueueAt` method: + + require 'Resque/Resque.php'; + require 'ResqueScheduler/ResqueScheduler.php'; + + $time = 1332067214; + ResqueScheduler::enqueueAt($time, 'email', 'SendFollowUpEmail', $args); + + $datetime = new DateTime('2012-03-18 13:21:49'); + ResqueScheduler::enqueueAt($datetime, 'email', 'SendFollowUpEmail', $args); + +NOTE: resque-scheduler does not guarantee a job will fire at the time supplied. +At the time supplied, resque-scheduler will take the job out of the delayed +queue and push it to the appropriate queue in Resque. Your next available Resque +worker will pick the job up. To keep processing as quick as possible, keep your +queues as empty as possible. + ## Workers Workers work in the exact same way as the Ruby workers. For complete @@ -315,7 +360,7 @@ $ PREFIX=my-app-name bin/resque ### Setting Redis backend ### -When you have the Redis database on a different host than the one the workers +When you have the Redis database on a different host than the one the workers are running, you must set the `REDIS_BACKEND` environment variable: ```sh @@ -355,6 +400,29 @@ functionality to PHP before 5.5, so if you'd like process titles updated, install the PECL module as well. php-resque will automatically detect and use it. +### Resque Scheduler + +resque-scheduler requires a special worker that runs in the background. This +worker is responsible for pulling items off the schedule/delayed queue and adding +them to the queue for resque. This means that for delayed or scheduled jobs to be +executed, that worker needs to be running. + +A basic "up-and-running" `bin/resque-scheduler` file that sets up a +running worker environment is included (`vendor/bin/resque-scheduler` when +installed via composer). It accepts many of the same environment variables as +the main workers for php-resque: + +* `REDIS_BACKEND` - Redis server to connect to +* `LOGGING` - Enable logging to STDOUT +* `VERBOSE` - Enable verbose logging +* `VVERBOSE` - Enable very verbose logging +* `INTERVAL` - Sleep for this long before checking scheduled/delayed queues +* `APP_INCLUDE` - Include this file when starting (to launch your app) +* `PIDFILE` - Write the PID of the worker out to this file + +It's easy to start the resque-scheduler worker using `bin/resque-scheduler`: + $ php bin/resque-scheduler + ## Event/Hook System php-resque has a basic event system that can be used by your application to @@ -462,6 +530,17 @@ passed (in this order) include: - Queue - string containing the name of the queue the job was added to - ID - string containing the new token of the enqueued job +### afterSchedule + +Called after a job has been added to the schedule. Arguments passed are the +timestamp, queue of the job, the class name of the job, and the job's arguments. + +### beforeDelayedEnqueue + +Called immediately after a job has been pulled off the delayed queue and right +before the job is added to the queue in resque. Arguments passed are the queue +of the job, the class name of the job, and the job's arguments. + ## Step-By-Step For a more in-depth look at what php-resque does under the hood (without needing @@ -486,6 +565,7 @@ to directly examine the code), have a look at `HOWITWORKS.md`. - @andrewjshults - @atorres757 - @benjisg +- @biinari - @cballou - @chaitanyakuber - @charly22 @@ -511,6 +591,7 @@ to directly examine the code), have a look at `HOWITWORKS.md`. - @patrickbajao - @pedroarnal - @ptrofimov +- @rayward - @richardkmiller - @Rockstar04 - @ruudk diff --git a/bin/resque-scheduler b/bin/resque-scheduler new file mode 100644 index 0000000..e973ebb --- /dev/null +++ b/bin/resque-scheduler @@ -0,0 +1,82 @@ +#!/usr/bin/env php +logLevel = $logLevel; + +$PIDFILE = getenv('PIDFILE'); +if ($PIDFILE) { + file_put_contents($PIDFILE, getmypid()) or + die('Could not write PID information to ' . $PIDFILE); +} + +fwrite(STDOUT, "*** Starting scheduler worker\n"); +$worker->work($interval); diff --git a/composer.json b/composer.json index 58d5efa..133310e 100644 --- a/composer.json +++ b/composer.json @@ -28,29 +28,31 @@ } ], "require": { - "php": ">=5.3.0", - "ext-pcntl": "*", + "php": ">=5.6.0", "colinmollenhour/credis": "~1.7", "psr/log": "~1.0" }, "suggest": { + "ext-pcntl": "REQUIRED for forking processes on platforms that support it (so anything but Windows).", "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.", "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available." }, "require-dev": { - "phpunit/phpunit": "3.7.*" + "phpunit/phpunit": "^5.7" }, "bin": [ - "bin/resque" + "bin/resque", + "bin/resque-scheduler" ], "autoload": { "psr-0": { - "Resque": "lib" + "Resque": "lib", + "ResqueScheduler": "lib" } - }, - "extra": { - "branch-alias": { - "dev-master": "1.0-dev" - } + }, + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" } + } } diff --git a/extras/resque-scheduler.monit b/extras/resque-scheduler.monit new file mode 100644 index 0000000..ab98e3a --- /dev/null +++ b/extras/resque-scheduler.monit @@ -0,0 +1,13 @@ +# Replace these with your own: +# [PATH/TO/RESQUE] +# [PATH/TO/RESQUE-SCHEDULER] +# [UID] +# [GID] +# [APP_INCLUDE] + +check process resque-scheduler_worker + with pidfile /var/run/resque/scheduler-worker.pid + start program = "/bin/sh -c 'APP_INCLUDE=[APP_INCLUDE] RESQUE_PHP=[PATH/TO/RESQUE] PIDFILE=/var/run/resque/scheduler-worker.pid nohup php -f [PATH/TO/RESQUE-SCHEDULER]/resque-scheduler.php > /var/log/resque/scheduler-worker.log &'" as uid [UID] and gid [GID] + stop program = "/bin/sh -c 'kill -s QUIT `cat /var/run/resque/scheduler-worker.pid` && rm -f /var/run/resque/scheduler-worker.pid; exit 0;'" + if totalmem is greater than 300 MB for 10 cycles then restart # eating up memory? + group resque-scheduler_workers \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 31ad439..3578d99 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -131,18 +131,22 @@ public function updateStatus($status, $result = null) return; } - $statusInstance = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); + $statusInstance = new Resque_Job_Status($this->payload['id'], $this->getPrefix()); $statusInstance->update($status, $result); } /** * Return the status of the current job. * - * @return int The status of the job as one of the Resque_Job_Status constants. + * @return int|null The status of the job as one of the Resque_Job_Status constants or null if job is not being tracked. */ public function getStatus() { - $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); + if(empty($this->payload['id'])) { + return null; + } + + $status = new Resque_Job_Status($this->payload['id'], $this->getPrefix()); return $status->get(); } @@ -171,9 +175,9 @@ public function getInstance() return $this->instance; } - $this->instance = $this->getJobFactory()->create($this->payload['class'], $this->getArguments(), $this->queue); - $this->instance->job = $this; - return $this->instance; + $this->instance = $this->getJobFactory()->create($this->payload['class'], $this->getArguments(), $this->queue); + $this->instance->job = $this; + return $this->instance; } /** @@ -248,13 +252,15 @@ public function fail($exception) */ public function recreate() { - $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); $monitor = false; - if($status->isTracking()) { - $monitor = true; + if (!empty($this->payload['id'])) { + $status = new Resque_Job_Status($this->payload['id'], $this->getPrefix()); + if($status->isTracking()) { + $monitor = true; + } } - return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor, $this->payload['prefix']); + return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor, null, $this->getPrefix()); } /** @@ -288,14 +294,26 @@ public function setJobFactory(Resque_Job_FactoryInterface $jobFactory) return $this; } - /** - * @return Resque_Job_FactoryInterface - */ - public function getJobFactory() - { - if ($this->jobFactory === null) { - $this->jobFactory = new Resque_Job_Factory(); - } - return $this->jobFactory; - } + /** + * @return Resque_Job_FactoryInterface + */ + public function getJobFactory() + { + if ($this->jobFactory === null) { + $this->jobFactory = new Resque_Job_Factory(); + } + return $this->jobFactory; + } + + /** + * @return string + */ + private function getPrefix() + { + if (isset($this->payload['prefix'])) { + return $this->payload['prefix']; + } + + return ''; + } } diff --git a/lib/ResqueScheduler.php b/lib/ResqueScheduler.php new file mode 100644 index 0000000..986f89a --- /dev/null +++ b/lib/ResqueScheduler.php @@ -0,0 +1,271 @@ + +* @copyright (c) 2012 Chris Boulton +* @license http://www.opensource.org/licenses/mit-license.php +*/ +class ResqueScheduler +{ + const VERSION = "0.1"; + + /** + * Enqueue a job in a given number of seconds from now. + * + * Identical to Resque::enqueue, however the first argument is the number + * of seconds before the job should be executed. + * + * @param int $in Number of seconds from now when the job should be executed. + * @param string $queue The name of the queue to place the job in. + * @param string $class The name of the class that contains the code to execute the job. + * @param array $args Any optional arguments that should be passed when the job is executed. + */ + public static function enqueueIn($in, $queue, $class, array $args = array()) + { + self::enqueueAt(time() + $in, $queue, $class, $args); + } + + /** + * Enqueue a job for execution at a given timestamp. + * + * Identical to Resque::enqueue, however the first argument is a timestamp + * (either UNIX timestamp in integer format or an instance of the DateTime + * class in PHP). + * + * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp. + * @param string $queue The name of the queue to place the job in. + * @param string $class The name of the class that contains the code to execute the job. + * @param array $args Any optional arguments that should be passed when the job is executed. + */ + public static function enqueueAt($at, $queue, $class, $args = array()) + { + self::validateJob($class, $queue); + + $job = self::jobToHash($queue, $class, $args); + self::delayedPush($at, $job); + + Resque_Event::trigger('afterSchedule', array( + 'at' => $at, + 'queue' => $queue, + 'class' => $class, + 'args' => $args, + )); + } + + /** + * Directly append an item to the delayed queue schedule. + * + * @param DateTime|int $timestamp Timestamp job is scheduled to be run at. + * @param array $item Hash of item to be pushed to schedule. + */ + public static function delayedPush($timestamp, $item) + { + $timestamp = self::getTimestamp($timestamp); + $redis = Resque::redis(); + $redis->rpush('delayed:' . $timestamp, json_encode($item)); + + $redis->zadd('delayed_queue_schedule', $timestamp, $timestamp); + } + + /** + * Get the total number of jobs in the delayed schedule. + * + * @return int Number of scheduled jobs. + */ + public static function getDelayedQueueScheduleSize() + { + return (int)Resque::redis()->zcard('delayed_queue_schedule'); + } + + /** + * Get the number of jobs for a given timestamp in the delayed schedule. + * + * @param DateTime|int $timestamp Timestamp + * @return int Number of scheduled jobs. + */ + public static function getDelayedTimestampSize($timestamp) + { + $timestamp = self::toTimestamp($timestamp); + return Resque::redis()->llen('delayed:' . $timestamp, $timestamp); + } + + /** + * Remove a delayed job from the queue + * + * note: you must specify exactly the same + * queue, class and arguments that you used when you added + * to the delayed queue + * + * also, this is an expensive operation because all delayed keys have tobe + * searched + * + * @param $queue + * @param $class + * @param $args + * @return int number of jobs that were removed + */ + public static function removeDelayed($queue, $class, $args) + { + $destroyed=0; + $item=json_encode(self::jobToHash($queue, $class, $args)); + $redis=Resque::redis(); + + foreach($redis->keys('delayed:*') as $key) + { + $key=$redis->removePrefix($key); + $destroyed+=$redis->lrem($key,0,$item); + } + + return $destroyed; + } + + /** + * removed a delayed job queued for a specific timestamp + * + * note: you must specify exactly the same + * queue, class and arguments that you used when you added + * to the delayed queue + * + * @param $timestamp + * @param $queue + * @param $class + * @param $args + * @return mixed + */ + public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args) + { + $key = 'delayed:' . self::getTimestamp($timestamp); + $item = json_encode(self::jobToHash($queue, $class, $args)); + $redis = Resque::redis(); + $count = $redis->lrem($key, 0, $item); + self::cleanupTimestamp($key, $timestamp); + + return $count; + } + + /** + * Generate hash of all job properties to be saved in the scheduled queue. + * + * @param string $queue Name of the queue the job will be placed on. + * @param string $class Name of the job class. + * @param array $args Array of job arguments. + */ + + private static function jobToHash($queue, $class, $args) + { + return array( + 'class' => $class, + 'args' => array($args), + 'queue' => $queue, + ); + } + + /** + * If there are no jobs for a given key/timestamp, delete references to it. + * + * Used internally to remove empty delayed: items in Redis when there are + * no more jobs left to run at that timestamp. + * + * @param string $key Key to count number of items at. + * @param int $timestamp Matching timestamp for $key. + */ + private static function cleanupTimestamp($key, $timestamp) + { + $timestamp = self::getTimestamp($timestamp); + $redis = Resque::redis(); + + if ($redis->llen($key) == 0) { + $redis->del($key); + $redis->zrem('delayed_queue_schedule', $timestamp); + } + } + + /** + * Convert a timestamp in some format in to a unix timestamp as an integer. + * + * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. + * @return int Timestamp + * @throws ResqueScheduler_InvalidTimestampException + */ + private static function getTimestamp($timestamp) + { + if ($timestamp instanceof DateTime) { + $timestamp = $timestamp->getTimestamp(); + } + + if ((int)$timestamp != $timestamp) { + throw new ResqueScheduler_InvalidTimestampException( + 'The supplied timestamp value could not be converted to an integer.' + ); + } + + return (int)$timestamp; + } + + /** + * Find the first timestamp in the delayed schedule before/including the timestamp. + * + * Will find and return the first timestamp upto and including the given + * timestamp. This is the heart of the ResqueScheduler that will make sure + * that any jobs scheduled for the past when the worker wasn't running are + * also queued up. + * + * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. + * Defaults to now. + * @return int|false UNIX timestamp, or false if nothing to run. + */ + public static function nextDelayedTimestamp($at = null) + { + if ($at === null) { + $at = time(); + } + else { + $at = self::getTimestamp($at); + } + + $items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1))); + if (!empty($items)) { + return $items[0]; + } + + return false; + } + + /** + * Pop a job off the delayed queue for a given timestamp. + * + * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. + * @return array Matching job at timestamp. + */ + public static function nextItemForTimestamp($timestamp) + { + $timestamp = self::getTimestamp($timestamp); + $key = 'delayed:' . $timestamp; + + $item = json_decode(Resque::redis()->lpop($key), true); + + self::cleanupTimestamp($key, $timestamp); + return $item; + } + + /** + * Ensure that supplied job class/queue is valid. + * + * @param string $class Name of job class. + * @param string $queue Name of queue. + * @throws Resque_Exception + */ + private static function validateJob($class, $queue) + { + if (empty($class)) { + throw new Resque_Exception('Jobs must be given a class.'); + } + else if (empty($queue)) { + throw new Resque_Exception('Jobs must be put in a queue.'); + } + + return true; + } +} diff --git a/lib/ResqueScheduler/InvalidTimestampException.php b/lib/ResqueScheduler/InvalidTimestampException.php new file mode 100644 index 0000000..afe2475 --- /dev/null +++ b/lib/ResqueScheduler/InvalidTimestampException.php @@ -0,0 +1,13 @@ + +* @copyright (c) 2012 Chris Boulton +* @license http://www.opensource.org/licenses/mit-license.php +*/ +class ResqueScheduler_InvalidTimestampException extends Resque_Exception +{ + +} \ No newline at end of file diff --git a/lib/ResqueScheduler/Worker.php b/lib/ResqueScheduler/Worker.php new file mode 100644 index 0000000..9f98ce9 --- /dev/null +++ b/lib/ResqueScheduler/Worker.php @@ -0,0 +1,191 @@ + + * @copyright (c) 2012 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class ResqueScheduler_Worker +{ + const LOG_NONE = 0; + const LOG_NORMAL = 1; + const LOG_VERBOSE = 2; + + /** + * @var int Current log level of this worker. + */ + public $logLevel = 0; + + /** + * @var int Interval to sleep for between checking schedules. + */ + protected $interval = 5; + + /** + * @var boolean True if on the next iteration, the worker should shutdown. + */ + private $shutdown = false; + + /** + * @var boolean True if this worker is paused. + */ + private $paused = false; + + /** + * The primary loop for a worker. + * + * Every $interval (seconds), the scheduled queue will be checked for jobs + * that should be pushed to Resque. + * + * @param int $interval How often to check schedules. + */ + public function work($interval = null) + { + if ($interval !== null) { + $this->interval = $interval; + } + + $this->updateProcLine('Starting'); + $this->registerSigHandlers(); + + while (true) { + if($this->shutdown) { + break; + } + if(!$this->paused) { + $this->handleDelayedItems(); + } + $this->sleep(); + } + } + + /** + * Handle delayed items for the next scheduled timestamp. + * + * Searches for any items that are due to be scheduled in Resque + * and adds them to the appropriate job queue in Resque. + * + * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule. + */ + public function handleDelayedItems($timestamp = null) + { + while (($oldestJobTimestamp = ResqueScheduler::nextDelayedTimestamp($timestamp)) !== false) { + $this->updateProcLine('Processing Delayed Items'); + $this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp); + } + } + + /** + * Schedule all of the delayed jobs for a given timestamp. + * + * Searches for all items for a given timestamp, pulls them off the list of + * delayed jobs and pushes them across to Resque. + * + * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule. + */ + public function enqueueDelayedItemsForTimestamp($timestamp) + { + $item = null; + while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) { + $this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] .' [delayed]'); + + Resque_Event::trigger('beforeDelayedEnqueue', array( + 'queue' => $item['queue'], + 'class' => $item['class'], + 'args' => $item['args'], + )); + + $payload = array_merge(array($item['queue'], $item['class']), $item['args']); + call_user_func_array('Resque::enqueue', $payload); + } + } + + /** + * Sleep for the defined interval. + */ + protected function sleep() + { + sleep($this->interval); + } + + /** + * Update the status of the current worker process. + * + * On supported systems (with the PECL proctitle module installed), update + * the name of the currently running process to indicate the current state + * of a worker. + * + * @param string $status The updated process title. + */ + private function updateProcLine($status) + { + if(function_exists('setproctitle')) { + setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status); + } + } + + /** + * Output a given log message to STDOUT. + * + * @param string $message Message to output. + */ + public function log($message) + { + if($this->logLevel == self::LOG_NORMAL) { + fwrite(STDOUT, "*** " . $message . "\n"); + } + else if($this->logLevel == self::LOG_VERBOSE) { + fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n"); + } + } + + /** + * Register signal handlers that a worker should respond to. + * + * TERM: Shutdown after the current timestamp was processed. + * INT: Shutdown after the current timestamp was processed. + * QUIT: Shutdown after the current timestamp was processed. + */ + private function registerSigHandlers() + { + if(!function_exists('pcntl_signal')) { + return; + } + + pcntl_signal(SIGTERM, array($this, 'shutdown')); + pcntl_signal(SIGINT, array($this, 'shutdown')); + pcntl_signal(SIGQUIT, array($this, 'shutdown')); + pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); + pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); + + $this->log('Registered signals'); + } + + public function shutdown() + { + $this->log('Shutting down'); + $this->shutdown = true; + } + + /** + * Signal handler callback for USR2, pauses processing. + */ + public function pauseProcessing() + { + $this->log('USR2 received; pausing processing'); + $this->paused = true; + } + + /** + * Signal handler callback for CONT, resume processing. + */ + public function unPauseProcessing() + { + $this->log('CONT received; resuming processing'); + $this->paused = false; + } +} diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index fb55d13..7ff6d8d 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -416,6 +416,30 @@ public function testDoNotUseFactoryToGetInstance() $instance = $job->getInstance(); $this->assertInstanceOf('Resque_JobInterface', $instance); } + + public function testJobStatusIsNullIfIdMissingFromPayload() + { + $payload = array( + 'class' => 'Some_Job_Class', + 'args' => null + ); + $job = new Resque_Job('jobs', $payload); + $this->assertEquals(null, $job->getStatus()); + } + + public function testJobCanBeRecreatedFromLegacyPayload() + { + $payload = array( + 'class' => 'Some_Job_Class', + 'args' => null + ); + $job = new Resque_Job('jobs', $payload); + $job->recreate(); + $newJob = Resque_Job::reserve('jobs'); + $this->assertEquals('jobs', $newJob->queue); + $this->assertEquals('Some_Job_Class', $newJob->payload['class']); + $this->assertNotNull($newJob->payload['id']); + } } class Some_Job_Class implements Resque_JobInterface