Skip to content

Commit 750ea0b

Browse files
authored
Merge pull request php-enqueue#162 from php-enqueue/delay_driver
Delay Strategy Configuration
2 parents 787a0ac + 237c10e commit 750ea0b

18 files changed

+334
-152
lines changed

phpunit.xml.dist

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@
3333
<directory>pkg/amqp-bunny/Tests</directory>
3434
</testsuite>
3535

36+
<testsuite name="amqp-lib transport">
37+
<directory>pkg/amqp-lib/Tests</directory>
38+
</testsuite>
39+
40+
<testsuite name="amqp-tools">
41+
<directory>pkg/amqp-tools/Tests</directory>
42+
</testsuite>
43+
3644
<testsuite name="fs transport">
3745
<directory>pkg/fs/Tests</directory>
3846
</testsuite>

pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\AmqpBunny\Symfony;
44

5+
use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait;
56
use Enqueue\Client\Amqp\RabbitMqDriver;
67
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
78
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -10,6 +11,8 @@
1011

1112
class RabbitMqAmqpBunnyTransportFactory extends AmqpBunnyTransportFactory
1213
{
14+
use DelayStrategyTransportFactoryTrait;
15+
1316
/**
1417
* @param string $name
1518
*/
@@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder)
2730

2831
$builder
2932
->children()
30-
->booleanNode('delay_plugin_installed')
31-
->defaultFalse()
32-
->info('The option tells whether RabbitMQ broker has delay plugin installed or not')
33+
->scalarNode('delay_strategy')
34+
->defaultValue('dlx')
35+
->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id')
3336
->end()
3437
;
3538
}
3639

40+
/**
41+
* {@inheritdoc}
42+
*/
43+
public function createConnectionFactory(ContainerBuilder $container, array $config)
44+
{
45+
$factoryId = parent::createConnectionFactory($container, $config);
46+
47+
$this->registerDelayStrategy($container, $config, $factoryId, $this->getName());
48+
49+
return $factoryId;
50+
}
51+
3752
/**
3853
* {@inheritdoc}
3954
*/

pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration()
5858
'user' => 'guest',
5959
'pass' => 'guest',
6060
'vhost' => '/',
61-
'delay_plugin_installed' => false,
61+
'delay_strategy' => 'dlx',
6262
'lazy' => true,
6363
'receive_method' => 'basic_get',
6464
'heartbeat' => 0,
@@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory()
7878
'pass' => 'guest',
7979
'vhost' => '/',
8080
'persisted' => false,
81-
'delay_plugin_installed' => false,
81+
'delay_strategy' => null,
8282
]);
8383

8484
$this->assertTrue($container->hasDefinition($serviceId));
@@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory()
9191
'pass' => 'guest',
9292
'vhost' => '/',
9393
'persisted' => false,
94-
'delay_plugin_installed' => false,
94+
'delay_strategy' => null,
9595
]], $factory->getArguments());
9696
}
9797

@@ -108,7 +108,7 @@ public function testShouldCreateContext()
108108
'pass' => 'guest',
109109
'vhost' => '/',
110110
'persisted' => false,
111-
'delay_plugin_installed' => false,
111+
'delay_strategy' => null,
112112
]);
113113

114114
$this->assertEquals('enqueue.transport.rabbitmq_amqp_bunny.context', $serviceId);

pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\AmqpExt\Symfony;
44

5+
use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait;
56
use Enqueue\Client\Amqp\RabbitMqDriver;
67
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
78
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -10,6 +11,8 @@
1011

1112
class RabbitMqAmqpTransportFactory extends AmqpTransportFactory
1213
{
14+
use DelayStrategyTransportFactoryTrait;
15+
1316
/**
1417
* @param string $name
1518
*/
@@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder)
2730

2831
$builder
2932
->children()
30-
->booleanNode('delay_plugin_installed')
31-
->defaultFalse()
32-
->info('The option tells whether RabbitMQ broker has delay plugin installed or not')
33+
->scalarNode('delay_strategy')
34+
->defaultValue('dlx')
35+
->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id')
3336
->end()
3437
;
3538
}
3639

