Skip to content

Commit dfdb38c

Browse files
committed
Add a retry mechanism for AMQP messages
Add some tests and remove the intrication between the receiver and the connection Configure each TTL individually and the dead routing key The `ttls` array is 0-indexed
1 parent 9b841d9 commit dfdb38c

File tree

4 files changed

+247
-3
lines changed

4 files changed

+247
-3
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

+55-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
6868
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
6969
$connection->method('get')->willReturn($envelope);
7070

71-
$connection->expects($this->once())->method('nack')->with($envelope);
71+
$connection->expects($this->once())->method('nack')->with($envelope, AMQP_REQUEUE);
7272

7373
$receiver = new AmqpReceiver($serializer, $connection);
7474
$receiver->receive(function () {
@@ -100,6 +100,60 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn
100100
throw new WillNeverWorkException('Well...');
101101
});
102102
}
103+
104+
public function testItPublishesTheMessageForRetryIfSuchConfiguration()
105+
{
106+
$serializer = new Serializer(
107+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
108+
);
109+
110+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
111+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
112+
$envelope->method('getHeaders')->willReturn(array(
113+
'type' => DummyMessage::class,
114+
));
115+
116+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
117+
$connection->method('get')->willReturn($envelope);
118+
$connection->method('getConnectionCredentials')->willReturn(array('retry' => array('attempts' => 3)));
119+
$connection->method('publishForRetry')->with($envelope)->willReturn(true);
120+
121+
$connection->expects($this->once())->method('ack')->with($envelope);
122+
123+
$receiver = new AmqpReceiver($serializer, $connection);
124+
$receiver->receive(function ($message) use ($receiver) {
125+
$this->assertEquals(new DummyMessage('Hi'), $message);
126+
$receiver->stop();
127+
});
128+
}
129+
130+
/**
131+
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException
132+
*/
133+
public function testItThrowsTheExceptionIfTheRetryPublishDidNotWork()
134+
{
135+
$serializer = new Serializer(
136+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
137+
);
138+
139+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
140+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
141+
$envelope->method('getHeaders')->willReturn(array(
142+
'type' => DummyMessage::class,
143+
));
144+
145+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
146+
$connection->method('get')->willReturn($envelope);
147+
$connection->method('getConnectionCredentials')->willReturn(array('retry' => array('attempts' => 3)));
148+
$connection->method('publishForRetry')->with($envelope)->willReturn(false);
149+
150+
$connection->expects($this->never())->method('ack')->with($envelope);
151+
152+
$receiver = new AmqpReceiver($serializer, $connection);
153+
$receiver->receive(function () {
154+
throw new InterruptException('Well...');
155+
});
156+
}
103157
}
104158

105159
class InterruptException extends \Exception

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

+92
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,98 @@ public function testItCanDisableTheSetup()
189189
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', array(), true, $factory);
190190
$connection->publish('body');
191191
}
192+
193+
public function testItRetriesTheMessage()
194+
{
195+
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
196+
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
197+
$retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
198+
199+
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
200+
$factory->method('createConnection')->willReturn($amqpConnection);
201+
$factory->method('createChannel')->willReturn($amqpChannel);
202+
$factory->method('createQueue')->willReturn($retryQueue);
203+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
204+
$retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
205+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
206+
));
207+
208+
$amqpExchange->expects($this->once())->method('setName')->with('messages');
209+
$amqpExchange->method('getName')->willReturn('messages');
210+
211+
$retryExchange->expects($this->once())->method('setName')->with('retry');
212+
$retryExchange->expects($this->once())->method('declareExchange');
213+
$retryExchange->method('getName')->willReturn('retry');
214+
215+
$retryQueue->expects($this->once())->method('setName')->with('retry_queue_1');
216+
$retryQueue->expects($this->once())->method('setArguments')->with(array(
217+
'x-message-ttl' => 30000,
218+
'x-dead-letter-exchange' => 'messages',
219+
));
220+
221+
$retryQueue->expects($this->once())->method('declareQueue');
222+
$retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_1');
223+
224+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
225+
$envelope->method('getHeader')->with('symfony-messenger-attempts')->willReturn(false);
226+
$envelope->method('getHeaders')->willReturn(array('x-some-headers' => 'foo'));
227+
$envelope->method('getBody')->willReturn('{}');
228+
229+
$retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_1', AMQP_NOPARAM, array('headers' => array('x-some-headers' => 'foo', 'symfony-messenger-attempts' => 1)));
230+
231+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', array('retry' => array('attempts' => 3)), false, $factory);
232+
$connection->publishForRetry($envelope);
233+
}
234+
235+
public function testItRetriesTheMessageWithADifferentRoutingKeyAndTTLs()
236+
{
237+
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
238+
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
239+
$retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
240+
241+
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
242+
$factory->method('createConnection')->willReturn($amqpConnection);
243+
$factory->method('createChannel')->willReturn($amqpChannel);
244+
$factory->method('createQueue')->willReturn($retryQueue);
245+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
246+
$retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
247+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
248+
));
249+
250+
$amqpExchange->expects($this->once())->method('setName')->with('messages');
251+
$amqpExchange->method('getName')->willReturn('messages');
252+
253+
$retryExchange->expects($this->once())->method('setName')->with('retry');
254+
$retryExchange->expects($this->once())->method('declareExchange');
255+
$retryExchange->method('getName')->willReturn('retry');
256+
257+
$connectionOptions = array(
258+
'retry' => array(
259+
'attempts' => 3,
260+
'dead_routing_key' => 'my_dead_routing_key',
261+
'ttls' => array(30000, 60000, 120000),
262+
),
263+
);
264+
265+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, false, $factory);
266+
267+
$messageRetriedTwice = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
268+
$messageRetriedTwice->method('getHeader')->with('symfony-messenger-attempts')->willReturn('2');
269+
$messageRetriedTwice->method('getHeaders')->willReturn(array('symfony-messenger-attempts' => '2'));
270+
$messageRetriedTwice->method('getBody')->willReturn('{}');
271+
272+
$retryQueue->expects($this->once())->method('setName')->with('retry_queue_3');
273+
$retryQueue->expects($this->once())->method('setArguments')->with(array(
274+
'x-message-ttl' => 120000,
275+
'x-dead-letter-exchange' => 'messages',
276+
));
277+
278+
$retryQueue->expects($this->once())->method('declareQueue');
279+
$retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_3');
280+
281+
$retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_3', AMQP_NOPARAM, array('headers' => array('symfony-messenger-attempts' => 3)));
282+
$connection->publishForRetry($messageRetriedTwice);
283+
}
192284
}
193285

194286
class TestAmqpFactory extends AmqpFactory

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

+10-2
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,14 @@ public function receive(callable $handler): void
6262

6363
throw $e;
6464
} catch (\Throwable $e) {
65-
$this->connection->nack($message, AMQP_REQUEUE);
65+
if (!$this->connection->publishForRetry($message)) {
66+
$this->connection->nack($message, AMQP_REQUEUE);
6667

67-
throw $e;
68+
throw $e;
69+
}
70+
71+
// Acknowledge current message as another one as been requeued.
72+
$this->connection->ack($message);
6873
} finally {
6974
if (\function_exists('pcntl_signal_dispatch')) {
7075
pcntl_signal_dispatch();
@@ -73,6 +78,9 @@ public function receive(callable $handler): void
7378
}
7479
}
7580

81+
/**
82+
* {@inheritdoc}
83+
*/
7684
public function stop(): void
7785
{
7886
$this->shouldStop = true;

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

+90
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ class Connection
3939
*/
4040
private $amqpQueue;
4141

42+
/**
43+
* @var \AMQPExchange|null
44+
*/
45+
private $amqpRetryExchange;
46+
4247
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null)
4348
{
4449
$this->connectionCredentials = $connectionCredentials;
@@ -101,6 +106,91 @@ public function publish(string $body, array $headers = array()): void
101106
$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers));
102107
}
103108

