Skip to content

Commit 2473d1f

Browse files
committed
[Scheduler] add RecurringMessage::getId() and prevent duplicates
1 parent 65af407 commit 2473d1f

File tree

8 files changed

+93
-19
lines changed

8 files changed

+93
-19
lines changed

src/Symfony/Component/Scheduler/Generator/MessageContext.php

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ final class MessageContext
2222
{
2323
public function __construct(
2424
public readonly string $name,
25+
public readonly string $id,
2526
public readonly TriggerInterface $trigger,
2627
public readonly \DateTimeImmutable $triggeredAt,
2728
public readonly ?\DateTimeImmutable $nextTriggerAt = null,

src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

+10-8
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
use Psr\Clock\ClockInterface;
1515
use Symfony\Component\Clock\Clock;
16+
use Symfony\Component\Scheduler\RecurringMessage;
1617
use Symfony\Component\Scheduler\Schedule;
17-
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
1818

1919
/**
2020
* @experimental
@@ -49,11 +49,13 @@ public function getMessages(): \Generator
4949
$heap = $this->heap($lastTime);
5050

5151
while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
52-
/** @var TriggerInterface $trigger */
53-
/** @var int $index */
5452
/** @var \DateTimeImmutable $time */
55-
/** @var object $message */
56-
[$time, $index, $trigger, $message] = $heap->extract();
53+
/** @var int $index */
54+
/** @var RecurringMessage $recurringMessage */
55+
[$time, $index, $recurringMessage] = $heap->extract();
56+
$id = $recurringMessage->getId();
57+
$message = $recurringMessage->getMessage();
58+
$trigger = $recurringMessage->getTrigger();
5759
$yield = true;
5860

5961
if ($time < $lastTime) {
@@ -64,11 +66,11 @@ public function getMessages(): \Generator
6466
}
6567

6668
if ($nextTime = $trigger->getNextRunDate($time)) {
67-
$heap->insert([$nextTime, $index, $trigger, $message]);
69+
$heap->insert([$nextTime, $index, $recurringMessage]);
6870
}
6971

7072
if ($yield) {
71-
yield (new MessageContext($this->name, $trigger, $time, $nextTime)) => $message;
73+
yield (new MessageContext($this->name, $id, $trigger, $time, $nextTime)) => $message;
7274
$this->checkpoint->save($time, $index);
7375
}
7476
}
@@ -91,7 +93,7 @@ private function heap(\DateTimeImmutable $time): TriggerHeap
9193
continue;
9294
}
9395

94-
$heap->insert([$nextTime, $index, $recurringMessage->getTrigger(), $recurringMessage->getMessage()]);
96+
$heap->insert([$nextTime, $index, $recurringMessage]);
9597
}
9698

9799
return $this->triggerHeap = $heap;

src/Symfony/Component/Scheduler/Generator/TriggerHeap.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212
namespace Symfony\Component\Scheduler\Generator;
1313

14-
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
14+
use Symfony\Component\Scheduler\RecurringMessage;
1515

1616
/**
1717
* @internal
1818
*
19-
* @extends \SplHeap<array{\DateTimeImmutable, int, TriggerInterface, object}>
19+
* @extends \SplHeap<array{\DateTimeImmutable, int, RecurringMessage}>
2020
*
2121
* @experimental
2222
*/
@@ -28,8 +28,8 @@ public function __construct(
2828
}
2929