40+
/**
41+
* {@inheritdoc}
42+
*/
43+
public function createConnectionFactory(ContainerBuilder $container, array $config)
44+
{
45+
$factoryId = parent::createConnectionFactory($container, $config);
46+
47+
$this->registerDelayStrategy($container, $config, $factoryId, $this->getName());
48+
49+
return $factoryId;
50+
}
51+
3752
/**
3853
* {@inheritdoc}
3954
*/

pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public function testShouldAllowAddConfiguration()
5959
'pass' => 'guest',
6060
'vhost' => '/',
6161
'persisted' => false,
62-
'delay_plugin_installed' => false,
62+
'delay_strategy' => 'dlx',
6363
'lazy' => true,
6464
'receive_method' => 'basic_get',
6565
], $config);
@@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory()
7878
'pass' => 'guest',
7979
'vhost' => '/',
8080
'persisted' => false,
81-
'delay_plugin_installed' => false,
81+
'delay_strategy' => null,
8282
]);
8383

8484
$this->assertTrue($container->hasDefinition($serviceId));
@@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory()
9191
'pass' => 'guest',
9292
'vhost' => '/',
9393
'persisted' => false,
94-
'delay_plugin_installed' => false,
94+
'delay_strategy' => null,
9595
]], $factory->getArguments());
9696
}
9797

@@ -108,7 +108,7 @@ public function testShouldCreateContext()
108108
'pass' => 'guest',
109109
'vhost' => '/',
110110
'persisted' => false,
111-
'delay_plugin_installed' => false,
111+
'delay_strategy' => null,
112112
]);
113113

114114
$this->assertEquals('enqueue.transport.rabbitmq_amqp.context', $serviceId);

pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\AmqpLib\Symfony;
44

5+
use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait;
56
use Enqueue\Client\Amqp\RabbitMqDriver;
67
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
78
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -10,6 +11,8 @@
1011

1112
class RabbitMqAmqpLibTransportFactory extends AmqpLibTransportFactory
1213
{
14+
use DelayStrategyTransportFactoryTrait;
15+
1316
/**
1417
* @param string $name
1518
*/
@@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder)
2730

2831
$builder
2932
->children()
30-
->booleanNode('delay_plugin_installed')
31-
->defaultFalse()
32-
->info('The option tells whether RabbitMQ broker has delay plugin installed or not')
33+
->scalarNode('delay_strategy')
34+
->defaultValue('dlx')
35+
->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id')
3336
->end()
3437
;
3538
}
3639

40+
/**
41+
* {@inheritdoc}
42+
*/
43+
public function createConnectionFactory(ContainerBuilder $container, array $config)
44+
{
45+
$factoryId = parent::createConnectionFactory($container, $config);
46+
47+
$this->registerDelayStrategy($container, $config, $factoryId, $this->getName());
48+
49+
return $factoryId;
50+
}
51+
3752
/**
3853
* {@inheritdoc}
3954
*/

pkg/amqp-lib/Tests/AmqpConsumerTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public function testShouldReturnMessageOnReceiveNoWait()
103103
$amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag';
104104
$amqpMessage->delivery_info['routing_key'] = 'routing-key';
105105
$amqpMessage->delivery_info['redelivered'] = true;
106+
$amqpMessage->delivery_info['routing_key'] = 'routing-key';
106107

107108
$channel = $this->createChannelMock();
108109
$channel
@@ -121,6 +122,7 @@ public function testShouldReturnMessageOnReceiveNoWait()
121122
$this->assertInstanceOf(AmqpMessage::class, $message);
122123
$this->assertSame('body', $message->getBody());
123124
$this->assertSame('delivery-tag', $message->getDeliveryTag());
125+
$this->assertSame('routing-key', $message->getRoutingKey());
124126
$this->assertTrue($message->isRedelivered());
125127
}
126128

@@ -148,6 +150,7 @@ public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet()
148150
$this->assertInstanceOf(AmqpMessage::class, $message);
149151
$this->assertSame('body', $message->getBody());
150152
$this->assertSame('delivery-tag', $message->getDeliveryTag());
153+
$this->assertSame('routing-key', $message->getRoutingKey());
151154
$this->assertTrue($message->isRedelivered());
152155
}
153156

