Skip to content

[Messenger] Dispatching the message inside a handler causes a socket error sometimes #42825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vuras opened this issue Sep 1, 2021 · 7 comments

Comments

@vuras
Copy link

vuras commented Sep 1, 2021

Symfony version(s) affected: 5.3.6

Description
Hello,

I have a system that needs to dispatch message after handling another message and sometimes not every time I get an error: Library error: a socket error occurred when messenger is trying to dispatch it. After throwing the exception message that was handled is being marked as failed and is retried which is spamming my database because everything was handled successfully up until the messenger trying to dispatch a message after successfully handling it.
I tried to play with AMQP options but with no success. Is it a bug or am I missing something? Maybe someone has any suggestions on how to workaround it?

How to reproduce
Dispatch a message inside a handler.

@vuras vuras added the Bug label Sep 1, 2021
@vuras vuras changed the title Dispatching the message inside a handler causes a socket error sometimes [Messenger] Dispatching the message inside a handler causes a socket error sometimes Sep 1, 2021
@Tobion
Copy link
Contributor

Tobion commented Sep 2, 2021

See #32357. don't think there is a good solution that works in every case.

@brzuchal
Copy link
Contributor

brzuchal commented Nov 25, 2021

@Tobion I've spent all my day investigating the issue, and I think there is a real bug at line https://github.com/symfony/symfony/blob/5.4/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php#L555

A small test script shows that in case of using AmqpTransport as a sender on the environment with more than one Transport (whatever kind it is) for pulling but the AmqpTransport INSTANCE only occasionally publishing may cause a connection to be dropped silently by a broker which on above-mentioned line gives false positive.
A solution might be to call the AMQPConnection::(p)?reconnect method and instead of checking isConnect on a channel (which still gives a false positive after reconnecting) check the isConnect on the channel's connection!! (IMPORTANT) which should then give false.

This affects many Symfony versions I guess. Please ping me for more details if you have any doubts.


Ofc executing AMQPConnection::(p)?reconnect is an additional cost but I guess this has to be concerned when the transport is used only occasionally in senders mode. There could be a calculation of heartbeat considered or ask for help the maintainers of php-amqp as there are also some issues similar php-amqp/php-amqp#305

Hope I'm on the right track.

@brzuchal
Copy link
Contributor

brzuchal commented Nov 25, 2021

For me this worked in Connection class:

    private function clearWhenDisconnected(): void
    {
        $connection = $this->channel()->getConnection();
        if ($this->amqpChannel) {
            if ($connection->isPersistent()) {
                $connection->preconnect();
            } else {
                $connection->reconnect();
            }
            $this->amqpChannel = null;
            $this->amqpQueues = [];
            $this->amqpExchange = null;
            $this->amqpDelayExchange = null;

            return;
        }

        if (!($connection->isConnected())) {
            $this->amqpChannel = null;
            $this->amqpQueues = [];
            $this->amqpExchange = null;
            $this->amqpDelayExchange = null;
        }
    }

Note! I'm aware this is an ugly hack, presenting it here only to show that it might work to reconnect especially when only the sender is used.

@nikophil
Copy link
Contributor

nikophil commented Dec 2, 2021

nice to see this fix
on our side we've fixed by forcing the connection to disconnect when a TransportException occurs in AMQPTransport

    public function send(Envelope $envelope): Envelope
    {
        try {
            return $this->decorated->send($envelope);
        } catch (TransportException $e) {
            /** @var RetryAfterAMQPExceptionStamp|null $stamp */
            $stamp = $envelope->last(RetryAfterAMQPExceptionStamp::class);

            if (null === $stamp) {
                $stamp = new RetryAfterAMQPExceptionStamp();
                $envelope = $envelope->with($stamp);
            } else {
                $stamp->increaseRetryAttempts();
            }

            if ($stamp->getRetryAttempts() < 3) {
                /*
                 * We only disconnect: the Connection object will be cleaned and AmqpConnection reconnected afterwards
                 * @see \Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection::clearWhenDisconnected()
                 */
                $this->connection->channel()->getConnection()->disconnect();

                return $this->send($envelope);
            }

            throw $e;
        }
    }

but I think @brzuchal is cleaner

@klewi1983
Copy link

For me this worked in Connection class:

    private function clearWhenDisconnected(): void
    {
        $connection = $this->channel()->getConnection();
        if ($this->amqpChannel) {
            if ($connection->isPersistent()) {
                $connection->preconnect();
            } else {
                $connection->reconnect();
            }
            $this->amqpChannel = null;
            $this->amqpQueues = [];
            $this->amqpExchange = null;
            $this->amqpDelayExchange = null;

            return;
        }

        if (!($connection->isConnected())) {
            $this->amqpChannel = null;
            $this->amqpQueues = [];
            $this->amqpExchange = null;
            $this->amqpDelayExchange = null;
        }
    }

Note! I'm aware this is an ugly hack, presenting it here only to show that it might work to reconnect especially when only the sender is used.

@brzuchal for me, your code works on symfony 5.4 too. Can you do a pull request for this code?

@brzuchal
Copy link
Contributor

brzuchal commented Mar 5, 2022

For me this worked in Connection class:

    private function clearWhenDisconnected(): void
    {
        $connection = $this->channel()->getConnection();
        if ($this->amqpChannel) {
            if ($connection->isPersistent()) {
                $connection->preconnect();
            } else {
                $connection->reconnect();
            }
            $this->amqpChannel = null;
            $this->amqpQueues = [];
            $this->amqpExchange = null;
            $this->amqpDelayExchange = null;

            return;
        }

        if (!($connection->isConnected())) {
            $this->amqpChannel = null;
            $this->amqpQueues = [];
            $this->amqpExchange = null;
            $this->amqpDelayExchange = null;
        }
    }

Note! I'm aware this is an ugly hack, presenting it here only to show that it might work to reconnect especially when only the sender is used.

@brzuchal for me, your code works on symfony 5.4 too. Can you do a pull request for this code?

I'll see if I find the time to od so next week.

@dmitryuk
Copy link
Contributor

Any updates on issue? I got the same bug

@fabpot fabpot closed this as completed Oct 15, 2022
fabpot added a commit that referenced this issue Oct 15, 2022
This PR was merged into the 5.4 branch.

Discussion
----------

[Messenger] Fix amqp socket lost

| Q             | A
| ------------- | ---
| Branch?       | 5.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       | #42825
| License       | MIT
| Doc PR        | N/A

I'm taking the work done from someone else and and adding the remarks mentionned because there is no activity on this PR

PR: #47574

Network can fail in many ways, sometimes pretty subtle.

To avoid error messages "Library error: a socket error occurred", we detect when heartbeat as expired, and disconnect the chanmel connection, and force a reconnect.

Here a explanation to justify the condition :
https://www.rabbitmq.com/heartbeats.html#:~:text=Heartbeat%20frames%20are%20sent%20about,TCP%20connection%20will%20be%20closed.

Commits
-------

bdeef74 [Messenger] Fix amqp socket lost
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants