Skip to content

Commit 73f182b

Browse files
committed
Add first pass for Apache ActiveMQ Artemis support.
1 parent caf0afa commit 73f182b

File tree

5 files changed

+99
-36
lines changed

5 files changed

+99
-36
lines changed

pkg/stomp/ExtensionType.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Stomp;
6+
7+
class ExtensionType
8+
{
9+
const ACTIVEMQ = 'activemq';
10+
const RABBITMQ = 'rabbitmq';
11+
const ARTEMIS = 'artemis';
12+
}

pkg/stomp/StompConnectionFactory.php

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313

1414
class StompConnectionFactory implements ConnectionFactory
1515
{
16-
const SCHEME_EXT_ACTIVEMQ = 'activemq';
17-
const SCHEME_EXT_RABBITMQ = 'rabbitmq';
16+
const SUPPORTED_SCHEMES = [
17+
ExtensionType::ACTIVEMQ,
18+
ExtensionType::RABBITMQ,
19+
ExtensionType::ARTEMIS,
20+
];
1821

1922
/**
2023
* @var array
@@ -71,15 +74,14 @@ public function __construct($config = 'stomp:')
7174
*/
7275
public function createContext(): Context
7376
{
74-
$useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false;
75-
7677
if ($this->config['lazy']) {
77-
return new StompContext(function () {
78-
return $this->establishConnection();
79-
}, $useExchangePrefix);
78+
return new StompContext(
79+
fn () => $this->establishConnection(),
80+
$this->config['target']
81+
);
8082
}
8183

82-
return new StompContext($this->establishConnection(), $useExchangePrefix);
84+
return new StompContext($this->establishConnection(), $this->config['target']);
8385
}
8486

8587
private function establishConnection(): BufferedStompClient
@@ -123,10 +125,11 @@ private function parseDsn(string $dsn): array
123125

124126
$schemeExtension = current($dsn->getSchemeExtensions());
125127
if (false === $schemeExtension) {
126-
$schemeExtension = self::SCHEME_EXT_RABBITMQ;
128+
$schemeExtension = ExtensionType::RABBITMQ;
127129
}
128-
if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) {
129-
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ));
130+
131+
if (false === in_array($schemeExtension, self::SUPPORTED_SCHEMES)) {
132+
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is not supported. It must be one of %s.', $schemeExtension, implode(', ', self::SUPPORTED_SCHEMES)));
130133
}
131134

