Skip to content

Commit f42fbdb

Browse files
committed
[client] Add replyTo and correlationId support to client's message.
1 parent ce45938 commit f42fbdb

File tree

16 files changed

+297
-75
lines changed

16 files changed

+297
-75
lines changed

pkg/amqp-ext/Client/AmqpDriver.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ public function createTransportMessage(Message $message)
145145
$transportMessage->setProperties($properties);
146146
$transportMessage->setMessageId($message->getMessageId());
147147
$transportMessage->setTimestamp($message->getTimestamp());
148+
$transportMessage->setReplyTo($message->getReplyTo());
149+
$transportMessage->setCorrelationId($message->getCorrelationId());
148150

149151
return $transportMessage;
150152
}
@@ -174,6 +176,8 @@ public function createClientMessage(PsrMessage $message)
174176

175177
$clientMessage->setMessageId($message->getMessageId());
176178
$clientMessage->setTimestamp($message->getTimestamp());
179+
$clientMessage->setReplyTo($message->getReplyTo());
180+
$clientMessage->setCorrelationId($message->getCorrelationId());
177181

178182
return $clientMessage;
179183
}

pkg/amqp-ext/Tests/Client/AmqpDriverTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public function testShouldConvertTransportMessageToClientMessage()
7575
$transportMessage->setHeader('expiration', '12345000');
7676
$transportMessage->setMessageId('MessageId');
7777
$transportMessage->setTimestamp(1000);
78+
$transportMessage->setReplyTo('theReplyTo');
79+
$transportMessage->setCorrelationId('theCorrelationId');
7880

7981
$driver = new AmqpDriver(
8082
$this->createPsrContextMock(),
@@ -92,6 +94,8 @@ public function testShouldConvertTransportMessageToClientMessage()
9294
'expiration' => '12345000',
9395
'message_id' => 'MessageId',
9496
'timestamp' => 1000,
97+
'reply_to' => 'theReplyTo',
98+
'correlation_id' => 'theCorrelationId',
9599
], $clientMessage->getHeaders());
96100
$this->assertSame([
97101
'key' => 'val',
@@ -100,6 +104,8 @@ public function testShouldConvertTransportMessageToClientMessage()
100104
$this->assertSame(12345, $clientMessage->getExpire());
101105
$this->assertSame('ContentType', $clientMessage->getContentType());
102106
$this->assertSame(1000, $clientMessage->getTimestamp());
107+
$this->assertSame('theReplyTo', $clientMessage->getReplyTo());
108+
$this->assertSame('theCorrelationId', $clientMessage->getCorrelationId());
103109
}
104110

105111
public function testShouldThrowExceptionIfExpirationIsNotNumeric()
@@ -129,6 +135,8 @@ public function testShouldConvertClientMessageToTransportMessage()
129135
$clientMessage->setExpire(123);
130136
$clientMessage->setMessageId('MessageId');
131137
$clientMessage->setTimestamp(1000);
138+
$clientMessage->setReplyTo('theReplyTo');
139+
$clientMessage->setCorrelationId('theCorrelationId');
132140

133141
$context = $this->createPsrContextMock();
134142
$context
@@ -154,12 +162,16 @@ public function testShouldConvertClientMessageToTransportMessage()
154162
'delivery_mode' => 2,
155163
'message_id' => 'MessageId',
156164
'timestamp' => 1000,
165+
'reply_to' => 'theReplyTo',
166+
'correlation_id' => 'theCorrelationId',
157167
], $transportMessage->getHeaders());
158168
$this->assertSame([
159169
'key' => 'val',
160170
], $transportMessage->getProperties());
161171
$this->assertSame('MessageId', $transportMessage->getMessageId());
162172
$this->assertSame(1000, $transportMessage->getTimestamp());
173+
$this->assertSame('theReplyTo', $transportMessage->getReplyTo());
174+
$this->assertSame('theCorrelationId', $transportMessage->getCorrelationId());
163175
}
164176

165177
public function testShouldSendMessageToRouter()

pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public function testShouldConvertTransportMessageToClientMessage()
8484
$transportMessage->setHeader('priority', 3);
8585
$transportMessage->setMessageId('MessageId');
8686
$transportMessage->setTimestamp(1000);
87+
$transportMessage->setReplyTo('theReplyTo');
88+
$transportMessage->setCorrelationId('theCorrelationId');
8789

