Description
Symfony version(s) affected
6.x, 5.x
Description
Everytime a message is retried, the DoctrineSender adds another TransportMessageIdStamp
with the new id to the envelope. This id (the latest id) is needed for the bin/console messenger:failed:show
command.
If a message is retried many times, the new id is appended as a new stamp and the stamps get more and more, which leads to a bigger and bigger serialized envelope in the database. Ultimately this can lead to very big binlogs in MySQL.
This is especially noticeable, when you need to check a condition every second and therefore use the Messenger component. If that condition is not yet met, the message is retried by throwing a RecoverableMessageHandlingException
inside a transport with a retry multiplier of 1 (see "How to reproduce").
How to reproduce
Example uses Symfony 6.2:
# config/packages/messenger.yaml
framework:
messenger:
transports:
check:
dsn: 'doctrine://default?queue_name=check'
retry_strategy:
delay: 1000
multiplier: 1
# src/Message/CheckMessage.php
namespace App\Message;
class CheckMessage
{
public function __construct()
{
}
}
# src/Handler/CheckHandler.php
namespace App\Handler;
use App\Message\CheckMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
#[AsMessageHandler]
class CheckHandler
{
public function __invoke(CheckMessage $message): never
{
throw new RecoverableMessageHandlingException();
}
}
# src/Command/QueueCommand.php
namespace App\Command;
use App\Message\CheckMessage;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsCommand(name: 'app:check:queue')]
class QueueCommand extends Command
{
public function __construct(
private readonly MessageBusInterface $bus,
)
{
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->bus->dispatch(new CheckMessage());
return Command::SUCCESS;
}
}
- Insert a message into the queue:
bin/console app:check:queue
- Handle this message 100 times:
bin/console messenger:consume check -l 100
- Open the
body
of the message from themessenger_messages
table. You see 100TransportMessageIdStamp
s:
(...) s:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\";a:100:{i:0;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311314;}i:1;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311315;}i:2;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311316;}i:3;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311317;}i:4;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311318;}i:5;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311319;}i:6;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311320;}i:7;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311321;}i:8;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311322;}i:9;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311323;}i:10;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311324;}i:11;O:57:\"Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\":1:{s:61:\"\0Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp\0id\";i:13311325;}(...)
Possible Solution
As far as I can see, the id from the TransportMessageIdStamp
is only used for the messenger:failed:show/retry/remove commands to be able to show/retry/remove specific messages.
For this command, the DoctrineReceiver
is used and always adds its own TransportMessageIdStamp
with the current id to the envelope. The stamp from the received message from the database is not used at all.
It should be enough to only keep the latest TransportMessageIdStamp
and remove all previous TransportMessageIdStamp
s inside the DoctrineSender
and DoctrineReceiver
.
Additional Context
No response