diff --git a/src/drivers/db/InfoAction.php b/src/drivers/db/InfoAction.php index 63bc61247b..2c3fa0f527 100644 --- a/src/drivers/db/InfoAction.php +++ b/src/drivers/db/InfoAction.php @@ -53,7 +53,7 @@ protected function getWaiting() ->from($this->queue->tableName) ->andWhere(['channel' => $this->queue->channel]) ->andWhere(['reserved_at' => null]) - ->andWhere(['delay' => 0]); + ->andWhere(['<=', 'execute_at', time()]); } /** @@ -65,7 +65,7 @@ protected function getDelayed() ->from($this->queue->tableName) ->andWhere(['channel' => $this->queue->channel]) ->andWhere(['reserved_at' => null]) - ->andWhere(['>', 'delay', 0]); + ->andWhere(['>', 'execute_at', time()]); } /** diff --git a/src/drivers/db/Queue.php b/src/drivers/db/Queue.php index f4b3979bf7..aec008fd9a 100644 --- a/src/drivers/db/Queue.php +++ b/src/drivers/db/Queue.php @@ -158,7 +158,7 @@ protected function pushMessage($message, $ttr, $delay, $priority) 'job' => $message, 'pushed_at' => time(), 'ttr' => $ttr, - 'delay' => $delay, + 'execute_at' => time() + $delay, 'priority' => $priority ?: 1024, ])->execute(); $tableSchema = $this->db->getTableSchema($this->tableName); @@ -185,7 +185,7 @@ protected function reserve() $payload = (new Query()) ->from($this->tableName) ->andWhere(['channel' => $this->channel, 'reserved_at' => null]) - ->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()]) + ->andWhere('[[execute_at]] <= :time', [':time' => time()]) ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC]) ->limit(1) ->one($this->db); diff --git a/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php b/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php new file mode 100644 index 0000000000..1d3c431b3d --- /dev/null +++ b/src/drivers/db/migrations/M181024151600DelayToExecuteAt.php @@ -0,0 +1,80 @@ + + */ +class M181024151600DelayToExecuteAt extends Migration +{ + public $tableName = '{{%queue}}'; + + public function up() + { + $this->dropIndex('channel', $this->tableName, 'channel'); + if ($this->db->driverName !== 'sqlite') { + $this->addColumn($this->tableName, 'execute_at', $this->integer()->notNull()->after('ttr')); + $this->update($this->tableName, [ + 'execute_at' => new Expression('[[pushed_at]] + [[delay]]') + ]); + $this->dropColumn($this->tableName, 'delay'); + } else { + $this->dropTable($this->tableName); + $this->createTable($this->tableName, [ + 'id' => $this->primaryKey(), + 'channel' => $this->string()->notNull(), + 'job' => $this->binary()->notNull(), + 'pushed_at' => $this->integer()->notNull(), + 'ttr' => $this->integer()->notNull(), + 'execute_at' => $this->integer()->notNull(), + 'priority' => $this->integer()->unsigned()->notNull()->defaultValue(1024), + 'reserved_at' => $this->integer(), + 'attempt' => $this->integer(), + 'done_at' => $this->integer(), + ]); + $this->createIndex('reserved_at', $this->tableName, 'reserved_at'); + $this->createIndex('priority', $this->tableName, 'priority'); + } + $this->createIndex('idx_queue__channel__execute_at', $this->tableName, ['channel', 'execute_at']); + } + + public function down() + { + if ($this->db->driverName !== 'sqlite') { + $this->dropIndex('idx_queue__channel__execute_at', $this->tableName); + $this->addColumn($this->tableName, 'delay', $this->integer()->notNull()->after('ttr')); + $this->update($this->tableName, [ + 'delay' => new Expression('[[execute_at]] - [[pushed_at]]') + ]); + $this->dropColumn($this->tableName, 'execute_at'); + } else { + $this->dropTable($this->tableName); + $this->createTable($this->tableName, [ + 'id' => $this->primaryKey(), + 'channel' => $this->string()->notNull(), + 'job' => $this->binary()->notNull(), + 'pushed_at' => $this->integer()->notNull(), + 'ttr' => $this->integer()->notNull(), + 'delay' => $this->integer()->notNull(), + 'priority' => $this->integer()->unsigned()->notNull()->defaultValue(1024), + 'reserved_at' => $this->integer(), + 'attempt' => $this->integer(), + 'done_at' => $this->integer(), + ]); + $this->createIndex('reserved_at', $this->tableName, 'reserved_at'); + $this->createIndex('priority', $this->tableName, 'priority'); + } + $this->createIndex('channel', $this->tableName, 'channel'); + } +}