Skip to content

Commit 6d69eef

Browse files
committed
[client] Rename MessageProducer classes to Producer
1 parent 88a8291 commit 6d69eef

File tree

7 files changed

+242
-194
lines changed

7 files changed

+242
-194
lines changed

pkg/enqueue-bundle/Resources/config/client.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,21 @@ services:
33
class: 'Enqueue\Client\Config'
44
public: false
55

6+
# deprecated use enqueue.client.producer
67
enqueue.client.message_producer:
78
class: 'Enqueue\Client\MessageProducer'
89
arguments: ['@enqueue.client.driver']
910

10-
enqueue.message_producer:
11+
enqueue.client.producer:
1112
alias: 'enqueue.client.message_producer'
1213

14+
# deprecated use enqueue.producer
15+
enqueue.message_producer:
16+
alias: 'enqueue.client.producer'
17+
18+
enqueue.producer:
19+
alias: 'enqueue.client.producer'
20+
1321
enqueue.client.router_processor:
1422
class: 'Enqueue\Client\RouterProcessor'
1523
public: true

pkg/enqueue/Client/MessageProducer.php

Lines changed: 4 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -2,117 +2,9 @@
22

33
namespace Enqueue\Client;
44

5-
use Enqueue\Util\JSON;
6-
use Enqueue\Util\UUID;
7-
8-
class MessageProducer implements MessageProducerInterface
5+
/**
6+
* @deprecated use Producer
7+
*/
8+
class MessageProducer extends Producer
99
{
10-
/**
11-
* @var DriverInterface
12-
*/
13-
protected $driver;
14-
15-
/**
16-
* @param DriverInterface $driver
17-
*/
18-
public function __construct(DriverInterface $driver)
19-
{
20-
$this->driver = $driver;
21-
}
22-
23-
/**
24-
* {@inheritdoc}
25-
*/
26-
public function send($topic, $message)
27-
{
28-
if (false == $message instanceof Message) {
29-
$body = $message;
30-
$message = new Message();
31-
$message->setBody($body);
32-
}
33-
34-
$this->prepareBody($message);
35-
36-
$message->setProperty(Config::PARAMETER_TOPIC_NAME, $topic);
37-
38-
if (!$message->getMessageId()) {
39-
$message->setMessageId(UUID::generate());
40-
}
41-
42-
if (!$message->getTimestamp()) {
43-
$message->setTimestamp(time());
44-
}
45-
46-
if (!$message->getPriority()) {
47-
$message->setPriority(MessagePriority::NORMAL);
48-
}
49-
50-
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
51-
if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
52-
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME));
53-
}
54-
if ($message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
55-
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME));
56-
}
57-
58-
$this->driver->sendToRouter($message);
59-
} elseif (Message::SCOPE_APP == $message->getScope()) {
60-
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
61-
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName());
62-
}
63-
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
64-
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName());
65-
}
66-
67-
$this->driver->sendToProcessor($message);
68-
} else {
69-
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));
70-
}
71-
}
72-
73-
/**
74-
* @param Message $message
75-
*/
76-
private function prepareBody(Message $message)
77-
{
78-
$body = $message->getBody();
79-
$contentType = $message->getContentType();
80-
81-
if (is_scalar($body) || null === $body) {
82-
$contentType = $contentType ?: 'text/plain';
83-
$body = (string) $body;
84-
} elseif (is_array($body)) {
85-
if ($contentType && $contentType !== 'application/json') {
86-
throw new \LogicException(sprintf('Content type "application/json" only allowed when body is array'));
87-
}
88-
89-
// only array of scalars is allowed.
90-
array_walk_recursive($body, function ($value) {
91-
if (!is_scalar($value) && null !== $value) {
92-
throw new \LogicException(sprintf(
93-
'The message\'s body must be an array of scalars. Found not scalar in the array: %s',
94-
is_object($value) ? get_class($value) : gettype($value)
95-
));
96-
}
97-
});
98-
99-
$contentType = 'application/json';
100-
$body = JSON::encode($body);
101-
} elseif ($body instanceof \JsonSerializable) {
102-
if ($contentType && $contentType !== 'application/json') {
103-
throw new \LogicException(sprintf('Content type "application/json" only allowed when body is array'));
104-
}
105-
106-
$contentType = 'application/json';
107-
$body = JSON::encode($body);
108-
} else {
109-
throw new \InvalidArgumentException(sprintf(
110-
'The message\'s body must be either null, scalar, array or object (implements \JsonSerializable). Got: %s',
111-
is_object($body) ? get_class($body) : gettype($body)
112-
));
113-
}
114-
115-
$message->setContentType($contentType);
116-
$message->setBody($body);
117-
}
11810
}

pkg/enqueue/Client/MessageProducerInterface.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace Enqueue\Client;
44

