Skip to content

Commit 1cdf048

Browse files
committed
Ensure message is handled only once per handler
Add check to ensure that a message is only handled once per handler Add try...catch to run all handlers before throwing exception
1 parent 755f411 commit 1cdf048

File tree

7 files changed

+246
-16
lines changed

7 files changed

+246
-16
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Symfony\Component\Messenger\Exception;
15+
16+
use Symfony\Component\Messenger\Envelope;
17+
18+
class ChainedHandlerFailedException extends \RuntimeException implements ExceptionInterface
19+
{
20+
/**
21+
* @var \Throwable[]
22+
*/
23+
private $nested;
24+
25+
/**
26+
* @var Envelope
27+
*/
28+
private $envelope;
29+
30+
public function __construct(Envelope $envelope, \Throwable ...$nested)
31+
{
32+
parent::__construct($this->constructMessage($nested));
33+
$this->envelope = $envelope;
34+
$this->nested = $nested;
35+
}
36+
37+
public function getEnvelope(): Envelope
38+
{
39+
return $this->envelope;
40+
}
41+
42+
/**
43+
* @return \Throwable[]
44+
*/
45+
public function getNestedExceptions(): array
46+
{
47+
return $this->nested;
48+
}
49+
50+
/**
51+
* @param \Throwable[] $nested
52+
*
53+
* @return string
54+
*/
55+
private function constructMessage(array $nested): string
56+
{
57+
return 1 === \count($nested) ?
58+
$nested[0]->getMessage() :
59+
sprintf('%d MessageHandler failed. First one failed with Message: %s', \count($nested), $nested[0]->getMessage());
60+
}
61+
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

+30-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Psr\Log\LoggerAwareTrait;
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
1718
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
1819
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
1920
use Symfony\Component\Messenger\Stamp\HandledStamp;
@@ -52,10 +53,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5253
'class' => \get_class($message),
5354
];
5455

56+
$exceptions = [];
5557
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
56-
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), \is_string($alias) ? $alias : null);
57-
$envelope = $envelope->with($handledStamp);
58-
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
58+
$alias = \is_string($alias) ? $alias : null;
59+
60+
if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
61+
continue;
62+
}
63+
64+
try {
65+
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
66+
$envelope = $envelope->with($handledStamp);
67+
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
68+
} catch (\Throwable $e) {
69+
$exceptions[] = $e;
70+
}
5971
}
6072

6173
if (null === $handler) {
@@ -66,6 +78,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6678
$this->logger->info('No handler for message "{class}"', $context);
6779
}
6880

81+
if (\count($exceptions)) {
82+
throw new ChainedHandlerFailedException($envelope, ...$exceptions);
83+
}
84+
6985
return $stack->next()->handle($envelope, $stack);
7086
}
87+
88+
private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
89+
{
90+
$some = array_filter($envelope
91+
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
92+
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
93+
$stamp->getHandlerAlias() === $alias;
94+
});
95+
96+
return \count($some) > 0;
97+
}
7198
}

src/Symfony/Component/Messenger/Stamp/HandledStamp.php

+13-8
Original file line numberDiff line numberDiff line change
@@ -40,33 +40,38 @@ public function __construct($result, string $callableName, string $handlerAlias
4040
/**
4141
* @param mixed $result The returned value of the message handler
4242
*/
43-
public static function fromCallable(callable $handler, $result, string $handlerAlias = null): self
43+
public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self
44+
{
45+
return new self($result, self::getNameFromCallable($handler), $handlerAlias);
46+
}
47+
48+
public static function getNameFromCallable(callable $handler): string
4449
{
4550
if (\is_array($handler)) {
4651
if (\is_object($handler[0])) {
47-
return new self($result, \get_class($handler[0]).'::'.$handler[1], $handlerAlias);
52+
return \get_class($handler[0]).'::'.$handler[1];
4853
}
4954

50-
return new self($result, $handler[0].'::'.$handler[1], $handlerAlias);
55+
return $handler[0].'::'.$handler[1];
5156
}
5257

5358
if (\is_string($handler)) {
54-
return new self($result, $handler, $handlerAlias);
59+
return $handler;
5560
}
5661

5762
if ($handler instanceof \Closure) {
5863
$r = new \ReflectionFunction($handler);
5964
if (false !== strpos($r->name, '{closure}')) {
60-
return new self($result, 'Closure', $handlerAlias);
65+
return 'Closure';
6166
}
6267
if ($class = $r->getClosureScopeClass()) {
63-
return new self($result, $class->name.'::'.$r->name, $handlerAlias);
68+
return $class->name.'::'.$r->name;
6469
}
6570

66-
return new self($result, $r->name, $handlerAlias);
71+
return $r->name;
6772
}
6873

69-
return new self($result, \get_class($handler).'::__invoke', $handlerAlias);
74+
return \get_class($handler).'::__invoke';
7075
}
7176

7277
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Fixtures;
13+
14+
class MessageHandlerFailingFirstTimes
15+
{
16+
private $remainingFailures;
17+
18+
private $called = 0;
19+
20+
public function __construct(int $throwExceptionOnFirstTries = 0)
21+
{
22+
$this->remainingFailures = $throwExceptionOnFirstTries;
23+
}
24+
25+
public function __invoke(DummyMessage $message)
26+
{
27+
if ($this->remainingFailures > 0) {
28+
--$this->remainingFailures;
29+
throw new \Exception('Handler should throw Exception.');
30+
}
31+
32+
++$this->called;
33+
}
34+
35+
public function getTimesCalledWithoutThrowing(): int
36+
{
37+
return $this->called;
38+
}
39+
}

src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php

+33-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Middleware;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
1516
use Symfony\Component\Messenger\Handler\HandlersLocator;
1617
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
1718
use Symfony\Component\Messenger\Middleware\StackMiddleware;
@@ -40,7 +41,7 @@ public function testItCallsTheHandlerAndNextMiddleware()
4041
/**
4142
* @dataProvider itAddsHandledStampsProvider
4243
*/
43-
public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
44+
public function testItAddsHandledStamps(array $handlers, array $expectedStamps, bool $nextIsCalled)
4445
{
4546
$message = new DummyMessage('Hey');
4647
$envelope = new Envelope($message);
@@ -49,7 +50,11 @@ public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
4950
DummyMessage::class => $handlers,
5051
]));
5152

