-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathSnsQsConsumer.php
143 lines (114 loc) · 3.88 KB
/
SnsQsConsumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
<?php
declare(strict_types=1);
namespace Enqueue\SnsQs;
use Enqueue\Sqs\SqsConsumer;
use Enqueue\Sqs\SqsMessage;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Message;
use Interop\Queue\Queue;
class SnsQsConsumer implements Consumer
{
/**
* @var SnsQsContext
*/
private $context;
/**
* @var SqsConsumer
*/
private $consumer;
/**
* @var SnsQsQueue
*/
private $queue;
public function __construct(SnsQsContext $context, SqsConsumer $consumer, SnsQsQueue $queue)
{
$this->context = $context;
$this->consumer = $consumer;
$this->queue = $queue;
}
public function getVisibilityTimeout(): ?int
{
return $this->consumer->getVisibilityTimeout();
}
/**
* The duration (in seconds) that the received messages are hidden from subsequent retrieve
* requests after being retrieved by a ReceiveMessage request.
*/
public function setVisibilityTimeout(?int $visibilityTimeout = null): void
{
$this->consumer->setVisibilityTimeout($visibilityTimeout);
}
public function getMaxNumberOfMessages(): int
{
return $this->consumer->getMaxNumberOfMessages();
}
/**
* The maximum number of messages to return. Amazon SQS never returns more messages than this value
* (however, fewer messages might be returned). Valid values are 1 to 10. Default is 1.
*/
public function setMaxNumberOfMessages(int $maxNumberOfMessages): void
{
$this->consumer->setMaxNumberOfMessages($maxNumberOfMessages);
}
public function getQueue(): Queue
{
return $this->queue;
}
public function receive(int $timeout = 0): ?Message
{
if ($sqsMessage = $this->consumer->receive($timeout)) {
return $this->convertMessage($sqsMessage);
}
return null;
}
public function receiveNoWait(): ?Message
{
if ($sqsMessage = $this->consumer->receiveNoWait()) {
return $this->convertMessage($sqsMessage);
}
return null;
}
/**
* @param SnsQsMessage $message
*/
public function acknowledge(Message $message): void
{
InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class);
$this->consumer->acknowledge($message->getSqsMessage());
}
/**
* @param SnsQsMessage $message
*/
public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class);
$this->consumer->reject($message->getSqsMessage(), $requeue);
}
private function convertMessage(SqsMessage $sqsMessage): SnsQsMessage
{
$message = $this->context->createMessage();
$message->setRedelivered($sqsMessage->isRedelivered());
$message->setSqsMessage($sqsMessage);
$body = $sqsMessage->getBody();
if (isset($body[0]) && '{' === $body[0]) {
$data = json_decode($sqsMessage->getBody(), true);
if (isset($data['TopicArn']) && isset($data['Type']) && 'Notification' === $data['Type']) {
// SNS message conversion
if (isset($data['Message'])) {
$message->setBody((string) $data['Message']);
}
if (isset($data['MessageAttributes']['Headers'])) {
$headersData = json_decode($data['MessageAttributes']['Headers']['Value'], true);
$message->setHeaders($headersData[0]);
$message->setProperties($headersData[1]);
}
return $message;
}
}
$message->setBody($sqsMessage->getBody());
$message->setHeaders($sqsMessage->getHeaders());
$message->setProperties($sqsMessage->getProperties());
return $message;
}
}