From 38a73642cba74436714ccf54fd85624d0e774227 Mon Sep 17 00:00:00 2001 From: Valentin Nazarov Date: Tue, 14 Jun 2022 15:11:13 +0300 Subject: [PATCH] [Messenger] Add per-message priority --- .../Amqp/Tests/Transport/AmqpSenderTest.php | 34 ++++++++++++++ .../Bridge/Amqp/Transport/AmqpSender.php | 7 +++ .../Tests/Transport/BeanstalkdSenderTest.php | 16 +++++++ .../Beanstalkd/Transport/BeanstalkdSender.php | 22 +++++++++- .../Beanstalkd/Transport/Connection.php | 11 ++--- src/Symfony/Component/Messenger/CHANGELOG.md | 5 +++ .../Messenger/Stamp/PriorityStamp.php | 44 +++++++++++++++++++ .../Tests/Stamp/PriorityStampTest.php | 37 ++++++++++++++++ 8 files changed, 170 insertions(+), 6 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Stamp/PriorityStamp.php create mode 100644 src/Symfony/Component/Messenger/Tests/Stamp/PriorityStampTest.php diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php index 9949a0d59413f..8d15f6fca2ed3 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php @@ -18,6 +18,8 @@ use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\PriorityStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; /** @@ -70,6 +72,38 @@ public function testItSendsTheEncodedMessageWithoutHeaders() $sender->send($envelope); } + public function testItSendsWithDelay() + { + $envelope = (new Envelope(new DummyMessage('Oy')))->with(new DelayStamp(1000)); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 1000); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } + + public function testItSendsWithPriority() + { + $envelope = (new Envelope(new DummyMessage('Oy')))->with(new PriorityStamp(255)); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $this->callback(function (AmqpStamp $stamp) { + return 255 === $stamp->getAttributes()['priority']; + })); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } + public function testContentTypeHeaderIsMovedToAttribute() { $envelope = new Envelope(new DummyMessage('Oy')); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php index d19258eb7f346..6dccba54a85ee 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php @@ -14,6 +14,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\PriorityStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -57,6 +58,12 @@ public function send(Envelope $envelope): Envelope } } + /** @var PriorityStamp|null $priorityStamp */ + $priorityStamp = $envelope->last(PriorityStamp::class); + if ($priorityStamp) { + $amqpStamp = AmqpStamp::createWithAttributes(['priority' => $priorityStamp->getPriority()], $amqpStamp); + } + $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); if ($amqpReceivedStamp instanceof AmqpReceivedStamp) { $amqpStamp = AmqpStamp::createFromAmqpEnvelope( diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php index cfc5b8fdba84f..f801c2967fd17 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php @@ -17,6 +17,7 @@ use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\PriorityStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; final class BeanstalkdSenderTest extends TestCase @@ -50,4 +51,19 @@ public function testSendWithDelay() $sender = new BeanstalkdSender($connection, $serializer); $sender->send($envelope); } + + public function testSendWithPriority() + { + $envelope = (new Envelope(new DummyMessage('Oy')))->with(new PriorityStamp(255)); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0, 0); + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $sender = new BeanstalkdSender($connection, $serializer); + $sender->send($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php index a8dd7beaa11a8..0549c4eaf47a9 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php @@ -11,8 +11,10 @@ namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport; +use Pheanstalk\Contract\PheanstalkInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\PriorityStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -42,8 +44,26 @@ public function send(Envelope $envelope): Envelope $delayStamp = $envelope->last(DelayStamp::class); $delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0; - $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs); + /** @var PriorityStamp|null $priorityStamp */ + $priorityStamp = $envelope->last(PriorityStamp::class); + $priority = $this->getPheanstalkPriority($priorityStamp); + + $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs, $priority); return $envelope; } + + /** + * Beanstalkd supports u32 priorities (0 to 2^32 - 1), with 0 being the highest. + * RabbitMQ supports u8 priorities (0 to 255), with 255 being the highest. + * To provide interoperability, use RabbitMQ model. + */ + private function getPheanstalkPriority(?PriorityStamp $stamp): int + { + if (null !== $stamp) { + return PriorityStamp::MAX_PRIORITY - $stamp->getPriority(); + } + + return PheanstalkInterface::DEFAULT_PRIORITY; + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php index 1957a458d7939..1f590907907c4 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php @@ -36,9 +36,9 @@ class Connection /** * Available options: * - * * tube_name: name of the tube - * * timeout: message reservation timeout (in seconds) - * * ttr: the message time to run before it is put back in the ready queue (in seconds) + * * tube_name: name of the tube + * * timeout: message reservation timeout (in seconds) + * * ttr: the message time to run before it is put back in the ready queue (in seconds) */ private array $configuration; private PheanstalkInterface $client; @@ -104,10 +104,11 @@ public function getTube(): string /** * @param int $delay The delay in milliseconds + * @param int $priority The priority in Beanstalkd terms (0 .. 2^32 - 1) * * @return string The inserted id */ - public function send(string $body, array $headers, int $delay = 0): string + public function send(string $body, array $headers, int $delay = 0, int $priority = PheanstalkInterface::DEFAULT_PRIORITY): string { $message = json_encode([ 'body' => $body, @@ -121,7 +122,7 @@ public function send(string $body, array $headers, int $delay = 0): string try { $job = $this->client->useTube($this->tube)->put( $message, - PheanstalkInterface::DEFAULT_PRIORITY, + $priority, $delay / 1000, $this->ttr ); diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 6bc1111fc85db..fbaf79bd71bf2 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +6.2 +--- + + * Add per-message priority for AMQP & Beanstalkd transports + 6.1 --- diff --git a/src/Symfony/Component/Messenger/Stamp/PriorityStamp.php b/src/Symfony/Component/Messenger/Stamp/PriorityStamp.php new file mode 100644 index 0000000000000..0948012f0bd01 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/PriorityStamp.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +use Symfony\Component\Messenger\Exception\InvalidArgumentException; + +/** + * Apply this stamp to provide priority for your message on a transport. + * + * @author Valentin Nazarov + */ +final class PriorityStamp implements StampInterface +{ + public const MIN_PRIORITY = 0; + public const MAX_PRIORITY = 255; + + private int $priority; + + /** + * @param int $priority The priority level + */ + public function __construct(int $priority) + { + if ($priority < self::MIN_PRIORITY || $priority > self::MAX_PRIORITY) { + throw new InvalidArgumentException(sprintf('Priority must be between %d and %d.', self::MIN_PRIORITY, self::MAX_PRIORITY)); + } + + $this->priority = $priority; + } + + public function getPriority(): int + { + return $this->priority; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/PriorityStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/PriorityStampTest.php new file mode 100644 index 0000000000000..5410be735ca0f --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Stamp/PriorityStampTest.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Stamp; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Stamp\PriorityStamp; + +/** + * @author Valentin Nazarov + */ +class PriorityStampTest extends TestCase +{ + /** + * @dataProvider invalidPriorityProvider + */ + public function testConstructorFailsOnPriorityOutOfBounds(int $priority) + { + $this->expectException(InvalidArgumentException::class); + new PriorityStamp($priority); + } + + public function invalidPriorityProvider(): iterable + { + yield [PriorityStamp::MIN_PRIORITY - 1]; + yield [PriorityStamp::MAX_PRIORITY + 1]; + } +}