Skip to content

Commit fd083fb

Browse files
authored
Merge pull request php-enqueue#1195 from onatskyy/snsqs_attributes
[SNSQS] added possibility to send message attributes using snsqs transport
2 parents 5f49256 + d99b2e1 commit fd083fb

File tree

3 files changed

+51
-13
lines changed

3 files changed

+51
-13
lines changed

pkg/snsqs/SnsQsMessage.php

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,27 @@ class SnsQsMessage implements Message
1717
*/
1818
private $sqsMessage;
1919

20-
public function __construct(string $body = '', array $properties = [], array $headers = [])
21-
{
20+
/**
21+
* @var array|null
22+
*/
23+
private $messageAttributes;
24+
25+
/**
26+
* See AWS documentation for message attribute structure.
27+
*
28+
* @see https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sns-2010-03-31.html#shape-messageattributevalue
29+
*/
30+
public function __construct(
31+
string $body = '',
32+
array $properties = [],
33+
array $headers = [],
34+
array $messageAttributes = null
35+
) {
2236
$this->body = $body;
2337
$this->properties = $properties;
2438
$this->headers = $headers;
2539
$this->redelivered = false;
40+
$this->messageAttributes = $messageAttributes;
2641
}
2742

2843
public function setSqsMessage(SqsMessage $message): void
@@ -34,4 +49,14 @@ public function getSqsMessage(): SqsMessage
3449
{
3550
return $this->sqsMessage;
3651
}
52+
53+
public function getMessageAttributes(): ?array
54+
{
55+
return $this->messageAttributes;
56+
}
57+
58+
public function setMessageAttributes(?array $messageAttributes): void
59+
{
60+
$this->messageAttributes = $messageAttributes;
61+
}
3762
}

pkg/snsqs/SnsQsProducer.php

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,7 @@ public function send(Destination $destination, Message $message): void
5151
InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class);
5252

5353
if (false == $destination instanceof SnsQsTopic && false == $destination instanceof SnsQsQueue) {
54-
throw new InvalidDestinationException(sprintf(
55-
'The destination must be an instance of [%s|%s] but got %s.',
56-
SnsQsTopic::class, SnsQsQueue::class,
57-
is_object($destination) ? get_class($destination) : gettype($destination)
58-
));
54+
throw new InvalidDestinationException(sprintf('The destination must be an instance of [%s|%s] but got %s.', SnsQsTopic::class, SnsQsQueue::class, is_object($destination) ? get_class($destination) : gettype($destination)));
5955
}
6056

6157
if ($destination instanceof SnsQsTopic) {
@@ -64,6 +60,7 @@ public function send(Destination $destination, Message $message): void
6460
$message->getProperties(),
6561
$message->getHeaders()
6662
);
63+
$snsMessage->setMessageAttributes($message->getMessageAttributes());
6764

6865
$this->getSnsProducer()->send($destination, $snsMessage);
6966
} else {
@@ -79,10 +76,6 @@ public function send(Destination $destination, Message $message): void
7976

8077
/**
8178
* Delivery delay is supported by SQSProducer.
82-
*
83-
* @param int|null $deliveryDelay
84-
*
85-
* @return Producer
8679
*/
8780
public function setDeliveryDelay(int $deliveryDelay = null): Producer
8881
{
@@ -93,8 +86,6 @@ public function setDeliveryDelay(int $deliveryDelay = null): Producer
9386

9487
/**
9588
* Delivery delay is supported by SQSProducer.
96-
*
97-
* @return int|null
9889
*/
9990
public function getDeliveryDelay(): ?int
10091
{

pkg/snsqs/Tests/SnsQsProducerTest.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\SnsQs\Tests;
44

55
use Enqueue\Sns\SnsContext;
6+
use Enqueue\Sns\SnsMessage;
67
use Enqueue\Sns\SnsProducer;
78
use Enqueue\SnsQs\SnsQsMessage;
89
use Enqueue\SnsQs\SnsQsProducer;
@@ -91,6 +92,7 @@ public function testShouldGetDeliveryDelayFromSQSProducer()
9192
public function testShouldSendSnsTopicMessageToSnsProducer()
9293
{
9394
$snsMock = $this->createSnsContextMock();
95+
$snsMock->method('createMessage')->willReturn(new SnsMessage());
9496
$destination = new SnsQsTopic('');
9597

9698
$snsProducerStub = $this->prophesize(SnsProducer::class);
@@ -102,6 +104,26 @@ public function testShouldSendSnsTopicMessageToSnsProducer()
102104
$producer->send($destination, new SnsQsMessage());
103105
}
104106

107+
public function testShouldSendSnsTopicMessageWithAttributesToSnsProducer()
108+
{
109+
$snsMock = $this->createSnsContextMock();
110+
$snsMock->method('createMessage')->willReturn(new SnsMessage());
111+
$destination = new SnsQsTopic('');
112+
113+
$snsProducerStub = $this->prophesize(SnsProducer::class);
114+
$snsProducerStub->send(
115+
$destination,
116+
Argument::that(function (SnsMessage $snsMessage) {
117+
return $snsMessage->getMessageAttributes() === ['foo' => 'bar'];
118+
})
119+
)->shouldBeCalledOnce();
120+
121+
$snsMock->method('createProducer')->willReturn($snsProducerStub->reveal());
122+
123+
$producer = new SnsQsProducer($snsMock, $this->createSqsContextMock());
124+
$producer->send($destination, new SnsQsMessage('', [], [], ['foo' => 'bar']));
125+
}
126+
105127
public function testShouldSendSqsMessageToSqsProducer()
106128
{
107129
$sqsMock = $this->createSqsContextMock();

0 commit comments

Comments
 (0)