diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php index c36270d81498e..9ebea2d115439 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php @@ -16,6 +16,7 @@ use Pheanstalk\Contract\PheanstalkSubscriberInterface; use Pheanstalk\Exception; use Pheanstalk\Exception\ClientException; +use Pheanstalk\Exception\ConnectionException; use Pheanstalk\Exception\DeadlineSoonException; use Pheanstalk\Exception\ServerException; use Pheanstalk\Pheanstalk; @@ -131,6 +132,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN() public function testGet() { $id = '1234'; + $id2 = '1235'; $beanstalkdEnvelope = [ 'body' => 'foo', 'headers' => 'bar', @@ -140,13 +142,52 @@ public function testGet() $timeout = 44; $tubeList = new TubeList($tubeName = new TubeName($tube), $tubeNameDefault = new TubeName('default')); - $job = new Job(new JobId($id), json_encode($beanstalkdEnvelope)); $client = $this->createMock(PheanstalkInterface::class); $client->expects($this->once())->method('watch')->with($tubeName)->willReturn(2); $client->expects($this->once())->method('listTubesWatched')->willReturn($tubeList); $client->expects($this->once())->method('ignore')->with($tubeNameDefault)->willReturn(1); - $client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn($job); + $client->expects($this->exactly(2))->method('reserveWithTimeout')->with($timeout)->willReturnOnConsecutiveCalls( + new Job(new JobId($id), json_encode($beanstalkdEnvelope)), + new Job(new JobId($id2), json_encode($beanstalkdEnvelope)), + ); + + $connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client); + + $envelope = $connection->get(); + + $this->assertSame($id, $envelope['id']); + $this->assertSame($beanstalkdEnvelope['body'], $envelope['body']); + $this->assertSame($beanstalkdEnvelope['headers'], $envelope['headers']); + + $envelope = $connection->get(); + + $this->assertSame($id2, $envelope['id']); + $this->assertSame($beanstalkdEnvelope['body'], $envelope['body']); + $this->assertSame($beanstalkdEnvelope['headers'], $envelope['headers']); + } + + public function testGetOnReconnect() + { + $id = '1234'; + $beanstalkdEnvelope = [ + 'body' => 'foo', + 'headers' => 'bar', + ]; + + $tube = 'baz'; + $timeout = 44; + + $tubeList = new TubeList($tubeName = new TubeName($tube), $tubeNameDefault = new TubeName('default')); + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->exactly(2))->method('watch')->with($tubeName)->willReturn(2); + $client->expects($this->exactly(2))->method('listTubesWatched')->willReturn($tubeList); + $client->expects($this->exactly(2))->method('ignore')->with($tubeNameDefault)->willReturn(1); + $client->expects($this->exactly(2))->method('reserveWithTimeout')->with($timeout)->willReturnOnConsecutiveCalls( + $this->throwException(new ConnectionException('123', 'foobar')), + new Job(new JobId($id), json_encode($beanstalkdEnvelope)), + ); $connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client); @@ -370,10 +411,11 @@ public function testSend() $expectedDelay = $delay / 1000; $id = '110'; + $id2 = '111'; $client = $this->createMock(PheanstalkInterface::class); $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); - $client->expects($this->once())->method('put')->with( + $client->expects($this->exactly(2))->method('put')->with( $this->callback(function (string $data) use ($body, $headers): bool { $expectedMessage = json_encode([ 'body' => $body, @@ -385,7 +427,51 @@ public function testSend() 1024, $expectedDelay, 90 - )->willReturn(new Job(new JobId($id), 'foobar')); + )->willReturnOnConsecutiveCalls( + new Job(new JobId($id), 'foobar'), + new Job(new JobId($id2), 'foobar'), + ); + + $connection = new Connection(['tube_name' => $tube], $client); + + $returnedId = $connection->send($body, $headers, $delay); + + $this->assertSame($id, $returnedId); + + $returnedId = $connection->send($body, $headers, $delay); + + $this->assertSame($id2, $returnedId); + } + + public function testSendOnReconnect() + { + $tube = 'xyz'; + + $body = 'foo'; + $headers = ['test' => 'bar']; + $delay = 1000; + $expectedDelay = $delay / 1000; + + $id = '110'; + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->exactly(2))->method('useTube')->with(new TubeName($tube)); + $client->expects($this->exactly(2))->method('put')->with( + $this->callback(function (string $data) use ($body, $headers): bool { + $expectedMessage = json_encode([ + 'body' => $body, + 'headers' => $headers, + ]); + + return $expectedMessage === $data; + }), + 1024, + $expectedDelay, + 90 + )->willReturnOnConsecutiveCalls( + $this->throwException(new ConnectionException('123', 'foobar')), + new Job(new JobId($id), 'foobar'), + ); $connection = new Connection(['tube_name' => $tube], $client); @@ -520,4 +606,5 @@ public function testSendWithRoundedDelay() interface PheanstalkInterface extends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface { + public function disconnect(): void; } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php index 232d8596336cf..380186445889f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php @@ -18,7 +18,6 @@ use Pheanstalk\Exception; use Pheanstalk\Exception\ConnectionException; use Pheanstalk\Pheanstalk; -use Pheanstalk\Values\Job as PheanstalkJob; use Pheanstalk\Values\JobId; use Pheanstalk\Values\TubeName; use Symfony\Component\Messenger\Exception\InvalidArgumentException; @@ -45,6 +44,9 @@ class Connection private int $ttr; private bool $buryOnReject; + private bool $usingTube = false; + private bool $watchingTube = false; + /** * Constructor. * @@ -139,7 +141,7 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit } return $this->withReconnect(function () use ($message, $delay, $priority) { - $this->client->useTube($this->tube); + $this->useTube(); $job = $this->client->put( $message, $priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY, @@ -153,7 +155,11 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit public function get(): ?array { - $job = $this->getFromTube(); + $job = $this->withReconnect(function () { + $this->watchTube(); + + return $this->client->reserveWithTimeout($this->timeout); + }); if (null === $job) { return null; @@ -174,25 +180,10 @@ public function get(): ?array ]; } - private function getFromTube(): ?PheanstalkJob - { - return $this->withReconnect(function () { - if ($this->client->watch($this->tube) > 1) { - foreach ($this->client->listTubesWatched() as $tube) { - if ((string) $tube !== (string) $this->tube) { - $this->client->ignore($tube); - } - } - } - - return $this->client->reserveWithTimeout($this->timeout); - }); - } - public function ack(string $id): void { $this->withReconnect(function () use ($id) { - $this->client->useTube($this->tube); + $this->useTube(); $this->client->delete(new JobId($id)); }); } @@ -200,7 +191,7 @@ public function ack(string $id): void public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void { $this->withReconnect(function () use ($id, $priority, $forceDelete) { - $this->client->useTube($this->tube); + $this->useTube(); if (!$forceDelete && $this->buryOnReject) { $this->client->bury(new JobId($id), $priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY); @@ -213,7 +204,7 @@ public function reject(string $id, ?int $priority = null, bool $forceDelete = fa public function keepalive(string $id): void { $this->withReconnect(function () use ($id) { - $this->client->useTube($this->tube); + $this->useTube(); $this->client->touch(new JobId($id)); }); } @@ -221,7 +212,7 @@ public function keepalive(string $id): void public function getMessageCount(): int { return $this->withReconnect(function () { - $this->client->useTube($this->tube); + $this->useTube(); $tubeStats = $this->client->statsTube($this->tube); return $tubeStats->currentJobsReady; @@ -237,6 +228,33 @@ public function getMessagePriority(string $id): int }); } + private function useTube(): void + { + if ($this->usingTube) { + return; + } + + $this->client->useTube($this->tube); + $this->usingTube = true; + } + + private function watchTube(): void + { + if ($this->watchingTube) { + return; + } + + if ($this->client->watch($this->tube) > 1) { + foreach ($this->client->listTubesWatched() as $tube) { + if ((string) $tube !== (string) $this->tube) { + $this->client->ignore($tube); + } + } + } + + $this->watchingTube = true; + } + private function withReconnect(callable $command): mixed { try { @@ -245,6 +263,9 @@ private function withReconnect(callable $command): mixed } catch (ConnectionException) { $this->client->disconnect(); + $this->usingTube = false; + $this->watchingTube = false; + return $command(); } } catch (Exception $exception) {