From cede63f33a25715ab72bc147ba0ffd5848af2325 Mon Sep 17 00:00:00 2001 From: Kevin Bond Date: Wed, 29 Mar 2023 17:46:57 -0400 Subject: [PATCH] [Scheduler] allow `MessageGenerator`'s to return `RecurringMessage[]` - Add `RecurringMessage` to `ScheduledStamp`. --- .../Scheduler/Generator/MessageGenerator.php | 14 +++++++++----- .../Generator/MessageGeneratorInterface.php | 4 +++- .../Scheduler/Messenger/ScheduledStamp.php | 4 ++++ .../Scheduler/Messenger/SchedulerTransport.php | 7 ++++++- src/Symfony/Component/Scheduler/Scheduler.php | 4 ++++ .../Tests/Generator/MessageGeneratorTest.php | 3 ++- .../Tests/Messenger/SchedulerTransportTest.php | 12 ++++++++---- 7 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php b/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php index 2f9743c59bfc2..6ada8fd21689e 100644 --- a/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php +++ b/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php @@ -13,6 +13,7 @@ use Psr\Clock\ClockInterface; use Symfony\Component\Clock\Clock; +use Symfony\Component\Scheduler\RecurringMessage; use Symfony\Component\Scheduler\Schedule; use Symfony\Component\Scheduler\Trigger\TriggerInterface; @@ -37,6 +38,9 @@ public function __construct( $this->checkpoint = $checkpoint; } + /** + * @return \Generator + */ public function getMessages(): \Generator { if (!$this->waitUntil @@ -54,8 +58,8 @@ public function getMessages(): \Generator /** @var TriggerInterface $trigger */ /** @var int $index */ /** @var \DateTimeImmutable $time */ - /** @var object $message */ - [$time, $index, $trigger, $message] = $heap->extract(); + /** @var object $recurringMessage */ + [$time, $index, $trigger, $recurringMessage] = $heap->extract(); $yield = true; if ($time < $lastTime) { @@ -66,11 +70,11 @@ public function getMessages(): \Generator } if ($nextTime = $trigger->getNextRunDate($time)) { - $heap->insert([$nextTime, $index, $trigger, $message]); + $heap->insert([$nextTime, $index, $trigger, $recurringMessage]); } if ($yield) { - yield $message; + yield $recurringMessage; $this->checkpoint->save($time, $index); } } @@ -93,7 +97,7 @@ private function heap(\DateTimeImmutable $time): TriggerHeap continue; } - $heap->insert([$nextTime, $index, $recurringMessage->getTrigger(), $recurringMessage->getMessage()]); + $heap->insert([$nextTime, $index, $recurringMessage->getTrigger(), $recurringMessage]); } return $this->triggerHeap = $heap; diff --git a/src/Symfony/Component/Scheduler/Generator/MessageGeneratorInterface.php b/src/Symfony/Component/Scheduler/Generator/MessageGeneratorInterface.php index 8b2f7eeacc4ce..bd22fb9037afd 100644 --- a/src/Symfony/Component/Scheduler/Generator/MessageGeneratorInterface.php +++ b/src/Symfony/Component/Scheduler/Generator/MessageGeneratorInterface.php @@ -11,13 +11,15 @@ namespace Symfony\Component\Scheduler\Generator; +use Symfony\Component\Scheduler\RecurringMessage; + /** * @experimental */ interface MessageGeneratorInterface { /** - * @return iterable + * @return iterable */ public function getMessages(): iterable; } diff --git a/src/Symfony/Component/Scheduler/Messenger/ScheduledStamp.php b/src/Symfony/Component/Scheduler/Messenger/ScheduledStamp.php index 4b1b5cf1b577d..f72f614b8d372 100644 --- a/src/Symfony/Component/Scheduler/Messenger/ScheduledStamp.php +++ b/src/Symfony/Component/Scheduler/Messenger/ScheduledStamp.php @@ -12,10 +12,14 @@ namespace Symfony\Component\Scheduler\Messenger; use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; +use Symfony\Component\Scheduler\RecurringMessage; /** * @experimental */ final class ScheduledStamp implements NonSendableStampInterface { + public function __construct(public readonly RecurringMessage $recurringMessage) + { + } } diff --git a/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php b/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php index af2a8f9adc1b2..20ef76cd1f0f8 100644 --- a/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php +++ b/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Scheduler\Exception\LogicException; use Symfony\Component\Scheduler\Generator\MessageGeneratorInterface; +use Symfony\Component\Scheduler\RecurringMessage; /** * @experimental @@ -29,7 +30,11 @@ public function __construct( public function get(): iterable { foreach ($this->messageGenerator->getMessages() as $message) { - yield Envelope::wrap($message, [new ScheduledStamp()]); + if (!$message instanceof RecurringMessage) { + throw new LogicException(sprintf('Messages from "%s" must be instances of "%s". Got "%s".', __CLASS__, RecurringMessage::class, get_debug_type($message))); + } + + yield Envelope::wrap($message->getMessage(), [new ScheduledStamp($message)]); } } diff --git a/src/Symfony/Component/Scheduler/Scheduler.php b/src/Symfony/Component/Scheduler/Scheduler.php index 8c9e8fac7b3ca..c93dbc169a8df 100644 --- a/src/Symfony/Component/Scheduler/Scheduler.php +++ b/src/Symfony/Component/Scheduler/Scheduler.php @@ -66,6 +66,10 @@ public function run(array $options = []): void $ran = false; foreach ($this->generators as $generator) { foreach ($generator->getMessages() as $message) { + if ($message instanceof RecurringMessage) { + $message = $message->getMessage(); + } + $this->handlers[$message::class]($message); $ran = true; } diff --git a/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php b/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php index 8b6900b8bb037..bf3eee15a4616 100644 --- a/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php +++ b/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php @@ -47,7 +47,8 @@ public function testGetMessages(string $startTime, array $runs, array $schedule) foreach ($runs as $time => $expected) { $now = self::makeDateTime($time); - $this->assertSame($expected, iterator_to_array($scheduler->getMessages())); + $messages = array_map(fn (RecurringMessage $m) => $m->getMessage(), iterator_to_array($scheduler->getMessages())); + $this->assertSame($expected, $messages); } } diff --git a/src/Symfony/Component/Scheduler/Tests/Messenger/SchedulerTransportTest.php b/src/Symfony/Component/Scheduler/Tests/Messenger/SchedulerTransportTest.php index 2b31cb67062d1..0fa0925eeed20 100644 --- a/src/Symfony/Component/Scheduler/Tests/Messenger/SchedulerTransportTest.php +++ b/src/Symfony/Component/Scheduler/Tests/Messenger/SchedulerTransportTest.php @@ -17,14 +17,15 @@ use Symfony\Component\Scheduler\Generator\MessageGeneratorInterface; use Symfony\Component\Scheduler\Messenger\ScheduledStamp; use Symfony\Component\Scheduler\Messenger\SchedulerTransport; +use Symfony\Component\Scheduler\RecurringMessage; class SchedulerTransportTest extends TestCase { public function testGetFromIterator() { $messages = [ - (object) ['id' => 'first'], - (object) ['id' => 'second'], + RecurringMessage::cron('* * * * *', (object) ['id' => 'first']), + RecurringMessage::cron('* * * * *', (object) ['id' => 'second']), ]; $generator = $this->createConfiguredMock(MessageGeneratorInterface::class, [ 'getMessages' => $messages, @@ -32,9 +33,12 @@ public function testGetFromIterator() $transport = new SchedulerTransport($generator); foreach ($transport->get() as $envelope) { + $message = array_shift($messages); + $this->assertInstanceOf(Envelope::class, $envelope); - $this->assertNotNull($envelope->last(ScheduledStamp::class)); - $this->assertSame(array_shift($messages), $envelope->getMessage()); + $this->assertEquals($message, $envelope->last(ScheduledStamp::class)?->recurringMessage); + $this->assertSame($message->getMessage(), $envelope->getMessage()); + $this->assertSame($message->getMessage(), $envelope->getMessage()); } $this->assertEmpty($messages);