Skip to content

[Messenger] make TraceableMiddleware decorate a StackInterface instead of each middleware to free the callstack from noisy frames #29006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1542,11 +1542,15 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
}

foreach ($middleware as $middlewareItem) {
if (!$validationConfig['enabled'] && 'messenger.middleware.validation' === $middlewareItem['id']) {
if (!$validationConfig['enabled'] && \in_array($middlewareItem['id'], array('validation', 'messenger.middleware.validation'), true)) {
throw new LogicException('The Validation middleware is only available when the Validator component is installed and enabled. Try running "composer require symfony/validator".');
}
}

if ($container->getParameter('kernel.debug') && class_exists(Stopwatch::class)) {
array_unshift($middleware, array('id' => 'traceable', 'arguments' => array($busId)));
}

$container->setParameter($busId.'.middleware', $middleware);
$container->register($busId, MessageBus::class)->addArgument(array())->addTag('messenger.bus');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
<argument type="service" id="validator" />
</service>

<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
<argument type="service" id="debug.stopwatch" />
</service>

<!-- Logging -->
<service id="messenger.middleware.logging" class="Symfony\Component\Messenger\Middleware\LoggingMiddleware" abstract="true">
<argument type="service" id="logger" />

<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="logger" />
</service>

<!-- Discovery -->
Expand All @@ -56,10 +59,9 @@
</service>

<service id="messenger.transport.amqp.factory" class="Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory">
<tag name="messenger.transport_factory" />
<argument type="service" id="messenger.transport.serializer" />
<argument>%kernel.debug%</argument>

<tag name="messenger.transport_factory" />
</service>
</services>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\DependencyInjection;

use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait;
Expand All @@ -22,7 +23,6 @@
use Symfony\Component\Messenger\Handler\ChainHandler;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
Expand All @@ -38,15 +38,13 @@ class MessengerPass implements CompilerPassInterface
private $busTag;
private $senderTag;
private $receiverTag;
private $debugStopwatchId;

public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver', string $debugStopwatchId = 'debug.stopwatch')
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
{
$this->handlerTag = $handlerTag;
$this->busTag = $busTag;
$this->senderTag = $senderTag;
$this->receiverTag = $receiverTag;
$this->debugStopwatchId = $debugStopwatchId;
}

/**
Expand Down Expand Up @@ -306,7 +304,6 @@ private function registerBusToCollector(ContainerBuilder $container, string $bus

private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
{
$debug = $container->getParameter('kernel.debug') && $container->has($this->debugStopwatchId);
$middlewareReferences = array();
foreach ($middlewareCollection as $middlewareItem) {
$id = $middlewareItem['id'];
Expand All @@ -319,7 +316,7 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
}

if ($isDefinitionAbstract = ($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
if (($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$count = \count($definition->getArguments());
foreach (array_values($arguments ?? array()) as $key => $argument) {
Expand All @@ -333,24 +330,9 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
}

if ($debug) {
$container->register($debugMiddlewareId = '.messenger.debug.traced.'.$messengerMiddlewareId, TraceableMiddleware::class)
// Decorates with a high priority so it's applied the earliest:
->setDecoratedService($messengerMiddlewareId, null, 100)
->setArguments(array(
new Reference($debugMiddlewareId.'.inner'),
new Reference($this->debugStopwatchId),
// In case the definition isn't abstract,
// we cannot be sure the service instance is used by one bus only.
// So we only inject the bus name when the original definition is abstract.
$isDefinitionAbstract ? $busId : null,
))
;
}

$middlewareReferences[] = new Reference($messengerMiddlewareId);
}

$container->getDefinition($busId)->replaceArgument(0, $middlewareReferences);
$container->getDefinition($busId)->replaceArgument(0, new IteratorArgument($middlewareReferences));
}
}
58 changes: 27 additions & 31 deletions src/Symfony/Component/Messenger/Middleware/TraceableMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
*/
class TraceableMiddleware implements MiddlewareInterface
{
private $inner;
private $stopwatch;
private $busName;
private $eventCategory;

public function __construct(MiddlewareInterface $inner, Stopwatch $stopwatch, string $busName = null, string $eventCategory = 'messenger.middleware')
public function __construct(Stopwatch $stopwatch, string $busName, string $eventCategory = 'messenger.middleware')
{
$this->inner = $inner;
$this->stopwatch = $stopwatch;
$this->busName = $busName;
$this->eventCategory = $eventCategory;
Expand All @@ -39,64 +37,62 @@ public function __construct(MiddlewareInterface $inner, Stopwatch $stopwatch, st
*/
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$class = \get_class($this->inner);
$eventName = 'c' === $class[0] && 0 === strpos($class, "class@anonymous\0") ? get_parent_class($class).'@anonymous' : $class;

if ($this->busName) {
$eventName .= " (bus: {$this->busName})";
}

$this->stopwatch->start($eventName, $this->eventCategory);
$stack = new TraceableStack($stack, $this->stopwatch, $this->busName, $this->eventCategory);

try {
return $this->inner->handle($envelope, new TraceableInnerMiddleware($stack, $this->stopwatch, $eventName, $this->eventCategory));
return $stack->next()->handle($envelope, $stack);
} finally {
if ($this->stopwatch->isStarted($eventName)) {
$this->stopwatch->stop($eventName);
}
$stack->stop();
}
}
}

/**
* @internal
*/
class TraceableInnerMiddleware implements MiddlewareInterface, StackInterface
class TraceableStack implements StackInterface
{
private $stack;
private $stopwatch;
private $eventName;
private $busName;
private $eventCategory;
private $currentEvent;

public function __construct(StackInterface $stack, Stopwatch $stopwatch, string $eventName, string $eventCategory)
public function __construct(StackInterface $stack, Stopwatch $stopwatch, string $busName, string $eventCategory)
{
$this->stack = $stack;
$this->stopwatch = $stopwatch;
$this->eventName = $eventName;
$this->busName = $busName;
$this->eventCategory = $eventCategory;
}

/**
* {@inheritdoc}
*/
public function handle(Envelope $envelope, StackInterface $stack): Envelope
public function next(): MiddlewareInterface
{
$this->stopwatch->stop($this->eventName);
if ($this === $stack) {
$envelope = $this->stack->next()->handle($envelope, $this->stack);
if (null !== $this->currentEvent) {
$this->stopwatch->stop($this->currentEvent);
}

if ($this->stack === $nextMiddleware = $this->stack->next()) {
$this->currentEvent = 'Tail';
} else {
$envelope = $stack->next()->handle($envelope, $stack);
$class = \get_class($nextMiddleware);
$this->currentEvent = sprintf('"%s"', 'c' === $class[0] && 0 === strpos($class, "class@anonymous\0") ? get_parent_class($class).'@anonymous' : $class);
}
$this->stopwatch->start($this->eventName, $this->eventCategory);
$this->currentEvent .= sprintf(' on "%s"', $this->busName);

$this->stopwatch->start($this->currentEvent, $this->eventCategory);

return $envelope;
return $nextMiddleware;
}

/**
* {@inheritdoc}
*/
public function next(): MiddlewareInterface
public function stop()
{
return $this;
if (null !== $this->currentEvent && $this->stopwatch->isStarted($this->currentEvent)) {
$this->stopwatch->stop($this->currentEvent);
}
$this->currentEvent = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Stopwatch\Stopwatch;

class MessengerPassTest extends TestCase
{
Expand Down Expand Up @@ -524,7 +523,7 @@ public function testRegistersMiddlewareFromServices()
new Reference(UselessMiddleware::class),
new Reference($factoryChildMiddlewareId),
new Reference($factoryWithDefaultChildMiddlewareId),
), $container->getDefinition($fooBusId)->getArgument(0));
), $container->getDefinition($fooBusId)->getArgument(0)->getValues());
$this->assertFalse($container->hasParameter($middlewareParameter));
}

Expand Down Expand Up @@ -557,39 +556,6 @@ public function testMiddlewareFactoryDefinitionMustBeAbstract()
(new MessengerPass())->process($container);
}