pkg/amqp-lib/Tests/AmqpContextTest.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Interop\Amqp\Impl\AmqpTopic;
99
use PhpAmqpLib\Channel\AMQPChannel;
1010
use PhpAmqpLib\Connection\AbstractConnection;
11+
use PhpAmqpLib\Wire\AMQPTable;
1112
use PHPUnit\Framework\TestCase;
1213

1314
class AmqpContextTest extends TestCase
@@ -26,7 +27,7 @@ public function testShouldDeclareTopic()
2627
$this->isTrue(),
2728
$this->isTrue(),
2829
$this->isTrue(),
29-
$this->identicalTo(['key' => 'value']),
30+
$this->isInstanceOf(AMQPTable::class),
3031
$this->isNull()
3132
)
3233
;
@@ -94,7 +95,7 @@ public function testShouldDeclareQueue()
9495
$this->isTrue(),
9596
$this->isTrue(),
9697
$this->isTrue(),
97-
$this->identicalTo(['key' => 'value']),
98+
$this->isInstanceOf(AMQPTable::class),
9899
$this->isNull()
99100
)
100101
;

pkg/amqp-lib/Tests/AmqpProducerTest.php

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\AmqpLib\Tests;
44

5+
use Enqueue\AmqpLib\AmqpContext;
56
use Enqueue\AmqpLib\AmqpProducer;
67
use Enqueue\Test\ClassExtensionTrait;
78
use Interop\Amqp\Impl\AmqpMessage;
@@ -23,7 +24,7 @@ class AmqpProducerTest extends TestCase
2324

2425
public function testCouldBeConstructedWithRequiredArguments()
2526
{
26-
new AmqpProducer($this->createAmqpChannelMock());
27+
new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock());
2728
}
2829

2930
public function testShouldImplementPsrProducerInterface()
@@ -33,7 +34,7 @@ public function testShouldImplementPsrProducerInterface()
3334

3435
public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()
3536
{
36-
$producer = new AmqpProducer($this->createAmqpChannelMock());
37+
$producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock());
3738

3839
$this->expectException(InvalidDestinationException::class);
3940
$this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpQueue but got');
@@ -43,7 +44,7 @@ public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()
4344

4445
public function testShouldThrowExceptionWhenMessageTypeIsInvalid()
4546
{
46-
$producer = new AmqpProducer($this->createAmqpChannelMock());
47+
$producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock());
4748

4849
$this->expectException(InvalidMessageException::class);
4950
$this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but it is');
@@ -70,7 +71,7 @@ public function testShouldPublishMessageToTopic()
7071
$message = new AmqpMessage('body');
7172
$message->setRoutingKey('routing-key');
7273

73-
$producer = new AmqpProducer($channel);
74+
$producer = new AmqpProducer($channel, $this->createContextMock());
7475
$producer->send($topic, $message);
7576

7677
$this->assertEquals('body', $amqpMessage->getBody());
@@ -92,7 +93,7 @@ public function testShouldPublishMessageToQueue()
9293

9394
$queue = new AmqpQueue('queue');
9495

95-
$producer = new AmqpProducer($channel);
96+
$producer = new AmqpProducer($channel, $this->createContextMock());
9697
$producer->send($queue, new AmqpMessage('body'));
9798

9899
$this->assertEquals('body', $amqpMessage->getBody());
@@ -111,7 +112,7 @@ public function testShouldSetMessageHeaders()
111112
}))
112113
;
113114

114-
$producer = new AmqpProducer($channel);
115+
$producer = new AmqpProducer($channel, $this->createContextMock());
115116
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain']));
116117

117118
$this->assertEquals(['content_type' => 'text/plain'], $amqpMessage->get_properties());
@@ -130,7 +131,7 @@ public function testShouldSetMessageProperties()
130131
}))
131132
;
132133

133-
$producer = new AmqpProducer($channel);
134+
$producer = new AmqpProducer($channel, $this->createContextMock());
134135
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value']));
135136

136137
$properties = $amqpMessage->get_properties();
@@ -163,4 +164,12 @@ private function createAmqpChannelMock()
163164
{
164165
return $this->createMock(AMQPChannel::class);
165166
}
167+
168+
/**
169+
* @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext
170+
*/
171+
private function createContextMock()
172+
{
173+
return $this->createMock(AmqpContext::class);
174+
}
166175
}

0 commit comments

Comments
 (0)