Skip to content

[Messenger] Add a redis stream transport #30917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Refractor redis transport using redis streams
  • Loading branch information
alexander-schranz committed Apr 25, 2019
commit ff0b8554ea82df9b6e4da8657c250db25e7c00d4
13 changes: 9 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages

matrix:
include:
Expand Down Expand Up @@ -55,8 +56,8 @@ before_install:

- |
# Start Redis cluster
docker pull grokzen/redis-cluster:4.0.8
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
docker pull grokzen/redis-cluster:5.0.4
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'

- |
Expand Down Expand Up @@ -116,6 +117,7 @@ before_install:
local ext_name=$1
local ext_so=$2
local INI=$3
local input=${4:-yes}
local ext_dir=$(php -r "echo ini_get('extension_dir');")
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name

Expand All @@ -124,7 +126,7 @@ before_install:
else
rm ~/.pearrc /tmp/pear 2>/dev/null || true
mkdir -p $ext_cache
echo yes | pecl install -f $ext_name &&
echo $input | pecl install -f $ext_name &&
cp $ext_dir/$ext_so $ext_cache
fi
}
Expand All @@ -147,7 +149,6 @@ before_install:
echo session.gc_probability = 0 >> $INI
echo opcache.enable_cli = 1 >> $INI
echo apc.enable_cli = 1 >> $INI
echo extension = redis.so >> $INI
echo extension = memcached.so >> $INI
done

Expand All @@ -166,7 +167,11 @@ before_install:
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
done
- |
# List all php extensions with versions
- php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'

- |
# Load fixtures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (empty($config['transports'])) {
$container->removeDefinition('messenger.transport.symfony_serializer');
$container->removeDefinition('messenger.transport.amqp.factory');
$container->removeDefinition('messenger.transport.redis.factory');
} else {
$container->getDefinition('messenger.transport.symfony_serializer')
->replaceArgument(1, $config['serializer']['symfony_serializer']['format'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
<tag name="messenger.transport_factory" />
</service>

<service id="messenger.transport.redis.factory" class="Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory">
<tag name="messenger.transport_factory" />
</service>

<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
<tag name="messenger.transport_factory" />
</service>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
'options' => ['queue' => ['name' => 'Queue']],
'serializer' => 'messenger.transport.native_php_serializer',
],
'redis' => 'redis://127.0.0.1:6379/messages',
],
],
]);
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
</framework:queue>
</framework:options>
</framework:transport>
<framework:transport name="redis" dsn="redis://127.0.0.1:6379/messages" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ framework:
queue:
name: Queue
serializer: 'messenger.transport.native_php_serializer'
redis: 'redis://127.0.0.1:6379/messages'
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ public function testMessenger()
$this->assertTrue($container->hasAlias('messenger.default_bus'));
$this->assertTrue($container->getAlias('messenger.default_bus')->isPublic());
$this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory'));
$this->assertFalse($container->hasDefinition('messenger.transport.redis.factory'));
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
}
Expand All @@ -697,6 +698,16 @@ public function testMessengerTransports()
$this->assertEquals(new Reference('messenger.transport.native_php_serializer'), $transportArguments[2]);

$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));

$this->assertTrue($container->hasDefinition('messenger.transport.redis'));
$transportFactory = $container->getDefinition('messenger.transport.redis')->getFactory();
$transportArguments = $container->getDefinition('messenger.transport.redis')->getArguments();

$this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory);
$this->assertCount(3, $transportArguments);
$this->assertSame('redis://127.0.0.1:6379/messages', $transportArguments[0]);

$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
}

public function testMessengerRouting()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,104 @@
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;

/**
* @requires extension redis
*/
class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
public function testFromInvalidDsn()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');

Connection::fromDsn('redis://');
}

public function testItGetsParametersFromTheDsn()
public function testFromDsn()
{
$this->assertEquals(
new Connection('queue', array(
new Connection(['stream' => 'queue'], [
'host' => 'localhost',
'port' => 6379,
)),
]),
Connection::fromDsn('redis://localhost/queue')
);
}

public function testOverrideOptionsViaQueryParameters()
public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection('queue', array(
'host' => '127.0.0.1',
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
'host' => 'localhost',
'port' => 6379,
), array(
'processing_ttl' => '8000',
)),
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
], [
'blocking_timeout' => 30,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
);
}

public function testFromDsnWithQueryOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
'host' => 'localhost',
'port' => 6379,
], [
'blocking_timeout' => 30,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
);
}

public function testKeepGettingPendingMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$redis->expects($this->exactly(3))->method('xreadgroup')
->with('symfony', 'consumer', ['queue' => 0], 1, null)
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
}

public function testFirstGetPendingMessagesThenNewMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$count = 0;

$redis->expects($this->exactly(2))->method('xreadgroup')
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
++$count;

if (1 === $count) {
return '0' === $arr_streams['queue'];
}

return '>' === $arr_streams['queue'];
}), 1, null)
->willReturn(['queue' => []]);

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}

public function testUnexpectedRedisError()
{
$this->expectException(LogicException::class);
$this->expectExceptionMessage('Redis error happens');
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
}

This file was deleted.

Loading