diff --git a/.gitignore b/.gitignore index b9809c577..57ae5ec2e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ bin/phpunit bin/sql-formatter vendor .php_cs -.php_cs.cache \ No newline at end of file +.php_cs.cache +composer.lock \ No newline at end of file diff --git a/pkg/amqp-ext/Client/AmqpDriver.php b/pkg/amqp-ext/Client/AmqpDriver.php index 560ef9036..52fd8d87b 100644 --- a/pkg/amqp-ext/Client/AmqpDriver.php +++ b/pkg/amqp-ext/Client/AmqpDriver.php @@ -9,9 +9,8 @@ use Enqueue\Client\Config; use Enqueue\Client\DriverInterface; use Enqueue\Client\Message; -use Enqueue\Client\MessagePriority; use Enqueue\Client\Meta\QueueMetaRegistry; -use Enqueue\Psr\DeliveryMode; +use Enqueue\AmqpExt\DeliveryMode; use Enqueue\Psr\Message as TransportMessage; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -33,11 +32,6 @@ class AmqpDriver implements DriverInterface */ private $queueMetaRegistry; - /** - * @var array - */ - private $priorityMap; - /** * @param AmqpContext $context * @param Config $config @@ -48,14 +42,6 @@ public function __construct(AmqpContext $context, Config $config, QueueMetaRegis $this->context = $context; $this->config = $config; $this->queueMetaRegistry = $queueMetaRegistry; - - $this->priorityMap = [ - MessagePriority::VERY_LOW => 0, - MessagePriority::LOW => 1, - MessagePriority::NORMAL => 2, - MessagePriority::HIGH => 3, - MessagePriority::VERY_HIGH => 4, - ]; } /** @@ -131,7 +117,6 @@ public function createQueue($queueName) { $queue = $this->context->createQueue($this->config->createTransportQueueName($queueName)); $queue->addFlag(AMQP_DURABLE); - $queue->setArguments(['x-max-priority' => 4]); return $queue; } @@ -152,17 +137,6 @@ public function createTransportMessage(Message $message) $headers['expiration'] = (string) ($message->getExpire() * 1000); } - if ($priority = $message->getPriority()) { - if (false == array_key_exists($priority, $this->priorityMap)) { - throw new \InvalidArgumentException(sprintf( - 'Given priority could not be converted to client\'s one. Got: %s', - $priority - )); - } - - $headers['priority'] = $this->priorityMap[$priority]; - } - $headers['delivery_mode'] = DeliveryMode::PERSISTENT; $transportMessage = $this->context->createMessage(); @@ -198,14 +172,6 @@ public function createClientMessage(TransportMessage $message) $clientMessage->setExpire((int) ((int) $expiration) / 1000); } - if ($priority = $message->getHeader('priority')) { - if (false === $clientPriority = array_search($priority, $this->priorityMap, true)) { - throw new \LogicException(sprintf('Cant convert transport priority to client: "%s"', $priority)); - } - - $clientMessage->setPriority($clientPriority); - } - $clientMessage->setMessageId($message->getMessageId()); $clientMessage->setTimestamp($message->getTimestamp()); diff --git a/pkg/amqp-ext/Client/RabbitMqDriver.php b/pkg/amqp-ext/Client/RabbitMqDriver.php index cd6b189e3..dbd3af9fb 100644 --- a/pkg/amqp-ext/Client/RabbitMqDriver.php +++ b/pkg/amqp-ext/Client/RabbitMqDriver.php @@ -8,6 +8,7 @@ use Enqueue\AmqpExt\AmqpTopic; use Enqueue\Client\Config; use Enqueue\Client\Message; +use Enqueue\Client\MessagePriority; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Consumption\Exception\LogicException; use Enqueue\Psr\Message as TransportMessage; @@ -31,6 +32,11 @@ class RabbitMqDriver extends AmqpDriver */ private $queueMetaRegistry; + /** + * @var array + */ + private $priorityMap; + /** * @param AmqpContext $context * @param Config $config @@ -43,6 +49,14 @@ public function __construct(AmqpContext $context, Config $config, QueueMetaRegis $this->config = $config; $this->context = $context; $this->queueMetaRegistry = $queueMetaRegistry; + + $this->priorityMap = [ + MessagePriority::VERY_LOW => 0, + MessagePriority::LOW => 1, + MessagePriority::NORMAL => 2, + MessagePriority::HIGH => 3, + MessagePriority::VERY_HIGH => 4, + ]; } /** @@ -68,6 +82,19 @@ public function sendToProcessor(Message $message) $this->context->createProducer()->send($destination, $transportMessage); } + /** + * {@inheritdoc} + * + * @return AmqpQueue + */ + public function createQueue($queueName) + { + $queue = parent::createQueue($queueName); + $queue->setArguments(['x-max-priority' => 4]); + + return $queue; + } + /** * {@inheritdoc} * @@ -77,6 +104,17 @@ public function createTransportMessage(Message $message) { $transportMessage = parent::createTransportMessage($message); + if ($priority = $message->getPriority()) { + if (false == array_key_exists($priority, $this->priorityMap)) { + throw new \InvalidArgumentException(sprintf( + 'Given priority could not be converted to client\'s one. Got: %s', + $priority + )); + } + + $transportMessage->setHeader('priority', $this->priorityMap[$priority]); + } + if ($message->getDelay()) { if (false == $this->config->getTransportOption('delay_plugin_installed', false)) { throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.'); @@ -85,6 +123,7 @@ public function createTransportMessage(Message $message) $transportMessage->setProperty('x-delay', (string) ($message->getDelay() * 1000)); } + return $transportMessage; } @@ -97,6 +136,14 @@ public function createClientMessage(TransportMessage $message) { $clientMessage = parent::createClientMessage($message); + if ($priority = $message->getHeader('priority')) { + if (false === $clientPriority = array_search($priority, $this->priorityMap, true)) { + throw new \LogicException(sprintf('Cant convert transport priority to client: "%s"', $priority)); + } + + $clientMessage->setPriority($clientPriority); + } + if ($delay = $message->getProperty('x-delay')) { if (false == is_numeric($delay)) { throw new \LogicException(sprintf('x-delay header is not numeric. "%s"', $delay)); diff --git a/pkg/psr-queue/DeliveryMode.php b/pkg/amqp-ext/DeliveryMode.php similarity index 77% rename from pkg/psr-queue/DeliveryMode.php rename to pkg/amqp-ext/DeliveryMode.php index e7631e785..3dac593d6 100644 --- a/pkg/psr-queue/DeliveryMode.php +++ b/pkg/amqp-ext/DeliveryMode.php @@ -1,6 +1,6 @@ assertSame($expectedQueue, $queue); $this->assertSame('queue-name', $queue->getQueueName()); - $this->assertSame(['x-max-priority' => 4], $queue->getArguments()); + $this->assertSame([], $queue->getArguments()); $this->assertSame(2, $queue->getFlags()); $this->assertNull($queue->getConsumerTag()); $this->assertSame([], $queue->getBindArguments()); @@ -74,7 +73,6 @@ public function testShouldConvertTransportMessageToClientMessage() $transportMessage->setProperties(['key' => 'val']); $transportMessage->setHeader('content_type', 'ContentType'); $transportMessage->setHeader('expiration', '12345000'); - $transportMessage->setHeader('priority', 3); $transportMessage->setMessageId('MessageId'); $transportMessage->setTimestamp(1000); @@ -92,7 +90,6 @@ public function testShouldConvertTransportMessageToClientMessage() 'hkey' => 'hval', 'content_type' => 'ContentType', 'expiration' => '12345000', - 'priority' => 3, 'message_id' => 'MessageId', 'timestamp' => 1000, ], $clientMessage->getHeaders()); @@ -103,7 +100,6 @@ public function testShouldConvertTransportMessageToClientMessage() $this->assertSame(12345, $clientMessage->getExpire()); $this->assertSame('ContentType', $clientMessage->getContentType()); $this->assertSame(1000, $clientMessage->getTimestamp()); - $this->assertSame(MessagePriority::HIGH, $clientMessage->getPriority()); } public function testShouldThrowExceptionIfExpirationIsNotNumeric() @@ -123,40 +119,6 @@ public function testShouldThrowExceptionIfExpirationIsNotNumeric() $driver->createClientMessage($transportMessage); } - public function testShouldThrowExceptionIfCantConvertTransportPriorityToClientPriority() - { - $transportMessage = new AmqpMessage(); - $transportMessage->setHeader('priority', 'unknown'); - - $driver = new AmqpDriver( - $this->createPsrContextMock(), - new Config('', '', '', '', '', ''), - $this->createQueueMetaRegistryMock() - ); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Cant convert transport priority to client: "unknown"'); - - $driver->createClientMessage($transportMessage); - } - - public function testShouldThrowExceptionIfCantConvertClientPriorityToTransportPriority() - { - $clientMessage = new Message(); - $clientMessage->setPriority('unknown'); - - $driver = new AmqpDriver( - $this->createPsrContextMock(), - new Config('', '', '', '', '', ''), - $this->createQueueMetaRegistryMock() - ); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Given priority could not be converted to client\'s one. Got: unknown'); - - $driver->createTransportMessage($clientMessage); - } - public function testShouldConvertClientMessageToTransportMessage() { $clientMessage = new Message(); @@ -165,7 +127,6 @@ public function testShouldConvertClientMessageToTransportMessage() $clientMessage->setProperties(['key' => 'val']); $clientMessage->setContentType('ContentType'); $clientMessage->setExpire(123); - $clientMessage->setPriority(MessagePriority::VERY_HIGH); $clientMessage->setMessageId('MessageId'); $clientMessage->setTimestamp(1000); @@ -190,7 +151,6 @@ public function testShouldConvertClientMessageToTransportMessage() 'hkey' => 'hval', 'content_type' => 'ContentType', 'expiration' => '123000', - 'priority' => 4, 'delivery_mode' => 2, 'message_id' => 'MessageId', 'timestamp' => 1000, diff --git a/pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php b/pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php index db88f30be..e17464d82 100644 --- a/pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php +++ b/pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php @@ -171,8 +171,15 @@ public function testShouldThrowExceptionIfCantConvertClientPriorityToTransportPr $clientMessage = new Message(); $clientMessage->setPriority('unknown'); + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new AmqpMessage()) + ; + $driver = new RabbitMqDriver( - $this->createPsrContextMock(), + $context, new Config('', '', '', '', '', ''), $this->createQueueMetaRegistryMock() ); @@ -217,10 +224,10 @@ public function testShouldConvertClientMessageToTransportMessage() 'hkey' => 'hval', 'content_type' => 'ContentType', 'expiration' => '123000', - 'priority' => 4, 'delivery_mode' => 2, 'message_id' => 'MessageId', 'timestamp' => 1000, + 'priority' => 4, ], $transportMessage->getHeaders()); $this->assertSame([ 'key' => 'val', diff --git a/pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php b/pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php index 7f1b49d79..f6169080a 100644 --- a/pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php +++ b/pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php @@ -45,6 +45,9 @@ public function onPreReceived(Context $context) if (false == $message->isRedelivered()) { return; } + if (false != $context->getResult()) { + return; + } $delayedMessage = $this->driver->createClientMessage($message); diff --git a/pkg/enqueue/Client/Message.php b/pkg/enqueue/Client/Message.php index da4cd74f5..b2117e16c 100644 --- a/pkg/enqueue/Client/Message.php +++ b/pkg/enqueue/Client/Message.php @@ -68,7 +68,7 @@ public function getBody() } /** - * @param null|string $body + * @param null|string|number|array|\JsonSerializable $body */ public function setBody($body) { diff --git a/pkg/enqueue/Consumption/ExtensionInterface.php b/pkg/enqueue/Consumption/ExtensionInterface.php index d05109450..5fbfe1819 100644 --- a/pkg/enqueue/Consumption/ExtensionInterface.php +++ b/pkg/enqueue/Consumption/ExtensionInterface.php @@ -24,8 +24,8 @@ public function onBeforeReceive(Context $context); /** * Executed when a new message is received from a broker but before it was passed to processor * The context contains a message. - * The extension may set a status. - * The consumption could be interrupted at this step but it will done only after the message is processed. + * The extension may set a status. If the status is set the exception is thrown + * The consumption could be interrupted at this step but it exits after the message is processed. * * @param Context $context */ @@ -33,8 +33,8 @@ public function onPreReceived(Context $context); /** * Executed when a message is processed by a processor. - * The context contains a message status and could be changed - * The consumption could be interrupted at this step but it will done only after the message is processed. + * The context contains a status, which could not be changed. + * The consumption could be interrupted at this step but it exits after the message is processed. * * @param Context $context */ diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index 002fcd8b7..e0739ff92 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -222,8 +222,6 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $extension->onPostReceived($context); } else { - $logger->debug(sprintf('Idle')); - usleep($this->idleMicroseconds); $extension->onIdle($context); } diff --git a/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php b/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php index 3e2a99a41..f248a4b9f 100644 --- a/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php +++ b/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php @@ -97,6 +97,25 @@ public function testShouldDoNothingIfMessageIsNotRedelivered() self::assertNull($context->getResult()); } + public function testShouldDoNothingIfMessageIsRedeliveredButResultWasAlreadySetOnContext() + { + $message = new NullMessage(); + $message->setRedelivered(true); + + $driver = $this->createDriverMock(); + $driver + ->expects(self::never()) + ->method('sendToProcessor') + ; + + $context = new Context($this->createPsrContextMock()); + $context->setPsrMessage($message); + $context->setResult('aStatus'); + + $extension = new DelayRedeliveredMessageExtension($driver, 12345); + $extension->onPreReceived($context); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface */ diff --git a/pkg/psr-queue/InvalidDeliveryModeException.php b/pkg/psr-queue/InvalidDeliveryModeException.php deleted file mode 100644 index 57edfba04..000000000 --- a/pkg/psr-queue/InvalidDeliveryModeException.php +++ /dev/null @@ -1,23 +0,0 @@ -assertClassExtends(ExceptionInterface::class, InvalidDeliveryModeException::class); - } - - public function testCouldBeConstructedWithoutAnyArguments() - { - new InvalidDeliveryModeException(); - } - - public function testThrowIfDeliveryModeIsNotValid() - { - $this->expectException(InvalidDeliveryModeException::class); - $this->expectExceptionMessage('The delivery mode must be one of [2,1].'); - - InvalidDeliveryModeException::assertValidDeliveryMode('is-not-valid'); - } - - public function testShouldDoNothingIfDeliveryModeIsValid() - { - InvalidDeliveryModeException::assertValidDeliveryMode(DeliveryMode::PERSISTENT); - InvalidDeliveryModeException::assertValidDeliveryMode(DeliveryMode::NON_PERSISTENT); - } -}