From b44f477444d13c6b140c2f7089c884bb86f590ba Mon Sep 17 00:00:00 2001 From: Mikk Tendermann Date: Wed, 24 Oct 2018 15:48:40 +0300 Subject: [PATCH 1/5] try to make it faster --- src/drivers/db/Queue.php | 4 +- .../M181024151600LaterToExecuteAt.php | 42 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 src/drivers/db/migrations/M181024151600LaterToExecuteAt.php diff --git a/src/drivers/db/Queue.php b/src/drivers/db/Queue.php index f4b3979bf7..aec008fd9a 100644 --- a/src/drivers/db/Queue.php +++ b/src/drivers/db/Queue.php @@ -158,7 +158,7 @@ protected function pushMessage($message, $ttr, $delay, $priority) 'job' => $message, 'pushed_at' => time(), 'ttr' => $ttr, - 'delay' => $delay, + 'execute_at' => time() + $delay, 'priority' => $priority ?: 1024, ])->execute(); $tableSchema = $this->db->getTableSchema($this->tableName); @@ -185,7 +185,7 @@ protected function reserve() $payload = (new Query()) ->from($this->tableName) ->andWhere(['channel' => $this->channel, 'reserved_at' => null]) - ->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()]) + ->andWhere('[[execute_at]] <= :time', [':time' => time()]) ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC]) ->limit(1) ->one($this->db); diff --git a/src/drivers/db/migrations/M181024151600LaterToExecuteAt.php b/src/drivers/db/migrations/M181024151600LaterToExecuteAt.php new file mode 100644 index 0000000000..c6616ecae1 --- /dev/null +++ b/src/drivers/db/migrations/M181024151600LaterToExecuteAt.php @@ -0,0 +1,42 @@ + + */ +class M181024151600DelayToExecuteAt extends Migration +{ + public $tableName = '{{%queue}}'; + + public function up() + { + $this->addColumn($this->tableName, 'execute_at', $this->integer()->notNull()->after('ttr')); + $this->update($this->tableName, [ + 'execute_at' => new Expression('[[pushed_at]] + [[delay]]') + ]); + $this->dropColumn($this->tableName, 'delay'); + $this->createIndex('idx_queue__channel__execute_at', $this->tableName, ['channel', 'execute_at']); + } + + public function down() + { + $this->dropIndex('idx_queue__channel__execute_at', $this->tableName); + $this->addColumn($this->tableName, 'delay', $this->integer()->notNull()->after('ttr')); + $this->update($this->tableName, [ + 'delay' => new Expression('[[execute_at]] - [[pushed_at]]') + ]); + $this->dropColumn($this->tableName, 'execute_at'); + } +} From e3d50179373c927861effaa75e6f6744cb2fc30f Mon Sep 17 00:00:00 2001 From: Mikk Tendermann Date: Wed, 24 Oct 2018 15:54:53 +0300 Subject: [PATCH 2/5] fix migrations --- ...1600LaterToExecuteAt.php => M181024151600DelayToExecuteAt.php} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/drivers/db/migrations/{M181024151600LaterToExecuteAt.php => M181024151600DelayToExecuteAt.php} (100%) diff --git a/src/drivers/db/migrations/M181024151600LaterToExecuteAt.php b/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php similarity index 100% rename from src/drivers/db/migrations/M181024151600LaterToExecuteAt.php rename to src/drivers/db/migrations/M181024151600DelayToExecuteAt.php From 79175dd3f2eb9c0f34b21e3ac2239a25b80dfded Mon Sep 17 00:00:00 2001 From: Mikk Tendermann Date: Wed, 24 Oct 2018 15:58:46 +0300 Subject: [PATCH 3/5] cannot have null values --- src/drivers/db/migrations/M181024151600DelayToExecuteAt.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php b/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php index c6616ecae1..4c1d382ee7 100644 --- a/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php +++ b/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php @@ -22,10 +22,11 @@ class M181024151600DelayToExecuteAt extends Migration public function up() { - $this->addColumn($this->tableName, 'execute_at', $this->integer()->notNull()->after('ttr')); + $this->addColumn($this->tableName, 'execute_at', $this->integer()->after('ttr')); $this->update($this->tableName, [ 'execute_at' => new Expression('[[pushed_at]] + [[delay]]') ]); + $this->alterColumn($this->tableName, 'execute_at', $this->integer()->notNull()->after('ttr')); $this->dropColumn($this->tableName, 'delay'); $this->createIndex('idx_queue__channel__execute_at', $this->tableName, ['channel', 'execute_at']); } @@ -33,10 +34,11 @@ public function up() public function down() { $this->dropIndex('idx_queue__channel__execute_at', $this->tableName); - $this->addColumn($this->tableName, 'delay', $this->integer()->notNull()->after('ttr')); + $this->addColumn($this->tableName, 'delay', $this->integer()->after('ttr')); $this->update($this->tableName, [ 'delay' => new Expression('[[execute_at]] - [[pushed_at]]') ]); + $this->alterColumn($this->tableName, 'delay', $this->integer()->notNull()->after('ttr')); $this->dropColumn($this->tableName, 'execute_at'); } } From 2b2cbda22874e2a3cb5959c08a9ebd3e11b6c601 Mon Sep 17 00:00:00 2001 From: Mikk Tendermann Date: Wed, 24 Oct 2018 16:08:47 +0300 Subject: [PATCH 4/5] this should not matter --- .../M181024151600DelayToExecuteAt.php | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php b/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php index 4c1d382ee7..1d3c431b3d 100644 --- a/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php +++ b/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php @@ -22,23 +22,59 @@ class M181024151600DelayToExecuteAt extends Migration public function up() { - $this->addColumn($this->tableName, 'execute_at', $this->integer()->after('ttr')); - $this->update($this->tableName, [ - 'execute_at' => new Expression('[[pushed_at]] + [[delay]]') - ]); - $this->alterColumn($this->tableName, 'execute_at', $this->integer()->notNull()->after('ttr')); - $this->dropColumn($this->tableName, 'delay'); + $this->dropIndex('channel', $this->tableName, 'channel'); + if ($this->db->driverName !== 'sqlite') { + $this->addColumn($this->tableName, 'execute_at', $this->integer()->notNull()->after('ttr')); + $this->update($this->tableName, [ + 'execute_at' => new Expression('[[pushed_at]] + [[delay]]') + ]); + $this->dropColumn($this->tableName, 'delay'); + } else { + $this->dropTable($this->tableName); + $this->createTable($this->tableName, [ + 'id' => $this->primaryKey(), + 'channel' => $this->string()->notNull(), + 'job' => $this->binary()->notNull(), + 'pushed_at' => $this->integer()->notNull(), + 'ttr' => $this->integer()->notNull(), + 'execute_at' => $this->integer()->notNull(), + 'priority' => $this->integer()->unsigned()->notNull()->defaultValue(1024), + 'reserved_at' => $this->integer(), + 'attempt' => $this->integer(), + 'done_at' => $this->integer(), + ]); + $this->createIndex('reserved_at', $this->tableName, 'reserved_at'); + $this->createIndex('priority', $this->tableName, 'priority'); + } $this->createIndex('idx_queue__channel__execute_at', $this->tableName, ['channel', 'execute_at']); } public function down() { - $this->dropIndex('idx_queue__channel__execute_at', $this->tableName); - $this->addColumn($this->tableName, 'delay', $this->integer()->after('ttr')); - $this->update($this->tableName, [ - 'delay' => new Expression('[[execute_at]] - [[pushed_at]]') - ]); - $this->alterColumn($this->tableName, 'delay', $this->integer()->notNull()->after('ttr')); - $this->dropColumn($this->tableName, 'execute_at'); + if ($this->db->driverName !== 'sqlite') { + $this->dropIndex('idx_queue__channel__execute_at', $this->tableName); + $this->addColumn($this->tableName, 'delay', $this->integer()->notNull()->after('ttr')); + $this->update($this->tableName, [ + 'delay' => new Expression('[[execute_at]] - [[pushed_at]]') + ]); + $this->dropColumn($this->tableName, 'execute_at'); + } else { + $this->dropTable($this->tableName); + $this->createTable($this->tableName, [ + 'id' => $this->primaryKey(), + 'channel' => $this->string()->notNull(), + 'job' => $this->binary()->notNull(), + 'pushed_at' => $this->integer()->notNull(), + 'ttr' => $this->integer()->notNull(), + 'delay' => $this->integer()->notNull(), + 'priority' => $this->integer()->unsigned()->notNull()->defaultValue(1024), + 'reserved_at' => $this->integer(), + 'attempt' => $this->integer(), + 'done_at' => $this->integer(), + ]); + $this->createIndex('reserved_at', $this->tableName, 'reserved_at'); + $this->createIndex('priority', $this->tableName, 'priority'); + } + $this->createIndex('channel', $this->tableName, 'channel'); } } From a8cf4e53db610262767571052ccf74ab3a4d02fe Mon Sep 17 00:00:00 2001 From: Mikk Tendermann Date: Wed, 24 Oct 2018 18:32:06 +0300 Subject: [PATCH 5/5] fix InfoAction error --- src/drivers/db/InfoAction.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/drivers/db/InfoAction.php b/src/drivers/db/InfoAction.php index 63bc61247b..2c3fa0f527 100644 --- a/src/drivers/db/InfoAction.php +++ b/src/drivers/db/InfoAction.php @@ -53,7 +53,7 @@ protected function getWaiting() ->from($this->queue->tableName) ->andWhere(['channel' => $this->queue->channel]) ->andWhere(['reserved_at' => null]) - ->andWhere(['delay' => 0]); + ->andWhere(['<=', 'execute_at', time()]); } /** @@ -65,7 +65,7 @@ protected function getDelayed() ->from($this->queue->tableName) ->andWhere(['channel' => $this->queue->channel]) ->andWhere(['reserved_at' => null]) - ->andWhere(['>', 'delay', 0]); + ->andWhere(['>', 'execute_at', time()]); } /**