diff --git a/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md b/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md
index 7c503eb35d52b..d738ac9cae80b 100644
--- a/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md
+++ b/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md
@@ -15,6 +15,7 @@ CHANGELOG
* Tag all workflows services with `workflow`, those with type=workflow are
tagged with `workflow.workflow`, and those with type=state_machine with
`workflow.state_machine`
+ * Add `rate_limiter` configuration option to `messenger.transport` to allow rate limited transports using the RateLimiter component
6.1
---
diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
index 4e6dec875faf1..d67ef55cacb16 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
@@ -1492,6 +1492,10 @@ function ($a) {
->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end()
->end()
->end()
+ ->scalarNode('rate_limiter')
+ ->defaultNull()
+ ->info('Rate limiter name to use when processing messages')
+ ->end()
->end()
->end()
->end()
diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index c5855b04cd992..8e522b5ec3d6c 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -2102,6 +2102,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$senderAliases = [];
$transportRetryReferences = [];
+ $transportRateLimiterReferences = [];
foreach ($config['transports'] as $name => $transport) {
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2130,6 +2131,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$transportRetryReferences[$name] = new Reference($retryServiceId);
}
+
+ if ($transport['rate_limiter']) {
+ if (!interface_exists(LimiterInterface::class)) {
+ throw new LogicException('Rate limiter cannot be used within Messenger as the RateLimiter component is not installed. Try running "composer require symfony/rate-limiter".');
+ }
+
+ $transportRateLimiterReferences[$name] = new Reference('limiter.'.$transport['rate_limiter']);
+ }
}
$senderReferences = [];
@@ -2184,6 +2193,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);
+ if (!$transportRateLimiterReferences) {
+ $container->removeDefinition('messenger.rate_limiter_locator');
+ } else {
+ $container->getDefinition('messenger.rate_limiter_locator')
+ ->replaceArgument(0, $transportRateLimiterReferences);
+ }
+
if (\count($failureTransports) > 0) {
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport']);
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php
index 9ca75d099e5f9..6407f3e244400 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php
@@ -160,6 +160,7 @@
[], // Receiver names
service('messenger.listener.reset_services')->nullOnInvalid(),
[], // Bus names
+ service('messenger.rate_limiter_locator')->nullOnInvalid(),
])
->tag('console.command')
->tag('monolog.logger', ['channel' => 'messenger'])
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
index 813d503000de4..0a67d48474d03 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
@@ -159,6 +159,11 @@
abstract_arg('max delay ms'),
])
+ // rate limiter
+ ->set('messenger.rate_limiter_locator', ServiceLocator::class)
+ ->args([[]])
+ ->tag('container.service_locator')
+
// worker event listener
->set('messenger.retry.send_failed_message_for_retry_listener', SendFailedMessageForRetryListener::class)
->args([
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd
index cd53d88db00ac..f1fe50d89ad47 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd
@@ -554,6 +554,7 @@
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
index 8236fced45021..19f22f2c78c99 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
@@ -20,6 +20,7 @@
'multiplier' => 3,
'max_delay' => 100,
],
+ 'rate_limiter' => 'customised_worker'
],
'failed' => 'in-memory:///',
'redis' => 'redis://127.0.0.1:6379/messages',
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
index d78c802810ae6..28e27e380bfe0 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
@@ -10,7 +10,7 @@
-
+
Queue
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
index b16f9b6a8f09d..24471939c5435 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
@@ -18,6 +18,7 @@ framework:
delay: 7
multiplier: 3
max_delay: 100
+ rate_limiter: customised_worker
failed: 'in-memory:///'
redis: 'redis://127.0.0.1:6379/messages'
beanstalkd: 'beanstalkd://127.0.0.1:11300'
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index 320483fb0dafc..472dd15355f49 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -949,6 +949,12 @@ public function testMessengerTransports()
return array_shift($values);
}, $failureTransports);
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
+
+ $rateLimitedTransports = $container->getDefinition('messenger.rate_limiter_locator')->getArgument(0);
+ $expectedRateLimitersByRateLimitedTransports = [
+ 'customised' => new Reference('limiter.customised_worker'),
+ ];
+ $this->assertEquals($expectedRateLimitersByRateLimitedTransports, $rateLimitedTransports);
}
public function testMessengerRouting()
diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 3824fcdcf75ed..8d6c34c8ddb11 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -6,6 +6,7 @@ CHANGELOG
* Add new `messenger:stats` command that return a list of transports with their "to be processed" message count
* Add `TransportNamesStamp` to change the transport while dispatching a message
+ * Add support for rate limited transports by using the RateLimiter component.
6.1
---
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index a97fcc1874c97..c5f440957d676 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -47,8 +47,9 @@ class ConsumeMessagesCommand extends Command
private array $receiverNames;
private ?ResetServicesListener $resetServicesListener;
private array $busIds;
+ private ?ContainerInterface $rateLimiterLocator;
- public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [])
+ public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null)
{
$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
@@ -57,6 +58,7 @@ public function __construct(RoutableMessageBus $routableBus, ContainerInterface
$this->receiverNames = $receiverNames;
$this->resetServicesListener = $resetServicesListener;
$this->busIds = $busIds;
+ $this->rateLimiterLocator = $rateLimiterLocator;
parent::__construct();
}
@@ -156,6 +158,7 @@ protected function interact(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
$receivers = [];
+ $rateLimiters = [];
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
@@ -167,6 +170,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
+ if ($this->rateLimiterLocator?->has($receiverName)) {
+ $rateLimiters[$receiverName] = $this->rateLimiterLocator->get($receiverName);
+ }
}
if (null !== $this->resetServicesListener && !$input->getOption('no-reset')) {
@@ -213,7 +219,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
- $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
+ $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
];
diff --git a/src/Symfony/Component/Messenger/Event/WorkerRateLimitedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerRateLimitedEvent.php
new file mode 100755
index 0000000000000..528127dadfd6e
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Event/WorkerRateLimitedEvent.php
@@ -0,0 +1,37 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Event;
+
+use Symfony\Component\RateLimiter\LimiterInterface;
+
+/**
+ * Dispatched after the worker has been blocked due to a configured rate limiter.
+ * Can be used to reset the rate limiter.
+ *
+ * @author Bob van de Vijver
+ */
+final class WorkerRateLimitedEvent
+{
+ public function __construct(private LimiterInterface $limiter, private string $transportName)
+ {
+ }
+
+ public function getLimiter(): LimiterInterface
+ {
+ return $this->limiter;
+ }
+
+ public function getTransportName(): string
+ {
+ return $this->transportName;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
index 747c6ed855d79..85419c78e45fa 100644
--- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php
+++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
@@ -20,6 +20,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
+use Symfony\Component\Messenger\Event\WorkerRateLimitedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
@@ -42,6 +43,8 @@
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
+use Symfony\Component\RateLimiter\RateLimiterFactory;
+use Symfony\Component\RateLimiter\Storage\InMemoryStorage;
use Symfony\Contracts\Service\ResetInterface;
/**
@@ -425,6 +428,41 @@ public function testWorkerMessageReceivedEventMutability()
$this->assertCount(1, $envelope->all(\get_class($stamp)));
}
+ public function testWorkerRateLimitMessages()
+ {
+ $envelope = [
+ new Envelope(new DummyMessage('message1')),
+ new Envelope(new DummyMessage('message2')),
+ ];
+ $receiver = new DummyReceiver([$envelope]);
+
+ $bus = $this->createMock(MessageBusInterface::class);
+ $bus->method('dispatch')->willReturnArgument(0);
+
+ $eventDispatcher = new EventDispatcher();
+ $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2));
+
+ $rateLimitCount = 0;
+ $listener = function (WorkerRateLimitedEvent $event) use (&$rateLimitCount) {
+ ++$rateLimitCount;
+ $event->getLimiter()->reset(); // Reset limiter to continue test
+ };
+ $eventDispatcher->addListener(WorkerRateLimitedEvent::class, $listener);
+
+ $rateLimitFactory = new RateLimiterFactory([
+ 'id' => 'bus',
+ 'policy' => 'fixed_window',
+ 'limit' => 1,
+ 'interval' => '1 minute',
+ ], new InMemoryStorage());
+
+ $worker = new Worker(['bus' => $receiver], $bus, $eventDispatcher, null, ['bus' => $rateLimitFactory]);
+ $worker->run();
+
+ $this->assertCount(2, $receiver->getAcknowledgedEnvelopes());
+ $this->assertEquals(1, $rateLimitCount);
+ }
+
public function testWorkerShouldLogOnStop()
{
$bus = $this->createMock(MessageBusInterface::class);
diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php
index 334e878fb1645..fee30a460f831 100644
--- a/src/Symfony/Component/Messenger/Worker.php
+++ b/src/Symfony/Component/Messenger/Worker.php
@@ -16,6 +16,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
+use Symfony\Component\Messenger\Event\WorkerRateLimitedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
@@ -29,6 +30,7 @@
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+use Symfony\Component\RateLimiter\LimiterInterface;
/**
* @author Samuel Roze
@@ -46,11 +48,12 @@ class Worker
private WorkerMetadata $metadata;
private array $acks = [];
private \SplObjectStorage $unacks;
+ private ?array $rateLimiters;
/**
* @param ReceiverInterface[] $receivers Where the key is the transport name
*/
- public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
+ public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null, array $rateLimiters = null)
{
$this->receivers = $receivers;
$this->bus = $bus;
@@ -60,6 +63,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
'transportNames' => array_keys($receivers),
]);
$this->unacks = new \SplObjectStorage();
+ $this->rateLimiters = $rateLimiters;
}
/**
@@ -102,6 +106,7 @@ public function run(array $options = []): void
foreach ($envelopes as $envelope) {
$envelopeHandled = true;
+ $this->rateLimit($transportName);
$this->handleMessage($envelope, $transportName);
$this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false));
@@ -217,6 +222,28 @@ private function ack(): bool
return (bool) $acks;
}
+ private function rateLimit(string $transportName): void
+ {
+ if (!$this->rateLimiters) {
+ return;
+ }
+
+ if (!\array_key_exists($transportName, $this->rateLimiters)) {
+ return;
+ }
+
+ /** @var LimiterInterface $rateLimiter */
+ $rateLimiter = $this->rateLimiters[$transportName]->create();
+ if ($rateLimiter->consume()->isAccepted()) {
+ return;
+ }
+
+ $this->logger?->info('Transport {transport} is being rate limited, waiting for token to become available...', ['transport' => $transportName]);
+
+ $this->eventDispatcher?->dispatch(new WorkerRateLimitedEvent($rateLimiter, $transportName));
+ $rateLimiter->reserve()->wait();
+ }
+
private function flush(bool $force): bool
{
$unacks = $this->unacks;
diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json
index b9fdce89b97e7..63610781b4e55 100644
--- a/src/Symfony/Component/Messenger/composer.json
+++ b/src/Symfony/Component/Messenger/composer.json
@@ -27,6 +27,7 @@
"symfony/http-kernel": "^5.4|^6.0",
"symfony/process": "^5.4|^6.0",
"symfony/property-access": "^5.4|^6.0",
+ "symfony/rate-limiter": "^5.4|^6.0",
"symfony/routing": "^5.4|^6.0",
"symfony/serializer": "^5.4|^6.0",
"symfony/service-contracts": "^1.1|^2|^3",