Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new event when job is created #62

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Doctrine/BaseDoctrineJobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Doctrine\Common\Persistence\ObjectManager;
use Doctrine\Common\Persistence\ObjectRepository;
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Manager\ArchivableJobManager;
use Dtc\QueueBundle\Manager\JobTimingManager;
use Dtc\QueueBundle\Manager\RunManager;
Expand All @@ -28,17 +29,19 @@ abstract class BaseDoctrineJobManager extends ArchivableJobManager
* @param JobTimingManager $jobTimingManager
* @param ObjectManager $objectManager
* @param $jobClass
* @param EventDispatcher $eventDispatcher
* @param $jobArchiveClass
*/
public function __construct(
RunManager $runManager,
JobTimingManager $jobTimingManager,
ObjectManager $objectManager,
$jobClass,
EventDispatcher $eventDispatcher,
$jobArchiveClass
) {
$this->objectManager = $objectManager;
parent::__construct($runManager, $jobTimingManager, $jobClass, $jobArchiveClass);
parent::__construct($runManager, $jobTimingManager, $jobClass, $eventDispatcher, $jobArchiveClass);
}

protected function getFetchCount($totalCount)
Expand Down
1 change: 1 addition & 0 deletions EventDispatcher/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

class Event
{
const POST_CREATE_JOB = 'queue.post_create_job';
const PRE_JOB = 'queue.pre_job';
const POST_JOB = 'queue.post_job';

Expand Down
9 changes: 7 additions & 2 deletions Manager/AbstractJobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Dtc\QueueBundle\Manager;

use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Exception\UnsupportedException;
use Dtc\QueueBundle\Model\BaseJob;
use Dtc\QueueBundle\Model\Job;
Expand All @@ -11,12 +12,16 @@ abstract class AbstractJobManager implements JobManagerInterface
protected $jobTiminigManager;
protected $jobClass;
protected $runManager;

public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)

/** @var EventDispatcher $eventDispatcher */
protected $eventDispatcher;

public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, EventDispatcher $eventDispatcher)
{
$this->runManager = $runManager;
$this->jobTiminigManager = $jobTimingManager;
$this->jobClass = $jobClass;
$this->eventDispatcher = $eventDispatcher;
}

/**
Expand Down
6 changes: 5 additions & 1 deletion Manager/ArchivableJobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Dtc\QueueBundle\Manager;

use Dtc\QueueBundle\EventDispatcher\EventDispatcher;

abstract class ArchivableJobManager extends StallableJobManager
{
/**
Expand All @@ -15,16 +17,18 @@ abstract class ArchivableJobManager extends StallableJobManager
* @param RunManager $runManager
* @param JobTimingManager $jobTimingManager
* @param $jobClass
* @param EventDispatcher $eventDispatcher
* @param $jobArchiveClass
*/
public function __construct(
RunManager $runManager,
JobTimingManager $jobTimingManager,
$jobClass,
EventDispatcher $eventDispatcher,
$jobArchiveClass
) {
$this->jobArchiveClass = $jobArchiveClass;
parent::__construct($runManager, $jobTimingManager, $jobClass);
parent::__construct($runManager, $jobTimingManager, $jobClass, $eventDispatcher);
}

