Skip to content

Commit 987baef

Browse files
committed
[Messenger] Move container resetting after receiver acknowledging (in command)
1 parent 8bea384 commit 987baef

File tree

13 files changed

+141
-65
lines changed

13 files changed

+141
-65
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

-4
Original file line numberDiff line numberDiff line change
@@ -1333,10 +1333,6 @@ function ($a) {
13331333
->fixXmlConfig('option')
13341334
->children()
13351335
->scalarNode('dsn')->end()
1336-
->booleanNode('reset_on_message')
1337-
->defaultFalse()
1338-
->info('Reset container services after each message. Turn it on when the transport is async and run in a worker.')
1339-
->end()
13401336
->scalarNode('serializer')->defaultNull()->info('Service id of a custom serializer to use.')->end()
13411337
->arrayNode('options')
13421338
->normalizeKeys(false)

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

-13
Original file line numberDiff line numberDiff line change
@@ -1977,7 +1977,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19771977

19781978
$senderAliases = [];
19791979
$transportRetryReferences = [];
1980-
$transportNamesForResetServices = [];
19811980
foreach ($config['transports'] as $name => $transport) {
19821981
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
19831982
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2006,18 +2005,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20062005

20072006
$transportRetryReferences[$name] = new Reference($retryServiceId);
20082007
}
2009-
if ($transport['reset_on_message']) {
2010-
$transportNamesForResetServices[] = $name;
2011-
}
2012-
}
2013-
2014-
if ($transportNamesForResetServices) {
2015-
$container
2016-
->getDefinition('messenger.listener.reset_services')
2017-
->replaceArgument(1, $transportNamesForResetServices)
2018-
;
2019-
} else {
2020-
$container->removeDefinition('messenger.listener.reset_services');
20212008
}
20222009

20232010
$senderReferences = [];

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php

+1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
service('event_dispatcher'),
143143
service('logger')->nullOnInvalid(),
144144
[], // Receiver names
145+
service('services_resetter')
145146
])
146147
->tag('console.command')
147148
->tag('monolog.logger', ['channel' => 'messenger'])

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

-8
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21-
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2221
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2322
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
2423
use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener;
@@ -198,13 +197,6 @@
198197
->set('messenger.listener.stop_worker_on_stop_exception_listener', StopWorkerOnCustomStopExceptionListener::class)
199198
->tag('kernel.event_subscriber')
200199

201-
->set('messenger.listener.reset_services', ResetServicesListener::class)
202-
->args([
203-
service('services_resetter'),
204-
abstract_arg('receivers names'),
205-
])
206-
->tag('kernel.event_subscriber')
207-
208200
->set('messenger.routable_message_bus', RoutableMessageBus::class)
209201
->args([
210202
abstract_arg('message bus locator'),

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

-1
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,6 @@
505505
<xsd:attribute name="serializer" type="xsd:string" />
506506
<xsd:attribute name="dsn" type="xsd:string" />
507507
<xsd:attribute name="failure-transport" type="xsd:string" />
508-
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
509508
</xsd:complexType>
510509

511510
<xsd:complexType name="messenger_retry_strategy">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
'default' => 'amqp://localhost/%2f/messages',
1212
'customised' => [
1313
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
14-
'reset_on_message' => true,
1514
'options' => ['queue' => ['name' => 'Queue']],
1615
'serializer' => 'messenger.transport.native_php_serializer',
1716
'retry_strategy' => [

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<framework:messenger failure-transport="failed">
1111
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
1212
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
13-
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" reset-on-message="true">
13+
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
1414
<framework:options>
1515
<framework:queue>
1616
<framework:name>Queue</framework:name>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ framework:
88
default: 'amqp://localhost/%2f/messages'
99
customised:
1010
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
11-
reset_on_message: true
1211
options:
1312
queue:
1413
name: Queue

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

-4
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,6 @@ public function testMessenger()
722722
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
723723
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
724724
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
725-
$this->assertFalse($container->hasDefinition('messenger.listener.reset_services'));
726725
}
727726

728727
public function testMessengerMultipleFailureTransports()
@@ -867,9 +866,6 @@ public function testMessengerTransports()
867866
return array_shift($values);
868867
}, $failureTransports);
869868
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
870-
871-
$this->assertTrue($container->hasDefinition('messenger.listener.reset_services'));
872-
$this->assertSame(['customised'], $container->getDefinition('messenger.listener.reset_services')->getArgument(1));
873869
}
874870

875871
public function testMessengerRouting()

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

+26-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
use Symfony\Component\Console\Question\ChoiceQuestion;
2424
use Symfony\Component\Console\Style\SymfonyStyle;
2525
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
27+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2628
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
2729
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
2830
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
@@ -43,14 +45,22 @@ class ConsumeMessagesCommand extends Command
4345
private $logger;
4446
private $receiverNames;
4547
private $eventDispatcher;
46-
47-
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
48-
{
48+
private $servicesResetter;
49+
50+
public function __construct(
51+
RoutableMessageBus $routableBus,
52+
ContainerInterface $receiverLocator,
53+
EventDispatcherInterface $eventDispatcher,
54+
LoggerInterface $logger = null,
55+
array $receiverNames = [],
56+
$servicesResetter = null
57+
) {
4958
$this->routableBus = $routableBus;
5059
$this->receiverLocator = $receiverLocator;
5160
$this->logger = $logger;
5261
$this->receiverNames = $receiverNames;
5362
$this->eventDispatcher = $eventDispatcher;
63+
$this->servicesResetter = $servicesResetter;
5464

5565
parent::__construct();
5666
}
@@ -72,6 +82,7 @@ protected function configure(): void
7282
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7383
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
7484
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
85+
new InputOption('reset-services-on-message', null, InputOption::VALUE_NEGATABLE, 'Reset (or do not --no-reset-services-on-message) container services after each message', false),
7586
])
7687
->setDescription(self::$defaultDescription)
7788
->setHelp(<<<'EOF'
@@ -159,6 +170,18 @@ protected function execute(InputInterface $input, OutputInterface $output)
159170
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
160171
}
161172

