Skip to content

Commit 14942db

Browse files
committed
feature #36152 [Messenger] dispatch event when a message is retried (nikophil)
This PR was squashed before being merged into the 5.2-dev branch. Discussion ---------- [Messenger] dispatch event when a message is retried | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | License | MIT Hello, i'm working on a bundle which helps to monitor messenger queues (add some stats for queues/transports + ability to manage failed messages from the browser) https://github.com/SymfonyCasts/messenger-monitor-bundle/ and we're missing some hooks in the messaging system: 1. a way to know when a message has been retried (fixed by dispatching a new `WorkerMessageRetriedEvent` in `SendFailedMessageForRetryListener::onMessageFailed()`) 2. a way to update the enveloppe in worker message events (fixed by adding `AbstractWorkerMessageEvent::setEnvelope()`) if needed i can provide some precise use cases. thanks. Commits ------- 55bddcb [Messenger] dispatch event when a message is retried
2 parents 1b88b8b + 55bddcb commit 14942db

File tree

8 files changed

+90
-4
lines changed

8 files changed

+90
-4
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@
152152
abstract_arg('senders service locator'),
153153
service('messenger.retry_strategy_locator'),
154154
service('logger')->ignoreOnInvalid(),
155+
service('event_dispatcher'),
155156
])
156157
->tag('kernel.event_subscriber')
157158
->tag('monolog.logger', ['channel' => 'messenger'])

src/Symfony/Component/Messenger/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ CHANGELOG
77
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
88
* Added factory methods to `DelayStamp`.
99
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
10+
* Added `WorkerMessageRetriedEvent`
11+
* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable
1012

1113
5.1.0
1214
-----

src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Event;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\StampInterface;
1516

1617
abstract class AbstractWorkerMessageEvent
1718
{
@@ -36,4 +37,9 @@ public function getReceiverName(): string
3637
{
3738
return $this->receiverName;
3839
}
40+
41+
public function addStamps(StampInterface ...$stamps): void
42+
{
43+
$this->envelope = $this->envelope->with(...$stamps);
44+
}
3945
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched after a message has been sent for retry.
16+
*
17+
* The event name is the class name.
18+
*/
19+
final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent
20+
{
21+
}

src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php

+9-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
1819
use Symfony\Component\Messenger\Exception\HandlerFailedException;
1920
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
2021
use Symfony\Component\Messenger\Exception\RuntimeException;
@@ -24,6 +25,7 @@
2425
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2526
use Symfony\Component\Messenger\Stamp\StampInterface;
2627
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
28+
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
2729

2830
/**
2931
* @author Tobias Schultze <http://tobion.de>
@@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
3335
private $sendersLocator;
3436
private $retryStrategyLocator;
3537
private $logger;
38+
private $eventDispatcher;
3639
private $historySize;
3740

38-
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
41+
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
3942
{
4043
$this->sendersLocator = $sendersLocator;
4144
$this->retryStrategyLocator = $retryStrategyLocator;
4245
$this->logger = $logger;
46+
$this->eventDispatcher = $eventDispatcher;
4347
$this->historySize = $historySize;
4448
}
4549

@@ -74,6 +78,10 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
7478

7579
// re-send the message for retry
7680
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
81+
82+
if (null !== $this->eventDispatcher) {
83+
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
84+
}
7785
} else {
7886
if (null !== $this->logger) {
7987
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);

src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php

+5-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Symfony\Component\Messenger\Stamp\DelayStamp;
2222
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2323
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
24+
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
2425

2526
class SendFailedMessageForRetryListenerTest extends TestCase
2627
{
@@ -107,7 +108,10 @@ public function testEnvelopeIsSentToTransportOnRetry()
107108
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
108109
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
109110

110-
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
111+
$eventDispatcher = $this->createMock(EventDispatcherInterface::class);
112+
$eventDispatcher->expects($this->once())->method('dispatch');
113+
114+
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher);
111115

112116
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
113117

src/Symfony/Component/Messenger/Tests/WorkerTest.php

+39
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2626
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2727
use Symfony\Component\Messenger\Stamp\SentStamp;
28+
use Symfony\Component\Messenger\Stamp\StampInterface;
2829
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
2930
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3031
use Symfony\Component\Messenger\Worker;
@@ -243,14 +244,45 @@ public function testWorkerWithMultipleReceivers()
243244
// make sure they were processed in the correct order
244245
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
245246
}
247+
248+
public function testWorkerMessageReceivedEventMutability()
249+
{
250+
$envelope = new Envelope(new DummyMessage('Hello'));
251+
$receiver = new DummyReceiver([[$envelope]]);
252+
253+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
254+
$bus->method('dispatch')->willReturnArgument(0);
255+
256+
$eventDispatcher = new EventDispatcher();
257+
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
258+
259+
$stamp = new class() implements StampInterface {
260+
};
261+
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
262+
$event->addStamps($stamp);
263+
};
264+
265+
$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
266+
267+
$worker = new Worker([$receiver], $bus, $eventDispatcher);
268+
$worker->run();
269+
270+
$envelope = current($receiver->getAcknowledgedEnvelopes());
271+
$this->assertCount(1, $envelope->all(\get_class($stamp)));
272+
}
246273
}
247274

248275
class DummyReceiver implements ReceiverInterface
249276
{
250277
private $deliveriesOfEnvelopes;
278+
private $acknowledgedEnvelopes;
279+
private $rejectedEnvelopes;
251280
private $acknowledgeCount = 0;
252281
private $rejectCount = 0;
253282

283+
/**
284+
* @param Envelope[][] $deliveriesOfEnvelopes
285+
*/
254286
public function __construct(array $deliveriesOfEnvelopes)
255287
{
256288
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
@@ -266,11 +298,13 @@ public function get(): iterable
266298
public function ack(Envelope $envelope): void
267299
{
268300
++$this->acknowledgeCount;
301+
$this->acknowledgedEnvelopes[] = $envelope;
269302
}
270303

271304
public function reject(Envelope $envelope): void
272305
{
273306
++$this->rejectCount;
307+
$this->rejectedEnvelopes[] = $envelope;
274308
}
275309

276310
public function getAcknowledgeCount(): int
@@ -282,4 +316,9 @@ public function getRejectCount(): int
282316
{
283317
return $this->rejectCount;
284318
}
319+
320+
public function getAcknowledgedEnvelopes(): array
321+
{
322+
return $this->acknowledgedEnvelopes;
323+
}
285324
}

src/Symfony/Component/Messenger/Worker.php

+7-2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
104104
{
105105
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
106106
$this->dispatchEvent($event);
107+
$envelope = $event->getEnvelope();
107108

108109
if (!$event->shouldHandle()) {
109110
return;
@@ -123,7 +124,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
123124
$envelope = $throwable->getEnvelope();
124125
}
125126

126-
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
127+
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
128+
$this->dispatchEvent($failedEvent);
129+
$envelope = $failedEvent->getEnvelope();
127130

128131
if (!$rejectFirst) {
129132
$receiver->reject($envelope);
@@ -132,7 +135,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
132135
return;
133136
}
134137

135-
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
138+
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
139+
$this->dispatchEvent($handledEvent);
140+
$envelope = $handledEvent->getEnvelope();
136141

137142
if (null !== $this->logger) {
138143
$message = $envelope->getMessage();

0 commit comments

Comments
 (0)