Skip to content

Improvements and fixes #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ bin/phpunit
bin/sql-formatter
vendor
.php_cs
.php_cs.cache
.php_cs.cache
composer.lock
36 changes: 1 addition & 35 deletions pkg/amqp-ext/Client/AmqpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Psr\DeliveryMode;
use Enqueue\AmqpExt\DeliveryMode;
use Enqueue\Psr\Message as TransportMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
Expand All @@ -33,11 +32,6 @@ class AmqpDriver implements DriverInterface
*/
private $queueMetaRegistry;

/**
* @var array
*/
private $priorityMap;

/**
* @param AmqpContext $context
* @param Config $config
Expand All @@ -48,14 +42,6 @@ public function __construct(AmqpContext $context, Config $config, QueueMetaRegis
$this->context = $context;
$this->config = $config;
$this->queueMetaRegistry = $queueMetaRegistry;

$this->priorityMap = [
MessagePriority::VERY_LOW => 0,
MessagePriority::LOW => 1,
MessagePriority::NORMAL => 2,
MessagePriority::HIGH => 3,
MessagePriority::VERY_HIGH => 4,
];
}

/**
Expand Down Expand Up @@ -131,7 +117,6 @@ public function createQueue($queueName)
{
$queue = $this->context->createQueue($this->config->createTransportQueueName($queueName));
$queue->addFlag(AMQP_DURABLE);
$queue->setArguments(['x-max-priority' => 4]);

return $queue;
}
Expand All @@ -152,17 +137,6 @@ public function createTransportMessage(Message $message)
$headers['expiration'] = (string) ($message->getExpire() * 1000);
}

if ($priority = $message->getPriority()) {
if (false == array_key_exists($priority, $this->priorityMap)) {
throw new \InvalidArgumentException(sprintf(
'Given priority could not be converted to client\'s one. Got: %s',
$priority
));
}

$headers['priority'] = $this->priorityMap[$priority];
}

$headers['delivery_mode'] = DeliveryMode::PERSISTENT;

$transportMessage = $this->context->createMessage();
Expand Down Expand Up @@ -198,14 +172,6 @@ public function createClientMessage(TransportMessage $message)
$clientMessage->setExpire((int) ((int) $expiration) / 1000);
}

if ($priority = $message->getHeader('priority')) {
if (false === $clientPriority = array_search($priority, $this->priorityMap, true)) {
throw new \LogicException(sprintf('Cant convert transport priority to client: "%s"', $priority));
}

$clientMessage->setPriority($clientPriority);
}

$clientMessage->setMessageId($message->getMessageId());
$clientMessage->setTimestamp($message->getTimestamp());

Expand Down
47 changes: 47 additions & 0 deletions pkg/amqp-ext/Client/RabbitMqDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Enqueue\AmqpExt\AmqpTopic;
use Enqueue\Client\Config;
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Consumption\Exception\LogicException;
use Enqueue\Psr\Message as TransportMessage;
Expand All @@ -31,6 +32,11 @@ class RabbitMqDriver extends AmqpDriver
*/
private $queueMetaRegistry;

/**
* @var array
*/
private $priorityMap;

/**
* @param AmqpContext $context
* @param Config $config
Expand All @@ -43,6 +49,14 @@ public function __construct(AmqpContext $context, Config $config, QueueMetaRegis
$this->config = $config;
$this->context = $context;
$this->queueMetaRegistry = $queueMetaRegistry;

$this->priorityMap = [
MessagePriority::VERY_LOW => 0,
MessagePriority::LOW => 1,
MessagePriority::NORMAL => 2,
MessagePriority::HIGH => 3,
MessagePriority::VERY_HIGH => 4,
];
}

/**
Expand All @@ -68,6 +82,19 @@ public function sendToProcessor(Message $message)
$this->context->createProducer()->send($destination, $transportMessage);
}

/**
* {@inheritdoc}
*
* @return AmqpQueue
*/
public function createQueue($queueName)
{
$queue = parent::createQueue($queueName);
$queue->setArguments(['x-max-priority' => 4]);

return $queue;
}

