Description
Description
Currently, failed messages are sent to Failure Transport only when they are consumed in the Worker, by running messenger:consume
.
Here is a request to send failed messages when they are dispatched to the asynchronous transport.
Use case
User tries to sign up. Application successfully commits a transaction and stores the user data in a storage.
After that, application sends a SendSignUpEmailMessage
message to async queue (let's say Kafka) to send an email. This queue is consumed by another process via messenger:consume
- this process actually sends emails.
When the Kafka is not available (due to network error or something else), we get an exception that fails the whole request, displaying the user 500 error page.
Instead, the failed dispatched SendSignUpEmailMessage
message can be sent to Failure Transport (let's say MySQL doctrine://default?queue_name=failed
) and then replayed with messenger:failed:retry
command.
Working example
Actually, we already implemented a custom Middleware that does exactly that - listens to a specific exception that is thrown from Kafka Transport, and redispatches the message to Failure Transport.
After the message is retried, it is either removed from Failure Transport or added back (supported by middleware as well).
Middleware code (click me)
final class SendFailedAsyncMessageToFailureTransportMiddleware implements MiddlewareInterface
{
/**
* @var MessageBusInterface
*/
private MessageBusInterface $messageBus;
/**
* @var SendersLocatorInterface
*/
private SendersLocatorInterface $sendersLocator;
public function __construct(MessageBusInterface $messageBus, SendersLocatorInterface $sendersLocator)
{
$this->messageBus = $messageBus;
$this->sendersLocator = $sendersLocator;
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$resultEnvelope = $envelope;
try {
if ($this->isReceivedFromFailureTransport($envelope)) {
$resultEnvelope = $envelope
// to avoid handling message but instead use senders to redispatch to original transport, see SendMessageMiddleware
->withoutAll(ReceivedStamp::class)
// process this message as a new and allow it to be sent to failure transport again in case of new errors
->withoutAll(SentToFailureTransportStamp::class)
// to avoid redelivering to failed queue again. We need to process message by original sender (QueueInterop), see SendMessageMiddleware::getSenders
->withoutAll(RedeliveryStamp::class);
}
return $stack->next()->handle($resultEnvelope, $stack);
} catch (AsyncTransportIsNotAvailableException $e) {
if (!$this->isAlreadySentToFailedSender($resultEnvelope)) {
$originalReceiverName = $this->getOriginalReceiverName($resultEnvelope);
// taken from built-in SendFailedMessageToFailureTransportListener
$flattenedException = class_exists(FlattenException::class) ? FlattenException::createFromThrowable($e) : null;
$newEnvelope = $resultEnvelope
->withoutAll(ReceivedStamp::class)
->withoutAll(TransportMessageIdStamp::class)
->with(new SentToFailureTransportStamp($originalReceiverName))
->with(new DelayStamp(0))
->with(new FailedToBeSentToAsyncQueue())
->with(new RedeliveryStamp(0, 'failed', $e->getMessage(), $flattenedException));
$this->messageBus->dispatch($newEnvelope);
}
}
return $resultEnvelope;
}
/**
* Returns `true` when the failed message is consumed from failure transport,
* for example by running `bin/console messenger:failed:retry`
*
* @param Envelope $envelope
*
* @return bool
*/
private function isReceivedFromFailureTransport(Envelope $envelope): bool
{
$failedToBeSentToAsyncQueueStamps = $envelope->all(FailedToBeSentToAsyncQueue::class);
$receivedStamps = $envelope->all(ReceivedStamp::class);
return \count($failedToBeSentToAsyncQueueStamps) > 0 && \count($receivedStamps) > 0;
}
private function isAlreadySentToFailedSender(Envelope $envelope): bool
{
return $envelope->last(SentToFailureTransportStamp::class) !== null;
}
private function getOriginalReceiverName(Envelope $resultEnvelope): string
{
$senders = $this->sendersLocator->getSenders($resultEnvelope);
$aliases = [];
foreach ($senders as $alias => $sender) {
$aliases[] = $alias;
}
Assert::same(\count($aliases), 1);
return $aliases[0];
}
}
And here is a config with functional test that shows the message is sent to Failure Transport after an exception during dispatching:
# config/packages/test/messenger.yaml
framework:
messenger:
failure_transport: failed
buses:
messenger.bus.default:
middleware:
- Core\Messenger\Middleware\SendFailedAsyncMessageToFailureTransportMiddleware
transports:
failed:
dsn: 'in-memory:///'
unavailable_kafka:
dsn: 'unavailable-kafka://'
routing:
'Core\Tests\Fixtures\Message\DummyMessageToThrowException': unavailable_kafka
Transport:
final class UnavailableKafkaTransport implements TransportInterface
{
public function send(Envelope $envelope): Envelope
{
throw AsyncTransportIsNotAvailableException::withMessage('Kafka Fake Transport: Kafka is not available');
}
// ...
}
and the test
public function test_it_dispatches_message_to_failure_transport_in_case_of_async_queue_exceptions(): void
{
/** @var MessageBusInterface $messageBus */
$messageBus = static::$container->get(MessageBusInterface::class);
/** @var InMemoryTransport $failureTransport */
$failureTransport = static::$container->get('messenger.transport.failed');
// action
$messageBus->dispatch(new DummyMessageToThrowException());
/** @var Envelope[] $sentEnvelopesToFailureTransport */
$sentEnvelopesToFailureTransport = $failureTransport->get();
$envelope = $sentEnvelopesToFailureTransport[0];
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
self::assertCount(1, $sentEnvelopesToFailureTransport);
self::assertCount(1, $envelope->all(FailedToBeSentToAsyncQueue::class));
self::assertCount(1, $envelope->all(SentToFailureTransportStamp::class));
self::assertSame('Kafka Fake Transport: Kafka is not available', $redeliveryStamp->getExceptionMessage());
self::assertSame('failed', $redeliveryStamp->getSenderClassOrAlias());
self::assertSame('unavailable_kafka', $sentToFailureTransportStamp->getOriginalReceiverName());
}
Do you think it would be useful to add similar middleware to the core? Of course by configuring what exceptions to listen (for example as it's done here #35481):
middleware:
- send_dispatched_messages_to_failure_transport: [['App\Exception\AsyncTransportIsNotAvailableException', 'Another\Exception']]