Skip to content

Commit f18bd98

Browse files
committed
Ensure message is handled only once per handler
Add check to ensure that a message is only handled once per handler Add try...catch to run all handlers before throwing exception
1 parent 0034a0f commit f18bd98

File tree

8 files changed

+237
-16
lines changed

8 files changed

+237
-16
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Symfony\Component\Messenger\Exception;
15+
16+
use Symfony\Component\Messenger\Envelope;
17+
18+
class ChainedHandlerFailedException extends \RuntimeException implements ExceptionInterface
19+
{
20+
/**
21+
* @var \Throwable[]
22+
*/
23+
private $nested;
24+
25+
/**
26+
* @var Envelope
27+
*/
28+
private $envelope;
29+
30+
public function __construct(Envelope $envelope, \Throwable ...$nested)
31+
{
32+
$this->envelope = $envelope;
33+
$this->nested = $nested;
34+
parent::__construct();
35+
}
36+
37+
public function getEnvelope(): Envelope
38+
{
39+
return $this->envelope;
40+
}
41+
42+
/**
43+
* @return \Throwable[]
44+
*/
45+
public function getNestedExceptions(): array
46+
{
47+
return $this->nested;
48+
}
49+
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

+30-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Psr\Log\LoggerAwareTrait;
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
1718
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
1819
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
1920
use Symfony\Component\Messenger\Stamp\HandledStamp;
@@ -52,10 +53,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5253
'class' => \get_class($message),
5354
];
5455

56+
$exceptions = [];
5557
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
56-
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), \is_string($alias) ? $alias : null);
57-
$envelope = $envelope->with($handledStamp);
58-
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
58+
$alias = \is_string($alias) ? $alias : null;
59+
60+
if ($this->hasMessageSeen($envelope, $handler, $alias)) {
61+
continue;
62+
}
63+
64+
try {
65+
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
66+
$envelope = $envelope->with($handledStamp);
67+
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
68+
} catch (\Throwable $e) {
69+
$exceptions[] = $e;
70+
}
5971
}
6072

6173
if (null === $handler) {
@@ -66,6 +78,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6678
$this->logger->info('No handler for message "{class}"', $context);
6779
}
6880

81+
if (\count($exceptions)) {
82+
throw new ChainedHandlerFailedException($envelope, ...$exceptions);
83+
}
84+
6985
return $stack->next()->handle($envelope, $stack);
7086
}
87+
88+
private function hasMessageSeen(Envelope $envelope, callable $handler, ?string $alias): bool
89+
{
90+
$some = array_filter($envelope
91+
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
92+
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
93+
$stamp->getHandlerAlias() === $alias;
94+
});
95+
96+
return \count($some) > 0;
97+
}
7198
}

src/Symfony/Component/Messenger/Stamp/HandledStamp.php

+13-8
Original file line numberDiff line numberDiff line change
@@ -40,33 +40,38 @@ public function __construct($result, string $callableName, string $handlerAlias
4040
/**
4141
* @param mixed $result The returned value of the message handler
4242
*/
43-
public static function fromCallable(callable $handler, $result, string $handlerAlias = null): self
43+
public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self
44+
{
45+
return new self($result, self::getNameFromCallable($handler), $handlerAlias);
46+
}
47+
48+
public static function getNameFromCallable(callable $handler): string
4449
{
4550
if (\is_array($handler)) {
4651
if (\is_object($handler[0])) {
47-
return new self($result, \get_class($handler[0]).'::'.$handler[1], $handlerAlias);
52+
return \get_class($handler[0]).'::'.$handler[1];
4853
}
4954

50-
return new self($result, $handler[0].'::'.$handler[1], $handlerAlias);
55+
return $handler[0].'::'.$handler[1];
5156
}
5257

5358
if (\is_string($handler)) {
54-
return new self($result, $handler, $handlerAlias);
59+
return $handler;
5560
}
5661

5762
if ($handler instanceof \Closure) {
5863
$r = new \ReflectionFunction($handler);
5964
if (false !== strpos($r->name, '{closure}')) {
60-
return new self($result, 'Closure', $handlerAlias);
65+
return 'Closure';
6166
}
6267
if ($class = $r->getClosureScopeClass()) {
63-
return new self($result, $class->name.'::'.$r->name, $handlerAlias);
68+
return $class->name.'::'.$r->name;
6469
}
6570

66-
return new self($result, $r->name, $handlerAlias);
71+
return $r->name;
6772
}
6873

69-
return new self($result, \get_class($handler).'::__invoke', $handlerAlias);
74+
return \get_class($handler).'::__invoke';
7075
}
7176

7277
/**

src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public function __construct(callable $callable)
1616
$this->callable = $callable;
1717
}
1818

19+
public function setCallable(callable $callable): void
20+
{
21+
$this->callable = $callable;
22+
}
23+
1924
public function receive(callable $handler): void
2025
{
2126
$callable = $this->callable;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Fixtures;
13+
14+
class DummyMessageHandler
15+
{
16+
/**
17+
* @var int
18+
*/
19+
private $remainingExceptionCount;
20+
21+
/**
22+
* @var int
23+
*/
24+
private $called = 0;
25+
26+
public function __construct(int $throwExceptionOnFirstTries = 0)
27+
{
28+
$this->remainingExceptionCount = $throwExceptionOnFirstTries;
29+
}
30+
31+
public function __invoke(DummyMessage $message)
32+
{
33+
if ($this->remainingExceptionCount > 0) {
34+
$this->remainingExceptionCount --;
35+
throw new \Exception('Handler should throw Exception.');
36+
}
37+
38+
$this->called ++;
39+
}
40+
41+
public function getTimesCalledWithoutThrowing(): int
42+
{
43+
return $this->called;
44+
}
45+
}

