From 93235c826c6c4e8ddc2b5da069ac18fa9cf4fb65 Mon Sep 17 00:00:00 2001 From: Miquel Fontana Date: Mon, 28 Apr 2025 11:56:56 +0200 Subject: [PATCH 1/6] issues/57867 delayed queue for days to allow quorum queues with high expire --- .../Bridge/Amqp/Transport/Connection.php | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 65650cedd72e2..0c3a75c9e18db 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -79,6 +79,7 @@ class Connection 'flags', 'arguments', ]; + private const BASE_EXPIRATION = 24 * 60 * 60 * 1000; private AmqpFactory $amqpFactory; private mixed $autoSetupExchange; @@ -143,6 +144,11 @@ public function __construct( * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%") * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays") * * arguments: array of extra delay queue arguments (for example: ['x-queue-type' => 'classic', 'x-message-deduplication' => true,]) + * * daily_delay_queues: When true, delay queues will be created with names including the current date + * (e.g., 'delay_queue_2024_03_21'). These queues are automatically deleted by RabbitMQ after they + * expire (x-expires argument), the x-expires argument is set to 24 hours (24 * 60 * 60 * 1000) plus delay. This is useful for quorum queues. + * because quorum queues do not redeclare expire time. + * (Default: false) * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) * * * Connection tuning options (see http://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.tune for details): @@ -389,11 +395,14 @@ private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetry $queue = $this->amqpFactory->createQueue($this->channel()); $queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt)); $queue->setFlags(\AMQP_DURABLE); + $queueExpirationBase = ($this->connectionOptions['delay']['daily_delay_queues']?? false) ? + self::BASE_EXPIRATION : 0; $queue->setArguments(array_merge([ 'x-message-ttl' => $delay, - // delete the delay queue 10 seconds after the message expires - // publishing another message redeclares the queue which renews the lease - 'x-expires' => $delay + 10000, + // The delay queue will be automatically deleted by RabbitMQ after being empty for 10 seconds (x-expires). + // Each time a new message is published, the queue is redeclared for classic queues, which resets the expiry timer. + // For quorum queues, redeclaration is not allowed, so using daily_delay_queues=true is recommended to manage cleanup. + 'x-expires' => $queueExpirationBase + $delay + 10000, // message should be broadcast to all consumers during delay, but to only one queue during retry // empty name is default direct exchange 'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'], @@ -408,12 +417,13 @@ private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetry private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string { $action = $isRetryAttempt ? '_retry' : '_delay'; + $date = ($this->connectionOptions['delay']['daily_delay_queues']?? false) ? '_' . (new \DateTimeImmutable())->format('Y-m-d') : ''; return str_replace( - ['%delay%', '%exchange_name%', '%routing_key%'], - [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], - $this->connectionOptions['delay']['queue_name_pattern'] - ).$action; + ['%delay%', '%exchange_name%', '%routing_key%'], + [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], + $this->connectionOptions['delay']['queue_name_pattern'] + ).$action.$date; } /** From 846d52c77cc18ad5d73e75c08e41e522f39a5f19 Mon Sep 17 00:00:00 2001 From: Miquel Fontana Date: Mon, 28 Apr 2025 12:01:06 +0200 Subject: [PATCH 2/6] issues/57867 improve commend message --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 0c3a75c9e18db..5c97c3493b469 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -399,9 +399,10 @@ private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetry self::BASE_EXPIRATION : 0; $queue->setArguments(array_merge([ 'x-message-ttl' => $delay, - // The delay queue will be automatically deleted by RabbitMQ after being empty for 10 seconds (x-expires). - // Each time a new message is published, the queue is redeclared for classic queues, which resets the expiry timer. + // delete the delay queue 10 seconds after the message expires + // publishing another message redeclares the queue which renews the lease // For quorum queues, redeclaration is not allowed, so using daily_delay_queues=true is recommended to manage cleanup. + // It will create a new queue for each day, with x-expires set to 24 hours (24 * 60 * 60 * 1000) plus delay. 'x-expires' => $queueExpirationBase + $delay + 10000, // message should be broadcast to all consumers during delay, but to only one queue during retry // empty name is default direct exchange From 181a108a9e6016a3f87ec1fdc00c8f43ca72c723 Mon Sep 17 00:00:00 2001 From: Miquel Fontana Date: Tue, 29 Apr 2025 09:49:22 +0200 Subject: [PATCH 3/6] issues/57867 improve commend message --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 5c97c3493b469..3428ded8b00bb 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -145,7 +145,7 @@ public function __construct( * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays") * * arguments: array of extra delay queue arguments (for example: ['x-queue-type' => 'classic', 'x-message-deduplication' => true,]) * * daily_delay_queues: When true, delay queues will be created with names including the current date - * (e.g., 'delay_queue_2024_03_21'). These queues are automatically deleted by RabbitMQ after they + * (e.g., '%queue_name_pattern%_%current_date%'). These queues are automatically deleted by RabbitMQ after they * expire (x-expires argument), the x-expires argument is set to 24 hours (24 * 60 * 60 * 1000) plus delay. This is useful for quorum queues. * because quorum queues do not redeclare expire time. * (Default: false) From 8fa00f62b7c1db589506789fb245694a2dd70652 Mon Sep 17 00:00:00 2001 From: Miquel Fontana Date: Tue, 29 Apr 2025 10:34:16 +0200 Subject: [PATCH 4/6] issues/57867 add tests --- .../Amqp/Tests/Transport/ConnectionTest.php | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index f61c14cab0663..a4e3b2b3f8c80 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -507,6 +507,22 @@ public function testItDelaysTheMessage() $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); } + public function testItDelaysTheMessageWithDailyDelayQueues() + { + $delayExchange = $this->createMock(\AMQPExchange::class); + $date = (new \DateTimeImmutable())->format('Y-m-d'); + $delayExchange->expects($this->once()) + ->method('publish') + ->with('{}', "delay_messages__5000_delay_$date", \AMQP_NOPARAM, [ + 'headers' => ['x-some-headers' => 'foo'], + 'delivery_mode' => 2, + 'timestamp' => time(), + ]); + $connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, "delay_messages__5000_delay_$date", true); + + $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); + } + public function testItRetriesTheMessage() { $delayExchange = $this->createMock(\AMQPExchange::class); @@ -520,6 +536,20 @@ public function testItRetriesTheMessage() $connection->publish('{}', [], 5000, $amqpStamp); } + public function testItRetriesTheMessageWithDailyDelayQueues() + { + $delayExchange = $this->createMock(\AMQPExchange::class); + $date = (new \DateTimeImmutable())->format('Y-m-d'); + $delayExchange->expects($this->once()) + ->method('publish') + ->with('{}', "delay_messages__5000_retry_$date", \AMQP_NOPARAM); + $connection = $this->createDelayOrRetryConnection($delayExchange, '', "delay_messages__5000_retry_$date", true); + + $amqpEnvelope = $this->createMock(\AMQPEnvelope::class); + $amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, ''); + $connection->publish('{}', [], 5000, $amqpStamp); + } + public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs() { $amqpConnection = $this->createMock(\AMQPConnection::class); @@ -849,7 +879,7 @@ public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown( $connection->publish('body'); } - private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection + private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName, bool $dailyDelayQueues = false): Connection { $amqpConnection = $this->createMock(\AMQPConnection::class); $amqpChannel = $this->createMock(\AMQPChannel::class); @@ -861,19 +891,20 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri $delayQueue = $this->createMock(\AMQPQueue::class); $factory->method('createQueue')->willReturn($this->createMock(\AMQPQueue::class), $delayQueue); $factory->method('createExchange')->willReturn($this->createMock(\AMQPExchange::class), $delayExchange); - + $baseExpire = $dailyDelayQueues ? 86400 * 1000 : 0; $delayQueue->expects($this->once())->method('setName')->with($delayQueueName); $delayQueue->expects($this->once())->method('setArguments')->with([ 'x-message-ttl' => 5000, - 'x-expires' => 5000 + 10000, + 'x-expires' => 5000 + 10000 + $baseExpire, 'x-dead-letter-exchange' => $deadLetterExchangeName, 'x-dead-letter-routing-key' => '', ]); $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', $delayQueueName); + $options = $dailyDelayQueues ? ['delay' => ['daily_delay_queues' => true]] : []; - return Connection::fromDsn('amqp://localhost', [], $factory); + return Connection::fromDsn('amqp://localhost', $options, $factory); } } From a0c066048f6e7963f5d45615da09d69d64acc02b Mon Sep 17 00:00:00 2001 From: Miquel Fontana Date: Tue, 29 Apr 2025 12:33:18 +0200 Subject: [PATCH 5/6] issues/57867 fix coding standard issues --- .../Messenger/Bridge/Amqp/Transport/Connection.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 3428ded8b00bb..477b4ee4de6cd 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -395,7 +395,7 @@ private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetry $queue = $this->amqpFactory->createQueue($this->channel()); $queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt)); $queue->setFlags(\AMQP_DURABLE); - $queueExpirationBase = ($this->connectionOptions['delay']['daily_delay_queues']?? false) ? + $queueExpirationBase = ($this->connectionOptions['delay']['daily_delay_queues'] ?? false) ? self::BASE_EXPIRATION : 0; $queue->setArguments(array_merge([ 'x-message-ttl' => $delay, @@ -418,13 +418,13 @@ private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetry private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string { $action = $isRetryAttempt ? '_retry' : '_delay'; - $date = ($this->connectionOptions['delay']['daily_delay_queues']?? false) ? '_' . (new \DateTimeImmutable())->format('Y-m-d') : ''; + $date = ($this->connectionOptions['delay']['daily_delay_queues'] ?? false) ? '_' . (new \DateTimeImmutable())->format('Y-m-d') : ''; return str_replace( - ['%delay%', '%exchange_name%', '%routing_key%'], - [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], - $this->connectionOptions['delay']['queue_name_pattern'] - ).$action.$date; + ['%delay%', '%exchange_name%', '%routing_key%'], + [$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], + $this->connectionOptions['delay']['queue_name_pattern'] + ).$action.$date; } /** From 8482fe9b0968b822c8220ad9973c86eb9a36050c Mon Sep 17 00:00:00 2001 From: Miquel Fontana Date: Tue, 29 Apr 2025 12:34:29 +0200 Subject: [PATCH 6/6] issues/57867 fix coding standard issues --- .../Component/Messenger/Bridge/Amqp/Transport/Connection.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 477b4ee4de6cd..30cdd557dcd67 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -418,7 +418,7 @@ private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetry private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string { $action = $isRetryAttempt ? '_retry' : '_delay'; - $date = ($this->connectionOptions['delay']['daily_delay_queues'] ?? false) ? '_' . (new \DateTimeImmutable())->format('Y-m-d') : ''; + $date = ($this->connectionOptions['delay']['daily_delay_queues'] ?? false) ? '_'.(new \DateTimeImmutable())->format('Y-m-d') : ''; return str_replace( ['%delay%', '%exchange_name%', '%routing_key%'],