diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index d8be5f33f36f8..f2de05282acc1 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,6 +4,8 @@ CHANGELOG 4.3.0 ----- + * Added optional `MessageCountAwareInterface` that receivers can implement + to give information about how many messages are waiting to be processed. * [BC BREAK] The `Envelope::__construct()` signature changed: you can no longer pass an unlimited number of stamps as the second, third, fourth, arguments etc: stamps are now an array passed to the diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index b2ebfe4ab5043..d3737007291ec 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -167,6 +167,24 @@ public function testItReceivesSignals() , $process->getOutput()); } + public function testItCountsMessagesInQueue() + { + $serializer = $this->createSerializer(); + + $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN')); + $connection->setup(); + $connection->queue()->purge(); + + $sender = new AmqpSender($connection, $serializer); + + $sender->send($first = new Envelope(new DummyMessage('First'))); + $sender->send($second = new Envelope(new DummyMessage('Second'))); + $sender->send($second = new Envelope(new DummyMessage('Third'))); + + sleep(1); // give amqp a moment to have the messages ready + $this->assertSame(3, $connection->countMessagesInQueue()); + } + private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10) { $timedOutTime = time() + $timeoutInSeconds; diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php index 26878e3647bfe..3d8bf82b53eb9 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php @@ -37,6 +37,9 @@ public function testGetAMessageWillChangeItsStatus() $queryBuilder ->method('getSQL') ->willReturn(''); + $queryBuilder + ->method('getParameters') + ->willReturn([]); $driverConnection ->method('prepare') ->willReturn($stmt); @@ -54,6 +57,9 @@ public function testGetWithNoPendingMessageWillReturnNull() $driverConnection = $this->getDBALConnectionMock(); $stmt = $this->getStatementMock(false); + $queryBuilder + ->method('getParameters') + ->willReturn([]); $driverConnection->expects($this->once()) ->method('createQueryBuilder') ->willReturn($queryBuilder); @@ -119,6 +125,7 @@ private function getQueryBuilderMock() $queryBuilder->method('orderBy')->willReturn($queryBuilder); $queryBuilder->method('setMaxResults')->willReturn($queryBuilder); $queryBuilder->method('setParameter')->willReturn($queryBuilder); + $queryBuilder->method('setParameters')->willReturn($queryBuilder); return $queryBuilder; } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php index ffcde2039306d..bc826832c593b 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php @@ -100,6 +100,46 @@ public function testItRetrieveTheFirstAvailableMessage() $this->assertEquals('{"message": "Hi available"}', $encoded['body']); } + public function testItCountMessages() + { + // insert messages + // one currently handled + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi handled"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))), + ]); + // one available later + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi delayed"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')), + ]); + // one available + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi available"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + ]); + // another available + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi available"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + ]); + + $this->assertSame(2, $this->connection->getMessageCount()); + } + public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout() { $twoHoursAgo = new \DateTime('now'); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index 9f1e25a40267a..e81b6b9431120 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -26,7 +27,7 @@ * * @experimental in 4.2 */ -class AmqpReceiver implements ReceiverInterface +class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface { private $serializer; private $connection; @@ -87,6 +88,14 @@ public function reject(Envelope $envelope): void $this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope)); } + /** + * {@inheritdoc} + */ + public function getMessageCount(): int + { + return $this->connection->countMessagesInQueue(); + } + private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void { try { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php index 586e16c4d86dd..91c9fe45c0c3a 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\SetupableTransportInterface; @@ -22,7 +23,7 @@ * * @experimental in 4.2 */ -class AmqpTransport implements TransportInterface, SetupableTransportInterface +class AmqpTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface { private $serializer; private $connection; @@ -75,6 +76,14 @@ public function setup(): void $this->connection->setup(); } + /** + * {@inheritdoc} + */ + public function getMessageCount(): int + { + return ($this->receiver ?? $this->getReceiver())->getMessageCount(); + } + private function getReceiver() { return $this->receiver = new AmqpReceiver($this->connection, $this->serializer); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 4dad0735b93e9..512193a5e98d6 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -184,6 +184,14 @@ public function publish(string $body, array $headers = [], int $delay = 0): void $this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes); } + /** + * Returns an approximate count of the messages in a queue. + */ + public function countMessagesInQueue(): int + { + return $this->queue()->declareQueue(); + } + /** * @throws \AMQPException */ diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php index faf6b8919b19a..f73b756d9ebdb 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php @@ -14,6 +14,7 @@ use Doctrine\DBAL\Connection as DBALConnection; use Doctrine\DBAL\DBALException; use Doctrine\DBAL\Exception\TableNotFoundException; +use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer; use Doctrine\DBAL\Types\Type; @@ -128,25 +129,14 @@ public function get(): ?array { $this->driverConnection->beginTransaction(); try { - $query = $this->driverConnection->createQueryBuilder() - ->select('m.*') - ->from($this->configuration['table_name'], 'm') - ->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit') - ->andWhere('m.available_at <= :now') - ->andWhere('m.queue_name = :queue_name') + $query = $this->createAvailableMessagesQueryBuilder() ->orderBy('available_at', 'ASC') ->setMaxResults(1); - $now = \DateTime::createFromFormat('U.u', microtime(true)); - $redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); // use SELECT ... FOR UPDATE to lock table $doctrineEnvelope = $this->executeQuery( $query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(), - [ - ':now' => self::formatDateTime($now), - ':queue_name' => $this->configuration['queue_name'], - ':redeliver_limit' => self::formatDateTime($redeliverLimit), - ] + $query->getParameters() )->fetch(); if (false === $doctrineEnvelope) { @@ -161,6 +151,7 @@ public function get(): ?array ->update($this->configuration['table_name']) ->set('delivered_at', ':delivered_at') ->where('id = :id'); + $now = \DateTime::createFromFormat('U.u', microtime(true)); $this->executeQuery($queryBuilder->getSQL(), [ ':id' => $doctrineEnvelope['id'], ':delivered_at' => self::formatDateTime($now), @@ -200,6 +191,33 @@ public function setup(): void $synchronizer->updateSchema($this->getSchema(), true); } + public function getMessageCount(): int + { + $queryBuilder = $this->createAvailableMessagesQueryBuilder() + ->select('COUNT(m.id) as message_count') + ->setMaxResults(1); + + return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn(); + } + + private function createAvailableMessagesQueryBuilder(): QueryBuilder + { + $now = \DateTime::createFromFormat('U.u', microtime(true)); + $redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); + + return $this->driverConnection->createQueryBuilder() + ->select('m.*') + ->from($this->configuration['table_name'], 'm') + ->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit') + ->andWhere('m.available_at <= :now') + ->andWhere('m.queue_name = :queue_name') + ->setParameters([ + ':now' => self::formatDateTime($now), + ':queue_name' => $this->configuration['queue_name'], + ':redeliver_limit' => self::formatDateTime($redeliverLimit), + ]); + } + private function executeQuery(string $sql, array $parameters = []) { $stmt = null; diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php index 3198e143dca84..65f2eacdc89b4 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -25,7 +26,7 @@ * * @experimental in 4.3 */ -class DoctrineReceiver implements ReceiverInterface +class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface { private $connection; private $serializer; @@ -81,6 +82,14 @@ public function reject(Envelope $envelope): void $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId()); } + /** + * {@inheritdoc} + */ + public function getMessageCount(): int + { + return $this->connection->getMessageCount(); + } + private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp { /** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */ diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/MessageCountAwareInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/MessageCountAwareInterface.php new file mode 100644 index 0000000000000..69f66e6084dd5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/MessageCountAwareInterface.php @@ -0,0 +1,28 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +/** + * @author Samuel Roze + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +interface MessageCountAwareInterface +{ + /** + * Returns the number of messages waiting to be handled. + * + * In some systems, this may be an approximate number. + */ + public function getMessageCount(): int; +}