public function testDecoratesWithTraceableMiddlewareOnDebug()
{
$container = $this->getContainerBuilder();

$container->register($busId = 'message_bus', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus');
$container->register('abstract_middleware', UselessMiddleware::class)->setAbstract(true);
$container->register('concrete_middleware', UselessMiddleware::class);

$container->setParameter($middlewareParameter = $busId.'.middleware', array(
array('id' => 'abstract_middleware'),
array('id' => 'concrete_middleware'),
));

$container->setParameter('kernel.debug', true);
$container->register('debug.stopwatch', Stopwatch::class);

(new MessengerPass())->process($container);

$this->assertNotNull($concreteDef = $container->getDefinition('.messenger.debug.traced.concrete_middleware'));
$this->assertEquals(array(
new Reference('.messenger.debug.traced.concrete_middleware.inner'),
new Reference('debug.stopwatch'),
null,
), $concreteDef->getArguments());

$this->assertNotNull($abstractDef = $container->getDefinition(".messenger.debug.traced.$busId.middleware.abstract_middleware"));
$this->assertEquals(array(
new Reference(".messenger.debug.traced.$busId.middleware.abstract_middleware.inner"),
new Reference('debug.stopwatch'),
$busId,
), $abstractDef->getArguments());
}

public function testItRegistersTheDebugCommand()
{
$container = $this->getContainerBuilder($commandBusId = 'command_bus');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Middleware\StackMiddleware;
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
Expand All @@ -27,7 +28,7 @@ class TraceableMiddlewareTest extends MiddlewareTestCase
public function testHandle()
{
$busId = 'command_bus';
$envelope = new Envelope($message = new DummyMessage('Hello'));
$envelope = new Envelope(new DummyMessage('Hello'));

$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
Expand All @@ -42,16 +43,22 @@ public function testHandle()
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
$stopwatch->expects($this->exactly(2))
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
->withConsecutive(
array($this->matches('"%sMiddlewareInterface%s" on "command_bus"'), 'messenger.middleware'),
array('Tail on "command_bus"', 'messenger.middleware')
)
;
$stopwatch->expects($this->exactly(2))
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
->withConsecutive(
array($this->matches('"%sMiddlewareInterface%s" on "command_bus"')),
array('Tail on "command_bus"')
)
;

$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
$traced = new TraceableMiddleware($stopwatch, $busId);

$traced->handle($envelope, $this->getStackMock());
$traced->handle($envelope, new StackMiddleware(new \ArrayIterator(array(null, $middleware))));
}

/**
Expand All @@ -61,32 +68,26 @@ public function testHandle()
public function testHandleWithException()
{
$busId = 'command_bus';
$envelope = new Envelope($message = new DummyMessage('Hello'));

$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
->method('handle')
->with($envelope, $this->anything())
->will($this->returnCallback(function ($envelope, StackInterface $stack) {
return $stack->next()->handle($envelope, $stack);
}))
->willThrowException(new \RuntimeException('Thrown from next middleware.'))
;

$stack = $this->getThrowingStackMock();

$stopwatch = $this->createMock(Stopwatch::class);
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
// Start is only expected to be called once, as an exception is thrown by the next callable:
$stopwatch->expects($this->exactly(1))
// Start/stop are expected to be called once, as an exception is thrown by the next callable
$stopwatch->expects($this->once())
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
->with($this->matches('"%sMiddlewareInterface%s" on "command_bus"'), 'messenger.middleware')
;
$stopwatch->expects($this->exactly(2))
$stopwatch->expects($this->once())
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
->with($this->matches('"%sMiddlewareInterface%s" on "command_bus"'))
;

$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
$traced->handle($envelope, $stack);
$traced = new TraceableMiddleware($stopwatch, $busId);
$traced->handle(new Envelope(new DummyMessage('Hello')), new StackMiddleware(new \ArrayIterator(array(null, $middleware))));
}
}