173+
if (!$input->hasParameterOption(['--reset-services-on-message', '--no-reset-services-on-message'], true)) {
174+
trigger_deprecation('symfony/messenger', '5.4', 'Not setting either "--reset-services-on-message" nor "--no-reset-services-on-message" option explicitly is deprecated, its default value will change to true in 6.0.');
175+
}
176+
177+
if ($input->getOption('reset-services-on-message')) {
178+
if (null === $this->servicesResetter) {
179+
throw new RuntimeException(\sprintf('Please set a $servicesResetter with %s instance to use resetting services after each message.', ServicesResetter::class));
180+
}
181+
182+
$this->eventDispatcher->addSubscriber(new ResetServicesListener($this->servicesResetter));
183+
}
184+
162185
$stopsWhen = [];
163186
if ($limit = $input->getOption('limit')) {
164187
$stopsWhen[] = "processed {$limit} messages";

src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php

+5-14
Original file line numberDiff line numberDiff line change
@@ -13,38 +13,29 @@
1313

1414
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1515
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16-
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
17-
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18-
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
16+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
1917

2018
/**
2119
* @author Grégoire Pineau <lyrixx@lyrixx.info>
2220
*/
2321
class ResetServicesListener implements EventSubscriberInterface
2422
{
2523
private $servicesResetter;
26-
private $receiversName;
2724

28-
public function __construct(ServicesResetter $servicesResetter, array $receiversName)
25+
public function __construct(ServicesResetter $servicesResetter)
2926
{
3027
$this->servicesResetter = $servicesResetter;
31-
$this->receiversName = $receiversName;
3228
}
3329

34-
public function resetServices(AbstractWorkerMessageEvent $event)
30+
public function resetServices(): void
3531
{
36-
if (!\in_array($event->getReceiverName(), $this->receiversName, true)) {
37-
return;
38-
}
39-
4032
$this->servicesResetter->reset();
4133
}
4234

43-
public static function getSubscribedEvents()
35+
public static function getSubscribedEvents(): array
4436
{
4537
return [
46-
WorkerMessageHandledEvent::class => ['resetServices'],
47-
WorkerMessageFailedEvent::class => ['resetServices'],
38+
WorkerRunningEvent::class => ['resetServices'],
4839
];
4940
}
5041
}

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

+104
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\DependencyInjection\ServiceLocator;
1919
use Symfony\Component\EventDispatcher\EventDispatcher;
2020
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
21+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
2122
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
2223
use Symfony\Component\Messenger\Envelope;
2324
use Symfony\Component\Messenger\MessageBusInterface;
@@ -61,6 +62,7 @@ public function testBasicRun()
6162
$tester->execute([
6263
'receivers' => ['dummy-receiver'],
6364
'--limit' => 1,
65+
'--no-reset-services-on-message' => null,
6466
]);
6567

6668
$tester->assertCommandIsSuccessful();
@@ -94,6 +96,108 @@ public function testRunWithBusOption()
9496
'receivers' => ['dummy-receiver'],
9597
'--bus' => 'dummy-bus',
9698
'--limit' => 1,
99+
'--no-reset-services-on-message' => null,
100+
]);
101+
102+
$tester->assertCommandIsSuccessful();
103+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
104+
}
105+
106+
public function provideRunWithResetServicesOption(): iterable
107+
{
108+
yield [true];
109+
yield [false];
110+
}
111+
112+
/**
113+
* @dataProvider provideRunWithResetServicesOption
114+
*/
115+
public function testRunWithResetServicesOption(bool $shouldReset)
116+
{
117+
$envelope = new Envelope(new \stdClass());
118+
119+
$receiver = $this->createMock(ReceiverInterface::class);
120+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
121+
122+
$receiverLocator = $this->createMock(ContainerInterface::class);
123+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
124+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
125+
126+
$bus = $this->createMock(RoutableMessageBus::class);
127+
$bus->expects($this->once())->method('dispatch');
128+
129+
$servicesResetter = $this->createMock(ServicesResetter::class);
130+
$servicesResetter->expects($shouldReset ? $this->once() : $this->never())->method('reset');
131+
132+
$command = new ConsumeMessagesCommand($bus, $receiverLocator, new EventDispatcher(), null, [], $servicesResetter);
133+
134+
$application = new Application();
135+
$application->add($command);
136+
$tester = new CommandTester($application->get('messenger:consume'));
137+
$tester->execute([
138+
'receivers' => ['dummy-receiver'],
139+
'--limit' => 1,
140+
$shouldReset ? '--reset-services-on-message' : '--no-reset-services-on-message' => null,
141+
]);
142+
143+
$tester->assertCommandIsSuccessful();
144+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
145+
}
146+
147+
public function testErrorOnResetServicesWithoutResetter()
148+
{
149+
$receiver = $this->createMock(ReceiverInterface::class);
150+
$receiver->expects($this->never())->method('get');
151+
152+
$receiverLocator = $this->createConfiguredMock(ContainerInterface::class, [
153+
'has' => true,
154+
'get' => $receiver,
155+
]);
156+
157+
$command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $receiverLocator, new EventDispatcher());
158+
159+
$application = new Application();
160+
$application->add($command);
161+
$tester = new CommandTester($application->get('messenger:consume'));
162+
163+
$this->expectException(\RuntimeException::class);
164+
$this->expectExceptionMessage('Please set a $servicesResetter');
165+
166+
$tester->execute([
167+
'receivers' => ['dummy-receiver'],
168+
'--reset-services-on-message' => null,
169+
]);
170+
}
171+
172+
/**
173+
* @group legacy
174+
*/
175+
public function testBasicRunWithoutResetServicesOption()
176+
{
177+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
178+
179+
$receiver = $this->createMock(ReceiverInterface::class);
180+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
181+
182+
$receiverLocator = $this->createMock(ContainerInterface::class);
183+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
184+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
185+
186+
$bus = $this->createMock(MessageBusInterface::class);
187+
$bus->expects($this->once())->method('dispatch');
188+
189+
$busLocator = $this->createMock(ContainerInterface::class);
190+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
191+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
192+
193+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
194+
195+
$application = new Application();
196+
$application->add($command);
197+
$tester = new CommandTester($application->get('messenger:consume'));
198+
$tester->execute([
199+
'receivers' => ['dummy-receiver'],
200+
'--limit' => 1,
97201
]);
98202

99203
$tester->assertCommandIsSuccessful();

src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php

+4-15
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,16 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16-
use Symfony\Component\Messenger\Envelope;
17-
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
1816
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
1917

2018
class ResetServicesListenerTest extends TestCase
2119
{
22-
public function provideTests(): iterable
23-
{
24-
yield ['foo', true];
25-
yield ['bar', false];
26-
}
27-
28-
/** @dataProvider provideTests */
29-
public function test(string $receiverName, bool $shouldReset)
20+
public function test()
3021
{
3122
$servicesResetter = $this->createMock(ServicesResetter::class);
32-
$servicesResetter->expects($shouldReset ? $this->once() : $this->never())->method('reset');
33-
34-
$event = new class(new Envelope(new \stdClass()), $receiverName) extends AbstractWorkerMessageEvent {};
23+
$servicesResetter->expects($this->once())->method('reset');
3524

36-
$resetListener = new ResetServicesListener($servicesResetter, ['foo']);
37-
$resetListener->resetServices($event);
25+
$resetListener = new ResetServicesListener($servicesResetter);
26+
$resetListener->resetServices();
3827
}
3928
}

0 commit comments

Comments
 (0)