Skip to content

Commit c721410

Browse files
committed
[Messenger] Add AMQP exchange to exchange bindings
Add possibility to configure exchange to exchange bindings in AMQP transport
1 parent f4361fb commit c721410

File tree

3 files changed

+42
-1
lines changed

3 files changed

+42
-1
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ public function testItSetupsTheTTLConnection()
356356
$connection->publish('body');
357357
}
358358

359-
public function testBindingArguments()
359+
public function testQueueBindingArguments()
360360
{
361361
$amqpConnection = $this->createMock(\AMQPConnection::class);
362362
$amqpChannel = $this->createMock(\AMQPChannel::class);
@@ -383,6 +383,36 @@ public function testBindingArguments()
383383
$connection->publish('body');
384384
}
385385

386+
public function testExchangeBindingArguments()
387+
{
388+
$factory = new TestAmqpFactory(
389+
$this->createMock(\AMQPConnection::class),
390+
$this->createMock(\AMQPChannel::class),
391+
$this->createMock(\AMQPQueue::class),
392+
$amqpExchange = $this->createMock(\AMQPExchange::class)
393+
);
394+
395+
$amqpExchange->expects($this->once())->method('declareExchange');
396+
$amqpExchange->expects($this->exactly(4))->method('bind')->withConsecutive(
397+
['exchange0', 'binding_key0', ['x-match' => 'all']],
398+
['exchange0', 'binding_key1', ['x-match' => 'all']],
399+
['exchange1', 'binding_key2', ['x-match' => 'any']],
400+
['exchange1', 'binding_key3', ['x-match' => 'any']],
401+
);
402+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
403+
404+
$dsn = 'amqp://localhost?exchange[type]=headers'.
405+
'&exchange[bindings][exchange0][binding_arguments][x-match]=all'.
406+
'&exchange[bindings][exchange0][binding_keys][0]=binding_key0'.
407+
'&exchange[bindings][exchange0][binding_keys][1]=binding_key1'.
408+
'&exchange[bindings][exchange1][binding_arguments][x-match]=any'.
409+
'&exchange[bindings][exchange1][binding_keys][0]=binding_key2'.
410+
'&exchange[bindings][exchange1][binding_keys][1]=binding_key3';
411+
412+
$connection = Connection::fromDsn($dsn, [], $factory);
413+
$connection->publish('body');
414+
}
415+
386416
public function testItCanDisableTheSetup()
387417
{
388418
$factory = new TestAmqpFactory(

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class Connection
7373
'default_publish_routing_key',
7474
'flags',
7575
'arguments',
76+
'bindings',
7677
];
7778

7879
private array $connectionOptions;
@@ -134,6 +135,9 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
134135
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
135136
* * flags: Exchange flags (Default: AMQP_DURABLE)
136137
* * arguments: Extra arguments
138+
* * bindings[name]: An array of the source exchanges to bind this exchange to, keyed by the name. Binding direction: source exchange -> this exchange
139+
* * binding_keys: The binding/routing keys (if any) to be used for the binding
140+
* * binding_arguments: Additional binding arguments
137141
* * delay:
138142
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
139143
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
@@ -448,6 +452,12 @@ private function setupExchangeAndQueues(): void
448452
{
449453
$this->exchange()->declareExchange();
450454

455+
foreach ($this->exchangeOptions['bindings'] ?? [] as $exchangeName => $exchangeConfig) {
456+
foreach ($exchangeConfig['binding_keys'] ?? [] as $bindingKey) {
457+
$this->exchange()->bind($exchangeName, $bindingKey, $exchangeConfig['binding_arguments'] ?? []);
458+
}
459+
}
460+
451461
foreach ($this->queuesOptions as $queueName => $queueConfig) {
452462
$this->queue($queueName)->declareQueue();
453463
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* Add `SerializedMessageStamp` to avoid serializing a message when a retry occurs
88
* Automatically resolve handled message type when method different from `__invoke` is used as handler
99
* Allow `#[AsMessageHandler]` attribute on methods
10+
* Add possibility to configure exchange to exchange bindings in AMQP transport
1011

1112
6.0
1213
---

0 commit comments

Comments
 (0)