diff --git a/DI/QuartzExtension.php b/DI/QuartzExtension.php index 2248263..7ad7742 100644 --- a/DI/QuartzExtension.php +++ b/DI/QuartzExtension.php @@ -1,6 +1,7 @@ register($this->format('enqueue_job_run_shell'), EnqueueJobRunShell::class) - ->setArguments([new Reference('enqueue.producer')]) + ->setArguments([new Reference(ProducerInterface::class)]) ; $container->register($this->format('job_run_shell_factory'), StdJobRunShellFactory::class) @@ -69,7 +70,7 @@ public function load(array $configs, ContainerBuilder $container) // TODO: add config option where can enable/disable this job $container->register($this->format('job.enqueue_response'), EnqueueResponseJob::class) - ->setArguments([new Reference('enqueue.producer')]) + ->setArguments([new Reference(ProducerInterface::class)]) ->addTag($this->format('job'), ['alias' => 'enqueue_response']) ->addTag($this->format('job'), ['alias' => EnqueueResponseJob::class]) ; @@ -93,7 +94,7 @@ public function load(array $configs, ContainerBuilder $container) new Reference($this->format('store')), new Reference($this->format('std_job_run_shell')) ]) - ->addTag('enqueue.client.processor') + ->addTag('enqueue.command_subscriber') ; $container->register($this->format('rpc_protocol'), RpcProtocol::class) @@ -106,7 +107,7 @@ public function load(array $configs, ContainerBuilder $container) new Reference($this->format('scheduler')), new Reference($this->format('rpc_protocol')) ]) - ->addTag('enqueue.client.processor') + ->addTag('enqueue.command_subscriber') ; } diff --git a/DI/RemoteSchedulerExtension.php b/DI/RemoteSchedulerExtension.php index ef90373..5b48a30 100644 --- a/DI/RemoteSchedulerExtension.php +++ b/DI/RemoteSchedulerExtension.php @@ -1,6 +1,7 @@ register($this->format('remote.transport'), EnqueueRemoteTransport::class) - ->setArguments([new Reference('enqueue.client.producer_v2')]) + ->setArguments([new Reference(ProducerInterface::class)]) ; $container->register($this->format('remote.rpc_protocol'), RpcProtocol::class) diff --git a/Enqueue/EnqueueRemoteTransportProcessor.php b/Enqueue/EnqueueRemoteTransportProcessor.php index 07ef6a6..af3b092 100644 --- a/Enqueue/EnqueueRemoteTransportProcessor.php +++ b/Enqueue/EnqueueRemoteTransportProcessor.php @@ -5,13 +5,13 @@ use Enqueue\Consumption\QueueSubscriberInterface; use Enqueue\Consumption\Result; use Enqueue\Util\JSON; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Context; +use Interop\Queue\Message; +use Interop\Queue\Processor; use Quartz\Bridge\Scheduler\RpcProtocol; use Quartz\Core\Scheduler; -class EnqueueRemoteTransportProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface +class EnqueueRemoteTransportProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface { /** * @var Scheduler @@ -33,10 +33,7 @@ public function __construct(Scheduler $scheduler, RpcProtocol $rpcProtocol) $this->rpcProtocol = $rpcProtocol; } - /** - * {@inheritdoc} - */ - public function process(PsrMessage $message, PsrContext $context) + public function process(Message $message, Context $context): Result { try { $request = $this->rpcProtocol->decodeRequest(JSON::decode($message->getBody())); @@ -50,23 +47,17 @@ public function process(PsrMessage $message, PsrContext $context) return Result::reply($context->createMessage(JSON::encode($result))); } - /** - * {@inheritdoc} - */ - public static function getSubscribedCommand() + public static function getSubscribedCommand(): array { return [ - 'processorName' => EnqueueRemoteTransport::COMMAND, - 'queueName' => EnqueueRemoteTransport::COMMAND, - 'queueNameHardcoded' => true, + 'command' => EnqueueRemoteTransport::COMMAND, + 'queue' => EnqueueRemoteTransport::COMMAND, + 'prefix_queue' => false, 'exclusive' => true, ]; } - /** - * {@inheritdoc} - */ - public static function getSubscribedQueues() + public static function getSubscribedQueues(): array { return [EnqueueRemoteTransport::COMMAND]; } diff --git a/Scheduler/JobRunShellProcessor.php b/Scheduler/JobRunShellProcessor.php index c7b34fb..5c6ed68 100644 --- a/Scheduler/JobRunShellProcessor.php +++ b/Scheduler/JobRunShellProcessor.php @@ -5,13 +5,13 @@ use Enqueue\Consumption\QueueSubscriberInterface; use Enqueue\Consumption\Result; use Enqueue\Util\JSON; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Context; +use Interop\Queue\Message; +use Interop\Queue\Processor; use Quartz\Scheduler\JobStore; use Quartz\Scheduler\StdJobRunShell; -class JobRunShellProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface +class JobRunShellProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface { /** * @var JobStore @@ -33,10 +33,7 @@ public function __construct(JobStore $store, StdJobRunShell $runShell) $this->runShell = $runShell; } - /** - * {@inheritdoc} - */ - public function process(PsrMessage $message, PsrContext $context) + public function process(Message $message, Context $context): Result { $data = JSON::decode($message->getBody()); @@ -50,26 +47,20 @@ public function process(PsrMessage $message, PsrContext $context) $this->runShell->execute($trigger); - return Result::ACK; + return Result::ack(); } - /** - * {@inheritdoc} - */ - public static function getSubscribedCommand() + public static function getSubscribedCommand(): array { return [ - 'processorName' => EnqueueJobRunShell::COMMAND, - 'queueName' => EnqueueJobRunShell::COMMAND, - 'queueNameHardcoded' => true, + 'command' => EnqueueJobRunShell::COMMAND, + 'queue' => EnqueueJobRunShell::COMMAND, + 'prefix_queue' => false, 'exclusive' => true, ]; } - /** - * {@inheritdoc} - */ - public static function getSubscribedQueues() + public static function getSubscribedQueues(): array { return [EnqueueJobRunShell::COMMAND]; } diff --git a/Tests/DI/QuartzConfigurationTest.php b/Tests/DI/QuartzConfigurationTest.php index 9bf887e..568d29b 100644 --- a/Tests/DI/QuartzConfigurationTest.php +++ b/Tests/DI/QuartzConfigurationTest.php @@ -26,7 +26,7 @@ public function testShouldReturnDefaultConfig() 'uriOptions' => [], 'driverOptions' => [], 'sessionId' => 'quartz', - 'dbName' => 'quartz', + 'dbName' => null, 'managementLockCol' => 'managementLock', 'calendarCol' => 'calendar', 'triggerCol' => 'trigger', @@ -42,7 +42,7 @@ public function testShouldReturnDefaultConfig() public function testCouldSetConfigurationOptions() { - $configuration = new QuartzConfiguration([]); + $configuration = new QuartzConfiguration(); $processor = new Processor(); $config = $processor->processConfiguration($configuration, [[ @@ -58,7 +58,7 @@ public function testCouldSetConfigurationOptions() 'uriOptions' => [], 'driverOptions' => [], 'sessionId' => 'quartz', - 'dbName' => 'quartz', + 'dbName' => null, 'managementLockCol' => 'managementLock', 'calendarCol' => 'calendar', 'triggerCol' => 'trigger', diff --git a/Tests/Enqueue/EnqueueRemoteTransportProcessorTest.php b/Tests/Enqueue/EnqueueRemoteTransportProcessorTest.php index 9ca3dda..bd07ea1 100644 --- a/Tests/Enqueue/EnqueueRemoteTransportProcessorTest.php +++ b/Tests/Enqueue/EnqueueRemoteTransportProcessorTest.php @@ -5,8 +5,8 @@ use Enqueue\Consumption\QueueSubscriberInterface; use Enqueue\Consumption\Result; use Enqueue\Null\NullMessage; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Context; +use Interop\Queue\Processor; use PHPUnit\Framework\TestCase; use Quartz\Bridge\Enqueue\EnqueueRemoteTransportProcessor; use Quartz\Bridge\Scheduler\RpcProtocol; @@ -16,11 +16,11 @@ class EnqueueRemoteTransportProcessorTest extends TestCase { - public function testShouldImpleentPsrProcessorInterface() + public function testShouldImpleentProcessorInterface() { $processor = new EnqueueRemoteTransportProcessor($this->createSchedulerMock(), $this->createRpcProtocolMock()); - $this->assertInstanceOf(PsrProcessor::class, $processor); + $this->assertInstanceOf(Processor::class, $processor); } public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectectedSubscribedCommand() @@ -30,9 +30,9 @@ public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectecte $this->assertInstanceOf(CommandSubscriberInterface::class, $processor); $expectedConfig = [ - 'processorName' => 'quartz_rpc', - 'queueName' => 'quartz_rpc', - 'queueNameHardcoded' => true, + 'command' => 'quartz_rpc', + 'queue' => 'quartz_rpc', + 'prefix_queue' => false, 'exclusive' => true, ]; @@ -78,7 +78,7 @@ public function testShouldInvokeSchedulerMethodAndReturnResponse() ->willReturn('scheduler-result') ; - $context = $this->createPsrContextMock(); + $context = $this->createContextMock(); $context ->expects($this->once()) ->method('createMessage') @@ -125,7 +125,7 @@ public function testOnExceptionShouldEncodeExceptionAndReturn() ->willThrowException($ex) ; - $context = $this->createPsrContextMock(); + $context = $this->createContextMock(); $context ->expects($this->once()) ->method('createMessage') @@ -141,11 +141,11 @@ public function testOnExceptionShouldEncodeExceptionAndReturn() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext + * @return \PHPUnit_Framework_MockObject_MockObject|Context */ - private function createPsrContextMock() + private function createContextMock() { - return $this->createMock(PsrContext::class); + return $this->createMock(Context::class); } /** diff --git a/Tests/Scheduler/JobRunShellProcessorTest.php b/Tests/Scheduler/JobRunShellProcessorTest.php index 73c24e6..7b6b118 100644 --- a/Tests/Scheduler/JobRunShellProcessorTest.php +++ b/Tests/Scheduler/JobRunShellProcessorTest.php @@ -6,8 +6,8 @@ use Enqueue\Consumption\Result; use Enqueue\Null\NullMessage; use Enqueue\Util\JSON; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Context; +use Interop\Queue\Processor; use PHPUnit\Framework\TestCase; use Quartz\Bridge\Scheduler\JobRunShellProcessor; use Quartz\Scheduler\StdJobRunShell; @@ -16,11 +16,11 @@ class JobRunShellProcessorTest extends TestCase { - public function testShouldImplementPsrProcessorInterface() + public function testShouldImplementProcessorInterface() { $processor = new JobRunShellProcessor($this->createJobStore(), $this->createJobRunShell()); - $this->assertInstanceOf(PsrProcessor::class, $processor); + $this->assertInstanceOf(Processor::class, $processor); } public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectectedSubscribedCommand() @@ -30,9 +30,9 @@ public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectecte $this->assertInstanceOf(CommandSubscriberInterface::class, $processor); $expectedConfig = [ - 'processorName' => 'quartz_job_run_shell', - 'queueName' => 'quartz_job_run_shell', - 'queueNameHardcoded' => true, + 'command' => 'quartz_job_run_shell', + 'queue' => 'quartz_job_run_shell', + 'prefix_queue' => false, 'exclusive' => true, ]; @@ -64,7 +64,7 @@ public function testShouldRejectMessageIfJobInstanceIdIsNotSet() $processor = new JobRunShellProcessor($store, $shell); - $result = $processor->process(new NullMessage(), $this->createMock(PsrContext::class)); + $result = $processor->process(new NullMessage(), $this->createMock(Context::class)); $this->assertInstanceOf(Result::class, $result); $this->assertSame('fire instance id is empty', $result->getReason()); @@ -91,7 +91,7 @@ public function testShouldRejectMessageIfJobInstanceWasNotFound() 'fireInstanceId' => '1234', ])); - $result = $processor->process($message, $this->createMock(PsrContext::class)); + $result = $processor->process($message, $this->createMock(Context::class)); $this->assertInstanceOf(Result::class, $result); $this->assertSame('There is not trigger with fire instance id: "1234"', $result->getReason()); @@ -122,9 +122,10 @@ public function testShouldPassTriggerToJobRunShell() 'fireInstanceId' => '1234', ])); - $result = $processor->process($message, $this->createMock(PsrContext::class)); + $result = $processor->process($message, $this->createMock(Context::class)); - $this->assertSame(Result::ACK, $result); + $this->assertInstanceOf(Result::class, $result); + $this->assertSame(Result::ACK, $result->getStatus()); } /** diff --git a/composer.json b/composer.json index 4f6730a..cad3e55 100644 --- a/composer.json +++ b/composer.json @@ -6,10 +6,10 @@ "keywords": ["job", "time", "task", "time scheduler", "quartz", "chrono"], "license": "MIT", "require": { - "php": ">=5.6", + "php": "^7.1", "symfony/framework-bundle": "^3|^4", - "enqueue/enqueue": "^0.8", - "queue-interop/queue-interop": "^0.6", + "enqueue/enqueue": "^0.9", + "queue-interop/queue-interop": "^0.7|^0.8", "php-quartz/quartz": "^0.1.1" }, "require-dev": {