Description
Symfony version(s) affected: 4.3.3
RabbitMQ version(s) affected: 3.7.17
Description
Symfony Messenger by default creates fanout exchange Messages
and all queues are receiving any messages published to the bus. In our project this is not acceptable. We thought that Messenger routing would be enough to place particular messages into corresponding queues. It was not.
To route some particular message into (and only to) configured queue, you need to configure Messenger to create a direct exchange, bind queues with routing key and finally to dispatch message with AmqpStamp()
with said routing key:
framework:
messenger:
transports:
auth_signup:
dsn: '%rabbitmq_connection%'
options:
exchange: { type: direct }
queues:
auth_signup:
binding_keys: [!php/const App\RoutingKeysDirect::AUTH_SIGNUP]
emails_transactional:
dsn: '%rabbitmq_connection%'
options:
exchange: { type: direct }
queues:
emails_transactional:
binding_keys: [!php/const App\RoutingKeysDirect::EMAILS_TRANSACTIONAL]
routing:
App\RegistrationFinishedMessage: auth_signup
App\EmailMessage: emails_transactional
With this setup we defined two queues bound to direct default exchange with routing key. We can inspect setup via management:
We can inspect binding of the queue in detail, we see it is really bound to exchange with our routing-key:
Now we dispatch message with routing-key like this:
$message = new App\RegistrationFinishedMessage('bla bla');
$messageRoutingKey = App\RoutingKeysDirect::AUTH_SIGNUP;
$bus->dispatch($message, [new AmqpStamp($messageRoutingKey)]);
Problem is, this setup will not correctly handle message retrying with selected retryPolicy. I step-debugged code today and found that for correct enqueue to delayed retry queue there must by routing-key present in form of AmqpStamp
in Envelope object.
This Stamp is however not serialized into message because it is implementing NonSendableStampInterface
. I don't know exact reasons behind this, but changing interface to StampInterface
will solve this problem!
However a new problem will now emerge - shoveling the message into failed transport after message reaches configured retryCount in retryPolicy (default is 3).
After that, message should be enqueued to "failed" transport if configured. In our setup (see above) it will fail with error
Server channel error: 406, message: PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'm
essages' in vhost '/': received 'fanout' but current is 'direct'
This is solvable pretty easily, again by defining exchange type for failed transport to direct
framework:
messenger:
failure_transport: failed
transports:
failed:
dsn: '%rabbitmq_connection%'
options:
exchange: { type: direct }
queues:
failed: ~
... other transports
However our message will still not be enqueued to failed transport after reaching retryCount. Thats because our previous fix (changing interface of AmqpStamp
) is now doing bad things when enqueuing failed message - message have routing-key attached to the Envelope which is not wanted!
To solve this second problem we must strip this Stamp from Envelope in SendFailedMessageToFailureTransportListener
. Code should look like this:
$envelope = $envelope->withoutAll(ReceivedStamp::class)
->withoutAll(TransportMessageIdStamp::class)
->withoutAll(AmqpStamp::class) // THIS LINE ADDED
->with(new SentToFailureTransportStamp($event->getReceiverName()))
->with(new DelayStamp(0))
->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException));
Now everything is working as expected!
To sumarize
- Symfony Messenger creates by default
fanout
exchange - that eliminates routing to specified queues
- to achieve above we must configure
direct
default exchange - now we need to attach
AmqpStamp($routingKey)
with message when dispatching - for messages in this setup retrying is not working
- it can be solved by changing interface of
AmpqStamp
fromNonSendableStampInterface
toStampInterface
- second problem will emerge with this setup - failed transport is not working when configured
- we must define
direct
exchange for failed transport as well - we must strip
AmqpStamp
from envelope in listener from message to be correctly enqueued to failed transport
Hope this bug report is explaining our problems in understable way.
Wish all the best.