/**
Expand Down
8 changes: 7 additions & 1 deletion Manager/RetryableJobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Dtc\QueueBundle\Manager;

use Dtc\QueueBundle\EventDispatcher\Event;
use Dtc\QueueBundle\Model\BaseJob;
use Dtc\QueueBundle\Model\RetryableJob;
use Dtc\QueueBundle\Model\Job;
Expand Down Expand Up @@ -48,7 +49,12 @@ public function save(Job $job)
$this->recordTiming($job);
$job->setUpdatedAt(new \DateTime());

return $this->retryableSave($job);
$result = $this->retryableSave($job);

$event = new Event($job);
$this->eventDispatcher->dispatch(Event::POST_CREATE_JOB, $event);

return $result;
}

/**
Expand Down
4 changes: 3 additions & 1 deletion Model/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Dtc\QueueBundle\Model;

use Dtc\QueueBundle\Manager\JobManagerInterface;

class Job extends BaseJob
{
const STATUS_EXPIRED = 'expired';
Expand All @@ -28,7 +30,7 @@ public function __call($method, $args)
}

$job = $this->jobManager->save($this);

return $job;
}

Expand Down
5 changes: 3 additions & 2 deletions RabbitMQ/JobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Dtc\QueueBundle\RabbitMQ;

use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Manager\SaveableTrait;
use Dtc\QueueBundle\Manager\VerifyTrait;
use Dtc\QueueBundle\Model\BaseJob;
Expand Down Expand Up @@ -39,11 +40,11 @@ class JobManager extends PriorityJobManager
protected $hostname;
protected $pid;

public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, EventDispatcher $eventDispatcher)
{
$this->hostname = gethostname() ?: '';
$this->pid = getmypid();
parent::__construct($runManager, $jobTimingManager, $jobClass);
parent::__construct($runManager, $jobTimingManager, $jobClass, $eventDispatcher);
}

/**
Expand Down
5 changes: 3 additions & 2 deletions Redis/BaseJobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Dtc\QueueBundle\Redis;

use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Manager\JobIdTrait;
use Dtc\QueueBundle\Manager\JobTimingManager;
use Dtc\QueueBundle\Manager\PriorityJobManager;
Expand Down Expand Up @@ -31,13 +32,13 @@ abstract class BaseJobManager extends PriorityJobManager
/**
* @param string $cacheKeyPrefix
*/
public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, EventDispatcher $eventDispatcher, $cacheKeyPrefix)
{
$this->cacheKeyPrefix = $cacheKeyPrefix;
$this->hostname = gethostname() ?: '';
$this->pid = getmypid();

parent::__construct($runManager, $jobTimingManager, $jobClass);
parent::__construct($runManager, $jobTimingManager, $jobClass, $eventDispatcher);
}

public function setRedis(RedisInterface $redis)
Expand Down
5 changes: 5 additions & 0 deletions Resources/config/queue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ services:
- "@dtc_queue.manager.job_timing"
- '@dtc_queue.document_manager'
- '%dtc_queue.class.job%'
- '@dtc_queue.event_dispatcher'
- '%dtc_queue.class.job_archive%'
calls:
- ["setMaxPriority", ['%dtc_queue.priority.max%']]
Expand All @@ -94,6 +95,7 @@ services:
- "@dtc_queue.manager.run"
- "@dtc_queue.manager.job_timing"
- '%dtc_queue.class.job%'
- '@dtc_queue.event_dispatcher'
calls:
- ["setDefaultMaxRetries", ['%dtc_queue.retry.max.retries%']]
- ["setDefaultMaxFailures", ['%dtc_queue.retry.max.failures%']]
Expand All @@ -107,6 +109,7 @@ services:
- "@dtc_queue.manager.run"
- "@dtc_queue.manager.job_timing"
- "%dtc_queue.class.job%"
- '@dtc_queue.event_dispatcher'
calls:
- ["setMaxPriority", ['%dtc_queue.priority.max%']]
- ["setPriorityDirection", ['%dtc_queue.priority.direction%']]
Expand All @@ -123,6 +126,7 @@ services:
- "@dtc_queue.manager.run"
- "@dtc_queue.manager.job_timing"
- "%dtc_queue.class.job%"
- '@dtc_queue.event_dispatcher'
- "%dtc_queue.redis.prefix%"
calls:
- ["setMaxPriority", ['%dtc_queue.priority.max%']]
Expand Down Expand Up @@ -199,6 +203,7 @@ services:
- "@dtc_queue.manager.job_timing"
- '@dtc_queue.entity_manager'
- '%dtc_queue.class.job%'
- '@dtc_queue.event_dispatcher'
- '%dtc_queue.class.job_archive%'
calls:
- ["setMaxPriority", ['%dtc_queue.priority.max%']]
Expand Down
4 changes: 3 additions & 1 deletion Tests/Beanstalkd/JobManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Dtc\QueueBundle\Tests\Beanstalkd;