8890
$driver = new RabbitMqDriver(
8991
$this->createPsrContextMock(),
@@ -102,6 +104,8 @@ public function testShouldConvertTransportMessageToClientMessage()
102104
'priority' => 3,
103105
'message_id' => 'MessageId',
104106
'timestamp' => 1000,
107+
'reply_to' => 'theReplyTo',
108+
'correlation_id' => 'theCorrelationId',
105109
], $clientMessage->getHeaders());
106110
$this->assertSame([
107111
'key' => 'val',
@@ -113,6 +117,8 @@ public function testShouldConvertTransportMessageToClientMessage()
113117
$this->assertSame('ContentType', $clientMessage->getContentType());
114118
$this->assertSame(1000, $clientMessage->getTimestamp());
115119
$this->assertSame(MessagePriority::HIGH, $clientMessage->getPriority());
120+
$this->assertSame('theReplyTo', $clientMessage->getReplyTo());
121+
$this->assertSame('theCorrelationId', $clientMessage->getCorrelationId());
116122
}
117123

118124
public function testShouldThrowExceptionIfXDelayIsNotNumeric()
@@ -202,6 +208,8 @@ public function testShouldConvertClientMessageToTransportMessage()
202208
$clientMessage->setDelay(432);
203209
$clientMessage->setMessageId('MessageId');
204210
$clientMessage->setTimestamp(1000);
211+
$clientMessage->setReplyTo('theReplyTo');
212+
$clientMessage->setCorrelationId('theCorrelationId');
205213

206214
$context = $this->createPsrContextMock();
207215
$context
@@ -227,6 +235,8 @@ public function testShouldConvertClientMessageToTransportMessage()
227235
'delivery_mode' => 2,
228236
'message_id' => 'MessageId',
229237
'timestamp' => 1000,
238+
'reply_to' => 'theReplyTo',
239+
'correlation_id' => 'theCorrelationId',
230240
'priority' => 4,
231241
], $transportMessage->getHeaders());
232242
$this->assertSame([
@@ -235,6 +245,8 @@ public function testShouldConvertClientMessageToTransportMessage()
235245
], $transportMessage->getProperties());
236246
$this->assertSame('MessageId', $transportMessage->getMessageId());
237247
$this->assertSame(1000, $transportMessage->getTimestamp());
248+
$this->assertSame('theReplyTo', $transportMessage->getReplyTo());
249+
$this->assertSame('theCorrelationId', $transportMessage->getCorrelationId());
238250
}
239251

240252
public function testThrowIfDelayNotSupportedOnConvertClientMessageToTransportMessage()

pkg/enqueue-bundle/Resources/config/client.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ services:
1313
enqueue.client.rpc_client:
1414
class: 'Enqueue\Client\RpcClient'
1515
arguments:
16-
- '@enqueue.client.driver'
1716
- '@enqueue.client.producer'
1817
- '@enqueue.transport.context'
1918

pkg/enqueue/Client/Message.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ class Message
5353
*/
5454
private $delay;
5555

56+
/**
57+
* @var string
58+
*/
59+
private $replyTo;
60+
61+
/**
62+
* @var string
63+
*/
64+
private $correlationId;
65+
5666
/**
5767
* @var array
5868
*/
@@ -204,6 +214,38 @@ public function getScope()
204214
return $this->scope;
205215
}
206216

217+
/**
218+
* @return string
219+
*/
220+
public function getReplyTo()
221+
{
222+
return $this->replyTo;
223+
}
224+
225+
/**
226+
* @param string $replyTo
227+
*/
228+
public function setReplyTo($replyTo)
229+
{
230+
$this->replyTo = $replyTo;
231+
}
232+
233+
/**
234+
* @return string
235+
*/
236+
public function getCorrelationId()
237+
{
238+
return $this->correlationId;
239+
}
240+
241+
/**
242+
* @param string $correlationId
243+
*/
244+
public function setCorrelationId($correlationId)
245+
{
246+
$this->correlationId = $correlationId;
247+
}
248+
207249
/**
208250
* @return array
209251
*/

pkg/enqueue/Client/NullDriver.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public function createTransportMessage(Message $message)
4949
$transportMessage->setProperties($message->getProperties());
5050
$transportMessage->setTimestamp($message->getTimestamp());
5151
$transportMessage->setMessageId($message->getMessageId());
52+
$transportMessage->setReplyTo($message->getReplyTo());
53+
$transportMessage->setCorrelationId($message->getCorrelationId());
5254