3030
/**
31-
* @param array{\DateTimeImmutable, int, TriggerInterface, object} $value1
32-
* @param array{\DateTimeImmutable, int, TriggerInterface, object} $value2
31+
* @param array{\DateTimeImmutable, int, RecurringMessage} $value1
32+
* @param array{\DateTimeImmutable, int, RecurringMessage} $value2
3333
*/
3434
protected function compare(mixed $value1, mixed $value2): int
3535
{

src/Symfony/Component/Scheduler/RecurringMessage.php

+25
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
*/
2222
final class RecurringMessage
2323
{
24+
private string $id;
25+
2426
private function __construct(
2527
private readonly TriggerInterface $trigger,
2628
private readonly object $message,
@@ -59,6 +61,29 @@ public static function trigger(TriggerInterface $trigger, object $message): self
5961
return new self($trigger, $message);
6062
}
6163

64+
/**
65+
* Unique identifier for this message's context.
66+
*/
67+
public function getId(): string
68+
{
69+
if (isset($this->id)) {
70+
return $this->id;
71+
}
72+
73+
try {
74+
$message = $this->message instanceof \Stringable ? (string) $this->message : serialize($this->message);
75+
} catch (\Exception) {
76+
$message = '';
77+
}
78+
79+
return $this->id = hash('crc32c', implode('', [
80+
$this->message::class,
81+
$message,
82+
$this->trigger::class,
83+
(string) $this->trigger,
84+
]));
85+
}
86+
6287
public function getMessage(): object
6388
{
6489
return $this->message;

src/Symfony/Component/Scheduler/Schedule.php

+10-4
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@
1212
namespace Symfony\Component\Scheduler;
1313

1414
use Symfony\Component\Lock\LockInterface;
15+
use Symfony\Component\Scheduler\Exception\LogicException;
1516
use Symfony\Contracts\Cache\CacheInterface;
1617

1718
/**
1819
* @experimental
1920
*/
2021
final class Schedule implements ScheduleProviderInterface
2122
{
22-
/** @var array<RecurringMessage> */
23+
/** @var array<string,RecurringMessage> */
2324
private array $messages = [];
2425
private ?LockInterface $lock = null;
2526
private ?CacheInterface $state = null;
@@ -29,8 +30,13 @@ final class Schedule implements ScheduleProviderInterface
2930
*/
3031
public function add(RecurringMessage $message, RecurringMessage ...$messages): static
3132
{
32-
$this->messages[] = $message;
33-
$this->messages = array_merge($this->messages, $messages);
33+
foreach ([$message, ...$messages] as $m) {
34+
if (isset($this->messages[$m->getId()])) {
35+
throw new LogicException('Duplicated schedule message.');
36+
}
37+
38+
$this->messages[$m->getId()] = $m;
39+
}
3440

3541
return $this;
3642
}
@@ -70,7 +76,7 @@ public function getState(): ?CacheInterface
7076
*/
7177
public function getRecurringMessages(): array
7278
{
73-
return $this->messages;
79+
return array_values($this->messages);
7480
}
7581

7682
/**

src/Symfony/Component/Scheduler/Tests/Messenger/SchedulerTransportTest.php

+4-3
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,17 @@ public function testGetFromIterator()
3434
$generator->method('getMessages')->willReturnCallback(function () use ($messages): \Generator {
3535
$trigger = $this->createMock(TriggerInterface::class);
3636
$triggerAt = new \DateTimeImmutable('2020-02-20T02:00:00', new \DateTimeZone('UTC'));
37-
yield (new MessageContext('default', $trigger, $triggerAt)) => $messages[0];
38-
yield (new MessageContext('default', $trigger, $triggerAt)) => $messages[1];
37+
yield (new MessageContext('default', 'id1', $trigger, $triggerAt)) => $messages[0];
38+
yield (new MessageContext('default', 'id2', $trigger, $triggerAt)) => $messages[1];
3939
});
4040
$transport = new SchedulerTransport($generator);
4141

42-
foreach ($transport->get() as $envelope) {
42+
foreach ($transport->get() as $i => $envelope) {
4343
$this->assertInstanceOf(Envelope::class, $envelope);
4444
$this->assertNotNull($stamp = $envelope->last(ScheduledStamp::class));
4545
$this->assertSame(array_shift($messages), $envelope->getMessage());
4646
$this->assertSame('default', $stamp->messageContext->name);
47+
$this->assertSame('id'.$i + 1, $stamp->messageContext->id);
4748
}
4849

4950
$this->assertEmpty($messages);

src/Symfony/Component/Scheduler/Tests/RecurringMessageTest.php

+9
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,13 @@ public function testHashedCronContextIsRequiredIfMessageIsNotStringable()
4242

4343
RecurringMessage::cron('#midnight', new \stdClass());
4444
}
45+
46+
public function testUniqueId()
47+
{
48+
$message1 = RecurringMessage::cron('* * * * *', new \stdClass());
49+
$message2 = RecurringMessage::cron('* 5 * * *', new \stdClass());
50+
51+
$this->assertSame($message1->getId(), (clone $message1)->getId());
52+
$this->assertNotSame($message1->getId(), $message2->getId());
53+
}
4554
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Tests;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Scheduler\Exception\LogicException;
16+
use Symfony\Component\Scheduler\RecurringMessage;
17+
use Symfony\Component\Scheduler\Schedule;
18+
19+
class ScheduleTest extends TestCase
20+
{
21+
public function testCannotAddDuplicateMessage()
22+
{
23+
$schedule = new Schedule();
24+
$schedule->add(RecurringMessage::cron('* * * * *', new \stdClass()));
25+
26+
$this->expectException(LogicException::class);
27+
28+
$schedule->add(RecurringMessage::cron('* * * * *', new \stdClass()));
29+
}
30+
}

0 commit comments

Comments
 (0)