Skip to content

[Messenger] Adding failure transport support #30970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,10 @@ function ($a) {
->end()
->end()
->end()
->scalarNode('failure_transport')
->defaultNull()
->info('Transport name to send failed messages to (after all retries have failed).')
->end()
->scalarNode('default_bus')->defaultNull()->end()
->arrayNode('buses')
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\DependencyInjection\Alias;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Definition;
Expand Down Expand Up @@ -284,6 +284,9 @@ public function load(array $configs, ContainerBuilder $container)
$container->removeDefinition('console.command.messenger_debug');
$container->removeDefinition('console.command.messenger_stop_workers');
$container->removeDefinition('console.command.messenger_setup_transports');
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
}

$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
Expand Down Expand Up @@ -1743,22 +1746,48 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
}
$senders = [];

// make sure senderAliases contains all senders
foreach ($messageConfiguration['senders'] as $sender) {
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
if (!isset($senderAliases[$sender])) {
$senderAliases[$sender] = $sender;
}
}

$messageToSendersMapping[$message] = new IteratorArgument($senders);
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
}

$senderReferences = [];
foreach ($senderAliases as $alias => $serviceId) {
$senderReferences[$alias] = new Reference($serviceId);
}

$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, $messagesToSendAndHandle)
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
->replaceArgument(2, $messagesToSendAndHandle)
;

$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);

$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(1, $config['failure_transport']);

if ($config['failure_transport']) {
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport'])
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $config['failure_transport']);
} else {
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
}
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
25 changes: 25 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@
<tag name="console.command" command="messenger:stop-workers" />
</service>

<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->
<argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="event_dispatcher" />
<argument /> <!-- Retry strategy -->
<argument type="service" id="logger" />

<tag name="console.command" command="messenger:failed:retry" />
</service>

<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->

<tag name="console.command" command="messenger:failed:show" />
</service>

<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->

<tag name="console.command" command="messenger:failed:remove" />
</service>

<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

<!-- Asynchronous -->
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
<argument type="collection" /> <!-- Per message sender iterators -->
<argument type="collection" /> <!-- Per message senders map -->
<argument /> <!-- senders locator -->
<argument type="collection" /> <!-- Messages to send and handle -->
</service>
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
Expand Down Expand Up @@ -91,5 +92,19 @@
<argument /> <!-- multiplier -->
<argument /> <!-- max delay ms -->
</service>

<!-- failed handling -->
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="messenger.routable_message_bus" />
<argument /> <!-- Failure transport name -->
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<!-- routable message bus -->
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
<argument /> <!-- Message bus locator -->
</service>
</services>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class),
'routing' => [],
'transports' => [],
'failure_transport' => null,
'serializer' => [
'default_serializer' => 'messenger.transport.native_php_serializer',
'symfony_serializer' => [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,12 +721,16 @@ public function testMessengerRouting()
'*' => false,
];

$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
$sendersMapping = $senderLocatorDefinition->getArgument(0);
$this->assertEquals([
'amqp' => new Reference('messenger.transport.amqp'),
'audit' => new Reference('audit'),
], $sendersMapping[DummyMessage::class]->getValues());
'amqp',
'audit',
], $sendersMapping[DummyMessage::class]);
$sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1));
$this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0)));
$this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]);
$this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]);
}

public function testMessengerTransportConfiguration()
Expand Down
7 changes: 7 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ CHANGELOG
4.3.0
-----

* [BC BREAK] `SendersLocatorInterface` has an additional method:
`getSenderByAlias()`.
* A new `ListableReceiverInterface` was added, which a receiver
can implement (when applicable) to enable listing and fetching
individual messages by id (used in the new "Failed Messages" commands).
* Both `SenderInterface::send()` and `ReceiverInterface::get()`
should now (when applicable) add a `TransportMessageIdStamp`.
* Added `WorkerStoppedEvent` dispatched when a worker is stopped.
* Added optional `MessageCountAwareInterface` that receivers can implement
to give information about how many messages are waiting to be processed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @internal
* @experimental in 4.3
*/
abstract class AbstractFailedMessagesCommand extends Command
{
private $receiverName;
private $receiver;

public function __construct(string $receiverName, ReceiverInterface $receiver)
{
$this->receiverName = $receiverName;
$this->receiver = $receiver;

parent::__construct();
}

protected function getReceiverName()
{
return $this->receiverName;
}

/**
* @return mixed|null
*/
protected function getMessageId(Envelope $envelope)
{
/** @var TransportMessageIdStamp $stamp */
$stamp = $envelope->last(TransportMessageIdStamp::class);

return null !== $stamp ? $stamp->getId() : null;
}

protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
{
$io->title('Failed Message Details');

/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);

$rows = [
['Class', \get_class($envelope->getMessage())],
];

if (null !== $id = $this->getMessageId($envelope)) {
$rows[] = ['Message Id', $id];
}

if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$rows = array_merge($rows, [
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
}

$io->table([], $rows);

if ($io->isVeryVerbose()) {
$io->title('Message:');
$dump = new Dumper($io);
$io->writeln($dump($envelope->getMessage()));
$io->title('Exception:');
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
} else {
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
}
}

protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
{
if ($receiver instanceof MessageCountAwareInterface) {
if (1 === $receiver->getMessageCount()) {
$io->writeln('There is <comment>1</comment> message pending in the failure transport.');
} else {
$io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
}
}
}

protected function getReceiver(): ReceiverInterface
{
return $this->receiver;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Command;

use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:remove';

/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDefinition([
new InputArgument('id', InputArgument::REQUIRED, 'Specific message id to remove'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
])
->setDescription('Remove a message from the failure transport.')
->setHelp(<<<'EOF'
The <info>%command.name%</info> removes a message that is pending in the failure transport.

<info>php %command.full_name% {id}</info>

The specific id can be found via the messenger:failed:show command.
EOF
)
;
}

/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

$receiver = $this->getReceiver();

$shouldForce = $input->getOption('force');
$this->removeSingleMessage($input->getArgument('id'), $receiver, $io, $shouldForce);
}

private function removeSingleMessage($id, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
{
if (!$receiver instanceof ListableReceiverInterface) {
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName()));
}

$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id));
}
$this->displaySingleMessage($envelope, $io);

if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
$receiver->reject($envelope);

$io->success('Message removed.');
} else {
$io->note('Message not removed.');
}
}
}
Loading