diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 4c1ade4ea05d4..dd3020662fa2e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +6.4 +--- + + * Add option to use transactions for publishing + 6.0 --- diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index a08c102a4b9da..7cf6bef0f701e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -146,6 +146,12 @@ public function testExceptionIfInvalidExchangeOptionIsPassed() Connection::fromDsn('amqp://host', ['exchange' => ['foo' => 'bar']]); } + public function testExceptionIfConfirmTimeoutAndTransactionalIsPassed() + { + $this->expectExceptionMessage('Confirm timeout cannot be used on transactional channel.'); + Connection::fromDsn('amqp://host?confirm_timeout=0.5&transactional=1'); + } + public function testSetsParametersOnTheQueueAndExchange() { $factory = new TestAmqpFactory( @@ -753,6 +759,63 @@ public function testItCanPublishAndWaitForConfirmation() $connection->publish('body'); } + public function testItPublishMessagesWithoutTransactions() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->method('getChannel')->willReturn($amqpChannel); + + $amqpChannel->expects($this->never())->method('startTransaction'); + $amqpChannel->expects($this->never())->method('commitTransaction'); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + + public function testItPublishMessagesUsingTransactions() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->method('getChannel')->willReturn($amqpChannel); + + $startedTransaction = false; + $amqpChannel->expects($this->once())->method('startTransaction')->willReturnCallback( + function () use (&$startedTransaction) { + $startedTransaction = true; + } + ); + + $published = false; + $amqpExchange->expects($this->once())->method('publish')->with( + 'body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()] + )->willReturnCallback(function () use (&$startedTransaction, &$published) { + $this->assertTrue($startedTransaction); + $published = true; + }); + + $amqpChannel->expects($this->once())->method('commitTransaction')->willReturnCallback( + function () use (&$published) { + $this->assertTrue($published); + } + ); + + $connection = Connection::fromDsn('amqp://localhost?transactional=1', [], $factory); + $connection->publish('body'); + } + public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() { $this->assertEquals( diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 1ed87b1a3b510..3c74f0d658dbb 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -54,6 +54,7 @@ class Connection 'read_timeout', 'write_timeout', 'confirm_timeout', + 'transactional', 'connect_timeout', 'rpc_timeout', 'cacert', @@ -128,6 +129,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. + * * transactional: Enable or not using transactions for publishing (Default: false) * * queues[name]: An array of queues, keyed by the name * * binding_keys: The binding keys (if any) to bind to this queue * * binding_arguments: Arguments to be used while binding the queue. @@ -251,6 +253,13 @@ private static function validateOptions(array $options): void && 0 < \count($invalidExchangeOptions = array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) { throw new LogicException(sprintf('Invalid exchange option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidExchangeOptions))); } + + $transactional = isset($options['transactional']) && filter_var($options['transactional'], \FILTER_VALIDATE_BOOL); + $confirmTimeout = ('' !== ($options['confirm_timeout'] ?? '')); + + if ($transactional && $confirmTimeout) { + throw new LogicException('Confirm timeout cannot be used on transactional channel.'); + } } private static function normalizeQueueArguments(array $arguments): array @@ -334,9 +343,14 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers); $attributes['delivery_mode'] ??= 2; $attributes['timestamp'] ??= time(); + $transactional = isset($this->connectionOptions['transactional']) && filter_var($this->connectionOptions['transactional'], \FILTER_VALIDATE_BOOL); $this->lastActivityTime = time(); + if ($transactional) { + $exchange->getChannel()->startTransaction(); + } + $exchange->publish( $body, $routingKey, @@ -344,6 +358,10 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string $attributes ); + if ($transactional) { + $exchange->getChannel()->commitTransaction(); + } + if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) { $this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']); }