From 5356023f6137abfcade91847fb56419f0a18c1c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Pillevesse?= Date: Wed, 15 Jan 2025 10:13:40 +0100 Subject: [PATCH] improve amqp connection issues --- .../Bridge/Amqp/Transport/AmqpReceiver.php | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index 3c855e9ee46ce..48706b6432755 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -92,10 +92,16 @@ public function ack(Envelope $envelope): void try { $stamp = $this->findAmqpStamp($envelope); - $this->connection->ack( - $stamp->getAmqpEnvelope(), - $stamp->getQueueName() - ); + $this->connection->ack($stamp->getAmqpEnvelope(), $stamp->getQueueName()); + } catch (\AMQPConnectionException) { + try { + $stamp = $this->findAmqpStamp($envelope); + + $this->connection->queue($stamp->getQueueName())->getConnection()->reconnect(); + $this->connection->ack($stamp->getAmqpEnvelope(), $stamp->getQueueName()); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } } catch (\AMQPException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } @@ -124,6 +130,13 @@ private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueNa { try { $this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM); + } catch (\AMQPConnectionException) { + try { + $this->connection->queue($queueName)->getConnection()->reconnect(); + $this->connection->nack($amqpEnvelope, $queueName, \AMQP_NOPARAM); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } } catch (\AMQPException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); }