From ce103f1d2cb77d4c4cf77a7e58a42f19507fb1f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Tobo=C5=82a?= Date: Sat, 31 Dec 2022 02:17:29 +0100 Subject: [PATCH] [Messenger] [Redis] Fixed problem where worker stops handling messages on first empty message --- .../Transport/RedisExtIntegrationTest.php | 28 +++++++++++++++++++ .../Bridge/Redis/Transport/RedisReceiver.php | 13 +++++++++ 2 files changed, 41 insertions(+) diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php index 1bfc79657ba9a..5a418b3204644 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php @@ -14,10 +14,14 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection; +use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisReceiver; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; /** * @requires extension redis + * * @group time-sensitive * @group integration */ @@ -318,6 +322,30 @@ public function testGetAfterReject() $redis->del('messenger-rejectthenget'); } + public function testItProperlyHandlesEmptyMessages() + { + $redisReceiver = new RedisReceiver($this->connection, new Serializer()); + + $this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]); + $this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]); + + $redisReceiver->get(); + $this->redis->xtrim('messages', 1); + + // The consumer died during handling a message while performing xtrim in parallel process + $this->redis = new \Redis(); + $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis); + $redisReceiver = new RedisReceiver($this->connection, new Serializer()); + + /** @var Envelope[] $envelope */ + $envelope = $redisReceiver->get(); + $this->assertCount(1, $envelope); + + $message = $envelope[0]->getMessage(); + $this->assertInstanceOf(DummyMessage::class, $message); + $this->assertEquals('Hi2', $message->getMessage()); + } + private function getConnectionGroup(Connection $connection): string { $property = (new \ReflectionClass(Connection::class))->getProperty('group'); diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php index 1a63d334db0dd..4b80aaa3f7d3b 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php @@ -14,6 +14,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -44,6 +45,18 @@ public function get(): iterable return []; } + if (null === $message['data']) { + try { + $this->connection->reject($message['id']); + } catch (TransportException $e) { + if ($e->getPrevious()) { + throw $e; + } + } + + return $this->get(); + } + $redisEnvelope = json_decode($message['data']['message'] ?? '', true); if (null === $redisEnvelope) {