Skip to content

Commit

Permalink
[1.x] Queue Overhaul (#456)
Browse files Browse the repository at this point in the history
* Slight refactor for readability

* Refactor to queue model

* Use the query builder internally

* WIP

* Add phpstan typings

* Continued work

* Starting out on count()

* Adding query builder count() method

* Todo for tomorrow

* Set the meta when creating the job

* Improving the logic

* Wrapping up until tmw

* Continued work

* Use the date query builder to get the proper queue items

* Adjust cron logic to properly scale up to meet demand of the queue

* Adding tests for cron queue

* Linting fixes

* Wrapping up fixes for queue

* Raise memory limit, skip queue tests on 8.0

* Prevent queue service provider on 8.1

* Remove bogus version

* Fixing tests

* Remove bogus return

* Add a log of queue job events

* Filling in the queue log

* Adding a database collection and passing along the found_rows to the collection

* Allow the value to be passed

* Queue UI (#458)

* Continued work on queue UI=

* Wrapping up the core of the queue UI

* Working table for queue UI

* Adding counts to the queue

* Switch to use wp_count_posts for performance

* Style the row table

* Adding single view for queue job

* Adding queue UI with fixes for retrying

* Preserve queue job items and cleanup after some time (#472)

* Continued work on queue UI=

* Wrapping up the core of the queue UI

* Working table for queue UI

* Adding counts to the queue

* Switch to use wp_count_posts for performance

* Style the row table

* Adding single view for queue job

* Adding queue UI with fixes for retrying

* Serialize enums when asserting against them

* Keep queue job around after it is complete for logging

* Fixing tests and adjusting comment

* Fixing linting

* Stubbing the cleanup command

* Cleanup the processed/failed jobs after some time

* Fixing phpcs issue

* Remove checks for <8.1 now that were on PHP 8.1+ always

* [1.x] Allow queue jobs to be run in-admin, allow delays for closure jobs (#481)

* Add support for displaying completed queue job status

* Allow a closure job to be delayed

* Allow queue jobs to be run in-admin, allow closure jobs to be delayed

* Add a back link on the queue job page

* Cleanup running jobs with the cleanup command

* Remove the superfluous take()
  • Loading branch information
srtfisher authored Dec 1, 2023
1 parent d1e1091 commit 2e60851
Show file tree
Hide file tree
Showing 41 changed files with 2,194 additions and 495 deletions.
39 changes: 36 additions & 3 deletions config/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
| Define the queue provider used in the application.
|
*/
'default' => environment( 'QUEUE_CONNECTION', 'wordpress' ),
'default' => environment( 'QUEUE_CONNECTION', 'wordpress' ),

/*
|--------------------------------------------------------------------------
Expand All @@ -25,7 +25,40 @@
| The amount of items handled in one run of the queue.
|
*/
'batch_size' => environment( 'QUEUE_BATCH_SIZE', 100 ),
'batch_size' => environment( 'QUEUE_BATCH_SIZE', 5 ),

/*
|--------------------------------------------------------------------------
| Maximum number of concurrent batches
|--------------------------------------------------------------------------
|
| The maximum number of batches that can be run concurrently. For example,
| if 1000 queue jobs are dispatched and this is set to 5 with a batch size
| of 100, then 5 batches of 100 will be run concurrently and take two runs
| of the queue to complete.
|
*/
'max_concurrent_batches' => environment( 'QUEUE_MAX_CONCURRENT_BATCHES', 1 ),

/*
|--------------------------------------------------------------------------
| Delete failed or processed queue items after a set time
|--------------------------------------------------------------------------
|
| Delete failed or processed queue items after a set time in seconds.
|
*/
'delete_after' => environment( 'QUEUE_DELETE_AFTER', 60 * 60 * 24 * 7 ),

/*
|--------------------------------------------------------------------------
| Enable the Queue Admin Interface
|--------------------------------------------------------------------------
|
| Enable the queue admin interface to display queue jobs.
|
*/
'enable_admin' => environment( 'QUEUE_ENABLE_ADMIN', true ),

/*
|--------------------------------------------------------------------------
Expand All @@ -35,7 +68,7 @@
| Control the configuration for the queue providers.
|
*/
'wordpress' => [
'wordpress' => [
// Delay between queue runs in seconds.
'delay' => environment( 'QUEUE_DELAY', 0 ),
],
Expand Down
2 changes: 1 addition & 1 deletion src/mantle/application/class-app-service-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function __construct( Application $app ) {
*/
protected function boot_scheduler() {
$this->app->singleton(
Schedule::class,
'scheduler',
fn ( $app ) => tap(
new Schedule( $app ),
fn ( Schedule $schedule ) => $this->schedule( $schedule ),
Expand Down
31 changes: 17 additions & 14 deletions src/mantle/application/class-application.php
Original file line number Diff line number Diff line change
Expand Up @@ -401,20 +401,23 @@ protected function register_base_bindings() {
*/
protected function register_core_aliases() {
$core_aliases = [
'app' => [ static::class, \Mantle\Contracts\Application::class ],
'config' => [ \Mantle\Config\Repository::class, \Mantle\Contracts\Config\Repository::class ],
'events' => [ \Mantle\Events\Dispatcher::class, \Mantle\Contracts\Events\Dispatcher::class ],
'files' => [ \Mantle\Filesystem\Filesystem::class ],
'filesystem' => [ \Mantle\Filesystem\Filesystem_Manager::class, \Mantle\Contracts\Filesystem\Filesystem_Manager::class ],
'log' => [ \Mantle\Log\Log_Manager::class, \Psr\Log\LoggerInterface::class ],
'queue' => [ \Mantle\Queue\Queue_Manager::class, \Mantle\Contracts\Queue\Queue_Manager::class ],
'redirect' => [ \Mantle\Http\Routing\Redirector::class ],
'request' => [ \Mantle\Http\Request::class, \Symfony\Component\HttpFoundation\Request::class ],
'router' => [ \Mantle\Http\Routing\Router::class, \Mantle\Contracts\Http\Routing\Router::class ],
'router.entity' => [ \Mantle\Http\Routing\Entity_Router::class, \Mantle\Contracts\Http\Routing\Entity_Router::class ],
'url' => [ \Mantle\Http\Routing\Url_Generator::class, \Mantle\Contracts\Http\Routing\Url_Generator::class ],
'view.loader' => [ \Mantle\Http\View\View_Finder::class, \Mantle\Contracts\Http\View\View_Finder::class ],
'view' => [ \Mantle\Http\View\Factory::class, \Mantle\Contracts\Http\View\Factory::class ],
'app' => [ static::class, \Mantle\Contracts\Application::class ],
'config' => [ \Mantle\Config\Repository::class, \Mantle\Contracts\Config\Repository::class ],
'events' => [ \Mantle\Events\Dispatcher::class, \Mantle\Contracts\Events\Dispatcher::class ],
'files' => [ \Mantle\Filesystem\Filesystem::class ],
'filesystem' => [ \Mantle\Filesystem\Filesystem_Manager::class, \Mantle\Contracts\Filesystem\Filesystem_Manager::class ],
'log' => [ \Mantle\Log\Log_Manager::class, \Psr\Log\LoggerInterface::class ],
'queue' => [ \Mantle\Queue\Queue_Manager::class, \Mantle\Contracts\Queue\Queue_Manager::class ],
'queue.worker' => [ \Mantle\Queue\Worker::class ],
'queue.dispatcher' => [ \Mantle\Queue\Dispatcher::class, \Mantle\Contracts\Queue\Dispatcher::class ],
'redirect' => [ \Mantle\Http\Routing\Redirector::class ],
'request' => [ \Mantle\Http\Request::class, \Symfony\Component\HttpFoundation\Request::class ],
'router' => [ \Mantle\Http\Routing\Router::class, \Mantle\Contracts\Http\Routing\Router::class ],
'router.entity' => [ \Mantle\Http\Routing\Entity_Router::class, \Mantle\Contracts\Http\Routing\Entity_Router::class ],
'scheduler' => [ \Mantle\Scheduling\Schedule::class ],
'url' => [ \Mantle\Http\Routing\Url_Generator::class, \Mantle\Contracts\Http\Routing\Url_Generator::class ],
'view.loader' => [ \Mantle\Http\View\View_Finder::class, \Mantle\Contracts\Http\View\View_Finder::class ],
'view' => [ \Mantle\Http\View\Factory::class, \Mantle\Contracts\Http\View\Factory::class ],
];

foreach ( $core_aliases as $key => $aliases ) {
Expand Down
12 changes: 10 additions & 2 deletions src/mantle/contracts/queue/interface-provider.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ interface Provider {
* @param mixed $job Job instance.
* @return bool
*/
public function push( $job );
public function push( $job ): bool;

/**
* Get the next set of jobs in the queue.
Expand All @@ -37,5 +37,13 @@ public function pop( string $queue = null, int $count = 1 ): Collection;
* @param string $queue Queue to compare against.
* @return bool
*/
public function in_queue( $job, string $queue = null ): bool;
public function in_queue( mixed $job, string $queue = null ): bool;

/**
* Retrieve the number of pending jobs in the queue.
*
* @param string $queue Queue name, optional.
* @return int
*/
public function pending_count( string $queue = null ): int;
}
2 changes: 2 additions & 0 deletions src/mantle/database/model/class-model.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,14 @@ public function refresh() {
}

$instance = static::find( $this->get( 'id' ) );

if ( ! $instance ) {
return null;
}

$this->exists = true;
$this->set_raw_attributes( $instance->get_raw_attributes() );

return $this;
}

Expand Down
10 changes: 5 additions & 5 deletions src/mantle/database/query/class-builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,8 @@ public function chunk_by_id( int $count, callable $callback, string $attribute =
/**
* Execute a callback over each item while chunking.
*
* @param callable(\Mantle\Support\Collection<int, TModel>): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param callable(TModel): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @return boolean
*/
public function each( callable $callback, int $count = 100 ) {
Expand All @@ -878,9 +878,9 @@ public function each( callable $callback, int $count = 100 ) {
/**
* Execute a callback over each item while chunking by ID.
*
* @param callable(\Mantle\Support\Collection<int, TModel>): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param string $attribute Attribute to chunk by.
* @param callable(TModel): mixed $callback Callback to run on each chunk.
* @param int $count Number of items to chunk by.
* @param string $attribute Attribute to chunk by.
* @return boolean
*/
public function each_by_id( callable $callback, int $count = 100, string $attribute = 'id' ) {
Expand Down
3 changes: 2 additions & 1 deletion src/mantle/database/query/class-post-query-builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereId( int $id )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereName( string $name )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereSlug( string $slug )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereStatus( string $status )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereStatus( string[]|string $status )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereTitle( string $title )
* @method \Mantle\Database\Query\Post_Query_Builder<TModel> whereType( string $type )
*/
Expand All @@ -49,6 +49,7 @@ class Post_Query_Builder extends Builder {
'post_author' => 'author',
'post_name' => 'name',
'slug' => 'name',
'status' => 'post_status',
];

/**
Expand Down
9 changes: 7 additions & 2 deletions src/mantle/queue/autoload.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
/**
* Dispatch a job to the queue.
*
* @template TJob of \Mantle\Contracts\Queue\Job|\Closure
*
* @param \Mantle\Contracts\Queue\Job|\Closure $job Job instance.
* @return Pending_Dispatch|Pending_Closure_Dispatch
* @return Pending_Closure_Dispatch|Pending_Dispatch
*
* @phpstan-param TJob|\Mantle\Contracts\Queue\Job|\Closure|\Closure $job Job instance.
* @phpstan-return (TJob is \Closure ? Pending_Closure_Dispatch : Pending_Dispatch)<TJob>
*/
function dispatch( $job ): Pending_Dispatch {
function dispatch( $job ): Pending_Dispatch|Pending_Closure_Dispatch {
return $job instanceof \Closure
? new Pending_Closure_Dispatch( Closure_Job::create( $job ) )
: new Pending_Dispatch( $job );
Expand Down
30 changes: 22 additions & 8 deletions src/mantle/queue/class-closure-job.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,39 @@
/**
* Closure_Job class file
*
* phpcs:disable Squiz.Commenting.VariableComment.Missing
*
* @package Mantle
*/

namespace Mantle\Queue;

use Closure;
use DateTimeInterface;
use Laravel\SerializableClosure\SerializableClosure;
use Mantle\Contracts\Queue\Can_Queue;
use ReflectionFunction;
use Throwable;

/**
* Abstract Queue Job
* Closure Job
*
* To be extended by provider-specific queue job classes.
* Storage of the closure-based queue job.
*/
class Closure_Job implements Can_Queue {
/**
* Serializable closure instance.
* The delay before the job will be run.
*
* @var SerializableClosure
* @var int|DateTimeInterface
*/
public SerializableClosure $closure;
public int|DateTimeInterface $delay;

/**
* The callbacks that should be run on failure.
*
* @var array
*/
public $failure_callbacks = [];
public array $failure_callbacks = [];

/**
* Create a new job instance.
Expand All @@ -48,8 +51,7 @@ public static function create( Closure $closure ): Closure_Job {
*
* @param SerializableClosure $closure Serialized closure to wrap.
*/
public function __construct( SerializableClosure $closure ) {
$this->closure = $closure;
public function __construct( public SerializableClosure $closure ) {
}

/**
Expand All @@ -61,6 +63,18 @@ public function handle() {
$callback();
}

/**
* Set the delay before the job will be run.
*
* @param DateTimeInterface|int $delay Delay in seconds or DateTime instance.
* @return static
*/
public function delay( DateTimeInterface|int $delay ) {
$this->delay = $delay;

return $this;
}

/**
* Add a callback to be executed if the job fails.
*
Expand Down
40 changes: 22 additions & 18 deletions src/mantle/queue/class-dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,58 @@
use Mantle\Contracts\Container;
use Mantle\Contracts\Queue\Can_Queue;
use Mantle\Contracts\Queue\Queue_Manager;
use Mantle\Queue\Events\Job_Queued;

/**
* Queue Dispatcher
*
* Executes jobs from the queue.
*/
class Dispatcher {
/**
* Container instance.
*
* @var Container
*/
protected $container;

/**
* Constructor.
*
* @param Container $container Container instance.
*/
public function __construct( Container $container ) {
$this->container = $container;
}
public function __construct( protected Container $container ) {}

/**
* Dispatch the job to the queue.
*
* @param mixed $job Job instance.
* @return mixed
* @return void
*/
public function dispatch( $job ) {
public function dispatch( mixed $job ): void {
if ( ! $this->should_command_be_queued( $job ) ) {
return $this->dispatch_now( $job );
$this->dispatch_now( $job );

return;
}

$manager = $this->container->make( Queue_Manager::class );
/**
* Provider instance.
*
* @var \Mantle\Contracts\Queue\Provider
*/
$provider = $this->container->make( Queue_Manager::class )->get_provider();

// Send the job to the queue.
$manager->get_provider()->push( $job );
$provider->push( $job );

// Dispatch the job queued event.
$this->container['events']->dispatch(
new Job_Queued( $provider, $job ),
);
}

/**
* Dispatch a job in the current process.
*
* @param mixed $job Job instance.
* @return mixed
* @return void
*/
public function dispatch_now( $job ) {
return $this->container->call( [ $job, 'handle' ] );
public function dispatch_now( mixed $job ): void {
$this->container->call( [ $job, 'handle' ] );
}

/**
Expand Down
Loading

0 comments on commit 2e60851

Please sign in to comment.