From dd55e54dc8bab39dc20461a4b6d8796822f74237 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Fri, 31 Jul 2020 15:52:18 -0500 Subject: [PATCH 1/6] 37233 Support multiple bindings on the same queue --- .../Messenger/Bridge/Amqp/Transport/Connection.php | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 97d2f7672f3b8..df47429f69acf 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -131,6 +131,9 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * queues[name]: An array of queues, keyed by the name * * binding_keys: The binding keys (if any) to bind to this queue * * binding_arguments: Arguments to be used while binding the queue. + * * bindings[name]: An array of bindings for this queue, keyed by the name + * * key: The binding key (if any) to bind to this queue + * * arguments: An array of arguments to be used while binding the queue. * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: @@ -443,6 +446,12 @@ private function setupExchangeAndQueues(): void foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); + foreach ($queueConfig['bindings'] ?? [] as $binding) { + $this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []); + } + if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) { + continue; + } foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []); } From be9e7812b4f7b18ac8df530f34d49538f42ba861 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Fri, 31 Jul 2020 16:20:14 -0500 Subject: [PATCH 2/6] 37233 Adding info on feature to changelog --- src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 20dbe19420958..07f82be4b80c9 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +5.2.0 +----- + + * Introduced support for multiple bindings on the same queue. + 5.1.0 ----- From c2b77ce9f844a705d3399241208117fa92a3fe25 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Mon, 3 Aug 2020 16:14:42 -0500 Subject: [PATCH 3/6] 37233 Introduce unit tests for multiple bindings, deprecate old binding arguments --- .../Amqp/Tests/Transport/ConnectionTest.php | 31 +++++++++++++++++++ .../Bridge/Amqp/Transport/Connection.php | 7 ++--- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 81b8e45d858f9..d19bce9ec75b5 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -339,6 +339,37 @@ public function testBindingArguments() $connection->publish('body'); } + public function testMultipleBindings() + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + $amqpExchange = $this->createMock(\AMQPExchange::class); + $amqpQueue = $this->createMock(\AMQPQueue::class); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createExchange')->willReturn($amqpExchange); + $factory->method('createQueue')->willReturn($amqpQueue); + + $amqpExchange->expects($this->once())->method('declareExchange'); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $amqpQueue->expects($this->once())->method('declareQueue'); + $amqpQueue->expects($this->exactly(2))->method('bind')->withConsecutive( + [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'change']], + [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'remove']] + ); + + $dsn = 'amqp://localhost?exchange[type]=headers'. + '&queues[queue0][bindings][one][arguments][x-match]=all'. + '&queues[queue0][bindings][one][arguments][header-property]=change'. + '&queues[queue0][bindings][two][arguments][x-match]=all'. + '&queues[queue0][bindings][two][arguments][header-property]=remove'; + + $connection = Connection::fromDsn($dsn, [], $factory); + $connection->publish('body'); + } + public function testItCanDisableTheSetup() { $factory = new TestAmqpFactory( diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index df47429f69acf..0cbc54f95bb7e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -59,8 +59,7 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ - 'binding_keys', - 'binding_arguments', + 'bindings', 'flags', 'arguments', ]; @@ -129,8 +128,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * queues[name]: An array of queues, keyed by the name - * * binding_keys: The binding keys (if any) to bind to this queue - * * binding_arguments: Arguments to be used while binding the queue. + * * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings') + * * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings') * * bindings[name]: An array of bindings for this queue, keyed by the name * * key: The binding key (if any) to bind to this queue * * arguments: An array of arguments to be used while binding the queue. From 32de160aae2f8d2a572db945e96306c38e743353 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Thu, 6 Aug 2020 13:27:00 -0500 Subject: [PATCH 4/6] 37233 Update deprecation notice, additional key test --- .../Bridge/Amqp/Tests/Transport/ConnectionTest.php | 5 +++-- .../Messenger/Bridge/Amqp/Transport/Connection.php | 10 ++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index d19bce9ec75b5..a7ac14d1934e1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -357,14 +357,15 @@ public function testMultipleBindings() $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'change']], - [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'remove']] + [self::DEFAULT_EXCHANGE_NAME, 'binding_key0', ['x-match' => 'all', 'header-property' => 'remove']] ); $dsn = 'amqp://localhost?exchange[type]=headers'. '&queues[queue0][bindings][one][arguments][x-match]=all'. '&queues[queue0][bindings][one][arguments][header-property]=change'. '&queues[queue0][bindings][two][arguments][x-match]=all'. - '&queues[queue0][bindings][two][arguments][header-property]=remove'; + '&queues[queue0][bindings][two][arguments][header-property]=remove'. + '&queues[queue0][bindings][two][key]=binding_key0'; $connection = Connection::fromDsn($dsn, [], $factory); $connection->publish('body'); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 0cbc54f95bb7e..cc083dc61c5ae 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -59,6 +59,14 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ + 'binding_keys', + 'binding_arguments', + 'bindings', + 'flags', + 'arguments', + ]; + + private const AVAILABLE_QUEUE_OPTIONS_52 = [ 'bindings', 'flags', 'arguments', @@ -233,6 +241,8 @@ private static function validateOptions(array $options): void if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions)); + } elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS_52))) { + trigger_deprecation('symfony/messenger', '5.2', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); } } } From 76426b483ed37805252cb26ea0341aa9144e14e7 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Mon, 5 Oct 2020 15:21:58 -0500 Subject: [PATCH 5/6] 37233 - Better validation of queue options --- .../Bridge/Amqp/Transport/Connection.php | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index db0213ef03085..c56127280a858 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -60,16 +60,18 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ - 'binding_keys', - 'binding_arguments', 'bindings', 'flags', 'arguments', ]; - private const AVAILABLE_QUEUE_OPTIONS_52 = [ - 'bindings', - 'flags', + private const ORIGINAL_BINDING_KEYS = [ + 'binding_keys', + 'binding_arguments', + ]; + + private const AVAILABLE_BINDINGS_OPTIONS = [ + 'key', 'arguments', ]; @@ -247,11 +249,15 @@ private static function validateOptions(array $options): void continue; } - if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::ORIGINAL_BINDING_KEYS))) { trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions)); - } elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS_52))) { + } elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { trigger_deprecation('symfony/messenger', '5.2', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); } + + if (\is_array($queue['bindings'] ?? false) && 0 < \count($invalidBindingsOptions = array_diff(array_keys($queue['bindings']), self::AVAILABLE_BINDINGS_OPTIONS))) { + throw new \InvalidArgumentException('Invalid bindings option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option only accepts "key" and "arguments"', implode('", "', $invalidBindingsOptions)); + } } } From 52a834b8f460d36a7a39654bc8d1b103bbf32495 Mon Sep 17 00:00:00 2001 From: Ryan Jefferson Date: Mon, 5 Oct 2020 16:30:52 -0500 Subject: [PATCH 6/6] 37233 - Better checking each binding --- .../Messenger/Bridge/Amqp/Transport/Connection.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index c56127280a858..86911d42db211 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -255,8 +255,12 @@ private static function validateOptions(array $options): void trigger_deprecation('symfony/messenger', '5.2', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); } - if (\is_array($queue['bindings'] ?? false) && 0 < \count($invalidBindingsOptions = array_diff(array_keys($queue['bindings']), self::AVAILABLE_BINDINGS_OPTIONS))) { - throw new \InvalidArgumentException('Invalid bindings option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option only accepts "key" and "arguments"', implode('", "', $invalidBindingsOptions)); + if (\is_array($queue['bindings'] ?? false)) { + foreach ($queue['bindings'] as $title => $individualBinding) { + if (0 < \count($invalidBindingsOptions = array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { + throw new \InvalidArgumentException(sprintf('Invalid bindings option(s) "%s" passed to the AMQP Messenger transport in "%s". Each "bindings" option only accepts "key" and "arguments"', implode('", "', $invalidBindingsOptions), $title)); + } + } } } }