From 29a25854aa38d4a620fd61f96cbc743c6cb74e14 Mon Sep 17 00:00:00 2001 From: Bob van de Vijver Date: Mon, 10 May 2021 17:50:30 +0200 Subject: [PATCH] [Messenger] Add simple transport based rate limiter to Messenger --- .../Bundle/FrameworkBundle/CHANGELOG.md | 1 + .../DependencyInjection/Configuration.php | 4 ++ .../FrameworkExtension.php | 16 ++++++++ .../Resources/config/console.php | 1 + .../Resources/config/messenger.php | 5 +++ .../Resources/config/schema/symfony-1.0.xsd | 1 + .../Fixtures/php/messenger_transports.php | 1 + .../Fixtures/xml/messenger_transports.xml | 2 +- .../Fixtures/yml/messenger_transports.yml | 1 + .../FrameworkExtensionTest.php | 6 +++ src/Symfony/Component/Messenger/CHANGELOG.md | 1 + .../Command/ConsumeMessagesCommand.php | 10 ++++- .../Event/WorkerRateLimitedEvent.php | 37 ++++++++++++++++++ .../Component/Messenger/Tests/WorkerTest.php | 38 +++++++++++++++++++ src/Symfony/Component/Messenger/Worker.php | 29 +++++++++++++- src/Symfony/Component/Messenger/composer.json | 1 + 16 files changed, 150 insertions(+), 4 deletions(-) create mode 100755 src/Symfony/Component/Messenger/Event/WorkerRateLimitedEvent.php diff --git a/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md b/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md index 7c503eb35d52b..d738ac9cae80b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md +++ b/src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md @@ -15,6 +15,7 @@ CHANGELOG * Tag all workflows services with `workflow`, those with type=workflow are tagged with `workflow.workflow`, and those with type=state_machine with `workflow.state_machine` + * Add `rate_limiter` configuration option to `messenger.transport` to allow rate limited transports using the RateLimiter component 6.1 --- diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index 4e6dec875faf1..d67ef55cacb16 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -1492,6 +1492,10 @@ function ($a) { ->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end() ->end() ->end() + ->scalarNode('rate_limiter') + ->defaultNull() + ->info('Rate limiter name to use when processing messages') + ->end() ->end() ->end() ->end() diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index c5855b04cd992..8e522b5ec3d6c 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -2102,6 +2102,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $senderAliases = []; $transportRetryReferences = []; + $transportRateLimiterReferences = []; foreach ($config['transports'] as $name => $transport) { $serializerId = $transport['serializer'] ?? 'messenger.default_serializer'; $transportDefinition = (new Definition(TransportInterface::class)) @@ -2130,6 +2131,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $transportRetryReferences[$name] = new Reference($retryServiceId); } + + if ($transport['rate_limiter']) { + if (!interface_exists(LimiterInterface::class)) { + throw new LogicException('Rate limiter cannot be used within Messenger as the RateLimiter component is not installed. Try running "composer require symfony/rate-limiter".'); + } + + $transportRateLimiterReferences[$name] = new Reference('limiter.'.$transport['rate_limiter']); + } } $senderReferences = []; @@ -2184,6 +2193,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $container->getDefinition('messenger.retry_strategy_locator') ->replaceArgument(0, $transportRetryReferences); + if (!$transportRateLimiterReferences) { + $container->removeDefinition('messenger.rate_limiter_locator'); + } else { + $container->getDefinition('messenger.rate_limiter_locator') + ->replaceArgument(0, $transportRateLimiterReferences); + } + if (\count($failureTransports) > 0) { $container->getDefinition('console.command.messenger_failed_messages_retry') ->replaceArgument(0, $config['failure_transport']); diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php index 9ca75d099e5f9..6407f3e244400 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php @@ -160,6 +160,7 @@ [], // Receiver names service('messenger.listener.reset_services')->nullOnInvalid(), [], // Bus names + service('messenger.rate_limiter_locator')->nullOnInvalid(), ]) ->tag('console.command') ->tag('monolog.logger', ['channel' => 'messenger']) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 813d503000de4..0a67d48474d03 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -159,6 +159,11 @@ abstract_arg('max delay ms'), ]) + // rate limiter + ->set('messenger.rate_limiter_locator', ServiceLocator::class) + ->args([[]]) + ->tag('container.service_locator') + // worker event listener ->set('messenger.retry.send_failed_message_for_retry_listener', SendFailedMessageForRetryListener::class) ->args([ diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd index cd53d88db00ac..f1fe50d89ad47 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd @@ -554,6 +554,7 @@ + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php index 8236fced45021..19f22f2c78c99 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php @@ -20,6 +20,7 @@ 'multiplier' => 3, 'max_delay' => 100, ], + 'rate_limiter' => 'customised_worker' ], 'failed' => 'in-memory:///', 'redis' => 'redis://127.0.0.1:6379/messages', diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml index d78c802810ae6..28e27e380bfe0 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml @@ -10,7 +10,7 @@ - + Queue diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml index b16f9b6a8f09d..24471939c5435 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml @@ -18,6 +18,7 @@ framework: delay: 7 multiplier: 3 max_delay: 100 + rate_limiter: customised_worker failed: 'in-memory:///' redis: 'redis://127.0.0.1:6379/messages' beanstalkd: 'beanstalkd://127.0.0.1:11300' diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 320483fb0dafc..472dd15355f49 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -949,6 +949,12 @@ public function testMessengerTransports() return array_shift($values); }, $failureTransports); $this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences); + + $rateLimitedTransports = $container->getDefinition('messenger.rate_limiter_locator')->getArgument(0); + $expectedRateLimitersByRateLimitedTransports = [ + 'customised' => new Reference('limiter.customised_worker'), + ]; + $this->assertEquals($expectedRateLimitersByRateLimitedTransports, $rateLimitedTransports); } public function testMessengerRouting() diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 3824fcdcf75ed..8d6c34c8ddb11 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -6,6 +6,7 @@ CHANGELOG * Add new `messenger:stats` command that return a list of transports with their "to be processed" message count * Add `TransportNamesStamp` to change the transport while dispatching a message + * Add support for rate limited transports by using the RateLimiter component. 6.1 --- diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index a97fcc1874c97..c5f440957d676 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -47,8 +47,9 @@ class ConsumeMessagesCommand extends Command private array $receiverNames; private ?ResetServicesListener $resetServicesListener; private array $busIds; + private ?ContainerInterface $rateLimiterLocator; - public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = []) + public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null) { $this->routableBus = $routableBus; $this->receiverLocator = $receiverLocator; @@ -57,6 +58,7 @@ public function __construct(RoutableMessageBus $routableBus, ContainerInterface $this->receiverNames = $receiverNames; $this->resetServicesListener = $resetServicesListener; $this->busIds = $busIds; + $this->rateLimiterLocator = $rateLimiterLocator; parent::__construct(); } @@ -156,6 +158,7 @@ protected function interact(InputInterface $input, OutputInterface $output) protected function execute(InputInterface $input, OutputInterface $output): int { $receivers = []; + $rateLimiters = []; foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) { if (!$this->receiverLocator->has($receiverName)) { $message = sprintf('The receiver "%s" does not exist.', $receiverName); @@ -167,6 +170,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int } $receivers[$receiverName] = $this->receiverLocator->get($receiverName); + if ($this->rateLimiterLocator?->has($receiverName)) { + $rateLimiters[$receiverName] = $this->rateLimiterLocator->get($receiverName); + } } if (null !== $this->resetServicesListener && !$input->getOption('no-reset')) { @@ -213,7 +219,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; - $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger); + $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters); $options = [ 'sleep' => $input->getOption('sleep') * 1000000, ]; diff --git a/src/Symfony/Component/Messenger/Event/WorkerRateLimitedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerRateLimitedEvent.php new file mode 100755 index 0000000000000..528127dadfd6e --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerRateLimitedEvent.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +use Symfony\Component\RateLimiter\LimiterInterface; + +/** + * Dispatched after the worker has been blocked due to a configured rate limiter. + * Can be used to reset the rate limiter. + * + * @author Bob van de Vijver + */ +final class WorkerRateLimitedEvent +{ + public function __construct(private LimiterInterface $limiter, private string $transportName) + { + } + + public function getLimiter(): LimiterInterface + { + return $this->limiter; + } + + public function getTransportName(): string + { + return $this->transportName; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 747c6ed855d79..85419c78e45fa 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -20,6 +20,7 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Event\WorkerRateLimitedEvent; use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; @@ -42,6 +43,8 @@ use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; +use Symfony\Component\RateLimiter\RateLimiterFactory; +use Symfony\Component\RateLimiter\Storage\InMemoryStorage; use Symfony\Contracts\Service\ResetInterface; /** @@ -425,6 +428,41 @@ public function testWorkerMessageReceivedEventMutability() $this->assertCount(1, $envelope->all(\get_class($stamp))); } + public function testWorkerRateLimitMessages() + { + $envelope = [ + new Envelope(new DummyMessage('message1')), + new Envelope(new DummyMessage('message2')), + ]; + $receiver = new DummyReceiver([$envelope]); + + $bus = $this->createMock(MessageBusInterface::class); + $bus->method('dispatch')->willReturnArgument(0); + + $eventDispatcher = new EventDispatcher(); + $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2)); + + $rateLimitCount = 0; + $listener = function (WorkerRateLimitedEvent $event) use (&$rateLimitCount) { + ++$rateLimitCount; + $event->getLimiter()->reset(); // Reset limiter to continue test + }; + $eventDispatcher->addListener(WorkerRateLimitedEvent::class, $listener); + + $rateLimitFactory = new RateLimiterFactory([ + 'id' => 'bus', + 'policy' => 'fixed_window', + 'limit' => 1, + 'interval' => '1 minute', + ], new InMemoryStorage()); + + $worker = new Worker(['bus' => $receiver], $bus, $eventDispatcher, null, ['bus' => $rateLimitFactory]); + $worker->run(); + + $this->assertCount(2, $receiver->getAcknowledgedEnvelopes()); + $this->assertEquals(1, $rateLimitCount); + } + public function testWorkerShouldLogOnStop() { $bus = $this->createMock(MessageBusInterface::class); diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 334e878fb1645..fee30a460f831 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Event\WorkerRateLimitedEvent; use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; @@ -29,6 +30,7 @@ use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\RateLimiter\LimiterInterface; /** * @author Samuel Roze @@ -46,11 +48,12 @@ class Worker private WorkerMetadata $metadata; private array $acks = []; private \SplObjectStorage $unacks; + private ?array $rateLimiters; /** * @param ReceiverInterface[] $receivers Where the key is the transport name */ - public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) + public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null, array $rateLimiters = null) { $this->receivers = $receivers; $this->bus = $bus; @@ -60,6 +63,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis 'transportNames' => array_keys($receivers), ]); $this->unacks = new \SplObjectStorage(); + $this->rateLimiters = $rateLimiters; } /** @@ -102,6 +106,7 @@ public function run(array $options = []): void foreach ($envelopes as $envelope) { $envelopeHandled = true; + $this->rateLimit($transportName); $this->handleMessage($envelope, $transportName); $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); @@ -217,6 +222,28 @@ private function ack(): bool return (bool) $acks; } + private function rateLimit(string $transportName): void + { + if (!$this->rateLimiters) { + return; + } + + if (!\array_key_exists($transportName, $this->rateLimiters)) { + return; + } + + /** @var LimiterInterface $rateLimiter */ + $rateLimiter = $this->rateLimiters[$transportName]->create(); + if ($rateLimiter->consume()->isAccepted()) { + return; + } + + $this->logger?->info('Transport {transport} is being rate limited, waiting for token to become available...', ['transport' => $transportName]); + + $this->eventDispatcher?->dispatch(new WorkerRateLimitedEvent($rateLimiter, $transportName)); + $rateLimiter->reserve()->wait(); + } + private function flush(bool $force): bool { $unacks = $this->unacks; diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index b9fdce89b97e7..63610781b4e55 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -27,6 +27,7 @@ "symfony/http-kernel": "^5.4|^6.0", "symfony/process": "^5.4|^6.0", "symfony/property-access": "^5.4|^6.0", + "symfony/rate-limiter": "^5.4|^6.0", "symfony/routing": "^5.4|^6.0", "symfony/serializer": "^5.4|^6.0", "symfony/service-contracts": "^1.1|^2|^3",