52-
$envelope = $middleware->handle($envelope, $this->getStackMock());
53+
try {
54+
$envelope = $middleware->handle($envelope, $this->getStackMock($nextIsCalled));
55+
} catch (ChainedHandlerFailedException $e) {
56+
$envelope = $e->getEnvelope();
57+
}
5358

5459
$this->assertEquals($expectedStamps, $envelope->all(HandledStamp::class));
5560
}
@@ -64,17 +69,22 @@ public function itAddsHandledStampsProvider()
6469
$second->method('__invoke')->willReturn(null);
6570
$secondClass = \get_class($second);
6671

72+
$failing = $this->createPartialMock(\stdClass::class, ['__invoke']);
73+
$failing->method('__invoke')->will($this->throwException(new \Exception('handler failed.')));
74+
6775
yield 'A stamp is added' => [
6876
[$first],
6977
[new HandledStamp('first result', $firstClass.'::__invoke')],
78+
true,
7079
];
7180

7281
yield 'A stamp is added per handler' => [
73-
[$first, $second],
82+
['first' => $first, 'second' => $second],
7483
[
75-
new HandledStamp('first result', $firstClass.'::__invoke'),
76-
new HandledStamp(null, $secondClass.'::__invoke'),
84+
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
85+
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
7786
],
87+
true,
7888
];
7989

8090
yield 'Yielded locator alias is used' => [
@@ -83,6 +93,24 @@ public function itAddsHandledStampsProvider()
8393
new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'),
8494
new HandledStamp(null, $secondClass.'::__invoke'),
8595
],
96+
true,
97+
];
98+
99+
yield 'It tries all handlers' => [
100+
['first' => $first, 'failing' => $failing, 'second' => $second],
101+
[
102+
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
103+
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
104+
],
105+
false,
106+
];
107+
108+
yield 'It ignores duplicated handler' => [
109+
[$first, $first],
110+
[
111+
new HandledStamp('first result', $firstClass.'::__invoke'),
112+
],
113+
true,
86114
];
87115
}
88116

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* This file is part of the Symfony package.
7+
*
8+
* (c) Fabien Potencier <fabien@symfony.com>
9+
*
10+
* For the full copyright and license information, please view the LICENSE
11+
* file that was distributed with this source code.
12+
*/
13+
14+
namespace Symfony\Component\Messenger\Tests;
15+
16+
use PHPUnit\Framework\TestCase;
17+
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Handler\HandlersLocator;
19+
use Symfony\Component\Messenger\MessageBus;
20+
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
21+
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
22+
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
23+
use Symfony\Component\Messenger\Stamp\SentStamp;
24+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
25+
use Symfony\Component\Messenger\Tests\Fixtures\MessageHandlerFailingFirstTimes;
26+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
27+
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
28+
use Symfony\Component\Messenger\Worker;
29+
30+
class RetryIntegrationTest extends TestCase
31+
{
32+
public function testRetryMechanism()
33+
{
34+
$apiMessage = new DummyMessage('API');
35+
36+
$receiver = $this->createMock(ReceiverInterface::class);
37+
$receiver->method('get')
38+
->willReturn([
39+
new Envelope($apiMessage, [
40+
new SentStamp('Some\Sender', 'sender_alias'),
41+
]),
42+
]);
43+
44+
$senderLocator = new SendersLocator([], ['*' => true]);
45+
46+
$handler = new MessageHandlerFailingFirstTimes();
47+
$throwingHandler = new MessageHandlerFailingFirstTimes(1);
48+
$handlerLocator = new HandlersLocator([
49+
DummyMessage::class => [
50+
'handler' => $handler,
51+
'throwing' => $throwingHandler,
52+
],
53+
]);
54+
55+
$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
56+
57+
$worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
58+
$worker->run([], function () use ($worker) {
59+
$worker->stop();
60+
});
61+
62+
$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
63+
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
64+
}
65+
}

src/Symfony/Component/Messenger/Worker.php

+5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
1616
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
1717
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
18+
use Symfony\Component\Messenger\Exception\ChainedHandlerFailedException;
1819
use Symfony\Component\Messenger\Exception\LogicException;
1920
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
2021
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
@@ -123,6 +124,10 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
123124
try {
124125
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
125126
} catch (\Throwable $throwable) {
127+
if ($throwable instanceof ChainedHandlerFailedException) {
128+
$envelope = $throwable->getEnvelope();
129+
}
130+
126131
$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
127132

128133
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));

0 commit comments

Comments
 (0)