Skip to content

[Messenger] Messages with AmqpStamp($routingKey) and direct RabbitMQ exchange are not retry-able #32994

Closed
@srigi

Description

@srigi

Symfony version(s) affected: 4.3.3
RabbitMQ version(s) affected: 3.7.17

Description
Symfony Messenger by default creates fanout exchange Messages and all queues are receiving any messages published to the bus. In our project this is not acceptable. We thought that Messenger routing would be enough to place particular messages into corresponding queues. It was not.

To route some particular message into (and only to) configured queue, you need to configure Messenger to create a direct exchange, bind queues with routing key and finally to dispatch message with AmqpStamp() with said routing key:

framework:
    messenger:
        transports:
            auth_signup:
                dsn: '%rabbitmq_connection%'
                options:
                    exchange: { type: direct }
                    queues:
                        auth_signup:
                            binding_keys: [!php/const App\RoutingKeysDirect::AUTH_SIGNUP]
            emails_transactional:
                dsn: '%rabbitmq_connection%'
                options:
                    exchange: { type: direct }
                    queues:
                        emails_transactional:
                            binding_keys: [!php/const App\RoutingKeysDirect::EMAILS_TRANSACTIONAL]

        routing:
            App\RegistrationFinishedMessage: auth_signup
            App\EmailMessage: emails_transactional

With this setup we defined two queues bound to direct default exchange with routing key. We can inspect setup via management:

exchanges

queues

We can inspect binding of the queue in detail, we see it is really bound to exchange with our routing-key:

sm_qas

Now we dispatch message with routing-key like this:

$message = new App\RegistrationFinishedMessage('bla bla');
$messageRoutingKey = App\RoutingKeysDirect::AUTH_SIGNUP;

$bus->dispatch($message, [new AmqpStamp($messageRoutingKey)]);

Problem is, this setup will not correctly handle message retrying with selected retryPolicy. I step-debugged code today and found that for correct enqueue to delayed retry queue there must by routing-key present in form of AmqpStamp in Envelope object.

This Stamp is however not serialized into message because it is implementing NonSendableStampInterface. I don't know exact reasons behind this, but changing interface to StampInterface will solve this problem!

However a new problem will now emerge - shoveling the message into failed transport after message reaches configured retryCount in retryPolicy (default is 3).
After that, message should be enqueued to "failed" transport if configured. In our setup (see above) it will fail with error

Server channel error: 406, message: PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'm
  essages' in vhost '/': received 'fanout' but current is 'direct'

This is solvable pretty easily, again by defining exchange type for failed transport to direct

framework:
    messenger:
        failure_transport: failed
        transports:
            failed:
                dsn: '%rabbitmq_connection%'
                options:
                    exchange: { type: direct }
                    queues:
                        failed: ~

            ... other transports

However our message will still not be enqueued to failed transport after reaching retryCount. Thats because our previous fix (changing interface of AmqpStamp) is now doing bad things when enqueuing failed message - message have routing-key attached to the Envelope which is not wanted!

To solve this second problem we must strip this Stamp from Envelope in SendFailedMessageToFailureTransportListener. Code should look like this:

        $envelope = $envelope->withoutAll(ReceivedStamp::class)
            ->withoutAll(TransportMessageIdStamp::class)
            ->withoutAll(AmqpStamp::class)  // THIS LINE ADDED
            ->with(new SentToFailureTransportStamp($event->getReceiverName()))
            ->with(new DelayStamp(0))
            ->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException));

Now everything is working as expected!

To sumarize

  • Symfony Messenger creates by default fanout exchange
  • that eliminates routing to specified queues
  • to achieve above we must configure direct default exchange
  • now we need to attach AmqpStamp($routingKey) with message when dispatching
  • for messages in this setup retrying is not working
  • it can be solved by changing interface of AmpqStamp from NonSendableStampInterface to StampInterface
  • second problem will emerge with this setup - failed transport is not working when configured
  • we must define direct exchange for failed transport as well
  • we must strip AmqpStamp from envelope in listener from message to be correctly enqueued to failed transport

Hope this bug report is explaining our problems in understable way.
Wish all the best.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions