diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index b052d0fe14104..9fcd9f488ebed 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +7.4 +--- + + * Add option to use transactions for publishing + 7.3 --- diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index e2d94d6bc3b63..cbd2eb4fdbab4 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -146,6 +146,12 @@ public function testExceptionIfInvalidExchangeOptionIsPassed() Connection::fromDsn('amqp://host', ['exchange' => ['foo' => 'bar']]); } + public function testExceptionIfConfirmTimeoutAndTransactionalIsPassed() + { + $this->expectExceptionMessage('Confirm timeout cannot be used on transactional channel.'); + Connection::fromDsn('amqp://host?confirm_timeout=0.5&transactional=1'); + } + public function testSetsParametersOnTheQueueAndExchange() { $factory = new TestAmqpFactory( @@ -761,6 +767,63 @@ public function testItCanPublishAndWaitForConfirmation() $connection->publish('body'); } + public function testItPublishMessagesWithoutTransactions() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->method('getChannel')->willReturn($amqpChannel); + + $amqpChannel->expects($this->never())->method('startTransaction'); + $amqpChannel->expects($this->never())->method('commitTransaction'); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + + public function testItPublishMessagesUsingTransactions() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->method('getChannel')->willReturn($amqpChannel); + + $startedTransaction = false; + $amqpChannel->expects($this->once())->method('startTransaction')->willReturnCallback( + function () use (&$startedTransaction) { + $startedTransaction = true; + } + ); + + $published = false; + $amqpExchange->expects($this->once())->method('publish')->with( + 'body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()] + )->willReturnCallback(function () use (&$startedTransaction, &$published) { + $this->assertTrue($startedTransaction); + $published = true; + }); + + $amqpChannel->expects($this->once())->method('commitTransaction')->willReturnCallback( + function () use (&$published) { + $this->assertTrue($published); + } + ); + + $connection = Connection::fromDsn('amqp://localhost?transactional=1', [], $factory); + $connection->publish('body'); + } + public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() { $this->assertEquals( diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 2599586f8f3d8..0c7a48283f08a 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -56,6 +56,7 @@ class Connection 'read_timeout', 'write_timeout', 'confirm_timeout', + 'transactional', 'connect_timeout', 'rpc_timeout', 'cacert', @@ -129,6 +130,7 @@ public function __construct( * * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. + * * transactional: Enable or not using transactions for publishing (Default: false) * * queues[name]: An array of queues, keyed by the name * * binding_keys: The binding keys (if any) to bind to this queue * * binding_arguments: Arguments to be used while binding the queue. @@ -253,6 +255,13 @@ private static function validateOptions(array $options): void && 0 < \count($invalidExchangeOptions = array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) { throw new LogicException(\sprintf('Invalid exchange option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidExchangeOptions))); } + + $transactional = isset($options['transactional']) && filter_var($options['transactional'], \FILTER_VALIDATE_BOOL); + $confirmTimeout = ('' !== ($options['confirm_timeout'] ?? '')); + + if ($transactional && $confirmTimeout) { + throw new LogicException('Confirm timeout cannot be used on transactional channel.'); + } } private static function normalizeQueueArguments(array $arguments): array @@ -338,9 +347,14 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, ?strin $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers); $attributes['delivery_mode'] ??= 2; $attributes['timestamp'] ??= time(); + $transactional = isset($this->connectionOptions['transactional']) && filter_var($this->connectionOptions['transactional'], \FILTER_VALIDATE_BOOL); $this->lastActivityTime = time(); + if ($transactional) { + $exchange->getChannel()->startTransaction(); + } + $exchange->publish( $body, $routingKey, @@ -348,6 +362,10 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, ?strin $attributes ); + if ($transactional) { + $exchange->getChannel()->commitTransaction(); + } + if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) { $this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']); }