Skip to content

Commit ce45938

Browse files
committed
[client] Add RpcClient on client level.
It provides a same level of abstraction as client's producer.
1 parent df7afac commit ce45938

File tree

11 files changed

+368
-8
lines changed

11 files changed

+368
-8
lines changed

pkg/enqueue-bundle/Resources/config/client.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ services:
1010
enqueue.producer:
1111
alias: 'enqueue.client.producer'
1212

13+
enqueue.client.rpc_client:
14+
class: 'Enqueue\Client\RpcClient'
15+
arguments:
16+
- '@enqueue.client.driver'
17+
- '@enqueue.client.producer'
18+
- '@enqueue.transport.context'
19+
1320
enqueue.client.router_processor:
1421
class: 'Enqueue\Client\RouterProcessor'
1522
public: true

pkg/enqueue-bundle/Resources/config/services.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,8 @@ services:
1717
- '@enqueue.consumption.queue_consumer'
1818
tags:
1919
- { name: 'console.command' }
20+
21+
enqueue.transport.rpc_client:
22+
class: 'Enqueue\Rpc\RpcClient'
23+
arguments:
24+
- '@enqueue.transport.context'
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional;
4+
5+
/**
6+
* @group functional
7+
*/
8+
class RpcClientTest extends WebTestCase
9+
{
10+
public function testTransportRpcClientCouldBeGetFromContainerAsService()
11+
{
12+
$connection = $this->container->get('enqueue.transport.rpc_client');
13+
14+
$this->assertInstanceOf(\Enqueue\Rpc\RpcClient::class, $connection);
15+
}
16+
17+
public function testClientRpcClientCouldBeGetFromContainerAsService()
18+
{
19+
$connection = $this->container->get('enqueue.client.rpc_client');
20+
21+
$this->assertInstanceOf(\Enqueue\Client\RpcClient::class, $connection);
22+
}
23+
}

pkg/enqueue/Client/NullDriver.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ class NullDriver implements DriverInterface
2121
protected $config;
2222

2323
/**
24-
* @param NullContext $session
24+
* @param NullContext $context
2525
* @param Config $config
2626
*/
27-
public function __construct(NullContext $session, Config $config)
27+
public function __construct(NullContext $context, Config $config)
2828
{
29-
$this->context = $session;
29+
$this->context = $context;
3030
$this->config = $config;
3131
}
3232

pkg/enqueue/Client/RpcClient.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php
2+
namespace Enqueue\Client;
3+
4+
use Enqueue\Psr\PsrContext;
5+
use Enqueue\Psr\PsrMessage;
6+
use Enqueue\Rpc\Promise;
7+
use Enqueue\Util\UUID;
8+
9+
class RpcClient
10+
{
11+
/**
12+
* @var DriverInterface
13+
*/
14+
private $driver;
15+
16+
/**
17+
* @var ProducerInterface
18+
*/
19+
private $producer;
20+
21+
/**
22+
* @var PsrContext
23+
*/
24+
private $context;
25+
26+
/**
27+
* @param DriverInterface $driver
28+
* @param ProducerInterface $producer
29+
* @param PsrContext $context
30+
*/
31+
public function __construct(DriverInterface $driver, ProducerInterface $producer, PsrContext $context)
32+
{
33+
$this->driver = $driver;
34+
$this->context = $context;
35+
$this->producer = $producer;
36+
}
37+
38+
/**
39+
* @param string $topic
40+
* @param string|array|Message $message
41+
* @param int $timeout
42+
*
43+
* @return PsrMessage
44+
*/
45+
public function call($topic, $message, $timeout)
46+
{
47+
return $this->callAsync($topic, $message, $timeout)->getMessage();
48+
}
49+
50+
/**
51+
* @param string $topic
52+
* @param string|array|Message $message $message
53+
* @param int $timeout
54+
*
55+
* @return Promise
56+
*/
57+
public function callAsync($topic, $message, $timeout)
58+
{
59+
if ($timeout < 1) {
60+
throw new \InvalidArgumentException(sprintf('Timeout must be positive not zero integer. Got %s', $timeout));
61+
}
62+
63+
if (false == $message instanceof Message) {
64+
$body = $message;
65+
$message = new Message();
66+
$message->setBody($body);
67+
}
68+
69+
$transportMessage = $this->driver->createTransportMessage($message);
70+
if ($transportMessage->getReplyTo()) {
71+
$replyQueue = $this->context->createQueue($transportMessage->getReplyTo());
72+
} else {
73+
$replyQueue = $this->context->createTemporaryQueue();
74+
$transportMessage->setReplyTo($replyQueue->getQueueName());
75+
}
76+
77+
if (false == $transportMessage->getCorrelationId()) {
78+
$transportMessage->setCorrelationId(UUID::generate());
79+
}
80+
81+
$message = $this->driver->createClientMessage($transportMessage);
82+
83+
$this->producer->send($topic, $message);
84+
85+
return new Promise(
86+
$this->context->createConsumer($replyQueue),
87+
$transportMessage->getCorrelationId(),
88+
$timeout
89+
);
90+
}
91+
}

pkg/enqueue/Rpc/Promise.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Rpc;
44

55
use Enqueue\Psr\PsrConsumer;
6+
use Enqueue\Psr\PsrMessage;
67

78
class Promise
89
{
@@ -15,6 +16,7 @@ class Promise
1516
* @var int
1617
*/
1718
private $timeout;
19+
1820
/**
1921
* @var string
2022
*/
@@ -32,6 +34,11 @@ public function __construct(PsrConsumer $consumer, $correlationId, $timeout)
3234
$this->correlationId = $correlationId;
3335
}
3436

37+
/**
38+
* @throws TimeoutException if the wait timeout is reached
39+
*
40+
* @return PsrMessage
41+
*/
3542
public function getMessage()
3643
{
3744
$endTime = time() + $this->timeout;
@@ -47,7 +54,7 @@ public function getMessage()
4754
}
4855
}
4956

50-
throw new \LogicException(sprintf('Time outed without receiving reply message. Timeout: %s, CorrelationId: %s', $this->timeout, $this->correlationId));
57+
throw TimeoutException::create($this->timeout, $this->correlationId);
5158
}
5259

5360
/**

pkg/enqueue/Rpc/RpcClient.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ public function __construct(PsrContext $context)
2525
/**
2626
* @param PsrDestination $destination
2727
* @param PsrMessage $message
28-
* @param $timeout
28+
* @param int $timeout
29+
*
30+
* @throws TimeoutException if the wait timeout is reached
2931
*
3032
* @return PsrMessage
3133
*/
@@ -37,7 +39,7 @@ public function call(PsrDestination $destination, PsrMessage $message, $timeout)
3739
/**
3840
* @param PsrDestination $destination
3941
* @param PsrMessage $message
40-
* @param $timeout
42+
* @param int $timeout
4143
*
4244
* @return Promise
4345
*/

pkg/enqueue/Rpc/TimeoutException.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php
2+
namespace Enqueue\Rpc;
3+
4+
class TimeoutException extends \LogicException
5+
{
6+
/**
7+
* @param int $timeout
8+
* @param string $correlationId
9+
*
10+
* @return static
11+
*/
12+
public static function create($timeout, $correlationId)
13+
{
14+
return new static(sprintf('Rpc call timeout is reached without receiving a reply message. Timeout: %s, CorrelationId: %s', $timeout, $correlationId));
15+
}
16+
}

0 commit comments

Comments
 (0)