@@ -189,6 +189,98 @@ public function testItCanDisableTheSetup()
189
189
$ connection = Connection::fromDsn ('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false ' , array (), true , $ factory );
190
190
$ connection ->publish ('body ' );
191
191
}
192
+
193
+ public function testItRetriesTheMessage ()
194
+ {
195
+ $ amqpConnection = $ this ->getMockBuilder (\AMQPConnection::class)->disableOriginalConstructor ()->getMock ();
196
+ $ amqpChannel = $ this ->getMockBuilder (\AMQPChannel::class)->disableOriginalConstructor ()->getMock ();
197
+ $ retryQueue = $ this ->getMockBuilder (\AMQPQueue::class)->disableOriginalConstructor ()->getMock ();
198
+
199
+ $ factory = $ this ->getMockBuilder (AmqpFactory::class)->getMock ();
200
+ $ factory ->method ('createConnection ' )->willReturn ($ amqpConnection );
201
+ $ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
202
+ $ factory ->method ('createQueue ' )->willReturn ($ retryQueue );
203
+ $ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
204
+ $ retryExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock (),
205
+ $ amqpExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock ()
206
+ ));
207
+
208
+ $ amqpExchange ->expects ($ this ->once ())->method ('setName ' )->with ('messages ' );
209
+ $ amqpExchange ->method ('getName ' )->willReturn ('messages ' );
210
+
211
+ $ retryExchange ->expects ($ this ->once ())->method ('setName ' )->with ('retry ' );
212
+ $ retryExchange ->expects ($ this ->once ())->method ('declareExchange ' );
213
+ $ retryExchange ->method ('getName ' )->willReturn ('retry ' );
214
+
215
+ $ retryQueue ->expects ($ this ->once ())->method ('setName ' )->with ('retry_queue_1 ' );
216
+ $ retryQueue ->expects ($ this ->once ())->method ('setArguments ' )->with (array (
217
+ 'x-message-ttl ' => 30000 ,
218
+ 'x-dead-letter-exchange ' => 'messages ' ,
219
+ ));
220
+
221
+ $ retryQueue ->expects ($ this ->once ())->method ('declareQueue ' );
222
+ $ retryQueue ->expects ($ this ->once ())->method ('bind ' )->with ('retry ' , 'attempt_1 ' );
223
+
224
+ $ envelope = $ this ->getMockBuilder (\AMQPEnvelope::class)->getMock ();
225
+ $ envelope ->method ('getHeader ' )->with ('symfony-messenger-attempts ' )->willReturn (false );
226
+ $ envelope ->method ('getHeaders ' )->willReturn (array ('x-some-headers ' => 'foo ' ));
227
+ $ envelope ->method ('getBody ' )->willReturn ('{} ' );
228
+
229
+ $ retryExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'attempt_1 ' , AMQP_NOPARAM , array ('headers ' => array ('x-some-headers ' => 'foo ' , 'symfony-messenger-attempts ' => 1 )));
230
+
231
+ $ connection = Connection::fromDsn ('amqp://localhost/%2f/messages ' , array ('retry ' => array ('attempts ' => 3 )), false , $ factory );
232
+ $ connection ->publishForRetry ($ envelope );
233
+ }
234
+
235
+ public function testItRetriesTheMessageWithADifferentRoutingKeyAndTTLs ()
236
+ {
237
+ $ amqpConnection = $ this ->getMockBuilder (\AMQPConnection::class)->disableOriginalConstructor ()->getMock ();
238
+ $ amqpChannel = $ this ->getMockBuilder (\AMQPChannel::class)->disableOriginalConstructor ()->getMock ();
239
+ $ retryQueue = $ this ->getMockBuilder (\AMQPQueue::class)->disableOriginalConstructor ()->getMock ();
240
+
241
+ $ factory = $ this ->getMockBuilder (AmqpFactory::class)->getMock ();
242
+ $ factory ->method ('createConnection ' )->willReturn ($ amqpConnection );
243
+ $ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
244
+ $ factory ->method ('createQueue ' )->willReturn ($ retryQueue );
245
+ $ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
246
+ $ retryExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock (),
247
+ $ amqpExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock ()
248
+ ));
249
+
250
+ $ amqpExchange ->expects ($ this ->once ())->method ('setName ' )->with ('messages ' );
251
+ $ amqpExchange ->method ('getName ' )->willReturn ('messages ' );
252
+
253
+ $ retryExchange ->expects ($ this ->once ())->method ('setName ' )->with ('retry ' );
254
+ $ retryExchange ->expects ($ this ->once ())->method ('declareExchange ' );
255
+ $ retryExchange ->method ('getName ' )->willReturn ('retry ' );
256
+
257
+ $ connectionOptions = array (
258
+ 'retry ' => array (
259
+ 'attempts ' => 3 ,
260
+ 'dead_routing_key ' => 'my_dead_routing_key ' ,
261
+ 'ttls ' => array (30000 , 60000 , 120000 ),
262
+ ),
263
+ );
264
+
265
+ $ connection = Connection::fromDsn ('amqp://localhost/%2f/messages ' , $ connectionOptions , false , $ factory );
266
+
267
+ $ messageRetriedTwice = $ this ->getMockBuilder (\AMQPEnvelope::class)->getMock ();
268
+ $ messageRetriedTwice ->method ('getHeader ' )->with ('symfony-messenger-attempts ' )->willReturn ('2 ' );
269
+ $ messageRetriedTwice ->method ('getHeaders ' )->willReturn (array ('symfony-messenger-attempts ' => '2 ' ));
270
+ $ messageRetriedTwice ->method ('getBody ' )->willReturn ('{} ' );
271
+
272
+ $ retryQueue ->expects ($ this ->once ())->method ('setName ' )->with ('retry_queue_3 ' );
273
+ $ retryQueue ->expects ($ this ->once ())->method ('setArguments ' )->with (array (
274
+ 'x-message-ttl ' => 120000 ,
275
+ 'x-dead-letter-exchange ' => 'messages ' ,
276
+ ));
277
+
278
+ $ retryQueue ->expects ($ this ->once ())->method ('declareQueue ' );
279
+ $ retryQueue ->expects ($ this ->once ())->method ('bind ' )->with ('retry ' , 'attempt_3 ' );
280
+
281
+ $ retryExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'attempt_3 ' , AMQP_NOPARAM , array ('headers ' => array ('symfony-messenger-attempts ' => 3 )));
282
+ $ connection ->publishForRetry ($ messageRetriedTwice );
283
+ }
192
284
}
193
285
194
286
class TestAmqpFactory extends AmqpFactory
0 commit comments