/**
* {@inheritdoc}
*
Expand All @@ -77,6 +104,17 @@ public function createTransportMessage(Message $message)
{
$transportMessage = parent::createTransportMessage($message);

if ($priority = $message->getPriority()) {
if (false == array_key_exists($priority, $this->priorityMap)) {
throw new \InvalidArgumentException(sprintf(
'Given priority could not be converted to client\'s one. Got: %s',
$priority
));
}

$transportMessage->setHeader('priority', $this->priorityMap[$priority]);
}

if ($message->getDelay()) {
if (false == $this->config->getTransportOption('delay_plugin_installed', false)) {
throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.');
Expand All @@ -85,6 +123,7 @@ public function createTransportMessage(Message $message)
$transportMessage->setProperty('x-delay', (string) ($message->getDelay() * 1000));
}


return $transportMessage;
}

Expand All @@ -97,6 +136,14 @@ public function createClientMessage(TransportMessage $message)
{
$clientMessage = parent::createClientMessage($message);

if ($priority = $message->getHeader('priority')) {
if (false === $clientPriority = array_search($priority, $this->priorityMap, true)) {
throw new \LogicException(sprintf('Cant convert transport priority to client: "%s"', $priority));
}

$clientMessage->setPriority($clientPriority);
}

if ($delay = $message->getProperty('x-delay')) {
if (false == is_numeric($delay)) {
throw new \LogicException(sprintf('x-delay header is not numeric. "%s"', $delay));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace Enqueue\Psr;
namespace Enqueue\AmqpExt;

final class DeliveryMode
{
Expand Down
42 changes: 1 addition & 41 deletions pkg/amqp-ext/Tests/Client/AmqpDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Psr\Producer;
use Enqueue\Test\ClassExtensionTrait;
Expand Down Expand Up @@ -60,7 +59,7 @@ public function testShouldCreateAndReturnQueueInstance()

$this->assertSame($expectedQueue, $queue);
$this->assertSame('queue-name', $queue->getQueueName());
$this->assertSame(['x-max-priority' => 4], $queue->getArguments());
$this->assertSame([], $queue->getArguments());
$this->assertSame(2, $queue->getFlags());
$this->assertNull($queue->getConsumerTag());
$this->assertSame([], $queue->getBindArguments());
Expand All @@ -74,7 +73,6 @@ public function testShouldConvertTransportMessageToClientMessage()
$transportMessage->setProperties(['key' => 'val']);
$transportMessage->setHeader('content_type', 'ContentType');
$transportMessage->setHeader('expiration', '12345000');
$transportMessage->setHeader('priority', 3);
$transportMessage->setMessageId('MessageId');
$transportMessage->setTimestamp(1000);

Expand All @@ -92,7 +90,6 @@ public function testShouldConvertTransportMessageToClientMessage()
'hkey' => 'hval',
'content_type' => 'ContentType',
'expiration' => '12345000',
'priority' => 3,
'message_id' => 'MessageId',
'timestamp' => 1000,
], $clientMessage->getHeaders());
Expand All @@ -103,7 +100,6 @@ public function testShouldConvertTransportMessageToClientMessage()
$this->assertSame(12345, $clientMessage->getExpire());
$this->assertSame('ContentType', $clientMessage->getContentType());
$this->assertSame(1000, $clientMessage->getTimestamp());
$this->assertSame(MessagePriority::HIGH, $clientMessage->getPriority());
}

public function testShouldThrowExceptionIfExpirationIsNotNumeric()
Expand All @@ -123,40 +119,6 @@ public function testShouldThrowExceptionIfExpirationIsNotNumeric()
$driver->createClientMessage($transportMessage);
}

public function testShouldThrowExceptionIfCantConvertTransportPriorityToClientPriority()
{
$transportMessage = new AmqpMessage();
$transportMessage->setHeader('priority', 'unknown');

$driver = new AmqpDriver(
$this->createPsrContextMock(),
new Config('', '', '', '', '', ''),
$this->createQueueMetaRegistryMock()
);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Cant convert transport priority to client: "unknown"');

$driver->createClientMessage($transportMessage);
}

public function testShouldThrowExceptionIfCantConvertClientPriorityToTransportPriority()
{
$clientMessage = new Message();
$clientMessage->setPriority('unknown');

$driver = new AmqpDriver(
$this->createPsrContextMock(),
new Config('', '', '', '', '', ''),
$this->createQueueMetaRegistryMock()
);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Given priority could not be converted to client\'s one. Got: unknown');

$driver->createTransportMessage($clientMessage);
}

public function testShouldConvertClientMessageToTransportMessage()
{
$clientMessage = new Message();
Expand All @@ -165,7 +127,6 @@ public function testShouldConvertClientMessageToTransportMessage()
$clientMessage->setProperties(['key' => 'val']);
$clientMessage->setContentType('ContentType');
$clientMessage->setExpire(123);
$clientMessage->setPriority(MessagePriority::VERY_HIGH);
$clientMessage->setMessageId('MessageId');
$clientMessage->setTimestamp(1000);

Expand All @@ -190,7 +151,6 @@ public function testShouldConvertClientMessageToTransportMessage()
'hkey' => 'hval',
'content_type' => 'ContentType',
'expiration' => '123000',
'priority' => 4,
'delivery_mode' => 2,
'message_id' => 'MessageId',
'timestamp' => 1000,
Expand Down
11 changes: 9 additions & 2 deletions pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,15 @@ public function testShouldThrowExceptionIfCantConvertClientPriorityToTransportPr
$clientMessage = new Message();
$clientMessage->setPriority('unknown');

$context = $this->createPsrContextMock();
$context
->expects($this->once())
->method('createMessage')
->willReturn(new AmqpMessage())
;

$driver = new RabbitMqDriver(
$this->createPsrContextMock(),
$context,
new Config('', '', '', '', '', ''),
$this->createQueueMetaRegistryMock()
);
Expand Down Expand Up @@ -217,10 +224,10 @@ public function testShouldConvertClientMessageToTransportMessage()
'hkey' => 'hval',
'content_type' => 'ContentType',
'expiration' => '123000',
'priority' => 4,
'delivery_mode' => 2,
'message_id' => 'MessageId',
'timestamp' => 1000,
'priority' => 4,
], $transportMessage->getHeaders());
$this->assertSame([
'key' => 'val',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public function onPreReceived(Context $context)
if (false == $message->isRedelivered()) {
return;
}
if (false != $context->getResult()) {
return;
}

$delayedMessage = $this->driver->createClientMessage($message);

Expand Down
2 changes: 1 addition & 1 deletion pkg/enqueue/Client/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function getBody()
}

/**
* @param null|string $body
* @param null|string|number|array|\JsonSerializable $body
*/
public function setBody($body)
{
Expand Down
8 changes: 4 additions & 4 deletions pkg/enqueue/Consumption/ExtensionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ public function onBeforeReceive(Context $context);
/**
* Executed when a new message is received from a broker but before it was passed to processor
* The context contains a message.
* The extension may set a status.
* The consumption could be interrupted at this step but it will done only after the message is processed.
* The extension may set a status. If the status is set the exception is thrown
* The consumption could be interrupted at this step but it exits after the message is processed.
*
* @param Context $context
*/
public function onPreReceived(Context $context);

/**
* Executed when a message is processed by a processor.
* The context contains a message status and could be changed
* The consumption could be interrupted at this step but it will done only after the message is processed.
* The context contains a status, which could not be changed.
* The consumption could be interrupted at this step but it exits after the message is processed.
*
* @param Context $context
*/
Expand Down
2 changes: 0 additions & 2 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,6 @@ protected function doConsume(ExtensionInterface $extension, Context $context)

$extension->onPostReceived($context);
} else {
$logger->debug(sprintf('Idle'));

usleep($this->idleMicroseconds);
$extension->onIdle($context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,25 @@ public function testShouldDoNothingIfMessageIsNotRedelivered()
self::assertNull($context->getResult());
}

public function testShouldDoNothingIfMessageIsRedeliveredButResultWasAlreadySetOnContext()
{
$message = new NullMessage();
$message->setRedelivered(true);

$driver = $this->createDriverMock();
$driver
->expects(self::never())
->method('sendToProcessor')
;

$context = new Context($this->createPsrContextMock());
$context->setPsrMessage($message);
$context->setResult('aStatus');

$extension = new DelayRedeliveredMessageExtension($driver, 12345);
$extension->onPreReceived($context);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface
*/
Expand Down
Loading