Skip to content

Commit ac37918

Browse files
committed
[Messenger][AMQP] Add routing key to data passed to serializer decode.
It's often useful to have the routing key available when decoding messages from AMQP. For example, sometimes the routing key contains meta information required for the processing of the message. This introduces an `extra` element which currently only contains the routing key, but there is scope to add other meta information from \AMQPEnvelope such as the exchange name for example. Fixes #43039
1 parent 934948f commit ac37918

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,35 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()
7474
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
7575
}
7676

77+
public function testItPassesExpectedDataToSerializerDecode()
78+
{
79+
$amqpEnvelope = $this->createAMQPEnvelope();
80+
81+
$connection = $this->createMock(Connection::class);
82+
$connection->method('getQueueNames')->willReturn(['queueName']);
83+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
84+
85+
$serializer = $this->createMock(SerializerInterface::class);
86+
$serializer->expects($this->once())->method('decode')->with($this->equalTo([
87+
'body' => $amqpEnvelope->getBody(),
88+
'headers' => $amqpEnvelope->getHeaders(),
89+
'extra' => [
90+
'routingKey' => $amqpEnvelope->getRoutingKey(),
91+
],
92+
]))->willReturn(new Envelope(new DummyMessage('Hi')));
93+
94+
$receiver = new AmqpReceiver($connection, $serializer);
95+
iterator_to_array($receiver->get());
96+
}
97+
7798
private function createAMQPEnvelope(): \AMQPEnvelope
7899
{
79100
$envelope = $this->createMock(\AMQPEnvelope::class);
80101
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
81102
$envelope->method('getHeaders')->willReturn([
82103
'type' => DummyMessage::class,
83104
]);
105+
$envelope->method('getRoutingKey')->willReturn('Dummy_RoutingKey');
84106

85107
return $envelope;
86108
}

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ private function getEnvelope(string $queueName): iterable
6666
$envelope = $this->serializer->decode([
6767
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
6868
'headers' => $amqpEnvelope->getHeaders(),
69+
'extra' => [
70+
'routingKey' => $amqpEnvelope->getRoutingKey(),
71+
]
6972
]);
7073
} catch (MessageDecodingFailedException $exception) {
7174
// invalid message of some type

0 commit comments

Comments
 (0)