Skip to content

[Messenger][Redis] Worker stops handling messages on first empty message #48166

Closed
@jvmanji

Description

@jvmanji

Symfony version(s) affected

5.4, 6.* and probably others

Description

We're using messenger with redis transport in one of our projects. On one of our staging environments, we trimmed one queue (probably in parallel, the consumer of that queue failed). This led us to a strange situation, having a message in XPENDING while not having it in the corresponding stream. As only IDs are being stored in XPENDING, while consuming, messenger can't decode the body. The whole problem is of course our fault and most probably should not happen, anyway the way messenger behaves is weird – it is returning an empty array, like there are no messages in the queue, while not XACK-ing that message, so it hangs in a strange loop. In my opinion, it should either XACK that message or throw an exception with enough information so that someone using the library can handle it manually. It took us a long time to discover what was going on, while messenger was hung in a strange situation, not consuming messages from the queue – because there are no errors being logged anywhere.

How to reproduce

I don't have a quick way to reproduce, but the problem lies in RedisReceiver.php:

    public function get(): iterable
    {
        $message = $this->connection->get();

        if (null === $message) {
            return [];
        }

        $redisEnvelope = json_decode($message['data']['message'] ?? '', true);

        if (null === $redisEnvelope) {
            return [];
        }

In my opinion, when $redisEnvelope is null, the library should either throw an exception with message ID or ACK this message, so it won't be handled any more.

Possible Solution

No response

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions