Skip to content

Commit a1cb2ab

Browse files
committed
[Messenger][AMQP] Use delivery_mode=2 by default
1 parent 5690b97 commit a1cb2ab

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public function testItSetupsTheConnectionWithDefaults()
227227
);
228228

229229
$amqpExchange->expects($this->once())->method('declareExchange');
230-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
230+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
231231
$amqpQueue->expects($this->once())->method('declareQueue');
232232
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
233233

@@ -250,7 +250,7 @@ public function testItSetupsTheConnection()
250250
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
251251

252252
$amqpExchange->expects($this->once())->method('declareExchange');
253-
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
253+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
254254
$amqpQueue0->expects($this->once())->method('declareQueue');
255255
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
256256
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
@@ -287,7 +287,7 @@ public function testBindingArguments()
287287
$factory->method('createQueue')->willReturn($amqpQueue);
288288

289289
$amqpExchange->expects($this->once())->method('declareExchange');
290-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
290+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
291291
$amqpQueue->expects($this->once())->method('declareQueue');
292292
$amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive(
293293
[self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']]
@@ -400,7 +400,7 @@ public function testItDelaysTheMessage()
400400
$delayQueue->expects($this->once())->method('declareQueue');
401401
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
402402

403-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
403+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
404404

405405
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
406406
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
@@ -442,7 +442,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
442442
$delayQueue->expects($this->once())->method('declareQueue');
443443
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
444444

445-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
445+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
446446
$connection->publish('{}', [], 120000);
447447
}
448448

@@ -474,12 +474,27 @@ public function testAmqpStampHeadersAreUsed()
474474
$amqpExchange = $this->createMock(\AMQPExchange::class)
475475
);
476476

477-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
477+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);
478478

479479
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
480480
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
481481
}
482482

483+
public function testAmqpStampDelireryModeIsUsed()
484+
{
485+
$factory = new TestAmqpFactory(
486+
$this->createMock(\AMQPConnection::class),
487+
$this->createMock(\AMQPChannel::class),
488+
$this->createMock(\AMQPQueue::class),
489+
$amqpExchange = $this->createMock(\AMQPExchange::class)
490+
);
491+
492+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);
493+
494+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
495+
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
496+
}
497+
483498
public function testItCanPublishWithTheDefaultRoutingKey()
484499
{
485500
$factory = new TestAmqpFactory(
@@ -546,7 +561,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
546561
$delayQueue->expects($this->once())->method('declareQueue');
547562
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
548563

549-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
564+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
550565
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
551566
}
552567

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
230230
{
231231
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
232232
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
233+
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
233234

234235
$exchange->publish(
235236
$body,

0 commit comments

Comments
 (0)