From 6064a73d8ba48336d5e563da46e477dcba735023 Mon Sep 17 00:00:00 2001 From: Michal Sitek Date: Fri, 22 Sep 2023 02:59:51 +0200 Subject: [PATCH 1/7] [Messenger] Add option option `transactional` to publish on AMQP queues using transactions Resolves #51656 --- .../Messenger/Bridge/Amqp/CHANGELOG.md | 5 +++ .../Amqp/Tests/Transport/ConnectionTest.php | 41 +++++++++++++++++++ .../Bridge/Amqp/Transport/Connection.php | 19 +++++++++ 3 files changed, 65 insertions(+) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 4c1ade4ea05d4..ee8e1a7a22d0b 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +6.3 +--- + + * Add option to use transactions for publishing + 6.0 --- diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index a08c102a4b9da..aca9e0d5a2464 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -146,6 +146,12 @@ public function testExceptionIfInvalidExchangeOptionIsPassed() Connection::fromDsn('amqp://host', ['exchange' => ['foo' => 'bar']]); } + public function testExceptionIfConfirmTimeoutAndTransactionalIsPassed() + { + $this->expectExceptionMessage('Confirm timeout cannot be used on transactional channel.'); + Connection::fromDsn('amqp://host?confirm_timeout=0.5&transactional=1'); + } + public function testSetsParametersOnTheQueueAndExchange() { $factory = new TestAmqpFactory( @@ -753,6 +759,41 @@ public function testItCanPublishAndWaitForConfirmation() $connection->publish('body'); } + public function testItPublishMessagesWithoutTransactions() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $this->createMock(\AMQPExchange::class) + ); + + $amqpChannel->expects($this->never())->method('startTransaction'); + $amqpChannel->expects($this->never())->method('commitTransaction'); + + $connection = Connection::fromDsn('amqp://localhost', [], $factory); + $connection->publish('body'); + } + + public function testItPublishMessagesUsingTransactions() + { + $factory = new TestAmqpFactory( + $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpExchange->method('getChannel')->willReturn($amqpChannel); + + $amqpChannel->expects($this->once())->method('startTransaction'); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); + $amqpChannel->expects($this->once())->method('commitTransaction'); + + $connection = Connection::fromDsn('amqp://localhost?transactional=1', [], $factory); + $connection->publish('body'); + } + public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn() { $this->assertEquals( diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 1ed87b1a3b510..9e46052b202ce 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -54,6 +54,7 @@ class Connection 'read_timeout', 'write_timeout', 'confirm_timeout', + 'transactional', 'connect_timeout', 'rpc_timeout', 'cacert', @@ -128,6 +129,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. + * * transactional: Enable or not using transactions for publishing (Default: false) * * queues[name]: An array of queues, keyed by the name * * binding_keys: The binding keys (if any) to bind to this queue * * binding_arguments: Arguments to be used while binding the queue. @@ -251,6 +253,14 @@ private static function validateOptions(array $options): void && 0 < \count($invalidExchangeOptions = array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) { throw new LogicException(sprintf('Invalid exchange option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidExchangeOptions))); } + + $transactional = isset($options['transactional']) && filter_var($options['transactional'], \FILTER_VALIDATE_BOOL); + $confirmTimeout = ('' !== ($options['confirm_timeout'] ?? '')); + + if ($transactional && $confirmTimeout) { + throw new LogicException(sprintf('Confirm timeout cannot be used on transactional channel.')); + } + } private static function normalizeQueueArguments(array $arguments): array @@ -334,9 +344,14 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers); $attributes['delivery_mode'] ??= 2; $attributes['timestamp'] ??= time(); + $transactional = isset($this->connectionOptions['transactional']) && filter_var($this->connectionOptions['transactional'], \FILTER_VALIDATE_BOOL); $this->lastActivityTime = time(); + if ($transactional) { + $exchange->getChannel()->startTransaction(); + } + $exchange->publish( $body, $routingKey, @@ -344,6 +359,10 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string $attributes ); + if ($transactional) { + $exchange->getChannel()->commitTransaction(); + } + if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) { $this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']); } From 14de41dea676e2a33817dfb29dd324cface24b1d Mon Sep 17 00:00:00 2001 From: michalsitek <48724329+michalsitek@users.noreply.github.com> Date: Fri, 22 Sep 2023 11:23:42 +0200 Subject: [PATCH 2/7] Fixed changed version in changelog Co-authored-by: Joseph Bielawski --- src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index ee8e1a7a22d0b..dd3020662fa2e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -1,7 +1,7 @@ CHANGELOG ========= -6.3 +6.4 --- * Add option to use transactions for publishing From f290f4693fb3b1dd91730c1e6319adfe11169c0c Mon Sep 17 00:00:00 2001 From: Michal Sitek Date: Fri, 22 Sep 2023 11:26:47 +0200 Subject: [PATCH 3/7] Removed redundant sprintf --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 9e46052b202ce..27904f21b848d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -258,7 +258,7 @@ private static function validateOptions(array $options): void $confirmTimeout = ('' !== ($options['confirm_timeout'] ?? '')); if ($transactional && $confirmTimeout) { - throw new LogicException(sprintf('Confirm timeout cannot be used on transactional channel.')); + throw new LogicException('Confirm timeout cannot be used on transactional channel.'); } } From e58dfd05a7ae99325954ce7fd621f5bb0eccb9f2 Mon Sep 17 00:00:00 2001 From: Michal Sitek Date: Fri, 22 Sep 2023 11:28:45 +0200 Subject: [PATCH 4/7] Removed redundant newline --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 27904f21b848d..3c74f0d658dbb 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -260,7 +260,6 @@ private static function validateOptions(array $options): void if ($transactional && $confirmTimeout) { throw new LogicException('Confirm timeout cannot be used on transactional channel.'); } - } private static function normalizeQueueArguments(array $arguments): array From 6c81d714c64ad216a3da07a54414625c701e64f7 Mon Sep 17 00:00:00 2001 From: Michal Sitek Date: Fri, 22 Sep 2023 13:38:21 +0200 Subject: [PATCH 5/7] Made sure transaction is started before and committed after publishing --- .../Amqp/Tests/Transport/ConnectionTest.php | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index aca9e0d5a2464..045fbe809aefa 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -786,9 +786,31 @@ public function testItPublishMessagesUsingTransactions() $amqpExchange->method('getChannel')->willReturn($amqpChannel); - $amqpChannel->expects($this->once())->method('startTransaction'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); - $amqpChannel->expects($this->once())->method('commitTransaction'); + $startedTransaction = false; + $amqpChannel->expects($this->once())->method('startTransaction')->willReturnCallback( + function () use (&$startedTransaction) { + $startedTransaction = true; + } + ); + + + $published = false; + $amqpExchange->expects($this->once())->method('publish')->with( + 'body', + null, + \AMQP_NOPARAM, + ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()] + )->willReturnCallback(function () use (&$startedTransaction, &$published) { + $this->assertTrue($startedTransaction); + $published = true; + }); + + + $amqpChannel->expects($this->once())->method('commitTransaction')->willReturnCallback( + function () use (&$published) { + $this->assertTrue($published); + } + ); $connection = Connection::fromDsn('amqp://localhost?transactional=1', [], $factory); $connection->publish('body'); From ecb8749e3b30a835232e750194ebd222e197240e Mon Sep 17 00:00:00 2001 From: Michal Sitek Date: Fri, 22 Sep 2023 13:40:19 +0200 Subject: [PATCH 6/7] Added missing return value for mock in tests --- .../Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 045fbe809aefa..310ce2be5b36d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -765,9 +765,11 @@ public function testItPublishMessagesWithoutTransactions() $this->createMock(\AMQPConnection::class), $amqpChannel = $this->createMock(\AMQPChannel::class), $this->createMock(\AMQPQueue::class), - $this->createMock(\AMQPExchange::class) + $amqpExchange = $this->createMock(\AMQPExchange::class) ); + $amqpExchange->method('getChannel')->willReturn($amqpChannel); + $amqpChannel->expects($this->never())->method('startTransaction'); $amqpChannel->expects($this->never())->method('commitTransaction'); From 41a9f2c080b9ed8f4d4b1ef7a1c8e735315c281e Mon Sep 17 00:00:00 2001 From: Michal Sitek Date: Fri, 22 Sep 2023 13:41:36 +0200 Subject: [PATCH 7/7] Removed double newlines --- .../Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 310ce2be5b36d..7cf6bef0f701e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -795,7 +795,6 @@ function () use (&$startedTransaction) { } ); - $published = false; $amqpExchange->expects($this->once())->method('publish')->with( 'body', @@ -807,7 +806,6 @@ function () use (&$startedTransaction) { $published = true; }); - $amqpChannel->expects($this->once())->method('commitTransaction')->willReturnCallback( function () use (&$published) { $this->assertTrue($published);