Skip to content

[Messenger] Allow to close the transport connection #59862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHANGELOG
7.3
---

* Implement the `CloseableTransportInterface` to allow closing the transport
* Add new `queue_attributes` and `queue_tags` options for SQS queue creation

7.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
Expand All @@ -27,7 +28,7 @@
/**
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface, ResetInterface
{
private SerializerInterface $serializer;

Expand Down Expand Up @@ -91,6 +92,11 @@ public function reset(): void
}
}

public function close(): void
{
$this->reset();
}

private function getReceiver(): MessageCountAwareInterface&ReceiverInterface
{
return $this->receiver ??= new AmazonSqsReceiver($this->connection, $this->serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"php": ">=8.2",
"async-aws/core": "^1.7",
"async-aws/sqs": "^1.0|^2.0",
"symfony/messenger": "^7.2",
"symfony/messenger": "^7.3",
"symfony/service-contracts": "^2.5|^3",
"psr/log": "^1|^2|^3"
},
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHANGELOG
7.1
---

* Implement the `CloseableTransportInterface` to allow closing the AMQP connection
* Add option `delay[arguments]` in the transport definition

6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand All @@ -22,7 +23,7 @@
/**
* @author Nicolas Grekas <p@tchwork.com>
*/
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface
{
private SerializerInterface $serializer;
private AmqpReceiver $receiver;
Expand Down Expand Up @@ -70,6 +71,11 @@ public function getMessageCount(): int
return $this->getReceiver()->getMessageCount();
}

public function close(): void
{
$this->connection->clear();
}

private function getReceiver(): AmqpReceiver
{
return $this->receiver ??= new AmqpReceiver($this->connection, $this->serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ private function clearWhenDisconnected(): void
}
}

private function clear(): void
public function clear(): void
{
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
$this->amqpQueues = [];
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/Bridge/Amqp/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"require": {
"php": ">=8.2",
"ext-amqp": "*",
"symfony/messenger": "^6.4|^7.0"
"symfony/messenger": "^7.3"
},
"require-dev": {
"symfony/event-dispatcher": "^6.4|^7.0",
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHANGELOG
7.3
---

* Implement the `CloseableTransportInterface` to allow closing the Redis connection
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Redis that the job is still being processed, in order to avoid timeouts

6.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class Connection
'ssl' => null, // see https://php.net/context.ssl
];

private \Redis|Relay|\RedisCluster|\Closure $redis;
private \Redis|Relay|\RedisCluster|null $redis = null;
private \Closure $redisInitializer;
private string $stream;
private string $queue;
private string $group;
Expand Down Expand Up @@ -112,9 +113,9 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster|null $red

if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
$this->redisInitializer = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
} else {
$this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
$this->redisInitializer = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
if (null !== $sentinelMaster) {
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
$hostIndex = 0;
Expand Down Expand Up @@ -737,10 +738,15 @@ private function rawCommand(string $command, ...$arguments): mixed

private function getRedis(): \Redis|Relay|\RedisCluster
{
if ($this->redis instanceof \Closure) {
$this->redis = ($this->redis)();
if (!$this->redis) {
$this->redis = ($this->redisInitializer)();
}

return $this->redis;
}

public function close(): void
{
$this->redis = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand All @@ -23,7 +24,7 @@
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface
class RedisTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface
{
private SerializerInterface $serializer;
private RedisReceiver $receiver;
Expand Down Expand Up @@ -71,6 +72,11 @@ public function getMessageCount(): int
return $this->getReceiver()->getMessageCount();
}

public function close(): void
{
$this->connection->close();
}

private function getReceiver(): RedisReceiver
{
return $this->receiver ??= new RedisReceiver($this->connection, $this->serializer);
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/Bridge/Redis/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"require": {
"php": ">=8.2",
"ext-redis": "*",
"symfony/messenger": "^7.2"
"symfony/messenger": "^7.3"
},
"require-dev": {
"symfony/property-access": "^6.4|^7.0",
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHANGELOG
7.3
---

* Add `CloseableTransportInterface` to allow closing the transport
* Add `SentForRetryStamp` that identifies whether a failed message was sent for retry
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport;

interface CloseableTransportInterface
{
public function close(): void;
}
Loading