Skip to content

Commit b542b7f

Browse files
author
scyzoryck
committed
Messenger - Add option to confirm message delivery in Amqp connection
1 parent a06e564 commit b542b7f

File tree

3 files changed

+64
-0
lines changed

3 files changed

+64
-0
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ CHANGELOG
66

77
* Introduced the AMQP bridge.
88
* Deprecated use of invalid options
9+
* Add option to confirm message delivery

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

+45
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,51 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
623623
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
624624
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
625625
}
626+
627+
public function testItPublishMessagesWithoutWaitingForConfirmation()
628+
{
629+
$factory = new TestAmqpFactory(
630+
$amqpConnection = $this->createMock(\AMQPConnection::class),
631+
$amqpChannel = $this->createMock(\AMQPChannel::class),
632+
$amqpQueue = $this->createMock(\AMQPQueue::class),
633+
$amqpExchange = $this->createMock(\AMQPExchange::class)
634+
);
635+
636+
$amqpChannel->expects($this->never())->method('waitForConfirm')->with(0.5);
637+
638+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
639+
$connection->publish('body');
640+
}
641+
642+
public function testSetChannelToConfirmMessage()
643+
{
644+
$factory = new TestAmqpFactory(
645+
$amqpConnection = $this->createMock(\AMQPConnection::class),
646+
$amqpChannel = $this->createMock(\AMQPChannel::class),
647+
$amqpQueue = $this->createMock(\AMQPQueue::class),
648+
$amqpExchange = $this->createMock(\AMQPExchange::class)
649+
);
650+
651+
$amqpChannel->expects($this->once())->method('confirmSelect');
652+
$amqpChannel->expects($this->once())->method('setConfirmCallback');
653+
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
654+
$connection->setup();
655+
}
656+
657+
public function testItCanPublishAndWaitForConfirmation()
658+
{
659+
$factory = new TestAmqpFactory(
660+
$amqpConnection = $this->createMock(\AMQPConnection::class),
661+
$amqpChannel = $this->createMock(\AMQPChannel::class),
662+
$amqpQueue = $this->createMock(\AMQPQueue::class),
663+
$amqpExchange = $this->createMock(\AMQPExchange::class)
664+
);
665+
666+
$amqpChannel->expects($this->once())->method('waitForConfirm')->with(0.5);
667+
668+
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
669+
$connection->publish('body');
670+
}
626671
}
627672

628673
class TestAmqpFactory extends AmqpFactory

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

+18
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Connection
5050
'heartbeat',
5151
'read_timeout',
5252
'write_timeout',
53+
'confirm_timeout',
5354
'connect_timeout',
5455
'cacert',
5556
'cert',
@@ -128,6 +129,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
128129
* * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
129130
* * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
130131
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
132+
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
131133
* * queues[name]: An array of queues, keyed by the name
132134
* * binding_keys: The binding keys (if any) to bind to this queue
133135
* * binding_arguments: Arguments to be used while binding the queue.
@@ -324,6 +326,10 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
324326
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
325327
$attributes
326328
);
329+
330+
if (isset($this->connectionOptions['confirm_timeout'])) {
331+
$this->channel()->waitForConfirm($this->connectionOptions['confirm_timeout']);
332+
}
327333
}
328334

329335
private function setupDelay(int $delay, ?string $routingKey)
@@ -477,6 +483,18 @@ public function channel(): \AMQPChannel
477483
if (isset($this->connectionOptions['prefetch_count'])) {
478484
$this->amqpChannel->setPrefetchCount($this->connectionOptions['prefetch_count']);
479485
}
486+
487+
if (isset($this->connectionOptions['confirm_timeout'])) {
488+
$this->amqpChannel->confirmSelect();
489+
$this->amqpChannel->setConfirmCallback(
490+
static function (): bool {
491+
return false;
492+
},
493+
static function (): bool {
494+
return false;
495+
}
496+
);
497+
}
480498
}
481499

482500
return $this->amqpChannel;

0 commit comments

Comments
 (0)