Skip to content

Commit fcecc48

Browse files
author
Peter van der Wal
committed
[Messenger] Prioritize receivers via transport configuration
1 parent e63495e commit fcecc48

File tree

11 files changed

+46
-4
lines changed

11 files changed

+46
-4
lines changed

src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Make the `config/` directory optional in `MicroKernelTrait`, add support for service arguments in the
1010
invokable Kernel class, and register `FrameworkBundle` by default when the `bundles.php` file is missing
1111
* Add `exit` option for `secrets:decrypt-to-local` command
12+
* Add `priority` option to the `messenger.transports` configurations
1213

1314
7.1
1415
---

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,6 +1612,7 @@ function ($a) {
16121612
->defaultNull()
16131613
->info('Rate limiter name to use when processing messages')
16141614
->end()
1615+
->integerNode('priority')->defaultValue(0)->info('Priority of this transport when the consumer is excecuted with the --all flag')->end()
16151616
->end()
16161617
->end()
16171618
->end()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2209,6 +2209,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
22092209
->addTag('messenger.receiver', [
22102210
'alias' => $name,
22112211
'is_failure_transport' => \in_array($name, $failureTransports, true),
2212+
'priority' => $transport['priority'],
22122213
])
22132214
;
22142215
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@
606606
<xsd:attribute name="dsn" type="xsd:string" />
607607
<xsd:attribute name="failure-transport" type="xsd:string" />
608608
<xsd:attribute name="rate-limiter" type="xsd:string" />
609+
<xsd:attribute name="priority" type="xsd:integer" />
609610
</xsd:complexType>
610611

611612
<xsd:complexType name="messenger_retry_strategy">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
],
2626
'rate_limiter' => 'customised_worker'
2727
],
28+
'prioritized' => [
29+
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=priority',
30+
'priority' => 10,
31+
],
2832
'failed' => 'in-memory:///',
2933
'redis' => 'redis://127.0.0.1:6379/messages',
3034
'beanstalkd' => 'beanstalkd://127.0.0.1:11300',

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
</framework:options>
2121
<framework:retry-strategy max-retries="10" delay="7" multiplier="3" max-delay="100"/>
2222
</framework:transport>
23+
<framework:transport name="prioritized" dsn="amqp://localhost/%2f/messages?exchange_name=priority" priority="10" />
2324
<framework:transport name="failed" dsn="in-memory:///" />
2425
<framework:transport name="redis" dsn="redis://127.0.0.1:6379/messages" />
2526
<framework:transport name="beanstalkd" dsn="beanstalkd://127.0.0.1:11300" />

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ framework:
2323
multiplier: 3
2424
max_delay: 100
2525
rate_limiter: customised_worker
26+
prioritized:
27+
dsn: amqp://localhost/%2f/messages?exchange_name=priority
28+
priority: 10
2629
failed: 'in-memory:///'
2730
redis: 'redis://127.0.0.1:6379/messages'
2831
beanstalkd: 'beanstalkd://127.0.0.1:11300'

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,7 @@ public function testMessengerMultipleFailureTransports()
873873
$this->assertEquals([
874874
'alias' => 'failure_transport_1',
875875
'is_failure_transport' => true,
876+
'priority' => 0,
876877
], $failureTransport1Tags);
877878

878879
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
@@ -881,6 +882,7 @@ public function testMessengerMultipleFailureTransports()
881882
$this->assertEquals([
882883
'alias' => 'failure_transport_3',
883884
'is_failure_transport' => true,
885+
'priority' => 0,
884886
], $failureTransport3Tags);
885887

886888
// transport 2 exists but does not appear in the mapping
@@ -913,6 +915,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport
913915
$this->assertEquals([
914916
'alias' => 'failure_transport_1',
915917
'is_failure_transport' => true,
918+
'priority' => 0,
916919
], $failureTransport1Tags);
917920

918921
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
@@ -921,6 +924,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport
921924
$this->assertEquals([
922925
'alias' => 'failure_transport_3',
923926
'is_failure_transport' => true,
927+
'priority' => 0,
924928
], $failureTransport3Tags);
925929

