Skip to content

Commit 0cf3e57

Browse files
author
Valentin Nazarov
committed
[Messenger] Add per-message priority
1 parent 883766e commit 0cf3e57

File tree

8 files changed

+170
-9
lines changed

8 files changed

+170
-9
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
1919
use Symfony\Component\Messenger\Envelope;
2020
use Symfony\Component\Messenger\Exception\TransportException;
21+
use Symfony\Component\Messenger\Stamp\DelayStamp;
22+
use Symfony\Component\Messenger\Stamp\PriorityStamp;
2123
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2224

2325
/**
@@ -118,4 +120,36 @@ public function testItThrowsATransportExceptionIfItCannotSendTheMessage()
118120
$sender = new AmqpSender($connection, $serializer);
119121
$sender->send($envelope);
120122
}
123+
124+
public function testSendWithDelay()
125+
{
126+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new DelayStamp(1000));
127+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
128+
129+
$serializer = $this->createMock(SerializerInterface::class);
130+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
131+
132+
$connection = $this->createMock(Connection::class);
133+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 1000);
134+
135+
$sender = new AmqpSender($connection, $serializer);
136+
$sender->send($envelope);
137+
}
138+
139+
public function testSendWithPriority()
140+
{
141+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new PriorityStamp(255));
142+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
143+
144+
$serializer = $this->createMock(SerializerInterface::class);
145+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
146+
147+
$connection = $this->createMock(Connection::class);
148+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $this->callback(function (AmqpStamp $stamp) {
149+
return 255 === $stamp->getAttributes()['priority'];
150+
}));
151+
152+
$sender = new AmqpSender($connection, $serializer);
153+
$sender->send($envelope);
154+
}
121155
}

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
17+
use Symfony\Component\Messenger\Stamp\PriorityStamp;
1718
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1819
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1920
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -57,6 +58,12 @@ public function send(Envelope $envelope): Envelope
5758
}
5859
}
5960

61+
/** @var PriorityStamp|null $priorityStamp */
62+
$priorityStamp = $envelope->last(PriorityStamp::class);
63+
if ($priorityStamp) {
64+
$amqpStamp = AmqpStamp::createWithAttributes(['priority' => $priorityStamp->getPriority()], $amqpStamp);
65+
}
66+
6067
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
6168
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
6269
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Transport;
1313

14+
use Pheanstalk\Contract\PheanstalkInterface;
1415
use PHPUnit\Framework\TestCase;
1516
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdSender;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Stamp\DelayStamp;
21+
use Symfony\Component\Messenger\Stamp\PriorityStamp;
2022
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2123

2224
final class BeanstalkdSenderTest extends TestCase
@@ -27,7 +29,7 @@ public function testSend()
2729
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
2830

2931
$connection = $this->createMock(Connection::class);
30-
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0);
32+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0, PheanstalkInterface::DEFAULT_PRIORITY);
3133

3234
$serializer = $this->createMock(SerializerInterface::class);
3335
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
@@ -42,7 +44,22 @@ public function testSendWithDelay()
4244
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
4345

4446
$connection = $this->createMock(Connection::class);
45-
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500);
47+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500, PheanstalkInterface::DEFAULT_PRIORITY);
48+
49+
$serializer = $this->createMock(SerializerInterface::class);
50+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
51+
52+
$sender = new BeanstalkdSender($connection, $serializer);
53+
$sender->send($envelope);
54+
}
55+
56+
public function testSendWithPriority()
57+
{
58+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new PriorityStamp(255));
59+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
60+
61+
$connection = $this->createMock(Connection::class);
62+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0, 0);
4663

4764
$serializer = $this->createMock(SerializerInterface::class);
4865
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
1313

14+
use Pheanstalk\Contract\PheanstalkInterface;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Stamp\DelayStamp;
17+
use Symfony\Component\Messenger\Stamp\PriorityStamp;
1618
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1719
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1820
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -42,8 +44,26 @@ public function send(Envelope $envelope): Envelope
4244
$delayStamp = $envelope->last(DelayStamp::class);
4345
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
4446