5+
/**
6+
* @deprecated use ProducerInterface
7+
*/
58
interface MessageProducerInterface
69
{
710
/**

pkg/enqueue/Client/Producer.php

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
use Enqueue\Util\JSON;
6+
use Enqueue\Util\UUID;
7+
8+
class Producer implements MessageProducerInterface
9+
{
10+
/**
11+
* @var DriverInterface
12+
*/
13+
protected $driver;
14+
15+
/**
16+
* @param DriverInterface $driver
17+
*/
18+
public function __construct(DriverInterface $driver)
19+
{
20+
$this->driver = $driver;
21+
}
22+
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public function send($topic, $message)
27+
{
28+
if (false == $message instanceof Message) {
29+
$body = $message;
30+
$message = new Message();
31+
$message->setBody($body);
32+
}
33+
34+
$this->prepareBody($message);
35+
36+
$message->setProperty(Config::PARAMETER_TOPIC_NAME, $topic);
37+
38+
if (!$message->getMessageId()) {
39+
$message->setMessageId(UUID::generate());
40+
}
41+
42+
if (!$message->getTimestamp()) {
43+
$message->setTimestamp(time());
44+
}
45+
46+
if (!$message->getPriority()) {
47+
$message->setPriority(MessagePriority::NORMAL);
48+
}
49+
50+
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
51+
if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
52+
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME));
53+
}
54+
if ($message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
55+
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME));
56+
}
57+
58+
$this->driver->sendToRouter($message);
59+
} elseif (Message::SCOPE_APP == $message->getScope()) {
60+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
61+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName());
62+
}
63+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
64+
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName());
65+
}
66+
67+
$this->driver->sendToProcessor($message);
68+
} else {
69+
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));
70+
}
71+
}
72+
73+
/**
74+
* @param Message $message
75+
*/
76+
private function prepareBody(Message $message)
77+
{
78+
$body = $message->getBody();
79+
$contentType = $message->getContentType();
80+
81+
if (is_scalar($body) || null === $body) {
82+
$contentType = $contentType ?: 'text/plain';
83+
$body = (string) $body;
84+
} elseif (is_array($body)) {
85+
if ($contentType && $contentType !== 'application/json') {
86+
throw new \LogicException(sprintf('Content type "application/json" only allowed when body is array'));
87+
}
88+
89+
// only array of scalars is allowed.
90+
array_walk_recursive($body, function ($value) {
91+
if (!is_scalar($value) && null !== $value) {
92+
throw new \LogicException(sprintf(
93+
'The message\'s body must be an array of scalars. Found not scalar in the array: %s',
94+
is_object($value) ? get_class($value) : gettype($value)
95+
));
96+
}
97+
});
98+
99+
$contentType = 'application/json';
100+
$body = JSON::encode($body);
101+
} elseif ($body instanceof \JsonSerializable) {
102+
if ($contentType && $contentType !== 'application/json') {
103+
throw new \LogicException(sprintf('Content type "application/json" only allowed when body is array'));
104+
}
105+
106+
$contentType = 'application/json';
107+
$body = JSON::encode($body);
108+
} else {
109+
throw new \InvalidArgumentException(sprintf(
110+
'The message\'s body must be either null, scalar, array or object (implements \JsonSerializable). Got: %s',
111+
is_object($body) ? get_class($body) : gettype($body)
112+
));
113+
}
114+
115+
$message->setContentType($contentType);
116+
$message->setBody($body);
117+
}
118+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
interface ProducerInterface
6+
{
7+
/**
8+
* Sends a message to a topic. There are some message processor may be subscribed to a topic.
9+
*
10+
* @param string $topic
11+
* @param string|array|Message $message
12+
*
13+
* @throws \Enqueue\Psr\Exception - if the producer fails to send
14+
* the message due to some internal error
15+
*/
16+
public function send($topic, $message);
17+
}

pkg/enqueue/Client/TraceableMessageProducer.php

Lines changed: 4 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -2,86 +2,9 @@
22

33
namespace Enqueue\Client;
44

5-
class TraceableMessageProducer implements MessageProducerInterface
5+
/**
6+
* @deprecated use TraceableProducer
7+
*/
8+
class TraceableMessageProducer extends TraceableProducer
69
{
7-
/**
8-
* @var array
9-
*/
10-
protected $traces = [];
11-
/**
12-
* @var MessageProducerInterface
13-
*/
14-
private $messageProducer;
15-
16-
/**
17-
* @param MessageProducerInterface $messageProducer
18-
*/
19-
public function __construct(MessageProducerInterface $messageProducer)
20-
{
21-
$this->messageProducer = $messageProducer;
22-
}
23-
24-
/**
25-
* {@inheritdoc}
26-
*/
27-
public function send($topic, $message)
28-
{
29-
$this->messageProducer->send($topic, $message);
30-
31-
$trace = [
32-
'topic' => $topic,
33-
'body' => $message,
34-
'headers' => [],
35-
'properties' => [],
36-
'priority' => null,
37-
'expire' => null,
38-
'delay' => null,
39-
'timestamp' => null,
40-
'contentType' => null,
41-
'messageId' => null,
42-
];
43-
if ($message instanceof Message) {
44-
$trace['body'] = $message->getBody();
45-
$trace['headers'] = $message->getHeaders();
46-
$trace['properties'] = $message->getProperties();
47-
$trace['priority'] = $message->getPriority();
48-
$trace['expire'] = $message->getExpire();
49-
$trace['delay'] = $message->getDelay();
50-
$trace['timestamp'] = $message->getTimestamp();
51-
$trace['contentType'] = $message->getContentType();
52-
$trace['messageId'] = $message->getMessageId();
53-
}
54-
55-
$this->traces[] = $trace;
56-
}
57-
58-
/**
59-
* @param string $topic
60-
*
61-
* @return array
62-
*/
63-
public function getTopicTraces($topic)
64-
{
65-
$topicTraces = [];
66-
foreach ($this->traces as $trace) {
67-
if ($topic == $trace['topic']) {
68-
$topicTraces[] = $trace;
69-
}
70-
}
71-
72-
return $topicTraces;
73-
}
74-
75-
/**
76-
* @return array
77-
*/
78-
public function getTraces()
79-
{
80-
return $this->traces;
81-
}
82-
83-
public function clearTraces()
84-
{
85-
$this->traces = [];
86-
}
8710
}

0 commit comments

Comments
 (0)