Skip to content

Commit 9b908ca

Browse files
[Messenger] Run the cycle collector only when the Worker is idle
Co-authored-by: Nicolas Grekas <nicolas.grekas@gmail.com>
1 parent a5a6eda commit 9b908ca

File tree

2 files changed

+44
-13
lines changed

2 files changed

+44
-13
lines changed

src/Symfony/Component/Messenger/EventListener/ResetMemoryUsageListener.php

+18-8
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,45 @@
1212
namespace Symfony\Component\Messenger\EventListener;
1313

1414
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
15-
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
16-
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
1715
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
16+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
1817

1918
/**
2019
* @author Tim Düsterhus <tim@tideways-gmbh.com>
2120
*/
2221
class ResetMemoryUsageListener implements EventSubscriberInterface
2322
{
23+
/**
24+
* Indicates whether the GC already ran during the current idle phase.
25+
*/
26+
private bool $collected = false;
27+
2428
public function resetBefore(WorkerMessageReceivedEvent $event): void
2529
{
2630
// Reset the peak memory usage for accurate measurement of the
2731
// memory usage on a per-message basis.
2832
memory_reset_peak_usage();
2933
}
3034

31-
public function collectAfter(WorkerMessageHandledEvent|WorkerMessageFailedEvent $event): void
35+
public function collectAfter(WorkerRunningEvent $event): void
3236
{
33-
// Run the cycle collector after handling a message to avoid it
34-
// running while the message is reserved and in processing.
35-
gc_collect_cycles();
37+
// Try to avoid running the GC while a message is handled by running it once while the worker is idle
38+
if ($event->isWorkerIdle()) {
39+
if (!$this->collected) {
40+
gc_collect_cycles();
41+
42+
$this->collected = true;
43+
}
44+
} else {
45+
$this->collected = false;
46+
}
3647
}
3748

3849
public static function getSubscribedEvents(): array
3950
{
4051
return [
4152
WorkerMessageReceivedEvent::class => ['resetBefore', -1024],
42-
WorkerMessageFailedEvent::class => ['collectAfter', -1024],
43-
WorkerMessageHandledEvent::class => ['collectAfter', -1024],
53+
WorkerRunningEvent::class => ['collectAfter', -1024],
4454
];
4555
}
4656
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

+26-5
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ public function testFlushBatchOnStop()
587587
$this->assertSame($expectedMessages, $handler->processedMessages);
588588
}
589589

590-
public function testGcCollectCyclesIsCalledOnMessageHandle()
590+
public function testGcCollectCyclesIsCalledOnIdleWorker()
591591
{
592592
$apiMessage = new DummyMessage('API');
593593

@@ -597,14 +597,35 @@ public function testGcCollectCyclesIsCalledOnMessageHandle()
597597

598598
$dispatcher = new EventDispatcher();
599599
$dispatcher->addSubscriber(new ResetMemoryUsageListener());
600-
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
600+
$before = 0;
601+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use (&$before) {
602+
static $i = 0;
603+
604+
$after = gc_status()['runs'];
605+
if (0 === $i) {
606+
$this->assertFalse($event->isWorkerIdle());
607+
$this->assertSame(0, $after - $before);
608+
} else if (1 === $i) {
609+
$this->assertTrue($event->isWorkerIdle());
610+
$this->assertSame(1, $after - $before);
611+
} else if (3 === $i) {
612+
// Wait a few idle phases before stopping.
613+
$this->assertSame(1, $after - $before);
614+
$event->getWorker()->stop();
615+
}
616+
617+
$i++;
618+
}, PHP_INT_MIN);
619+
601620

602621
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
603-
$worker->run();
604622

605-
$gcStatus = gc_status();
623+
gc_collect_cycles();
624+
$before = gc_status()['runs'];
606625

607-
$this->assertGreaterThan(0, $gcStatus['runs']);
626+
$worker->run([
627+
'sleep' => 0,
628+
]);
608629
}
609630

610631
public function testMemoryUsageIsResetOnMessageHandle()

0 commit comments

Comments
 (0)