Skip to content

[Messenger] Proper message decoding error handling #50043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public function testGetAfterReject()

public function testItProperlyHandlesEmptyMessages()
{
$redisReceiver = new RedisReceiver($this->connection, new Serializer());
$redisReceiver = new RedisReceiver($this->connection, Serializer::create());

$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
Expand All @@ -352,7 +352,7 @@ public function testItProperlyHandlesEmptyMessages()
// The consumer died during handling a message while performing xtrim in parallel process
$this->redis = new \Redis();
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis);
$redisReceiver = new RedisReceiver($this->connection, new Serializer());
$redisReceiver = new RedisReceiver($this->connection, Serializer::create());

/** @var Envelope[] $envelope */
$envelope = $redisReceiver->get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\VarDumper\Caster\Caster;
use Symfony\Component\VarDumper\Caster\TraceStub;
use Symfony\Component\VarDumper\Cloner\ClonerInterface;
Expand All @@ -46,15 +45,12 @@ abstract class AbstractFailedMessagesCommand extends Command
protected const DEFAULT_TRANSPORT_OPTION = 'choose';

protected ServiceProviderInterface $failureTransports;
protected ?PhpSerializer $phpSerializer;

private ?string $globalFailureReceiverName;

public function __construct(?string $globalFailureReceiverName, ServiceProviderInterface $failureTransports, PhpSerializer $phpSerializer = null)
public function __construct(?string $globalFailureReceiverName, ServiceProviderInterface $failureTransports)
{
$this->failureTransports = $failureTransports;
$this->globalFailureReceiverName = $globalFailureReceiverName;
$this->phpSerializer = $phpSerializer;

parent::__construct();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ private function removeMessages(string $failureTransportName, array $ids, Receiv
}

foreach ($ids as $id) {
$this->phpSerializer?->acceptPhpIncompleteClass();
try {
$envelope = $receiver->find($id);
} finally {
$this->phpSerializer?->rejectPhpIncompleteClass();
}
$envelope = $receiver->find($id);

if (null === $envelope) {
$io->error(sprintf('The message with id "%s" was not found.', $id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Contracts\Service\ServiceProviderInterface;

Expand All @@ -42,13 +41,13 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
private MessageBusInterface $messageBus;
private ?LoggerInterface $logger;

public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, PhpSerializer $phpSerializer = null)
public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
{
$this->eventDispatcher = $eventDispatcher;
$this->messageBus = $messageBus;
$this->logger = $logger;

parent::__construct($globalReceiverName, $failureTransports, $phpSerializer);
parent::__construct($globalReceiverName, $failureTransports);
}

protected function configure(): void
Expand Down Expand Up @@ -135,14 +134,9 @@ private function runInteractive(string $failureTransportName, SymfonyStyle $io,
// handling the message
while (true) {
$envelopes = [];
$this->phpSerializer?->acceptPhpIncompleteClass();
try {
foreach ($receiver->all(1) as $envelope) {
++$count;
$envelopes[] = $envelope;
}
} finally {
$this->phpSerializer?->rejectPhpIncompleteClass();
foreach ($receiver->all(1) as $envelope) {
++$count;
$envelopes[] = $envelope;
}

// break the loop if all messages are consumed
Expand Down Expand Up @@ -212,12 +206,7 @@ private function retrySpecificIds(string $failureTransportName, array $ids, Symf
}

foreach ($ids as $id) {
$this->phpSerializer?->acceptPhpIncompleteClass();
try {
$envelope = $receiver->find($id);
} finally {
$this->phpSerializer?->rejectPhpIncompleteClass();
}
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,24 @@ private function listMessages(?string $failedTransportName, SymfonyStyle $io, in
$io->comment(sprintf('Displaying only \'%s\' messages', $classFilter));
}

$this->phpSerializer?->acceptPhpIncompleteClass();
try {
foreach ($envelopes as $envelope) {
$currentClassName = $envelope->getMessage()::class;

if ($classFilter && $classFilter !== $currentClassName) {
continue;
}

/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
/** @var ErrorDetailsStamp|null $lastErrorDetailsStamp */
$lastErrorDetailsStamp = $envelope->last(ErrorDetailsStamp::class);

$rows[] = [
$this->getMessageId($envelope),
$currentClassName,
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'),
$lastErrorDetailsStamp?->getExceptionMessage() ?? '',
];
foreach ($envelopes as $envelope) {
$currentClassName = $envelope->getMessage()::class;

if ($classFilter && $classFilter !== $currentClassName) {
continue;
}
} finally {
$this->phpSerializer?->rejectPhpIncompleteClass();

/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
/** @var ErrorDetailsStamp|null $lastErrorDetailsStamp */
$lastErrorDetailsStamp = $envelope->last(ErrorDetailsStamp::class);

$rows[] = [
$this->getMessageId($envelope),
$currentClassName,
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'),
$lastErrorDetailsStamp?->getExceptionMessage() ?? '',
];
}

$rowsCount = \count($rows);
Expand Down Expand Up @@ -148,19 +143,14 @@ private function listMessagesPerClass(?string $failedTransportName, SymfonyStyle

$countPerClass = [];

$this->phpSerializer?->acceptPhpIncompleteClass();
try {
foreach ($envelopes as $envelope) {
$c = $envelope->getMessage()::class;

if (!isset($countPerClass[$c])) {
$countPerClass[$c] = [$c, 0];
}
foreach ($envelopes as $envelope) {
$c = $envelope->getMessage()::class;

++$countPerClass[$c][1];
if (!isset($countPerClass[$c])) {
$countPerClass[$c] = [$c, 0];
}
} finally {
$this->phpSerializer?->rejectPhpIncompleteClass();

++$countPerClass[$c][1];
}

if (0 === \count($countPerClass)) {
Expand All @@ -176,12 +166,7 @@ private function showMessage(?string $failedTransportName, string $id, SymfonySt
{
/** @var ListableReceiverInterface $receiver */
$receiver = $this->getReceiver($failedTransportName);
$this->phpSerializer?->acceptPhpIncompleteClass();
try {
$envelope = $receiver->find($id);
} finally {
$this->phpSerializer?->rejectPhpIncompleteClass();
}
$envelope = $receiver->find($id);
if (null === $envelope) {
throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@
*/
class MessageDecodingFailedStamp implements StampInterface
{
public function __construct(private readonly string $message = '')
{
}

public function getMessage(): string
{
return $this->message;
}
}
11 changes: 9 additions & 2 deletions src/Symfony/Component/Messenger/Stamp/SerializedMessageStamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@

final class SerializedMessageStamp implements NonSendableStampInterface
{
public function __construct(private string $serializedMessage)
{
public function __construct(
private readonly string $serializedMessage,
private readonly string $messageType,
) {
}

public function getSerializedMessage(): string
{
return $this->serializedMessage;
}

public function getType(): string
{
return $this->messageType;
}
}
Loading