5355
return $transportMessage;
5456
}
@@ -66,6 +68,8 @@ public function createClientMessage(PsrMessage $message)
6668
$clientMessage->setProperties($message->getProperties());
6769
$clientMessage->setTimestamp($message->getTimestamp());
6870
$clientMessage->setMessageId($message->getMessageId());
71+
$clientMessage->setReplyTo($message->getReplyTo());
72+
$clientMessage->setCorrelationId($message->getCorrelationId());
6973

7074
if ($contentType = $message->getHeader('content_type')) {
7175
$clientMessage->setContentType($contentType);

pkg/enqueue/Client/RpcClient.php

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@
88

99
class RpcClient
1010
{
11-
/**
12-
* @var DriverInterface
13-
*/
14-
private $driver;
15-
1611
/**
1712
* @var ProducerInterface
1813
*/
@@ -24,13 +19,11 @@ class RpcClient
2419
private $context;
2520

2621
/**
27-
* @param DriverInterface $driver
2822
* @param ProducerInterface $producer
2923
* @param PsrContext $context
3024
*/
31-
public function __construct(DriverInterface $driver, ProducerInterface $producer, PsrContext $context)
25+
public function __construct(ProducerInterface $producer, PsrContext $context)
3226
{
33-
$this->driver = $driver;
3427
$this->context = $context;
3528
$this->producer = $producer;
3629
}
@@ -66,25 +59,22 @@ public function callAsync($topic, $message, $timeout)
6659
$message->setBody($body);
6760
}
6861

69-
$transportMessage = $this->driver->createTransportMessage($message);
70-
if ($transportMessage->getReplyTo()) {
71-
$replyQueue = $this->context->createQueue($transportMessage->getReplyTo());
62+
if ($message->getReplyTo()) {
63+
$replyQueue = $this->context->createQueue($message->getReplyTo());
7264
} else {
7365
$replyQueue = $this->context->createTemporaryQueue();
74-
$transportMessage->setReplyTo($replyQueue->getQueueName());
66+
$message->setReplyTo($replyQueue->getQueueName());
7567
}
7668

77-
if (false == $transportMessage->getCorrelationId()) {
78-
$transportMessage->setCorrelationId(UUID::generate());
69+
if (false == $message->getCorrelationId()) {
70+
$message->setCorrelationId(UUID::generate());
7971
}
8072

81-
$message = $this->driver->createClientMessage($transportMessage);
82-
8373
$this->producer->send($topic, $message);
8474

8575
return new Promise(
8676
$this->context->createConsumer($replyQueue),
87-
$transportMessage->getCorrelationId(),
77+
$message->getCorrelationId(),
8878
$timeout
8979
);
9080
}

pkg/enqueue/Consumption/Extension/ReplyExtension.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@ class ReplyExtension implements ExtensionInterface
1717
public function onPostReceived(Context $context)
1818
{
1919
$replyTo = $context->getPsrMessage()->getReplyTo();
20-
$correlationId = $context->getPsrMessage()->getCorrelationId();
2120
if (false == $replyTo) {
2221
return;
2322
}
2423

2524
/** @var Result $result */
2625
$result = $context->getResult();
2726
if (false == $result instanceof Result) {
28-
throw new \LogicException('To send a reply an instance of Result class has to returned from a Processor.');
27+
return;
2928
}
3029

3130
if (false == $result->getReply()) {
32-
throw new \LogicException('To send a reply the Result must contain a reply message.');
31+
return;
3332
}
3433

34+
$correlationId = $context->getPsrMessage()->getCorrelationId();
3535
$replyMessage = clone $result->getReply();
3636
$replyMessage->setCorrelationId($correlationId);
3737

pkg/enqueue/Tests/Client/MessageTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,24 @@ public function testShouldSetMessageBusScopeInConstructor()
9797
self::assertSame(Message::SCOPE_MESSAGE_BUS, $message->getScope());
9898
}
9999

100+
public function testShouldAllowGetPreviouslySetReplyTo()
101+
{
102+
$message = new Message();
103+
104+
$message->setReplyTo('theReplyTo');
105+
106+
self::assertSame('theReplyTo', $message->getReplyTo());
107+
}
108+
109+
public function testShouldAllowGetPreviouslySetCorrelationId()
110+
{
111+
$message = new Message();
112+
113+
$message->setCorrelationId('theCorrelationId');
114+
115+
self::assertSame('theCorrelationId', $message->getCorrelationId());
116+
}
117+
100118
public function testShouldAllowGetPreviouslySetHeaders()
101119
{
102120
$message = new Message();

0 commit comments

Comments
 (0)