Skip to content

[Messenger] Reconnect after connection was closed #36538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
msubotovic opened this issue Apr 23, 2020 · 18 comments · Fixed by #54167
Closed

[Messenger] Reconnect after connection was closed #36538

msubotovic opened this issue Apr 23, 2020 · 18 comments · Fixed by #54167

Comments

@msubotovic
Copy link

Hi, i'm getting a lot of 'Library error: connection closed unexpectedly' errors when using RabbitMQ cluster with HAProxy in front.
The problem appears when handler is dispatching messages to another handler. The handler opens connection to dispatch a message, but if it stays iddle for
50 sec., this connection is automatically closed.
Is it possible to renew the connection every time the handler is dispatching a message?

@msubotovic msubotovic changed the title Reconnect after connection was closed Messenger | Reconnect after connection was closed Apr 23, 2020
@msubotovic msubotovic changed the title Messenger | Reconnect after connection was closed [Messenger] Reconnect after connection was closed Apr 23, 2020
@Nek-
Copy link
Contributor

Nek- commented Oct 29, 2020

Hello,

First, you may configure your HA Proxy another way to make it work. But there's also some things that are not documented at all in the Symfony documentation: you can add options to your amqp connection. And here is a list of options:

https://github.com/php-amqp/php-amqp/blob/96cd5cb5eddd3db2faaa3643dad2fe4677d7c438/stubs/AMQPConnection.php#L27-L53

You may be interested in the heartbeat option :) .

This is related to : symfony/symfony-docs#11792

@jkobus
Copy link
Contributor

jkobus commented Nov 3, 2020

Messenger's AmqpReceiver does not use blocking calls, therefore heartbeat option is useless.
See https://github.com/symfony/amqp-messenger/blob/5.x/Transport/AmqpReceiver.php#L52

@mleczakm
Copy link
Contributor

mleczakm commented Dec 29, 2020

I'm hitting exactly same problem, but during publishing messages. Probably ability to reset channel manually would allow to fix it, I'm gonna check it. In fact, whole https://github.com/symfony/amqp-messenger/blob/a7f681b022f9cfb88febc12face05b12a47232c1/Transport/Connection.php#L551 clearWhenDisconnected() method is useless, since checking isConnected() will always return true.

@carsonbot
Copy link

Thank you for this issue.
There has not been a lot of activity here for a while. Has this been resolved?

@carsonbot
Copy link

Could I get a reply or should I close this?

@jkobus
Copy link
Contributor

jkobus commented Jul 16, 2021

@mleczakm were you able to solve this issue somehow?

@carsonbot carsonbot removed the Stalled label Jul 16, 2021
@ghost
Copy link

ghost commented Jul 16, 2021

@jkobus I've overridden transport IIRC and introduce custom callback to be called on every message handled and reset the connection. It is definitely not the fastest way to handle it. It probably should be handled otherwise (on message loop events, time-based like worker soft-kill) and still clearWhenDisconnected() does nothing :(

@jkobus
Copy link
Contributor

jkobus commented Dec 10, 2021

IMHO the solution would be to catch this exception (and other, connection related errors like this one) inside the \Symfony\Component\Messenger\Transport\AmqpExt\Connection and use internal flag (for example, (bool) reconnect) and not to rely only on AmqpChannel::isConnected.

One could then catch this exception and decide whether to retry the operation. During retry, clearWhenDisconnected() would check whether reconnect is true aside from what AMQPChannel::isConnected says and if it is, it would create a new connection.

The one thing that worries me is that some connection problems are thrown as \AMQPException rather than \AMQPConnectionException.

Guys, let me know whether this would make sense?

@carsonbot
Copy link

Thank you for this issue.
There has not been a lot of activity here for a while. Has this been resolved?

@carsonbot
Copy link

Hello? This issue is about to be closed if nobody replies.

@Kravalg
Copy link

Kravalg commented Jun 26, 2022

Issue is still not solved

@carsonbot carsonbot removed the Stalled label Jun 26, 2022
@jkobus
Copy link
Contributor

jkobus commented Oct 18, 2022

My workaround, if someone finds it helpful:
https://github.com/jkobus/symfony-messenger-fix

@mleczakm
Copy link
Contributor

@jkobus looks great and simple! WDYT about preparing pull request?

