Skip to content

Commit

Permalink
updat deps, fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Dec 19, 2018
1 parent 8ec625e commit e8b319f
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 73 deletions.
9 changes: 5 additions & 4 deletions DI/QuartzExtension.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Quartz\Bridge\DI;

use Enqueue\Client\ProducerInterface;
use Quartz\Bridge\Enqueue\EnqueueRemoteTransportProcessor;
use Quartz\Bridge\Enqueue\EnqueueResponseJob;
use Quartz\Bridge\Scheduler\EnqueueJobRunShell;
Expand Down Expand Up @@ -56,7 +57,7 @@ public function load(array $configs, ContainerBuilder $container)
;

$container->register($this->format('enqueue_job_run_shell'), EnqueueJobRunShell::class)
->setArguments([new Reference('enqueue.producer')])
->setArguments([new Reference(ProducerInterface::class)])
;

$container->register($this->format('job_run_shell_factory'), StdJobRunShellFactory::class)
Expand All @@ -69,7 +70,7 @@ public function load(array $configs, ContainerBuilder $container)

// TODO: add config option where can enable/disable this job
$container->register($this->format('job.enqueue_response'), EnqueueResponseJob::class)
->setArguments([new Reference('enqueue.producer')])
->setArguments([new Reference(ProducerInterface::class)])
->addTag($this->format('job'), ['alias' => 'enqueue_response'])
->addTag($this->format('job'), ['alias' => EnqueueResponseJob::class])
;
Expand All @@ -93,7 +94,7 @@ public function load(array $configs, ContainerBuilder $container)
new Reference($this->format('store')),
new Reference($this->format('std_job_run_shell'))
])
->addTag('enqueue.client.processor')
->addTag('enqueue.command_subscriber')
;

$container->register($this->format('rpc_protocol'), RpcProtocol::class)
Expand All @@ -106,7 +107,7 @@ public function load(array $configs, ContainerBuilder $container)
new Reference($this->format('scheduler')),
new Reference($this->format('rpc_protocol'))
])
->addTag('enqueue.client.processor')
->addTag('enqueue.command_subscriber')
;
}

Expand Down
3 changes: 2 additions & 1 deletion DI/RemoteSchedulerExtension.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Quartz\Bridge\DI;

use Enqueue\Client\ProducerInterface;
use Quartz\Bridge\Enqueue\EnqueueRemoteTransport;
use Quartz\Bridge\Scheduler\RemoteScheduler;
use Quartz\Bridge\Scheduler\RpcProtocol;
Expand Down Expand Up @@ -37,7 +38,7 @@ public function getAlias()
public function load(array $configs, ContainerBuilder $container)
{
$container->register($this->format('remote.transport'), EnqueueRemoteTransport::class)
->setArguments([new Reference('enqueue.client.producer_v2')])
->setArguments([new Reference(ProducerInterface::class)])
;

$container->register($this->format('remote.rpc_protocol'), RpcProtocol::class)
Expand Down
29 changes: 10 additions & 19 deletions Enqueue/EnqueueRemoteTransportProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
use Enqueue\Consumption\QueueSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Util\JSON;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Quartz\Bridge\Scheduler\RpcProtocol;
use Quartz\Core\Scheduler;

class EnqueueRemoteTransportProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
class EnqueueRemoteTransportProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface
{
/**
* @var Scheduler
Expand All @@ -33,10 +33,7 @@ public function __construct(Scheduler $scheduler, RpcProtocol $rpcProtocol)
$this->rpcProtocol = $rpcProtocol;
}

/**
* {@inheritdoc}
*/
public function process(PsrMessage $message, PsrContext $context)
public function process(Message $message, Context $context): Result
{
try {
$request = $this->rpcProtocol->decodeRequest(JSON::decode($message->getBody()));
Expand All @@ -50,23 +47,17 @@ public function process(PsrMessage $message, PsrContext $context)
return Result::reply($context->createMessage(JSON::encode($result)));
}

/**
* {@inheritdoc}
*/
public static function getSubscribedCommand()
public static function getSubscribedCommand(): array
{
return [
'processorName' => EnqueueRemoteTransport::COMMAND,
'queueName' => EnqueueRemoteTransport::COMMAND,
'queueNameHardcoded' => true,
'command' => EnqueueRemoteTransport::COMMAND,
'queue' => EnqueueRemoteTransport::COMMAND,
'prefix_queue' => false,
'exclusive' => true,
];
}

/**
* {@inheritdoc}
*/
public static function getSubscribedQueues()
public static function getSubscribedQueues(): array
{
return [EnqueueRemoteTransport::COMMAND];
}
Expand Down
31 changes: 11 additions & 20 deletions Scheduler/JobRunShellProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
use Enqueue\Consumption\QueueSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Util\JSON;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Quartz\Scheduler\JobStore;
use Quartz\Scheduler\StdJobRunShell;

class JobRunShellProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
class JobRunShellProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface
{
/**
* @var JobStore
Expand All @@ -33,10 +33,7 @@ public function __construct(JobStore $store, StdJobRunShell $runShell)
$this->runShell = $runShell;
}

/**
* {@inheritdoc}
*/
public function process(PsrMessage $message, PsrContext $context)
public function process(Message $message, Context $context): Result
{
$data = JSON::decode($message->getBody());

Expand All @@ -50,26 +47,20 @@ public function process(PsrMessage $message, PsrContext $context)

$this->runShell->execute($trigger);

return Result::ACK;
return Result::ack();
}

/**
* {@inheritdoc}
*/
public static function getSubscribedCommand()
public static function getSubscribedCommand(): array
{
return [
'processorName' => EnqueueJobRunShell::COMMAND,
'queueName' => EnqueueJobRunShell::COMMAND,
'queueNameHardcoded' => true,
'command' => EnqueueJobRunShell::COMMAND,
'queue' => EnqueueJobRunShell::COMMAND,
'prefix_queue' => false,
'exclusive' => true,
];
}

/**
* {@inheritdoc}
*/
public static function getSubscribedQueues()
public static function getSubscribedQueues(): array
{
return [EnqueueJobRunShell::COMMAND];
}
Expand Down
6 changes: 3 additions & 3 deletions Tests/DI/QuartzConfigurationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function testShouldReturnDefaultConfig()
'uriOptions' => [],
'driverOptions' => [],
'sessionId' => 'quartz',
'dbName' => 'quartz',
'dbName' => null,
'managementLockCol' => 'managementLock',
'calendarCol' => 'calendar',
'triggerCol' => 'trigger',
Expand All @@ -42,7 +42,7 @@ public function testShouldReturnDefaultConfig()

public function testCouldSetConfigurationOptions()
{
$configuration = new QuartzConfiguration([]);
$configuration = new QuartzConfiguration();

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
Expand All @@ -58,7 +58,7 @@ public function testCouldSetConfigurationOptions()
'uriOptions' => [],
'driverOptions' => [],
'sessionId' => 'quartz',
'dbName' => 'quartz',
'dbName' => null,
'managementLockCol' => 'managementLock',
'calendarCol' => 'calendar',
'triggerCol' => 'trigger',
Expand Down
24 changes: 12 additions & 12 deletions Tests/Enqueue/EnqueueRemoteTransportProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
use Enqueue\Consumption\QueueSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Null\NullMessage;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrProcessor;
use Interop\Queue\Context;
use Interop\Queue\Processor;
use PHPUnit\Framework\TestCase;
use Quartz\Bridge\Enqueue\EnqueueRemoteTransportProcessor;
use Quartz\Bridge\Scheduler\RpcProtocol;
Expand All @@ -16,11 +16,11 @@

class EnqueueRemoteTransportProcessorTest extends TestCase
{
public function testShouldImpleentPsrProcessorInterface()
public function testShouldImpleentProcessorInterface()
{
$processor = new EnqueueRemoteTransportProcessor($this->createSchedulerMock(), $this->createRpcProtocolMock());

$this->assertInstanceOf(PsrProcessor::class, $processor);
$this->assertInstanceOf(Processor::class, $processor);
}

public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectectedSubscribedCommand()
Expand All @@ -30,9 +30,9 @@ public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectecte
$this->assertInstanceOf(CommandSubscriberInterface::class, $processor);

$expectedConfig = [
'processorName' => 'quartz_rpc',
'queueName' => 'quartz_rpc',
'queueNameHardcoded' => true,
'command' => 'quartz_rpc',
'queue' => 'quartz_rpc',
'prefix_queue' => false,
'exclusive' => true,
];

Expand Down Expand Up @@ -78,7 +78,7 @@ public function testShouldInvokeSchedulerMethodAndReturnResponse()
->willReturn('scheduler-result')
;

$context = $this->createPsrContextMock();
$context = $this->createContextMock();
$context
->expects($this->once())
->method('createMessage')
Expand Down Expand Up @@ -125,7 +125,7 @@ public function testOnExceptionShouldEncodeExceptionAndReturn()
->willThrowException($ex)
;

$context = $this->createPsrContextMock();
$context = $this->createContextMock();
$context
->expects($this->once())
->method('createMessage')
Expand All @@ -141,11 +141,11 @@ public function testOnExceptionShouldEncodeExceptionAndReturn()
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|PsrContext
* @return \PHPUnit_Framework_MockObject_MockObject|Context
*/
private function createPsrContextMock()
private function createContextMock()
{
return $this->createMock(PsrContext::class);
return $this->createMock(Context::class);
}

/**
Expand Down
23 changes: 12 additions & 11 deletions Tests/Scheduler/JobRunShellProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
use Enqueue\Consumption\Result;
use Enqueue\Null\NullMessage;
use Enqueue\Util\JSON;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrProcessor;
use Interop\Queue\Context;
use Interop\Queue\Processor;
use PHPUnit\Framework\TestCase;
use Quartz\Bridge\Scheduler\JobRunShellProcessor;
use Quartz\Scheduler\StdJobRunShell;
Expand All @@ -16,11 +16,11 @@

class JobRunShellProcessorTest extends TestCase
{
public function testShouldImplementPsrProcessorInterface()
public function testShouldImplementProcessorInterface()
{
$processor = new JobRunShellProcessor($this->createJobStore(), $this->createJobRunShell());

$this->assertInstanceOf(PsrProcessor::class, $processor);
$this->assertInstanceOf(Processor::class, $processor);
}

public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectectedSubscribedCommand()
Expand All @@ -30,9 +30,9 @@ public function testShouldImplementCommandSubscriberInterfaceAndReturnExpectecte
$this->assertInstanceOf(CommandSubscriberInterface::class, $processor);

$expectedConfig = [
'processorName' => 'quartz_job_run_shell',
'queueName' => 'quartz_job_run_shell',
'queueNameHardcoded' => true,
'command' => 'quartz_job_run_shell',
'queue' => 'quartz_job_run_shell',
'prefix_queue' => false,
'exclusive' => true,
];

Expand Down Expand Up @@ -64,7 +64,7 @@ public function testShouldRejectMessageIfJobInstanceIdIsNotSet()

$processor = new JobRunShellProcessor($store, $shell);

$result = $processor->process(new NullMessage(), $this->createMock(PsrContext::class));
$result = $processor->process(new NullMessage(), $this->createMock(Context::class));

$this->assertInstanceOf(Result::class, $result);
$this->assertSame('fire instance id is empty', $result->getReason());
Expand All @@ -91,7 +91,7 @@ public function testShouldRejectMessageIfJobInstanceWasNotFound()
'fireInstanceId' => '1234',
]));

$result = $processor->process($message, $this->createMock(PsrContext::class));
$result = $processor->process($message, $this->createMock(Context::class));

$this->assertInstanceOf(Result::class, $result);
$this->assertSame('There is not trigger with fire instance id: "1234"', $result->getReason());
Expand Down Expand Up @@ -122,9 +122,10 @@ public function testShouldPassTriggerToJobRunShell()
'fireInstanceId' => '1234',
]));

$result = $processor->process($message, $this->createMock(PsrContext::class));
$result = $processor->process($message, $this->createMock(Context::class));

$this->assertSame(Result::ACK, $result);
$this->assertInstanceOf(Result::class, $result);
$this->assertSame(Result::ACK, $result->getStatus());
}

/**
Expand Down
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"keywords": ["job", "time", "task", "time scheduler", "quartz", "chrono"],
"license": "MIT",
"require": {
"php": ">=5.6",
"php": "^7.1",
"symfony/framework-bundle": "^3|^4",
"enqueue/enqueue": "^0.8",
"queue-interop/queue-interop": "^0.6",
"enqueue/enqueue": "^0.9",
"queue-interop/queue-interop": "^0.7|^0.8",
"php-quartz/quartz": "^0.1.1"
},
"require-dev": {
Expand Down

0 comments on commit e8b319f

Please sign in to comment.