Skip to content

Commit 18d2270

Browse files
Herberto Gracaqshurick
authored andcommitted
Add batch delivery
Currently, the doctrine transport only delivers one message at a time, resulting in low performance and a very high amount of hits to the DB. (one hit per message) With batch delivery, the transport will retrieve several messages in a single query. Co-authored-by: Alexander Malyk <shu.rick.ifmo@gmail.com>
1 parent 902d0fe commit 18d2270

10 files changed

+230
-55
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

+91-23
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
use Doctrine\DBAL\Platforms\MariaDBPlatform;
1919
use Doctrine\DBAL\Platforms\MySQL57Platform;
2020
use Doctrine\DBAL\Platforms\OraclePlatform;
21+
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
2122
use Doctrine\DBAL\Platforms\SQLServer2012Platform;
23+
use Doctrine\DBAL\Query\Expression\ExpressionBuilder;
2224
use Doctrine\DBAL\Query\QueryBuilder;
2325
use Doctrine\DBAL\Result;
2426
use Doctrine\DBAL\Schema\AbstractSchemaManager;
@@ -36,11 +38,19 @@ public function testGetAMessageWillChangeItsStatus()
3638
{
3739
$queryBuilder = $this->getQueryBuilderMock();
3840
$driverConnection = $this->getDBALConnectionMock();
39-
$stmt = $this->getResultMock([
40-
'id' => 1,
41-
'body' => '{"message":"Hi"}',
42-
'headers' => json_encode(['type' => DummyMessage::class]),
43-
]);
41+
$resultMock = [
42+
[
43+
'id' => 1,
44+
'body' => '{"message":"Hi"}',
45+
'headers' => json_encode(['type' => DummyMessage::class]),
46+
],
47+
[
48+
'id' => 2,
49+
'body' => '{"message":"There"}',
50+
'headers' => json_encode(['type' => DummyMessage::class]),
51+
],
52+
];
53+
$stmt = $this->getResultMockForGet($resultMock);
4454

4555
$driverConnection
4656
->method('createQueryBuilder')
@@ -62,17 +72,19 @@ public function testGetAMessageWillChangeItsStatus()
6272
->willReturn(1);
6373

6474
$connection = new Connection([], $driverConnection);
65-
$doctrineEnvelope = $connection->get();
66-
$this->assertEquals(1, $doctrineEnvelope['id']);
67-
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
68-
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']);
75+
$doctrineEnvelopeList = $connection->get();
76+
foreach ($doctrineEnvelopeList as $key => $doctrineEnvelope) {
77+
$this->assertEquals($resultMock[$key]['id'], $doctrineEnvelope['id']);
78+
$this->assertEquals($resultMock[$key]['body'], $doctrineEnvelope['body']);
79+
$this->assertEquals(json_decode($resultMock[$key]['headers'], true), $doctrineEnvelope['headers']);
80+
}
6981
}
7082

71-
public function testGetWithNoPendingMessageWillReturnNull()
83+
public function testGetWithNoPendingMessageWillReturnEmptyList()
7284
{
7385
$queryBuilder = $this->getQueryBuilderMock();
7486
$driverConnection = $this->getDBALConnectionMock();
75-
$stmt = $this->getResultMock(false);
87+
$stmt = $this->getResultMockForGet([]);
7688

7789
$queryBuilder
7890
->method('getParameters')
@@ -90,8 +102,9 @@ public function testGetWithNoPendingMessageWillReturnNull()
90102
->willReturn($stmt);
91103

92104
$connection = new Connection([], $driverConnection);
93-
$doctrineEnvelope = $connection->get();
94-
$this->assertNull($doctrineEnvelope);
105+
$doctrineEnvelopeList = $connection->get();
106+
$this->assertIsArray($doctrineEnvelopeList);
107+
$this->assertEmpty($doctrineEnvelopeList);
95108
}
96109

97110
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
@@ -151,11 +164,27 @@ private function getQueryBuilderMock()
151164
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
152165
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
153166
$queryBuilder->method('setParameters')->willReturn($queryBuilder);
167+
$queryBuilder->method('addOrderBy')->willReturn($queryBuilder);
168+
169+
$expressionBuilderMock = $this->createMock(ExpressionBuilder::class);
170+
$expressionBuilderMock->method('in')->willReturn('');
171+
$queryBuilder->method('expr')->willReturn($expressionBuilderMock);
154172