use Dtc\QueueBundle\Beanstalkd\JobManager;
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Manager\JobTimingManager;
use Dtc\QueueBundle\Manager\RunManager;
use Dtc\QueueBundle\Tests\FibonacciWorker;
Expand Down Expand Up @@ -32,7 +33,8 @@ public static function setUpBeforeClass()
self::$beanstalkd = new Pheanstalk($host);
self::$jobTimingManager = new JobTimingManager($jobTimingClass, false);
self::$runManager = new RunManager($runClass);
self::$jobManager = new JobManager(self::$runManager, self::$jobTimingManager, $className);
self::$eventDispatcher = new EventDispatcher();
self::$jobManager = new JobManager(self::$runManager, self::$jobTimingManager, $className, self::$eventDispatcher);
self::$jobManager->setBeanstalkd(self::$beanstalkd);
self::$worker = new FibonacciWorker();

Expand Down
4 changes: 3 additions & 1 deletion Tests/Command/CommandTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Dtc\QueueBundle\Tests\Command;

use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Model\Job;
use Dtc\QueueBundle\Model\JobTiming;
use Dtc\QueueBundle\Manager\JobTimingManager;
Expand Down Expand Up @@ -86,7 +87,8 @@ protected function runStubCommand($className, $params, $call, $expectedResult =

$jobTimingManager = new JobTimingManager(JobTiming::class, false);
$runManager = new StubRunManager($jobTimingManager, \Dtc\QueueBundle\Model\Run::class);
$jobManager = new StubJobManager($runManager, $jobTimingManager, Job::class);
$eventDispatcher = new EventDispatcher();
$jobManager = new StubJobManager($runManager, $jobTimingManager, Job::class, $eventDispatcher);
$container = new Container();
$container->set('dtc_queue.manager.job', $jobManager);
$container->set('dtc_queue.manager.run', $runManager);
Expand Down
4 changes: 3 additions & 1 deletion Tests/Command/CountCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Dtc\QueueBundle\Tests\Command;

use Dtc\QueueBundle\Command\CountCommand;
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Model\Job;
use Dtc\QueueBundle\Model\JobTiming;
use Dtc\QueueBundle\Manager\JobTimingManager;
Expand All @@ -22,7 +23,8 @@ public function testCountCommand()
$container = new Container();
$jobTimingManager = new JobTimingManager(JobTiming::class, false);
$runManager = new RunManager($jobTimingManager, Run::class);
$container->set('dtc_queue.manager.job', new StubJobManager($runManager, $jobTimingManager, Job::class));
$eventDispatcher = new EventDispatcher();
$container->set('dtc_queue.manager.job', new StubJobManager($runManager, $jobTimingManager, Job::class, $eventDispatcher));
$this->runCommandException(CountCommand::class, $container, []);
}