132135
return array_filter(array_replace($dsn->getQuery(), [
@@ -151,7 +154,7 @@ private function parseDsn(string $dsn): array
151154
private function defaultConfig(): array
152155
{
153156
return [
154-
'target' => self::SCHEME_EXT_RABBITMQ,
157+
'target' => ExtensionType::RABBITMQ,
155158
'host' => 'localhost',
156159
'port' => 61613,
157160
'login' => 'guest',

pkg/stomp/StompConsumer.php

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Interop\Queue\Message;
1010
use Interop\Queue\Queue;
1111
use Stomp\Client;
12+
use Stomp\Exception\ErrorFrameException;
1213
use Stomp\Transport\Frame;
1314

1415
class StompConsumer implements Consumer
@@ -96,17 +97,23 @@ public function receive(int $timeout = 0): ?Message
9697
{
9798
$this->subscribe();
9899

99-
if (0 === $timeout) {
100-
while (true) {
101-
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
102-
return $this->convertMessage($message);
100+
try {
101+
if (0 === $timeout) {
102+
while (true) {
103+
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
104+
return $this->convertMessage($message);
105+
}
103106
}
104107
}
105-
} else {
106-
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
107-
return $this->convertMessage($message);
108+
else {
109+
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
110+
return $this->convertMessage($message);
111+
}
108112
}
109113
}
114+
catch (ErrorFrameException $e) {
115+
throw new \Exception($e->getMessage() . "\n" . $e->getFrame()->getBody());
116+
}
110117

111118
return null;
112119
}
@@ -143,10 +150,11 @@ public function reject(Message $message, bool $requeue = false): void
143150

144151
$nackFrame = $this->stomp->getProtocol()->getNackFrame($message->getFrame());
145152

146-
// rabbitmq STOMP protocol extension
147-
$nackFrame->addHeaders([
148-
'requeue' => $requeue ? 'true' : 'false',
149-
]);
153+
if ($this->queue->getExtensionType() === ExtensionType::RABBITMQ) {
154+
$nackFrame->addHeaders([
155+
'requeue' => $requeue ? 'true' : 'false',
156+
]);
157+
}
150158

151159
$this->stomp->sendFrame($nackFrame);
152160
}
@@ -168,13 +176,32 @@ private function subscribe(): void
168176
$this->ackMode
169177
);
170178

171-
// rabbitmq STOMP protocol extension
172179
$headers = $this->queue->getHeaders();
173-
$headers['prefetch-count'] = $this->prefetchCount;
174-
$headers = StompHeadersEncoder::encode($headers);
175180

176-
foreach ($headers as $key => $value) {
177-
$frame[$key] = $value;
181+
if ($this->queue->getExtensionType() === ExtensionType::RABBITMQ) {
182+
183+
$headers['prefetch-count'] = $this->prefetchCount;
184+
$headers = StompHeadersEncoder::encode($headers);
185+
186+
foreach ($headers as $key => $value) {
187+
$frame[$key] = $value;
188+
}
189+
}
190+
191+
if ($this->queue->getExtensionType() === ExtensionType::ARTEMIS) {
192+
193+
$subscriptionName = "{$this->subscriptionId}-{$this->queue->getStompName()}";
194+
195+
$artemisHeaders = [];
196+
197+
$artemisHeaders['client-id'] = true ? $this->subscriptionId : null;
198+
$artemisHeaders['durable-subscription-name'] = true ? $subscriptionName : null;
199+
200+
$artemisHeaders = StompHeadersEncoder::encode(array_filter($artemisHeaders));
201+
202+
foreach ($artemisHeaders as $key => $value) {
203+
$frame[$key] = $value;
204+
}
178205
}
179206

180207
$this->stomp->sendFrame($frame);
@@ -187,7 +214,7 @@ private function convertMessage(Frame $frame): StompMessage
187214
throw new \LogicException(sprintf('Frame is not MESSAGE frame but: "%s"', $frame->getCommand()));
188215
}
189216

190-
list($headers, $properties) = StompHeadersEncoder::decode($frame->getHeaders());
217+
[$headers, $properties] = StompHeadersEncoder::decode($frame->getHeaders());
191218

192219
$redelivered = isset($headers['redelivered']) && 'true' === $headers['redelivered'];
193220

pkg/stomp/StompContext.php

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class StompContext implements Context
2323
*/
2424
private $stomp;
2525

26+
/**
27+
* @var string
28+
*/
29+
private $extensionType;
30+
2631
/**
2732
* @var bool
2833
*/
@@ -35,9 +40,9 @@ class StompContext implements Context
3540

3641
/**
3742
* @param BufferedStompClient|callable $stomp
38-
* @param bool $useExchangePrefix
43+
* @param string $extensionType
3944
*/
40-
public function __construct($stomp, $useExchangePrefix = true)
45+
public function __construct($stomp, string $extensionType)
4146
{
4247
if ($stomp instanceof BufferedStompClient) {
4348
$this->stomp = $stomp;
@@ -47,7 +52,8 @@ public function __construct($stomp, $useExchangePrefix = true)
4752
throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
4853
}
4954

50-
$this->useExchangePrefix = $useExchangePrefix;
55+
$this->extensionType = $extensionType;
56+
$this->useExchangePrefix = true;
5157
}
5258

5359
/**
@@ -64,7 +70,7 @@ public function createMessage(string $body = '', array $properties = [], array $
6470
public function createQueue(string $name): Queue
6571
{
6672
if (0 !== strpos($name, '/')) {
67-
$destination = new StompDestination();
73+
$destination = new StompDestination($this->extensionType);
6874
$destination->setType(StompDestination::TYPE_QUEUE);
6975
$destination->setStompName($name);
7076

@@ -91,7 +97,7 @@ public function createTemporaryQueue(): Queue
9197
public function createTopic(string $name): Topic
9298
{
9399
if (0 !== strpos($name, '/')) {
94-
$destination = new StompDestination();
100+
$destination = new StompDestination($this->extensionType);
95101
$destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
96102
$destination->setStompName($name);
97103

@@ -151,7 +157,7 @@ public function createDestination(string $destination): StompDestination
151157
$routingKey = $pieces[1];
152158
}
153159

154-
$destination = new StompDestination();
160+
$destination = new StompDestination($this->extensionType);
155161
$destination->setType($type);
156162
$destination->setStompName($name);
157163
$destination->setRoutingKey($routingKey);

pkg/stomp/StompDestination.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,25 @@ class StompDestination implements Topic, Queue
3939
* @var array
4040
*/
4141
private $headers;
42+
/**
43+
* @var string
44+
*/
45+
private string $extensionType;
4246

43-
public function __construct()
47+
public function __construct(string $extensionType)
4448
{
4549
$this->headers = [
4650
self::HEADER_DURABLE => false,
4751
self::HEADER_AUTO_DELETE => true,
4852
self::HEADER_EXCLUSIVE => false,
4953
];
54+
55+
$this->extensionType = $extensionType;
56+
}
57+
58+
public function getExtensionType(): string
59+
{
60+
return $this->extensionType;
5061
}
5162

5263
public function getStompName(): string
@@ -65,6 +76,10 @@ public function getQueueName(): string
6576
throw new \LogicException('Destination name is not set');
6677
}
6778

79+
if ($this->extensionType === ExtensionType::ARTEMIS) {
80+
return $this->getStompName();
81+
}
82+
6883
$name = '/'.$this->getType().'/'.$this->getStompName();
6984

7085
if ($this->getRoutingKey()) {

0 commit comments

Comments
 (0)