Skip to content

Commit 488bb88

Browse files
committed
[Mesenger] Add support for resetting container services after each messenger message.
Without this patch, services are not resetted. For example Monolog Finger Cross handler is never reset nor flushed. So if the first message trigger and "error" level message, all others message will log and overflow the buffer. Usage with framework: ```yaml framework: messenger: transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' reset_on_message: true failed: 'doctrine://default?queue_name=failed' sync: 'sync://' ```
1 parent 1ee9727 commit 488bb88

File tree

12 files changed

+125
-1
lines changed

12 files changed

+125
-1
lines changed

src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ CHANGELOG
1010
* Deprecate `get()`, `has()`, `getDoctrine()`, and `dispatchMessage()` in `AbstractController`, use method/constructor injection instead
1111
* Add `MicroKernelTrait::getBundlesPath` method to get bundles config path
1212
* Deprecate the `cache.adapter.doctrine` service
13+
* Add support for resetting container services after each messenger message.
1314

1415
5.3
1516
---

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

+4
Original file line numberDiff line numberDiff line change
@@ -1333,6 +1333,10 @@ function ($a) {
13331333
->fixXmlConfig('option')
13341334
->children()
13351335
->scalarNode('dsn')->end()
1336+
->booleanNode('reset_on_message')
1337+
->defaultFalse()
1338+
->info('Reset container services after each message. Turn it on when the transport is async and run in a worker.')
1339+
->end()
13361340
->scalarNode('serializer')->defaultNull()->info('Service id of a custom serializer to use.')->end()
13371341
->arrayNode('options')
13381342
->normalizeKeys(false)

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+13
Original file line numberDiff line numberDiff line change
@@ -2013,6 +2013,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20132013

20142014
$senderAliases = [];
20152015
$transportRetryReferences = [];
2016+
$transportNamesForResetServices = [];
20162017
foreach ($config['transports'] as $name => $transport) {
20172018
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
20182019
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2041,6 +2042,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20412042

20422043
$transportRetryReferences[$name] = new Reference($retryServiceId);
20432044
}
2045+
if ($transport['reset_on_message']) {
2046+
$transportNamesForResetServices[] = $name;
2047+
}
2048+
}
2049+
2050+
if ($transportNamesForResetServices) {
2051+
$container
2052+
->getDefinition('messenger.listener.reset_services')
2053+
->replaceArgument(1, $transportNamesForResetServices)
2054+
;
2055+
} else {
2056+
$container->removeDefinition('messenger.listener.reset_services');
20442057
}
20452058

20462059
$senderReferences = [];

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2122
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2223
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
2324
use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener;
@@ -195,6 +196,12 @@
195196
->tag('kernel.event_subscriber')
196197

197198
->set('messenger.listener.stop_worker_on_stop_exception_listener', StopWorkerOnCustomStopExceptionListener::class)
199+
200+
->set('messenger.listener.reset_services', ResetServicesListener::class)
201+
->args([
202+
service('services_resetter'),
203+
abstract_arg('receivers names'),
204+
])
198205
->tag('kernel.event_subscriber')
199206

200207
->set('messenger.routable_message_bus', RoutableMessageBus::class)

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

+1
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@
505505
<xsd:attribute name="serializer" type="xsd:string" />
506506
<xsd:attribute name="dsn" type="xsd:string" />
507507
<xsd:attribute name="failure-transport" type="xsd:string" />
508+
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
508509
</xsd:complexType>
509510

510511
<xsd:complexType name="messenger_retry_strategy">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
'default' => 'amqp://localhost/%2f/messages',
1212
'customised' => [
1313
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
14+
'reset_on_message' => true,
1415
'options' => ['queue' => ['name' => 'Queue']],
1516
'serializer' => 'messenger.transport.native_php_serializer',
1617
'retry_strategy' => [

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<framework:messenger failure-transport="failed">
1111
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
1212
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
13-
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
13+
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" reset-on-message="true">
1414
<framework:options>
1515
<framework:queue>
1616
<framework:name>Queue</framework:name>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ framework:
88
default: 'amqp://localhost/%2f/messages'
99
customised:
1010
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
11+
reset_on_message: true
1112
options:
1213
queue:
1314
name: Queue

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

+4
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ public function testMessenger()
723723
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
724724
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
725725
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
726+
$this->assertFalse($container->hasDefinition('messenger.listener.reset_services'));
726727
}
727728

728729
public function testMessengerMultipleFailureTransports()
@@ -867,6 +868,9 @@ public function testMessengerTransports()
867868
return array_shift($values);
868869
}, $failureTransports);
869870
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
871+
872+
$this->assertTrue($container->hasDefinition('messenger.listener.reset_services'));
873+
$this->assertSame(['customised'], $container->getDefinition('messenger.listener.reset_services')->getArgument(1));
870874
}
871875

872876
public function testMessengerRouting()

src/Symfony/Component/Messenger/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
8+
* Add support for resetting container services after each messenger message.
89

910
5.3
1011
---
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\EventListener;
13+
14+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
15+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16+
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
17+
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
19+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
20+
21+
/**
22+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
23+
*/
24+
class ResetServicesListener implements EventSubscriberInterface
25+
{
26+
private $servicesResetter;
27+
private $receiversName;
28+
29+
public function __construct(ServicesResetter $servicesResetter, array $receiversName)
30+
{
31+
$this->servicesResetter = $servicesResetter;
32+
$this->receiversName = $receiversName;
33+
}
34+
35+
public function resetServices(AbstractWorkerMessageEvent $event)
36+
{
37+
if (!\in_array($event->getReceiverName(), $this->receiversName, true)) {
38+
return;
39+
}
40+
41+
$this->servicesResetter->reset();
42+
}
43+
44+
public static function getSubscribedEvents()
45+
{
46+
return [
47+
WorkerMessageHandledEvent::class => ['resetServices'],
48+
WorkerMessageFailedEvent::class => ['resetServices'],
49+
WorkerRunningEvent::class => ['resetServices'],
50+
];
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\EventListener;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
18+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
19+
20+
class ResetServicesListenerTest extends TestCase
21+
{
22+
public function provideTests(): iterable
23+
{
24+
yield ['foo', true];
25+
yield ['bar', false];
26+
}
27+
28+
/** @dataProvider provideTests */
29+
public function test(string $receiverName, bool $shouldReset)
30+
{
31+
$servicesResetter = $this->createMock(ServicesResetter::class);
32+
$servicesResetter->expects($shouldReset ? $this->once() : $this->never())->method('reset');
33+
34+
$event = new class(new Envelope(new \stdClass()), $receiverName) extends AbstractWorkerMessageEvent {};
35+
36+
$resetListener = new ResetServicesListener($servicesResetter, ['foo']);
37+
$resetListener->resetServices($event);
38+
}
39+
}

0 commit comments

Comments
 (0)