Expand Down
3 changes: 2 additions & 1 deletion Tests/Command/CreateJobCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public function testCreateJobCommand()
{
$jobTimingManager = new JobTimingManager(JobTimingManager::class, false);
$runManager = new RunManager($jobTimingManager, Run::class);
$jobManager = new StubJobManager($runManager, $jobTimingManager, Job::class);
$eventDispatcher = new EventDispatcher();
$jobManager = new StubJobManager($runManager, $jobTimingManager, Job::class, $eventDispatcher);
$container = new Container();
$container->set('dtc_queue.manager.job', $jobManager);
$this->runCommandException(CreateJobCommand::class, $container, ['worker_name' => 'fibonacci', 'method' => 'fibonacci', 'args' => [1]]);
Expand Down
6 changes: 4 additions & 2 deletions Tests/Doctrine/DoctrineJobManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Doctrine\ORM\EntityManager;
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
use Dtc\QueueBundle\Doctrine\DtcQueueListener;
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Manager\StallableJobManager;
use Dtc\QueueBundle\Model\BaseJob;
use Dtc\QueueBundle\Model\Job;
Expand Down Expand Up @@ -52,8 +53,9 @@ public static function setUpBeforeClass()
{
self::$jobTimingManager = new self::$jobTimingManagerClass(self::$objectManager, self::$jobTimingClass, true);
self::$runManager = new self::$runManagerClass(self::$objectManager, self::$runClass, self::$runArchiveClass);
self::$eventDispatcher = new EventDispatcher();
/** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
$jobManager = new self::$jobManagerClass(self::$runManager, self::$jobTimingManager, self::$objectManager, self::$objectName, self::$archiveObjectName);
$jobManager = new self::$jobManagerClass(self::$runManager, self::$jobTimingManager, self::$objectManager, self::$objectName, self::$eventDispatcher, self::$archiveObjectName);
self::$jobManager = $jobManager;
$jobManager->setMaxPriority(255);

Expand Down Expand Up @@ -341,7 +343,7 @@ public function testResetExceptionJobs()
self::assertEquals(BaseJob::STATUS_EXCEPTION, $result->getStatus());
if ($objectManager instanceof EntityManager) {
JobManagerTest::createObjectManager();
$jobManager = new self::$jobManagerClass(self::$runManager, self::$jobTimingManager, self::$objectManager, self::$objectName, self::$archiveObjectName);
$jobManager = new self::$jobManagerClass(self::$runManager, self::$jobTimingManager, self::$objectManager, self::$objectName, self::$eventDispatcher, self::$archiveObjectName);
$jobManager->getObjectManager()->clear();
$objectManager = $jobManager->getObjectManager();
}
Expand Down
12 changes: 12 additions & 0 deletions Tests/EventDispatcher/EventDispatcherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,28 @@ public function testEventDispatcher()

self::assertFalse($eventDispatcher->hasListeners(Event::POST_JOB));
self::assertFalse($eventDispatcher->hasListeners(Event::PRE_JOB));
self::assertFalse($eventDispatcher->hasListeners(Event::POST_CREATE_JOB));

$eventDispatcher->addSubscriber($eventSubscriber);
self::assertTrue($eventDispatcher->hasListeners(Event::POST_JOB));
self::assertTrue($eventDispatcher->hasListeners(Event::PRE_JOB));
self::assertTrue($eventDispatcher->hasListeners(Event::POST_CREATE_JOB));

$job = new Job();
$event = new Event($job);

self::assertEmpty($eventSubscriber->getPostJobCalled());
self::assertEmpty($eventSubscriber->getPreJobCalled());
self::assertEmpty($eventSubscriber->getPostCreateJobCalled());

$eventDispatcher->dispatch(Event::POST_CREATE_JOB, $event);

self::assertEmpty($eventSubscriber->getPostJobCalled());
self::assertEmpty($eventSubscriber->getPreJobCalled());
$postCreateJobCalled = $eventSubscriber->getPostCreateJobCalled();
self::assertNotEmpty($postCreateJobCalled);
$dispatchedEvent = $postCreateJobCalled[0];
self::assertEquals($event, $dispatchedEvent);

$eventDispatcher->dispatch(Event::PRE_JOB, $event);

Expand Down
12 changes: 12 additions & 0 deletions Tests/EventDispatcher/StubEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

class StubEventSubscriber implements EventSubscriberInterface
{
protected $postCreateJobCalled;
protected $preJobCalled;
protected $postJobCalled;

Expand All @@ -20,6 +21,11 @@ public function postJob(Event $event)
$this->postJobCalled[] = $event;
}

public function postCreateJob(Event $event)
{
$this->postCreateJobCalled[] = $event;
}

public function getPreJobCalled()
{
return $this->preJobCalled;
Expand All @@ -30,9 +36,15 @@ public function getPostJobCalled()
return $this->postJobCalled;
}

public function getPostCreateJobCalled()
{
return $this->postCreateJobCalled;
}

public static function getSubscribedEvents()
{
return [
Event::POST_CREATE_JOB => 'postCreateJob',
Event::PRE_JOB => 'preJob',
Event::POST_JOB => 'postJob',
];
Expand Down
4 changes: 4 additions & 0 deletions Tests/Manager/BaseJobManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Dtc\QueueBundle\Tests\Manager;

use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
use Dtc\QueueBundle\Manager\JobManagerInterface;
use Dtc\QueueBundle\Manager\PriorityJobManager;
use Dtc\QueueBundle\Manager\RunManager;
Expand Down Expand Up @@ -30,6 +31,9 @@ abstract class BaseJobManagerTest extends TestCase
/** @var JobTimingManager */
public static $jobTimingManager;

/** @var EventDispatcher */
public static $eventDispatcher;

public static function setUpBeforeClass()
{
self::$jobClass = self::$jobManager->getJobClass();
Expand Down
Loading