45-
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
47+
/** @var PriorityStamp|null $priorityStamp */
48+
$priorityStamp = $envelope->last(PriorityStamp::class);
49+
$priority = $this->getPheanstalkPriority($priorityStamp);
50+
51+
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs, $priority);
4652

4753
return $envelope;
4854
}
55+
56+
/**
57+
* Beanstalkd supports u32 priorities (0 to 2^32 - 1), with 0 being the highest.
58+
* RabbitMQ supports u8 priorities (0 to 255), with 255 being the highest.
59+
* To provide interoperability, use RabbitMQ model.
60+
*/
61+
private function getPheanstalkPriority(?PriorityStamp $stamp): int
62+
{
63+
if (null !== $stamp) {
64+
return PriorityStamp::MAX_PRIORITY - $stamp->getPriority();
65+
}
66+
67+
return PheanstalkInterface::DEFAULT_PRIORITY;
68+
}
4969
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ class Connection
3636
/**
3737
* Available options:
3838
*
39-
* * tube_name: name of the tube
40-
* * timeout: message reservation timeout (in seconds)
41-
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
39+
* * tube_name: name of the tube
40+
* * timeout: message reservation timeout (in seconds)
41+
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
4242
*/
4343
private $configuration;
4444
private $client;
@@ -103,11 +103,12 @@ public function getTube(): string
103103
}
104104

105105
/**
106-
* @param int $delay The delay in milliseconds
106+
* @param int $delay The delay in milliseconds
107+
* @param int $priority The priority in Beanstalkd terms (0 .. 2^32 - 1)
107108
*
108109
* @return string The inserted id
109110
*/
110-
public function send(string $body, array $headers, int $delay = 0): string
111+
public function send(string $body, array $headers, int $delay = 0, int $priority = PheanstalkInterface::DEFAULT_PRIORITY): string
111112
{
112113
$message = json_encode([
113114
'body' => $body,
@@ -121,7 +122,7 @@ public function send(string $body, array $headers, int $delay = 0): string
121122
try {
122123
$job = $this->client->useTube($this->tube)->put(
123124
$message,
124-
PheanstalkInterface::DEFAULT_PRIORITY,
125+
$priority,
125126
$delay / 1000,
126127
$this->ttr
127128
);

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
5.4
55
---
66

7+
* Add per-message priority for AMQP & Beanstalkd transports
78
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
89

910
5.3
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
15+
16+
/**
17+
* Apply this stamp to provide priority for your message on a transport.
18+
*
19+
* @author Valentin Nazarov <i.kozlice@protonmail.com>
20+
*/
21+
final class PriorityStamp implements StampInterface
22+
{
23+
public const MIN_PRIORITY = 0;
24+
public const MAX_PRIORITY = 255;
25+
26+
private $priority;
27+
28+
/**
29+
* @param int $priority The priority level
30+
*/
31+
public function __construct(int $priority)
32+
{
33+
if ($priority < self::MIN_PRIORITY || $priority > self::MAX_PRIORITY) {
34+
throw new InvalidArgumentException(sprintf('Priority must be between %d and %d.', self::MIN_PRIORITY, self::MAX_PRIORITY));
35+
}
36+
37+
$this->priority = $priority;
38+
}
39+
40+
public function getPriority(): int
41+
{
42+
return $this->priority;
43+
}
44+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Stamp;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Stamp\PriorityStamp;
17+
18+
/**
19+
* @author Valentin Nazarov <i.kozlice@protonmail.com>
20+
*/
21+
class PriorityStampTest extends TestCase
22+
{
23+
/**
24+
* @dataProvider invalidPriorityProvider
25+
*/
26+
public function testConstructorFailsOnPriorityOutOfBounds(int $priority)
27+
{
28+
$this->expectException(InvalidArgumentException::class);
29+
new PriorityStamp($priority);
30+
}
31+
32+
public function invalidPriorityProvider(): iterable
33+
{
34+
yield [PriorityStamp::MIN_PRIORITY - 1];
35+
yield [PriorityStamp::MAX_PRIORITY + 1];
36+
}
37+
}

0 commit comments

Comments
 (0)