src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php

+33-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Middleware;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
1516
use Symfony\Component\Messenger\Handler\HandlersLocator;
1617
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
1718
use Symfony\Component\Messenger\Middleware\StackMiddleware;
@@ -40,7 +41,7 @@ public function testItCallsTheHandlerAndNextMiddleware()
4041
/**
4142
* @dataProvider itAddsHandledStampsProvider
4243
*/
43-
public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
44+
public function testItAddsHandledStamps(array $handlers, array $expectedStamps, bool $nextIsCalled)
4445
{
4546
$message = new DummyMessage('Hey');
4647
$envelope = new Envelope($message);
@@ -49,7 +50,11 @@ public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
4950
DummyMessage::class => $handlers,
5051
]));
5152

52-
$envelope = $middleware->handle($envelope, $this->getStackMock());
53+
try {
54+
$envelope = $middleware->handle($envelope, $this->getStackMock($nextIsCalled));
55+
} catch (ChainedHandlerFailedException $e) {
56+
$envelope = $e->getEnvelope();
57+
}
5358

5459
$this->assertEquals($expectedStamps, $envelope->all(HandledStamp::class));
5560
}
@@ -64,17 +69,22 @@ public function itAddsHandledStampsProvider()
6469
$second->method('__invoke')->willReturn(null);
6570
$secondClass = \get_class($second);
6671

72+
$failing = $this->createPartialMock(\stdClass::class, ['__invoke']);
73+
$failing->method('__invoke')->will($this->throwException(new \Exception('handler failed.')));
74+
6775
yield 'A stamp is added' => [
6876
[$first],
6977
[new HandledStamp('first result', $firstClass.'::__invoke')],
78+
true,
7079
];
7180

7281
yield 'A stamp is added per handler' => [
73-
[$first, $second],
82+
['first' => $first, 'second' => $second],
7483
[
75-
new HandledStamp('first result', $firstClass.'::__invoke'),
76-
new HandledStamp(null, $secondClass.'::__invoke'),
84+
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
85+
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
7786
],
87+
true,
7888
];
7989

8090
yield 'Yielded locator alias is used' => [
@@ -83,6 +93,24 @@ public function itAddsHandledStampsProvider()
8393
new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'),
8494
new HandledStamp(null, $secondClass.'::__invoke'),
8595
],
96+
true,
97+
];
98+
99+
yield 'It tries all handlers' => [
100+
['first' => $first, 'failing' => $failing, 'second' => $second],
101+
[
102+
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
103+
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
104+
],
105+
false,
106+
];
107+
108+
yield 'It ignores duplicated handler' => [
109+
[$first, $first],
110+
[
111+
new HandledStamp('first result', $firstClass.'::__invoke'),
112+
],
113+
true,
86114
];
87115
}
88116

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?php declare(strict_types=1);
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Handler\HandlersLocator;
17+
use Symfony\Component\Messenger\MessageBus;
18+
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
19+
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
20+
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
21+
use Symfony\Component\Messenger\Stamp\SentStamp;
22+
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
23+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
24+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandler;
25+
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
26+
use Symfony\Component\Messenger\Worker;
27+
28+
class RetryIntegrationTest extends TestCase
29+
{
30+
public function testRetryMechanism()
31+
{
32+
$apiMessage = new DummyMessage('API');
33+
34+
$receiver = new CallbackReceiver(function ($handler) use ($apiMessage) {
35+
$handler(new Envelope($apiMessage, new SentStamp('Some\Sender', 'sender_alias')));
36+
});
37+
38+
$senderLocator = new SendersLocator([], ['*' => true]);
39+
40+
$handler = new DummyMessageHandler();
41+
$throwingHandler = new DummyMessageHandler(1);
42+
$handlerLocator = new HandlersLocator([
43+
DummyMessage::class => [
44+
'handler' => $handler,
45+
'throwing' => $throwingHandler
46+
]
47+
]);
48+
49+
$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
50+
51+
$worker = new Worker($receiver, $bus, 'receiver name', new MultiplierRetryStrategy());
52+
$worker->run();
53+
54+
$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
55+
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
56+
}
57+
}

src/Symfony/Component/Messenger/Worker.php

+5
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
1818
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
1919
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
20+
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
2021
use Symfony\Component\Messenger\Exception\LogicException;
2122
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
2223
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2324
use Symfony\Component\Messenger\Stamp\DelayStamp;
25+
use Symfony\Component\Messenger\Stamp\HandledStamp;
2426
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2527
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2628
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -88,6 +90,9 @@ public function run()
8890
try {
8991
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
9092
} catch (\Throwable $throwable) {
93+
if ($throwable instanceof ChainedHandlerFailedException) {
94+
$envelope = $throwable->getEnvelope();
95+
}
9196
$shouldRetry = $this->shouldRetry($throwable, $envelope);
9297

9398
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));

0 commit comments

Comments
 (0)