From 1add384c77e53d79aeb94b2cee25af785baadca6 Mon Sep 17 00:00:00 2001 From: Jeroeny Date: Mon, 4 Sep 2023 14:05:26 +0200 Subject: [PATCH] [Scheduler] Allow modifying the schedule at runtime and recalculate heap --- src/Symfony/Component/Scheduler/CHANGELOG.md | 1 + .../Scheduler/Generator/MessageGenerator.php | 8 ++- src/Symfony/Component/Scheduler/Schedule.php | 49 +++++++++++++- .../Tests/Generator/MessageGeneratorTest.php | 66 +++++++++++++++++++ 4 files changed, 121 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Scheduler/CHANGELOG.md b/src/Symfony/Component/Scheduler/CHANGELOG.md index c1fa4a6406e7f..4dbde09635bcf 100644 --- a/src/Symfony/Component/Scheduler/CHANGELOG.md +++ b/src/Symfony/Component/Scheduler/CHANGELOG.md @@ -10,6 +10,7 @@ CHANGELOG * Add `AbstractTriggerDecorator` * Make `ScheduledStamp` "send-able" * Add `ScheduledStamp` to `RedispatchMessage` + * Allow modifying the Schedule at runtime 6.3 --- diff --git a/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php b/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php index a2f59beedeb48..b4ee9c6ab406d 100644 --- a/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php +++ b/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php @@ -19,7 +19,7 @@ final class MessageGenerator implements MessageGeneratorInterface { - private Schedule $schedule; + private ?Schedule $schedule = null; private TriggerHeap $triggerHeap; private ?\DateTimeImmutable $waitUntil; @@ -36,6 +36,12 @@ public function getMessages(): \Generator { $checkpoint = $this->checkpoint(); + if ($this->schedule?->shouldRestart()) { + unset($this->triggerHeap); + $this->waitUntil = new \DateTimeImmutable('@0'); + $this->schedule->setRestart(false); + } + if (!$this->waitUntil || $this->waitUntil > ($now = $this->clock->now()) || !$checkpoint->acquire($now) diff --git a/src/Symfony/Component/Scheduler/Schedule.php b/src/Symfony/Component/Scheduler/Schedule.php index 500f93ea9ee61..422aa4dc74d2e 100644 --- a/src/Symfony/Component/Scheduler/Schedule.php +++ b/src/Symfony/Component/Scheduler/Schedule.php @@ -21,20 +21,55 @@ final class Schedule implements ScheduleProviderInterface private array $messages = []; private ?LockInterface $lock = null; private ?CacheInterface $state = null; + private bool $shouldRestart = false; + + public static function with(RecurringMessage $message, RecurringMessage ...$messages): static + { + return static::doAdd(new self(), $message, ...$messages); + } /** * @return $this */ public function add(RecurringMessage $message, RecurringMessage ...$messages): static + { + $this->setRestart(true); + + return static::doAdd($this, $message, ...$messages); + } + + private static function doAdd(self $schedule, RecurringMessage $message, RecurringMessage ...$messages): static { foreach ([$message, ...$messages] as $m) { - if (isset($this->messages[$m->getId()])) { + if (isset($schedule->messages[$m->getId()])) { throw new LogicException('Duplicated schedule message.'); } - $this->messages[$m->getId()] = $m; + $schedule->messages[$m->getId()] = $m; } + return $schedule; + } + + /** + * @return $this + */ + public function remove(RecurringMessage $message): static + { + unset($this->messages[$message->getId()]); + $this->setRestart(true); + + return $this; + } + + /** + * @return $this + */ + public function clear(): static + { + $this->messages = []; + $this->setRestart(true); + return $this; } @@ -83,4 +118,14 @@ public function getSchedule(): static { return $this; } + + public function shouldRestart(): bool + { + return $this->shouldRestart; + } + + public function setRestart(bool $shouldRestart): bool + { + return $this->shouldRestart = $shouldRestart; + } } diff --git a/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php b/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php index eb35766cf3961..01522288f2a93 100644 --- a/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php +++ b/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php @@ -95,6 +95,72 @@ public function getSchedule(): Schedule } } + public function testGetMessagesFromScheduleProviderWithRestart() + { + $first = (object) ['id' => 'first']; + $startTime = '22:12:00'; + $runs = [ + '22:12:00' => [], + '22:12:01' => [], + '22:13:00' => [$first], + '22:13:01' => [], + ]; + $schedule = [[$first, '22:13:00', '22:14:00']]; + + // for referencing + $now = self::makeDateTime($startTime); + + $clock = $this->createMock(ClockInterface::class); + $clock->method('now')->willReturnReference($now); + + foreach ($schedule as $i => $s) { + if (\is_array($s)) { + $schedule[$i] = $this->createMessage(...$s); + } + } + + $scheduleProvider = new class($schedule) implements ScheduleProviderInterface { + private Schedule $schedule; + + public function __construct(array $schedule) + { + $this->schedule = Schedule::with(...$schedule); + $this->schedule->stateful(new ArrayAdapter()); + } + + public function getSchedule(): Schedule + { + return $this->schedule; + } + + public function add(RecurringMessage $message): self + { + $this->schedule->add($message); + + return $this; + } + }; + + $scheduler = new MessageGenerator($scheduleProvider, 'dummy', $clock); + + // Warmup. The first run always returns nothing. + $this->assertSame([], iterator_to_array($scheduler->getMessages(), false)); + + $toAdd = (object) ['id' => 'added-after-start']; + + foreach ($runs as $time => $expected) { + $now = self::makeDateTime($time); + $this->assertSame($expected, iterator_to_array($scheduler->getMessages(), false)); + } + + $scheduleProvider->add($this->createMessage($toAdd, '22:13:10', '22:13:11')); + + $this->assertSame([], iterator_to_array($scheduler->getMessages(), false)); + + $now = self::makeDateTime('22:13:10'); + $this->assertSame([$toAdd], iterator_to_array($scheduler->getMessages(), false)); + } + public function testYieldedContext() { // for referencing