diff --git a/src/Adapter/SynchronousAdapter.php b/src/Adapter/SynchronousAdapter.php index e271c365..b4b3be82 100644 --- a/src/Adapter/SynchronousAdapter.php +++ b/src/Adapter/SynchronousAdapter.php @@ -7,6 +7,7 @@ use InvalidArgumentException; use Yiisoft\Yii\Queue\Enum\JobStatus; use Yiisoft\Yii\Queue\Message\MessageInterface; +use Yiisoft\Yii\Queue\Message\ParametrizedMessageInterface; use Yiisoft\Yii\Queue\QueueFactory; use Yiisoft\Yii\Queue\QueueInterface; use Yiisoft\Yii\Queue\Worker\WorkerInterface; @@ -66,7 +67,9 @@ public function push(MessageInterface $message): void $key = count($this->messages) + $this->current; $this->messages[] = $message; - $message->setId((string) $key); + if ($message instanceof ParametrizedMessageInterface) { + $message->setId((string) $key); + } } public function subscribe(callable $handlerCallback): void diff --git a/src/Exception/JobFailureException.php b/src/Exception/JobFailureException.php index ea88ab98..db585156 100644 --- a/src/Exception/JobFailureException.php +++ b/src/Exception/JobFailureException.php @@ -7,13 +7,14 @@ use RuntimeException; use Throwable; use Yiisoft\Yii\Queue\Message\MessageInterface; +use Yiisoft\Yii\Queue\Message\ParametrizedMessageInterface; class JobFailureException extends RuntimeException { public function __construct(private MessageInterface $queueMessage, Throwable $previous) { $error = $previous->getMessage(); - $messageId = $queueMessage->getId() ?? 'null'; + $messageId = $queueMessage instanceof ParametrizedMessageInterface ? ($queueMessage->getId() ?? 'null') : 'null'; $messageText = "Processing of message #$messageId is stopped because of an exception:\n$error."; parent::__construct($messageText, 0, $previous); diff --git a/src/Message/Message.php b/src/Message/Message.php index 5e1fd2a7..c07327fd 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -4,8 +4,10 @@ namespace Yiisoft\Yii\Queue\Message; -final class Message implements MessageInterface +final class Message implements ParametrizedMessageInterface { + use ParametrizedMessageTrait; + /** * @param mixed $data Message data, encodable by a queue adapter * @param array $metadata Message metadata, encodable by a queue adapter @@ -15,8 +17,9 @@ public function __construct( private string $handlerName, private mixed $data, private array $metadata = [], - private ?string $id = null + ?string $id = null ) { + $this->setId($id); } public function getHandlerName(): string @@ -29,16 +32,6 @@ public function getData(): mixed return $this->data; } - public function setId(?string $id): void - { - $this->id = $id; - } - - public function getId(): ?string - { - return $this->id; - } - public function getMetadata(): array { return $this->metadata; diff --git a/src/Message/MessageInterface.php b/src/Message/MessageInterface.php index 017febf6..f29e3d0c 100644 --- a/src/Message/MessageInterface.php +++ b/src/Message/MessageInterface.php @@ -6,15 +6,6 @@ interface MessageInterface { - public function setId(?string $id): void; - - /** - * Returns unique message ID. - * - * @return string|null - */ - public function getId(): ?string; - /** * Returns handler name. * diff --git a/src/Message/ParametrizedMessageInterface.php b/src/Message/ParametrizedMessageInterface.php new file mode 100644 index 00000000..6f21b29e --- /dev/null +++ b/src/Message/ParametrizedMessageInterface.php @@ -0,0 +1,17 @@ +id = $id; + } + + /** + * Returns unique message ID. + * + * @return string|null + */ + public function getId(): ?string + { + return $this->id; + } +} diff --git a/src/Middleware/FailureHandling/FailureHandlingRequest.php b/src/Middleware/FailureHandling/FailureHandlingRequest.php index 7621f977..0623edfb 100644 --- a/src/Middleware/FailureHandling/FailureHandlingRequest.php +++ b/src/Middleware/FailureHandling/FailureHandlingRequest.php @@ -14,9 +14,6 @@ public function __construct(private MessageInterface $message, private Throwable { } - /** - * @return MessageInterface - */ public function getMessage(): MessageInterface { return $this->message; diff --git a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php index 23d454b5..d56b953f 100644 --- a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php +++ b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php @@ -7,6 +7,7 @@ use InvalidArgumentException; use Yiisoft\Yii\Queue\Message\Message; use Yiisoft\Yii\Queue\Message\MessageInterface; +use Yiisoft\Yii\Queue\Message\ParametrizedMessageInterface; use Yiisoft\Yii\Queue\Middleware\FailureHandling\FailureHandlingRequest; use Yiisoft\Yii\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Yii\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; @@ -62,11 +63,12 @@ public function processFailure( ): FailureHandlingRequest { $message = $request->getMessage(); if ($this->suites($message)) { + $id = $message instanceof ParametrizedMessageInterface ? $message->getId() : null; $messageNew = new Message( handlerName: $message->getHandlerName(), data: $message->getData(), metadata: $this->formNewMeta($message), - id: $message->getId(), + id: $id, ); ($this->queue ?? $request->getQueue())->push( $messageNew, diff --git a/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php b/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php index 24a69b9e..c1f93251 100644 --- a/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php +++ b/src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php @@ -7,6 +7,7 @@ use InvalidArgumentException; use Yiisoft\Yii\Queue\Message\Message; use Yiisoft\Yii\Queue\Message\MessageInterface; +use Yiisoft\Yii\Queue\Message\ParametrizedMessageInterface; use Yiisoft\Yii\Queue\Middleware\FailureHandling\FailureHandlingRequest; use Yiisoft\Yii\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Yii\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; @@ -40,11 +41,12 @@ public function processFailure( ): FailureHandlingRequest { $message = $request->getMessage(); if ($this->suites($message)) { + $id = $message instanceof ParametrizedMessageInterface ? $message->getId() : null; $message = new Message( handlerName: $message->getHandlerName(), data: $message->getData(), metadata: $this->createMeta($message), - id: $message->getId(), + id: $id, ); $message = $this->queue?->push($message) ?? $request->getQueue()->push($message); diff --git a/src/Queue.php b/src/Queue.php index 9c16bdab..cbfa3670 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -10,6 +10,7 @@ use Yiisoft\Yii\Queue\Enum\JobStatus; use Yiisoft\Yii\Queue\Exception\AdapterConfiguration\AdapterNotConfiguredException; use Yiisoft\Yii\Queue\Message\MessageInterface; +use Yiisoft\Yii\Queue\Message\ParametrizedMessageInterface; use Yiisoft\Yii\Queue\Middleware\Push\AdapterPushHandler; use Yiisoft\Yii\Queue\Middleware\Push\MessageHandlerPushInterface; use Yiisoft\Yii\Queue\Middleware\Push\MiddlewarePushInterface; @@ -59,7 +60,10 @@ public function push( $this->logger->info( 'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.', - ['handlerName' => $message->getHandlerName(), 'id' => $message->getId() ?? 'null'] + [ + 'handlerName' => $message->getHandlerName(), + 'id' => $message instanceof ParametrizedMessageInterface ? ($message->getId() ?? 'null') : 'null', + ] ); return $message; diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 18103a87..667dbe00 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -16,6 +16,7 @@ use Yiisoft\Injector\Injector; use Yiisoft\Yii\Queue\Exception\JobFailureException; use Yiisoft\Yii\Queue\Message\MessageInterface; +use Yiisoft\Yii\Queue\Message\ParametrizedMessageInterface; use Yiisoft\Yii\Queue\Middleware\Consume\ConsumeFinalHandler; use Yiisoft\Yii\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; use Yiisoft\Yii\Queue\Middleware\Consume\ConsumeRequest; @@ -41,11 +42,16 @@ public function __construct( } /** - * @throws Throwable + * @throws JobFailureException + * @throws RuntimeException */ public function process(MessageInterface $message, QueueInterface $queue): MessageInterface { - $this->logger->info('Processing message #{message}.', ['message' => $message->getId()]); + if ($message instanceof ParametrizedMessageInterface) { + $this->logger->info('Processing message #{message}.', ['message' => $message->getId()]); + } else { + $this->logger->debug('Processing message with data: {data}.', ['data' => $message->getData()]); + } $name = $message->getHandlerName(); $handler = $this->getHandler($name);