926930
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
@@ -947,15 +951,26 @@ public function testMessengerTransports()
947951
$container = $this->createContainerFromFile('messenger_transports');
948952
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
949953
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
950-
$this->assertEquals([
951-
['alias' => 'default', 'is_failure_transport' => false], ], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
954+
$this->assertEquals([[
955+
'alias' => 'default',
956+
'is_failure_transport' => false,
957+
'priority' => 0,
958+
]], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
952959
$transportArguments = $container->getDefinition('messenger.transport.default')->getArguments();
953960
$this->assertEquals(new Reference('messenger.default_serializer'), $transportArguments[2]);
954961

955962
$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
956963
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
957964
$transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments();
958965

966+
$this->assertTrue($container->hasDefinition('messenger.transport.prioritized'));
967+
$this->assertTrue($container->getDefinition('messenger.transport.prioritized')->hasTag('messenger.receiver'));
968+
$this->assertEquals([[
969+
'alias' => 'prioritized',
970+
'is_failure_transport' => false,
971+
'priority' => 10,
972+
]], $container->getDefinition('messenger.transport.prioritized')->getTag('messenger.receiver'));
973+
959974
$this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory);
960975
$this->assertCount(3, $transportArguments);
961976
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]);
@@ -1000,6 +1015,7 @@ public function testMessengerTransports()
10001015
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
10011016
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
10021017
$expectedTransportsByFailureTransports = [
1018+
'prioritized' => new Reference('messenger.transport.failed'),
10031019
'beanstalkd' => new Reference('messenger.transport.failed'),
10041020
'customised' => new Reference('messenger.transport.failed'),
10051021
'default' => new Reference('messenger.transport.failed'),

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
88
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
99
* Add `--format` option to the `messenger:stats` command
10+
* Allow prioritizing receivers so that `messenger:consume --all` consumes receivers in a predefined order
1011

1112
7.1
1213
---

src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,8 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
242242
{
243243
$receiverMapping = [];
244244
$failureTransportsMap = [];
245+
$receiverPriority = [];
246+
245247
if ($container->hasDefinition('console.command.messenger_failed_messages_retry')) {
246248
$commandDefinition = $container->getDefinition('console.command.messenger_failed_messages_retry');
247249
$globalReceiverName = $commandDefinition->getArgument(0);
@@ -263,15 +265,23 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
263265
$receiverMapping[$id] = new Reference($id);
264266

265267
foreach ($tags as $tag) {
268+
$receiverPriority[$id] = max($tag['priority'] ?? 0, $receiverPriority[$id] ?? PHP_INT_MIN);
269+
266270
if (isset($tag['alias'])) {
267271
$receiverMapping[$tag['alias']] = $receiverMapping[$id];
272+
$receiverPriority[$tag['alias']] = max($tag['priority'] ?? 0, $receiverPriority[$tag['alias']] ?? PHP_INT_MIN);
273+
268274
if ($tag['is_failure_transport'] ?? false) {
269275
$failureTransportsMap[$tag['alias']] = $receiverMapping[$id];
270276
}
271277
}
272278
}
273279
}
274280

281+
$prioritySorter = fn (string $a, string $b): int => $receiverPriority[$b] <=> $receiverPriority[$a];
282+
uksort($receiverMapping, $prioritySorter);
283+
uksort($failureTransportsMap, $prioritySorter);
284+
275285
$receiverNames = [];
276286
foreach ($receiverMapping as $name => $reference) {
277287
$receiverNames[(string) $reference] = $name;

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use Symfony\Component\DependencyInjection\ServiceLocator;
2525
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
2626
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
27+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceiver;
2728
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
2829
use Symfony\Component\Messenger\Command\DebugCommand;
2930
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
@@ -448,11 +449,12 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
448449
]);
449450

450451
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
452+
$container->register(DoctrineReceiver::class, DoctrineReceiver::class)->addTag('messenger.receiver', ['alias' => 'doctrine', 'priority' => 1]);
451453
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy']);
452454

453455
(new MessengerPass())->process($container);
454456

455-
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
457+
$this->assertSame(['doctrine', 'amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
456458
}
457459

458460
public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()
@@ -464,11 +466,12 @@ public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()
464466
]);
465467

466468
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
469+
$container->register(DoctrineReceiver::class, DoctrineReceiver::class)->addTag('messenger.receiver', ['alias' => 'doctrine', 'priority' => 1]);
467470
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy']);
468471

469472
(new MessengerPass())->process($container);
470473

471-
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_setup_transports')->getArgument(1));
474+
$this->assertSame(['doctrine', 'amqp', 'dummy'], $container->getDefinition('console.command.messenger_setup_transports')->getArgument(1));
472475
}
473476

474477
public function testItRegistersHandlersOnDifferentBuses()

0 commit comments

Comments
 (0)