@carsonbot
Copy link

Thank you for this issue.
There has not been a lot of activity here for a while. Has this been resolved?

@Nek-
Copy link
Contributor

Nek- commented May 16, 2023

I don't think this is fixed yet.

@carsonbot carsonbot removed the Stalled label May 16, 2023
@carsonbot
Copy link

Thank you for this issue.
There has not been a lot of activity here for a while. Has this been resolved?

@mleczakm
Copy link
Contributor

I don't think this is fixed yet.

@carsonbot carsonbot removed the Stalled label Nov 17, 2023
chalasr added a commit that referenced this issue Mar 5, 2024
…lishing a message. (jwage)

This PR was squashed before being merged into the 6.4 branch.

Discussion
----------

[Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.

| Q             | A
| ------------- | ---
| Branch?       | 6.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Issues        | Fix #36538 Fix #48241
| License       | MIT

If you have a message handler that dispatches messages to another queue, you can encounter `AMQPConnectionException` with the message "Library error: a SSL error occurred" or "a socket error occurred"  depending on if you are using tls or not or if you are running behind a load balancer or not.

You can manually reproduce this issue by dispatching a message where the handler then dispatches another message to a different queue, then go to rabbitmq admin and close the connection manually, then dispatch another message and when the message handler goes to dispatch the other message, you will get this exception:

```
a socket error occurred
#0 /vagrant/vendor/symfony/amqp-messenger/Transport/AmqpTransport.php(60): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender->send()
#1 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(62): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransport->send()
#2 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle()
#3 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(61): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle()
#4 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle()
#5 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle()
#6 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle()
#7 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle()
#8 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch()
#9 /vagrant/src/Messenger/MessageBus.php(37): Symfony\Component\Messenger\TraceableMessageBus->dispatch()
#10 /vagrant/vendor/symfony/mailer/Mailer.php(66): App\Messenger\MessageBus->dispatch()
#11 /vagrant/src/Mailer/Mailer.php(83): Symfony\Component\Mailer\Mailer->send()
#12 /vagrant/src/Mailer/Mailer.php(96): App\Mailer\Mailer->send()
#13 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(118): App\Mailer\Mailer->sendEmail()
#14 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(72): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->handle()
#15 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(152): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->__invoke()
#16 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(91): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->callHandler()
#17 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(71): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->handle()
#18 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle()
#19 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(68): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle()
#20 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle()
#21 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle()
#22 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle()
#23 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle()
#24 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch()
#25 /vagrant/vendor/symfony/messenger/RoutableMessageBus.php(54): Symfony\Component\Messenger\TraceableMessageBus->dispatch()
#26 /vagrant/vendor/symfony/messenger/Worker.php(162): Symfony\Component\Messenger\RoutableMessageBus->dispatch()
#27 /vagrant/vendor/symfony/messenger/Worker.php(109): Symfony\Component\Messenger\Worker->handleMessage()
#28 /vagrant/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php(238): Symfony\Component\Messenger\Worker->run()
#29 /vagrant/vendor/symfony/console/Command/Command.php(326): Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute()
#30 /vagrant/vendor/symfony/console/Application.php(1096): Symfony\Component\Console\Command\Command->run()
#31 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(126): Symfony\Component\Console\Application->doRunCommand()
#32 /vagrant/vendor/symfony/console/Application.php(324): Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand()
#33 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(80): Symfony\Component\Console\Application->doRun()
#34 /vagrant/vendor/symfony/console/Application.php(175): Symfony\Bundle\FrameworkBundle\Console\Application->doRun()
#35 /vagrant/vendor/symfony/runtime/Runner/Symfony/ConsoleApplicationRunner.php(49): Symfony\Component\Console\Application->run()
#36 /vagrant/vendor/autoload_runtime.php(29): Symfony\Component\Runtime\Runner\Symfony\ConsoleApplicationRunner->run()
#37 /vagrant/bin/console(11): require_once('...')
#38 {main}
```

TODO:

- [x] Add test for retry logic when publishing messages

Commits
-------

f123370 [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.
@chalasr chalasr closed this as completed Mar 5, 2024
@Nyholm
Copy link
Member

Nyholm commented Mar 6, 2024

Wohoo. So @jwage made a fix for this. Please try it out and report issues/success.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants