From a7effdc547015125952e4161cfe1a8fb2837a9f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20Lengemann=20Garc=C3=A9s?= Date: Thu, 15 Sep 2022 20:39:37 -0500 Subject: [PATCH 1/3] [Messenger][Amqp-messenger] Support content encoding and compression --- .../Amqp/Compressor/CompressorFactory.php | 28 +++++++++ .../Amqp/Compressor/CompressorInterface.php | 19 ++++++ .../Bridge/Amqp/Compressor/Deflate.php | 30 ++++++++++ .../Messenger/Bridge/Amqp/Compressor/Gzip.php | 30 ++++++++++ .../Amqp/Tests/Transport/AmqpReceiverTest.php | 58 +++++++++++++++++++ .../Bridge/Amqp/Transport/AmqpReceiver.php | 7 +++ .../Bridge/Amqp/Transport/Connection.php | 6 ++ .../Messenger/Bridge/Amqp/composer.json | 3 +- 8 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php create mode 100644 src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorInterface.php create mode 100644 src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php create mode 100644 src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php new file mode 100644 index 0000000000000..f60ef32766d12 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php @@ -0,0 +1,28 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor; + +use Symfony\Component\Messenger\Exception\InvalidArgumentException; + +class CompressorFactory +{ + public static function createCompressor(string $mimeContentEncoding) + { + if ('gzip' === $mimeContentEncoding) { + return new Gzip(); + } elseif ('deflate') { + return new Deflate(); + } + + throw new InvalidArgumentException(sprintf('The MIME content encoding of the message cannot be decompressed "%s".', $mimeContentEncoding)); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorInterface.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorInterface.php new file mode 100644 index 0000000000000..25da35094cc31 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorInterface.php @@ -0,0 +1,19 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor; + +interface CompressorInterface +{ + public function compress(mixed $data): string; + + public function decompress(mixed $data): mixed; +} diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php new file mode 100644 index 0000000000000..9db52ea5b80e6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php @@ -0,0 +1,30 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor; + +class Deflate implements CompressorInterface +{ + public function compress(mixed $data): string + { + return gzdeflate($data); + } + + public function decompress(mixed $data): mixed + { + $decompressData = gzinflate($data); + if (false === $decompressData) { + return $data; + } + + return $decompressData; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php new file mode 100644 index 0000000000000..0df742d530ff7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php @@ -0,0 +1,30 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor; + +class Gzip implements CompressorInterface +{ + public function compress(mixed $data): string + { + return gzencode($data); + } + + public function decompress(mixed $data): mixed + { + $decompressData = gzdecode($data); + if (false === $decompressData) { + return $data; + } + + return $decompressData; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php index 9dd86dcd07b42..a89da8215d3a6 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php @@ -46,6 +46,40 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testItReturnsTheGzipDecompressMessageToTheHandler() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $amqpEnvelope = $this->createAMQPEnvelopeWithContentEncodingGzip(); + $connection = $this->createMock(Connection::class); + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('get')->with('queueName')->willReturn($amqpEnvelope); + + $receiver = new AmqpReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + } + + public function testItReturnsTheDeflateDecompressMessageToTheHandler() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $amqpEnvelope = $this->createAMQPEnvelopeWithContentEncodingGzip(); + $connection = $this->createMock(Connection::class); + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('get')->with('queueName')->willReturn($amqpEnvelope); + + $receiver = new AmqpReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + } + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { $this->expectException(TransportException::class); @@ -84,4 +118,28 @@ private function createAMQPEnvelope(): \AMQPEnvelope return $envelope; } + + private function createAMQPEnvelopeWithContentEncodingGzip(): \AMQPEnvelope + { + $envelope = $this->createMock(\AMQPEnvelope::class); + $envelope->method('getBody')->willReturn(gzencode('{"message": "Hi"}')); + $envelope->method('getContentEncoding')->willReturn('gzip'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + return $envelope; + } + + private function createAMQPEnvelopeWithContentEncodingDeflate(): \AMQPEnvelope + { + $envelope = $this->createMock(\AMQPEnvelope::class); + $envelope->method('getBody')->willReturn(gzdeflate('{"message": "Hi"}')); + $envelope->method('getContentEncoding')->willReturn('gzip'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + return $envelope; + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index fecf39b29afc3..a16cb370570cc 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Transport; +use Symfony\Component\Messenger\Bridge\Amqp\Compressor\CompressorFactory; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; @@ -63,6 +64,12 @@ private function getEnvelope(string $queueName): iterable $body = $amqpEnvelope->getBody(); try { + $contentEncoding = $amqpEnvelope->getContentEncoding(); + if ($contentEncoding) { + $compressor = CompressorFactory::createCompressor($contentEncoding); + $body = $compressor->decompress($body); + } + $envelope = $this->serializer->decode([ 'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351 'headers' => $amqpEnvelope->getHeaders(), diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index ec71f60f1394a..2530e41018643 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Transport; +use Symfony\Component\Messenger\Bridge\Amqp\Compressor\CompressorFactory; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\LogicException; @@ -335,6 +336,11 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string $this->lastActivityTime = time(); + if (isset($attributes['content_encoding'])) { + $compressor = CompressorFactory::createCompressor($attributes['content_encoding']); + $body = $compressor->compress($body); + } + $exchange->publish( $body, $routingKey, diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json b/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json index 90d71960061df..9b1a60fe3993f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json @@ -24,7 +24,8 @@ "symfony/event-dispatcher": "^5.4|^6.0", "symfony/process": "^5.4|^6.0", "symfony/property-access": "^5.4|^6.0", - "symfony/serializer": "^5.4|^6.0" + "symfony/serializer": "^5.4|^6.0", + "ext-zlib": "*" }, "autoload": { "psr-4": { "Symfony\\Component\\Messenger\\Bridge\\Amqp\\": "" }, From fff81c61c71633dd361087196b8a8a3c8f6abc4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20Lengemann=20Garc=C3=A9s?= Date: Sun, 25 Sep 2022 23:18:58 -0500 Subject: [PATCH 2/3] Add constants, convert `if` to `match` and some tests --- .../Amqp/Compressor/CompressorFactory.php | 12 +++++------ .../Bridge/Amqp/Compressor/Deflate.php | 9 +++++---- .../Messenger/Bridge/Amqp/Compressor/Gzip.php | 9 +++++---- .../Amqp/Tests/Compressor/DeflateTest.php | 20 +++++++++++++++++++ .../Bridge/Amqp/Tests/Compressor/GzipTest.php | 20 +++++++++++++++++++ 5 files changed, 55 insertions(+), 15 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/DeflateTest.php create mode 100644 src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/GzipTest.php diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php index f60ef32766d12..00f53d3f1435a 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/CompressorFactory.php @@ -17,12 +17,10 @@ class CompressorFactory { public static function createCompressor(string $mimeContentEncoding) { - if ('gzip' === $mimeContentEncoding) { - return new Gzip(); - } elseif ('deflate') { - return new Deflate(); - } - - throw new InvalidArgumentException(sprintf('The MIME content encoding of the message cannot be decompressed "%s".', $mimeContentEncoding)); + return match ($mimeContentEncoding) { + Gzip::CONTENT_ENCODING => new Gzip(), + Deflate::CONTENT_ENCODING => new Deflate(), + default => throw new InvalidArgumentException(sprintf('The MIME content encoding of the message cannot be decompressed "%s".', $mimeContentEncoding)), + }; } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php index 9db52ea5b80e6..016e4a079ea82 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Deflate.php @@ -13,6 +13,8 @@ class Deflate implements CompressorInterface { + public const CONTENT_ENCODING = 'deflate'; + public function compress(mixed $data): string { return gzdeflate($data); @@ -20,11 +22,10 @@ public function compress(mixed $data): string public function decompress(mixed $data): mixed { - $decompressData = gzinflate($data); - if (false === $decompressData) { - return $data; + if (\function_exists('gzinflate')) { + return @gzinflate($data) ?: $data; } - return $decompressData; + return $data; } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php index 0df742d530ff7..b09a5a6afcbc2 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Compressor/Gzip.php @@ -13,6 +13,8 @@ class Gzip implements CompressorInterface { + public const CONTENT_ENCODING = 'gzip'; + public function compress(mixed $data): string { return gzencode($data); @@ -20,11 +22,10 @@ public function compress(mixed $data): string public function decompress(mixed $data): mixed { - $decompressData = gzdecode($data); - if (false === $decompressData) { - return $data; + if (\function_exists('gzdecode')) { + return @gzdecode($data) ?: $data; } - return $decompressData; + return $data; } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/DeflateTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/DeflateTest.php new file mode 100644 index 0000000000000..4f0a51b7bf884 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/DeflateTest.php @@ -0,0 +1,20 @@ +decompress('string no compressed'); + $this->assertEquals('string no compressed', $actual); + + $actual = $compressor->decompress(gzdeflate('string compressed')); + $this->assertEquals('string compressed', $actual); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/GzipTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/GzipTest.php new file mode 100644 index 0000000000000..4183dbe9dc434 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Compressor/GzipTest.php @@ -0,0 +1,20 @@ +decompress('string no compressed'); + $this->assertEquals('string no compressed', $actual); + + $actual = $compressor->decompress(gzencode('string compressed')); + $this->assertEquals('string compressed', $actual); + } +} From ecaf750a5f4c49663eb0b21cca9e803b1ba8a06d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20Lengemann=20Garc=C3=A9s?= Date: Thu, 20 Oct 2022 20:01:44 -0500 Subject: [PATCH 3/3] Draft: implement compression/decompression using a middleware --- .../FrameworkExtension.php | 1 + .../Resources/config/messenger.php | 4 ++ .../Bridge/Amqp/Transport/AmqpReceiver.php | 2 + .../Bridge/Amqp/Transport/Connection.php | 4 +- .../Middleware/CompressMiddleware.php | 61 +++++++++++++++++++ 5 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Middleware/CompressMiddleware.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 876ba35c40f8d..01636887f3067 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -2016,6 +2016,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder ['id' => 'failed_message_processing_middleware'], ], 'after' => [ + ['id' => 'compress_middleware'], ['id' => 'send_message'], ['id' => 'handle_message'], ], diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 13e8dad627835..6d02639059855 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -25,6 +25,7 @@ use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener; use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware; +use Symfony\Component\Messenger\Middleware\CompressMiddleware; use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware; use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; @@ -110,6 +111,9 @@ service('router'), ]) + ->set('messenger.middleware.compress_middleware', CompressMiddleware::class) + ->abstract() + // Discovery ->set('messenger.receiver_locator', ServiceLocator::class) ->args([ diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index a16cb370570cc..70c073b986194 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -64,11 +64,13 @@ private function getEnvelope(string $queueName): iterable $body = $amqpEnvelope->getBody(); try { + /* $contentEncoding = $amqpEnvelope->getContentEncoding(); if ($contentEncoding) { $compressor = CompressorFactory::createCompressor($contentEncoding); $body = $compressor->decompress($body); } + */ $envelope = $this->serializer->decode([ 'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351 diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 2530e41018643..6d9916b6cd26b 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -335,12 +335,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string $attributes['timestamp'] ??= time(); $this->lastActivityTime = time(); - + /* if (isset($attributes['content_encoding'])) { $compressor = CompressorFactory::createCompressor($attributes['content_encoding']); $body = $compressor->compress($body); } - + */ $exchange->publish( $body, $routingKey, diff --git a/src/Symfony/Component/Messenger/Middleware/CompressMiddleware.php b/src/Symfony/Component/Messenger/Middleware/CompressMiddleware.php new file mode 100644 index 0000000000000..9bcec26c46374 --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/CompressMiddleware.php @@ -0,0 +1,61 @@ +last(AmqpReceivedStamp::class)) { + $contentEncoding ??= $amqpReceivedStamp->getAmqpEnvelope()->getContentEncoding(); + if (!$contentEncoding) { + return $stack->next()->handle($envelope, $stack); + } + + $message = $envelope->getMessage(); + $compress = $this->decompress($contentEncoding, $message); + $envelope = new Envelope( + $compress, + $envelope->all() + ); + } else { + $amqpStamp = $envelope->last(AmqpStamp::class); + $contentEncoding = $amqpStamp->getAttributes()['content_encoding'] ?? null; + if (!$contentEncoding) { + return $stack->next()->handle($envelope, $stack); + } + $message = $envelope->getMessage(); + // We need a string, but in this point we have an object + $compress = $this->compress($contentEncoding, $message); + $envelope = new Envelope( + $compress, + $envelope->all() + ); + } + + return $stack->next()->handle($envelope, $stack); + } + + public function compress(string $contentEncoding, mixed $data): string + { + return match ($contentEncoding) { + 'gzip' => gzencode($data), + 'deflate' => gzdeflate($data), + default => throw new InvalidArgumentException(sprintf('The MIME content encoding of the message cannot be decompressed "%s".', $contentEncoding)), + }; + } + + public function decompress(string $contentEncoding, mixed $data): mixed + { + return match ($contentEncoding) { + 'gzip' => gzdecode($data) ?: $data, + 'deflate' => gzinflate($data) ?: $data, + }; + + return $data; + } +}