forked from swarrot/swarrot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAckProcessor.php
76 lines (66 loc) · 2.16 KB
/
AckProcessor.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
<?php
namespace Swarrot\Processor\Ack;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Swarrot\Broker\Message;
use Swarrot\Broker\MessageProvider\MessageProviderInterface;
use Swarrot\Processor\ConfigurableInterface;
use Swarrot\Processor\ProcessorInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class AckProcessor implements ConfigurableInterface
{
private $processor;
private $messageProvider;
private $logger;
public function __construct(ProcessorInterface $processor, MessageProviderInterface $messageProvider, LoggerInterface $logger = null)
{
$this->processor = $processor;
$this->messageProvider = $messageProvider;
$this->logger = $logger ?: new NullLogger();
}
/**
* {@inheritdoc}
*/
public function process(Message $message, array $options): bool
{
try {
$return = $this->processor->process($message, $options);
$this->messageProvider->ack($message);
$this->logger->info(
"[Ack] Message has been correctly ack'ed",
[
'message_id' => $message->getId(),
'swarrot_processor' => 'ack',
]
);
return $return;
} catch (\Throwable $e) {
$requeue = isset($options['requeue_on_error']) ? (bool) $options['requeue_on_error'] : false;
$this->messageProvider->nack($message, $requeue);
$this->logger->error(
sprintf(
'[Ack] An exception occurred, the message has been %s.',
$requeue ? 'requeued' : "nack'ed"
),
[
'message_id' => $message->getId(),
'swarrot_processor' => 'ack',
'exception' => $e,
]
);
throw $e;
}
}
/**
* {@inheritdoc}
*/
public function setDefaultOptions(OptionsResolver $resolver): void
{
$resolver
->setDefaults([
'requeue_on_error' => false,
])
->setAllowedTypes('requeue_on_error', 'bool')
;
}
}