diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 81c0100991936..849f106537948 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -4,6 +4,7 @@ CHANGELOG 5.2.0 ----- + * Introduced support for multiple bindings on the same queue. * Add option to confirm message delivery * DSN now support AMQPS out-of-the-box. diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 36fde1250587c..2563efd83af2f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -398,6 +398,38 @@ public function testBindingArguments() $connection->publish('body'); } + public function testMultipleBindings() + { + $amqpConnection = $this->createMock(\AMQPConnection::class); + $amqpChannel = $this->createMock(\AMQPChannel::class); + $amqpExchange = $this->createMock(\AMQPExchange::class); + $amqpQueue = $this->createMock(\AMQPQueue::class); + + $factory = $this->createMock(AmqpFactory::class); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createExchange')->willReturn($amqpExchange); + $factory->method('createQueue')->willReturn($amqpQueue); + + $amqpExchange->expects($this->once())->method('declareExchange'); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $amqpQueue->expects($this->once())->method('declareQueue'); + $amqpQueue->expects($this->exactly(2))->method('bind')->withConsecutive( + [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'change']], + [self::DEFAULT_EXCHANGE_NAME, 'binding_key0', ['x-match' => 'all', 'header-property' => 'remove']] + ); + + $dsn = 'amqp://localhost?exchange[type]=headers'. + '&queues[queue0][bindings][one][arguments][x-match]=all'. + '&queues[queue0][bindings][one][arguments][header-property]=change'. + '&queues[queue0][bindings][two][arguments][x-match]=all'. + '&queues[queue0][bindings][two][arguments][header-property]=remove'. + '&queues[queue0][bindings][two][key]=binding_key0'; + + $connection = Connection::fromDsn($dsn, [], $factory); + $connection->publish('body'); + } + public function testItCanDisableTheSetup() { $factory = new TestAmqpFactory( diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 64a550167b17c..86911d42db211 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -60,9 +60,18 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ + 'bindings', + 'flags', + 'arguments', + ]; + + private const ORIGINAL_BINDING_KEYS = [ 'binding_keys', 'binding_arguments', - 'flags', + ]; + + private const AVAILABLE_BINDINGS_OPTIONS = [ + 'key', 'arguments', ]; @@ -131,8 +140,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. * * queues[name]: An array of queues, keyed by the name - * * binding_keys: The binding keys (if any) to bind to this queue - * * binding_arguments: Arguments to be used while binding the queue. + * * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings') + * * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings') + * * bindings[name]: An array of bindings for this queue, keyed by the name + * * key: The binding key (if any) to bind to this queue + * * arguments: An array of arguments to be used while binding the queue. * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: @@ -237,8 +249,18 @@ private static function validateOptions(array $options): void continue; } - if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::ORIGINAL_BINDING_KEYS))) { trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions)); + } elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + trigger_deprecation('symfony/messenger', '5.2', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions)); + } + + if (\is_array($queue['bindings'] ?? false)) { + foreach ($queue['bindings'] as $title => $individualBinding) { + if (0 < \count($invalidBindingsOptions = array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { + throw new \InvalidArgumentException(sprintf('Invalid bindings option(s) "%s" passed to the AMQP Messenger transport in "%s". Each "bindings" option only accepts "key" and "arguments"', implode('", "', $invalidBindingsOptions), $title)); + } + } } } } @@ -461,6 +483,12 @@ private function setupExchangeAndQueues(): void foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); + foreach ($queueConfig['bindings'] ?? [] as $binding) { + $this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []); + } + if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) { + continue; + } foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []); }