155173
return $queryBuilder;
156174
}
157175

158-
private function getResultMock($expectedResult)
176+
private function getResultMockForGet($expectedResult)
177+
{
178+
$stmt = $this->createMock(class_exists(Result::class) ? Result::class : ResultStatement::class);
179+
180+
$stmt->expects($this->once())
181+
->method(class_exists(Result::class) ? 'fetchAllAssociative' : 'fetchAll')
182+
->willReturn($expectedResult);
183+
184+
return $stmt;
185+
}
186+
187+
private function getResultMockForFind($expectedResult)
159188
{
160189
$stmt = $this->createMock(class_exists(Result::class) ? Result::class : ResultStatement::class);
161190

@@ -255,14 +284,14 @@ public static function buildConfigurationProvider(): iterable
255284
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined()
256285
{
257286
$this->expectException(InvalidArgumentException::class);
258-
$this->expectExceptionMessage('Unknown option found: [new_option]. Allowed options are [table_name, queue_name, redeliver_timeout, auto_setup]');
287+
$this->expectExceptionMessage('Unknown option found: [new_option]. Allowed options are [table_name, queue_name, batch_size, redeliver_timeout, auto_setup]');
259288
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']);
260289
}
261290

262291
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
263292
{
264293
$this->expectException(InvalidArgumentException::class);
265-
$this->expectExceptionMessage('Unknown option found in DSN: [new_option]. Allowed options are [table_name, queue_name, redeliver_timeout, auto_setup]');
294+
$this->expectExceptionMessage('Unknown option found in DSN: [new_option]. Allowed options are [table_name, queue_name, batch_size, redeliver_timeout, auto_setup]');
266295
Connection::buildConfiguration('doctrine://default?new_option=woops');
267296
}
268297

@@ -271,7 +300,7 @@ public function testFind()
271300
$queryBuilder = $this->getQueryBuilderMock();
272301
$driverConnection = $this->getDBALConnectionMock();
273302
$id = 1;
274-
$stmt = $this->getResultMock([
303+
$stmt = $this->getResultMockForFind([
275304
'id' => $id,
276305
'body' => '{"message":"Hi"}',
277306
'headers' => json_encode(['type' => DummyMessage::class]),
@@ -355,7 +384,7 @@ public function testFindAll()
355384
/**
356385
* @dataProvider providePlatformSql
357386
*/
358-
public function testGeneratedSql(AbstractPlatform $platform, string $expectedSql)
387+
public function testGeneratedSql(AbstractPlatform $platform, int $batchSize, string $expectedSql)
359388
{
360389
$driverConnection = $this->createMock(DBALConnection::class);
361390
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
@@ -378,33 +407,72 @@ public function testGeneratedSql(AbstractPlatform $platform, string $expectedSql
378407
;
379408
$driverConnection->expects($this->once())->method('commit');
380409

381-
$connection = new Connection([], $driverConnection);
410+
$connection = new Connection(['batch_size' => $batchSize], $driverConnection);
382411
$connection->get();
383412
}
384413

385414
public static function providePlatformSql(): iterable
386415
{
387-
yield 'MySQL' => [
416+
yield 'MySQL|batch_size=1' => [
388417
new MySQL57Platform(),
418+
1,
389419
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
390420
];
391421

422+
yield 'MySQL|batch_size=50' => [
423+
new MySQL57Platform(),
424+
50,
425+
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
426+
];
427+
428+
yield 'Postgres|batch_size=1' => [
429+
new PostgreSQLPlatform(),
430+
1,
431+
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
432+
];
433+
434+
yield 'Postgres|batch_size=50' => [
435+
new PostgreSQLPlatform(),
436+
50,
437+
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
438+
];
439+
392440
if (class_exists(MariaDBPlatform::class)) {
393-
yield 'MariaDB' => [
441+
yield 'MariaDB|batch_size=1' => [
394442
new MariaDBPlatform(),
443+
1,
395444
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE',
396445
];
446+
yield 'MariaDB|batch_size=50' => [
447+
new MariaDBPlatform(),
448+
50,
449+
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 50 FOR UPDATE',
450+
];
397451
}
398452

399-
yield 'SQL Server' => [
453+
yield 'SQL Server|batch_size=1' => [
400454
new SQLServer2012Platform(),
455+
1,
401456
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
402457
];
403458

404-
yield 'Oracle' => [
459+
yield 'SQL Server|batch_size=50' => [
460+
new SQLServer2012Platform(),
461+
50,
462+
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 50 ROWS ONLY ',
463+
];
464+
465+
yield 'Oracle|batch_size=1' => [
405466
new OraclePlatform(),
467+
1,
406468
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
407469
];
470+
471+
yield 'Oracle|batch_size=50' => [
472+
new OraclePlatform(),
473+
50,
474+
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 50) FOR UPDATE',
475+
];
408476
}
409477

410478
public function testConfigureSchema()

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineIntegrationTest.php

+9-7
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ protected function tearDown(): void
5050
public function testConnectionSendAndGet()
5151
{
5252
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
53-
$encoded = $this->connection->get();
54-
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
55-
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
53+
$encodedList = $this->connection->get()[0];
54+
$this->assertEquals('{"message": "Hi"}', $encodedList['body']);
55+
$this->assertEquals(['type' => DummyMessage::class], $encodedList['headers']);
5656
}
5757

5858
public function testSendWithDelay()
@@ -106,7 +106,7 @@ public function testItRetrieveTheFirstAvailableMessage()
106106
'available_at' => $this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:30:00', new \DateTimeZone('UTC'))),
107107
]);
108108

109-
$encoded = $this->connection->get();
109+
$encoded = $this->connection->get()[0];
110110
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
111111
}
112112

@@ -171,18 +171,20 @@ public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
171171
'available_at' => $this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:30:00', new \DateTimeZone('UTC'))),
172172
]);
173173

174-
$next = $this->connection->get();
174+
$next = $this->connection->get()[0];
175175
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
176176
$this->connection->reject($next['id']);
177177
}
178178

179179
public function testTheTransportIsSetupOnGet()
180180
{
181181
$this->assertFalse($this->createSchemaManager()->tablesExist(['messenger_messages']));
182-
$this->assertNull($this->connection->get());
182+
$result = $this->connection->get();
183+
$this->assertIsArray($result);
184+
$this->assertEmpty($result);
183185

184186
$this->connection->send('the body', ['my' => 'header']);
185-
$envelope = $this->connection->get();
187+
$envelope = $this->connection->get()[0];
186188
$this->assertEquals('the body', $envelope['body']);
187189
}
188190

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ public function testPostgreSqlConnectionSendAndGet()
5959
{
6060
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
6161

62-
$encoded = $this->connection->get();
62+
$encoded = $this->connection->get()[0];
6363
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
6464
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
6565

66-
$this->assertNull($this->connection->get());
66+
$this->assertEmpty($this->connection->get());
6767
}
6868

6969
private function createSchemaManager(): AbstractSchemaManager

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public function testItReturnsTheDecodedMessageToTheHandler()
3737

3838
$doctrineEnvelope = $this->createDoctrineEnvelope();
3939
$connection = $this->createMock(Connection::class);
40-
$connection->method('get')->willReturn($doctrineEnvelope);
40+
$connection->method('get')->willReturn([$doctrineEnvelope]);
4141

4242
$receiver = new DoctrineReceiver($connection, $serializer);
4343
$actualEnvelopes = $receiver->get();
@@ -60,7 +60,7 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
6060

6161
$doctrineEnvelop = $this->createDoctrineEnvelope();
6262
$connection = $this->createMock(Connection::class);
63-
$connection->method('get')->willReturn($doctrineEnvelop);
63+
$connection->method('get')->willReturn([$doctrineEnvelop]);
6464
$connection->expects($this->once())->method('reject');
6565

6666
$receiver = new DoctrineReceiver($connection, $serializer);

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function testReceivesMessages()
4646
];
4747

4848
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
49-
$connection->method('get')->willReturn($doctrineEnvelope);
49+
$connection->method('get')->willReturn([$doctrineEnvelope]);
5050

5151
$envelopes = $transport->get();
5252
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

0 commit comments

Comments
 (0)