Skip to content

[Messenger] dispatch event when a message is retried #36152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
abstract_arg('senders service locator'),
service('messenger.retry_strategy_locator'),
service('logger')->ignoreOnInvalid(),
service('event_dispatcher'),
])
->tag('kernel.event_subscriber')
->tag('monolog.logger', ['channel' => 'messenger'])
Expand Down
2 changes: 2 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ CHANGELOG
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
* Added factory methods to `DelayStamp`.
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
* Added `WorkerMessageRetriedEvent`
* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable

5.1.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;

abstract class AbstractWorkerMessageEvent
{
Expand All @@ -36,4 +37,9 @@ public function getReceiverName(): string
{
return $this->receiverName;
}

public function addStamps(StampInterface ...$stamps): void
{
$this->envelope = $this->envelope->with(...$stamps);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

/**
* Dispatched after a message has been sent for retry.
*
* The event name is the class name.
*/
final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException;
Expand All @@ -24,6 +25,7 @@
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* @author Tobias Schultze <http://tobion.de>
Expand All @@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
private $sendersLocator;
private $retryStrategyLocator;
private $logger;
private $eventDispatcher;
private $historySize;

public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
$this->historySize = $historySize;
}

Expand Down Expand Up @@ -74,6 +78,10 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)

// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);

if (null !== $this->eventDispatcher) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
}
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

class SendFailedMessageForRetryListenerTest extends TestCase
{
Expand Down Expand Up @@ -107,7 +108,10 @@ public function testEnvelopeIsSentToTransportOnRetry()
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
$eventDispatcher = $this->createMock(EventDispatcherInterface::class);
$eventDispatcher->expects($this->once())->method('dispatch');

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher);

$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

Expand Down
39 changes: 39 additions & 0 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
Expand Down Expand Up @@ -243,14 +244,45 @@ public function testWorkerWithMultipleReceivers()
// make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}

public function testWorkerMessageReceivedEventMutability()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new DummyReceiver([[$envelope]]);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturnArgument(0);

$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

$stamp = new class() implements StampInterface {
};
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
$event->addStamps($stamp);
};

$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);

$worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run();

$envelope = current($receiver->getAcknowledgedEnvelopes());
$this->assertCount(1, $envelope->all(\get_class($stamp)));
}
}

class DummyReceiver implements ReceiverInterface
{
private $deliveriesOfEnvelopes;
private $acknowledgedEnvelopes;
private $rejectedEnvelopes;
private $acknowledgeCount = 0;
private $rejectCount = 0;

/**
* @param Envelope[][] $deliveriesOfEnvelopes
*/
public function __construct(array $deliveriesOfEnvelopes)
{
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
Expand All @@ -266,11 +298,13 @@ public function get(): iterable
public function ack(Envelope $envelope): void
{
++$this->acknowledgeCount;
$this->acknowledgedEnvelopes[] = $envelope;
}

public function reject(Envelope $envelope): void
{
++$this->rejectCount;
$this->rejectedEnvelopes[] = $envelope;
}

public function getAcknowledgeCount(): int
Expand All @@ -282,4 +316,9 @@ public function getRejectCount(): int
{
return $this->rejectCount;
}

public function getAcknowledgedEnvelopes(): array
{
return $this->acknowledgedEnvelopes;
}
}
9 changes: 7 additions & 2 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
{
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event);
$envelope = $event->getEnvelope();

if (!$event->shouldHandle()) {
return;
Expand All @@ -123,7 +124,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
$envelope = $throwable->getEnvelope();
}

$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
$this->dispatchEvent($failedEvent);
$envelope = $failedEvent->getEnvelope();

if (!$rejectFirst) {
$receiver->reject($envelope);
Expand All @@ -132,7 +135,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
return;
}

$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
$this->dispatchEvent($handledEvent);
$envelope = $handledEvent->getEnvelope();

if (null !== $this->logger) {
$message = $envelope->getMessage();
Expand Down