Skip to content

Commit 278ff6f

Browse files
committed
[Messenger][Amqp-messenger] Support content encoding and compression
1 parent 4d09837 commit 278ff6f

File tree

8 files changed

+180
-1
lines changed

8 files changed

+180
-1
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
15+
16+
class CompressorFactory
17+
{
18+
public static function createCompressor(string $mimeContentEncoding)
19+
{
20+
if ('gzip' === $mimeContentEncoding) {
21+
return new Gzip();
22+
} elseif ('deflate') {
23+
return new Deflate();
24+
}
25+
26+
throw new InvalidArgumentException(sprintf('The MIME content encoding of the message cannot be decompressed "%s".', $mimeContentEncoding));
27+
}
28+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
interface CompressorInterface
15+
{
16+
public function compress(mixed $data): string;
17+
18+
public function decompress(mixed $data): mixed;
19+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
class Deflate implements CompressorInterface
15+
{
16+
public function compress(mixed $data): string
17+
{
18+
return gzdeflate($data);
19+
}
20+
21+
public function decompress(mixed $data): mixed
22+
{
23+
$decompressData = gzinflate($data);
24+
if (false === $decompressData) {
25+
return $data;
26+
}
27+
28+
return $decompressData;
29+
}
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
class Gzip implements CompressorInterface
15+
{
16+
public function compress(mixed $data): string
17+
{
18+
return gzencode($data);
19+
}
20+
21+
public function decompress(mixed $data): mixed
22+
{
23+
$decompressData = gzdecode($data);
24+
if (false === $decompressData) {
25+
return $data;
26+
}
27+
28+
return $decompressData;
29+
}
30+
}

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,40 @@ public function testItReturnsTheDecodedMessageToTheHandler()
4646
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
4747
}
4848

49+
public function testItReturnsTheGzipDecompressMessageToTheHandler()
50+
{
51+
$serializer = new Serializer(
52+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
53+
);
54+
55+
$amqpEnvelope = $this->createAMQPEnvelopeWithContentEncodingGzip();
56+
$connection = $this->createMock(Connection::class);
57+
$connection->method('getQueueNames')->willReturn(['queueName']);
58+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
59+
60+
$receiver = new AmqpReceiver($connection, $serializer);
61+
$actualEnvelopes = iterator_to_array($receiver->get());
62+
$this->assertCount(1, $actualEnvelopes);
63+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
64+
}
65+
66+
public function testItReturnsTheDeflateDecompressMessageToTheHandler()
67+
{
68+
$serializer = new Serializer(
69+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
70+
);
71+
72+
$amqpEnvelope = $this->createAMQPEnvelopeWithContentEncodingGzip();
73+
$connection = $this->createMock(Connection::class);
74+
$connection->method('getQueueNames')->willReturn(['queueName']);
75+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
76+
77+
$receiver = new AmqpReceiver($connection, $serializer);
78+
$actualEnvelopes = iterator_to_array($receiver->get());
79+
$this->assertCount(1, $actualEnvelopes);
80+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
81+
}
82+
4983
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
5084
{
5185
$this->expectException(TransportException::class);
@@ -84,4 +118,28 @@ private function createAMQPEnvelope(): \AMQPEnvelope
84118

85119
return $envelope;
86120
}
121+
122+
private function createAMQPEnvelopeWithContentEncodingGzip(): \AMQPEnvelope
123+
{
124+
$envelope = $this->createMock(\AMQPEnvelope::class);
125+
$envelope->method('getBody')->willReturn(gzencode('{"message": "Hi"}'));
126+
$envelope->method('getContentEncoding')->willReturn('gzip');
127+
$envelope->method('getHeaders')->willReturn([
128+
'type' => DummyMessage::class,
129+
]);
130+
131+
return $envelope;
132+
}
133+
134+
private function createAMQPEnvelopeWithContentEncodingDeflate(): \AMQPEnvelope
135+
{
136+
$envelope = $this->createMock(\AMQPEnvelope::class);
137+
$envelope->method('getBody')->willReturn(gzdeflate('{"message": "Hi"}'));
138+
$envelope->method('getContentEncoding')->willReturn('gzip');
139+
$envelope->method('getHeaders')->willReturn([
140+
'type' => DummyMessage::class,
141+
]);
142+
143+
return $envelope;
144+
}
87145
}

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

14+
use Symfony\Component\Messenger\Bridge\Amqp\Compressor\CompressorFactory;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Exception\LogicException;
1617
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -63,6 +64,12 @@ private function getEnvelope(string $queueName): iterable
6364
$body = $amqpEnvelope->getBody();
6465

6566
try {
67+
$contentEncoding = $amqpEnvelope->getContentEncoding();
68+
if ($contentEncoding) {
69+
$compressor = CompressorFactory::createCompressor($contentEncoding);
70+
$body = $compressor->decompress($body);
71+
}
72+
6673
$envelope = $this->serializer->decode([
6774
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
6875
'headers' => $amqpEnvelope->getHeaders(),

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

14+
use Symfony\Component\Messenger\Bridge\Amqp\Compressor\CompressorFactory;
1415
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
1516
use Symfony\Component\Messenger\Exception\LogicException;
1617

@@ -332,6 +333,11 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
332333
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
333334
$attributes['timestamp'] = $attributes['timestamp'] ?? time();
334335

336+
if (isset($attributes['content_encoding'])) {
337+
$compressor = CompressorFactory::createCompressor($attributes['content_encoding']);
338+
$body = $compressor->compress($body);
339+
}
340+
335341
$exchange->publish(
336342
$body,
337343
$routingKey,

src/Symfony/Component/Messenger/Bridge/Amqp/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
"symfony/event-dispatcher": "^5.4|^6.0",
2525
"symfony/process": "^5.4|^6.0",
2626
"symfony/property-access": "^5.4|^6.0",
27-
"symfony/serializer": "^5.4|^6.0"
27+
"symfony/serializer": "^5.4|^6.0",
28+
"ext-zlib": "*"
2829
},
2930
"autoload": {
3031
"psr-4": { "Symfony\\Component\\Messenger\\Bridge\\Amqp\\": "" },

0 commit comments

Comments
 (0)