109+
/**
110+
* @throws \AMQPException
111+
*/
112+
public function publishForRetry(\AMQPEnvelope $message): bool
113+
{
114+
if (!isset($this->connectionCredentials['retry'])) {
115+
return false;
116+
}
117+
118+
$retryConfiguration = $this->connectionCredentials['retry'];
119+
$attemptNumber = ($message->getHeader('symfony-messenger-attempts') ?: 0) + 1;
120+
121+
if ($this->shouldSetup()) {
122+
$this->setupRetry($retryConfiguration, $attemptNumber);
123+
}
124+
125+
$maximumAttempts = $retryConfiguration['attempts'] ?? 3;
126+
$routingKey = str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern'] ?? 'attempt_%attempt%');
127+
128+
if ($attemptNumber > $maximumAttempts) {
129+
if (!isset($retryConfiguration['dead_queue'])) {
130+
return false;
131+
}
132+
133+
$routingKey = $retryConfiguration['dead_routing_key'] ?? 'dead';
134+
}
135+
136+
$this->retryExchange($retryConfiguration)->publish(
137+
$message->getBody(),
138+
$routingKey,
139+
AMQP_NOPARAM,
140+
array('headers' => array_merge($message->getHeaders(), array('symfony-messenger-attempts' => $attemptNumber)))
141+
);
142+
143+
return true;
144+
}
145+
146+
private function setupRetry(array $retryConfiguration, int $attemptNumber)
147+
{
148+
if (!$this->channel()->isConnected()) {
149+
$this->clear();
150+
}
151+
152+
$exchange = $this->retryExchange($retryConfiguration);
153+
$exchange->declareExchange();
154+
155+
$queue = $this->retryQueue($retryConfiguration, $attemptNumber);
156+
$queue->declareQueue();
157+
$queue->bind($exchange->getName(), str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern'] ?? 'attempt_%attempt%'));
158+
159+
if (isset($retryConfiguration['dead_queue'])) {
160+
$queue = $this->amqpFactory->createQueue($this->channel());
161+
$queue->setName($retryConfiguration['dead_queue']);
162+
$queue->declareQueue();
163+
$queue->bind($exchange->getName(), $retryConfiguration['dead_routing_key'] ?? 'dead');
164+
}
165+
}
166+
167+
private function retryExchange(array $retryConfiguration)
168+
{
169+
if (null === $this->amqpRetryExchange) {
170+
$this->amqpRetryExchange = $this->amqpFactory->createExchange($this->channel());
171+
$this->amqpRetryExchange->setName($retryConfiguration['name'] ?? 'retry');
172+
$this->amqpRetryExchange->setType(AMQP_EX_TYPE_DIRECT);
173+
}
174+
175+
return $this->amqpRetryExchange;
176+
}
177+
178+
private function retryQueue(array $retryConfiguration, int $attemptNumber)
179+
{
180+
$queue = $this->amqpFactory->createQueue($this->channel());
181+
$queue->setName(str_replace('%attempt%', $attemptNumber, $retryConfiguration['queue_name_pattern'] ?? 'retry_queue_%attempt%'));
182+
$queue->setArguments(array(
183+
'x-message-ttl' => $retryConfiguration['ttls'][$attemptNumber - 1] ?? 30000, // 30 seconds by default
184+
'x-dead-letter-exchange' => $this->exchange()->getName(),
185+
));
186+
187+
if (isset($this->queueConfiguration['routing_key'])) {
188+
$queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
189+
}
190+
191+
return $queue;
192+
}
193+
104194
/**
105195
* Waits and gets a message from the configured queue.
106196
*

0 commit comments

Comments
 (0)