Skip to content

[Messenger] support for multiple bindings on the same queue #37722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHANGELOG
5.2.0
-----

* Introduced support for multiple bindings on the same queue.
* Add option to confirm message delivery
* DSN now support AMQPS out-of-the-box.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,38 @@ public function testBindingArguments()
$connection->publish('body');
}

public function testMultipleBindings()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$amqpExchange = $this->createMock(\AMQPExchange::class);
$amqpQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createExchange')->willReturn($amqpExchange);
$factory->method('createQueue')->willReturn($amqpQueue);

$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->exactly(2))->method('bind')->withConsecutive(
[self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all', 'header-property' => 'change']],
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0', ['x-match' => 'all', 'header-property' => 'remove']]
);

$dsn = 'amqp://localhost?exchange[type]=headers'.
'&queues[queue0][bindings][one][arguments][x-match]=all'.
'&queues[queue0][bindings][one][arguments][header-property]=change'.
'&queues[queue0][bindings][two][arguments][x-match]=all'.
'&queues[queue0][bindings][two][arguments][header-property]=remove'.
'&queues[queue0][bindings][two][key]=binding_key0';

$connection = Connection::fromDsn($dsn, [], $factory);
$connection->publish('body');
}

public function testItCanDisableTheSetup()
{
$factory = new TestAmqpFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,18 @@ class Connection
];

private const AVAILABLE_QUEUE_OPTIONS = [
'bindings',
'flags',
'arguments',
];

private const ORIGINAL_BINDING_KEYS = [
'binding_keys',
'binding_arguments',
'flags',
];

private const AVAILABLE_BINDINGS_OPTIONS = [
'key',
'arguments',
];

Expand Down Expand Up @@ -131,8 +140,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * binding_arguments: Arguments to be used while binding the queue.
* * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings')
* * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings')
* * bindings[name]: An array of bindings for this queue, keyed by the name
* * key: The binding key (if any) to bind to this queue
* * arguments: An array of arguments to be used while binding the queue.
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
Expand Down Expand Up @@ -237,8 +249,18 @@ private static function validateOptions(array $options): void
continue;
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::ORIGINAL_BINDING_KEYS))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions));
} elseif (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
trigger_deprecation('symfony/messenger', '5.2', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "bindings" option should be used rather than "binding_keys" and "binding_arguments".', implode('", "', $invalidQueueOptions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is quite right. In 5.1, we ADDED validation of the options in general. In 5.2, you're deprecating bindings. So basically, if you pass bindings, we should say that bindings is deprecated in 5.2. If we pass any other invalid option, it would use the existing deprecation message.

Also, could we add validation that bindings only contains key and arguments keys?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this and added validation on each individual binding. Thanks!

}

if (\is_array($queue['bindings'] ?? false)) {
foreach ($queue['bindings'] as $title => $individualBinding) {
if (0 < \count($invalidBindingsOptions = array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) {
throw new \InvalidArgumentException(sprintf('Invalid bindings option(s) "%s" passed to the AMQP Messenger transport in "%s". Each "bindings" option only accepts "key" and "arguments"', implode('", "', $invalidBindingsOptions), $title));
}
}
}
}
}
Expand Down Expand Up @@ -461,6 +483,12 @@ private function setupExchangeAndQueues(): void

foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['bindings'] ?? [] as $binding) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the example in the PR description, you show using an array key under bindings - like manual-messages, but that is not used here. What is the purpose of this YAML key? You also show an example without one. Which is the correct usage?

Copy link
Author

@inferont inferont Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure there is or should be a correct usage. The original intent was that these bindings could have a developer/human friendly name. I have found that using these names for the bindings is useful when we have more than a dozen bindings. In my projects, all of these bindings would ideally have a name. I do not think I would want to force another developer to do this generally though. I'm okay with any suggestions here though.

$this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []);
}
if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't needed? Because if $queueConfig['binding_keys']) is empty, then the next foreach wouldn't do anything anyways.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The '?? [null]' in the next line will ensure that bind is called at least once with a null $bindingKey if 'binding_keys' is empty. This would have the effect of binding all messages from the exchange to the queue.

continue;
}
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
}
Expand Down