Skip to content

[Messenger] Reset peak memory usage for each message #60018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
Expand Down Expand Up @@ -218,6 +219,9 @@
service('services_resetter'),
])

->set('messenger.listener.reset_memory_usage', ResetMemoryUsageListener::class)
->tag('kernel.event_subscriber')

->set('messenger.routable_message_bus', RoutableMessageBus::class)
->args([
abstract_arg('message bus locator'),
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ CHANGELOG
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
* Add `--class-filter` option to the `messenger:failed:remove` command
* Add `$stamps` parameter to `HandleTrait::handle`
* Add `Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener` to reset PHP's peak memory usage for each processed message

7.2
---
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\EventListener;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;

/**
* @author Tim Düsterhus <tim@tideways-gmbh.com>
*/
final class ResetMemoryUsageListener implements EventSubscriberInterface
Copy link
Contributor

@94noni 94noni Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt to add some debug/info log here ? can it be usefull for ppl?

so they know if they want to "disable/opt out" this subscriber in their app

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if anyone needs this to be configurable. I don't see why it should at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, i was mostly commenting to this thx

{
private bool $collect = false;

public function resetBefore(WorkerMessageReceivedEvent $event): void
{
// Reset the peak memory usage for accurate measurement of the
// memory usage on a per-message basis.
memory_reset_peak_usage();
$this->collect = true;
}

public function collectAfter(WorkerRunningEvent $event): void
{
if ($event->isWorkerIdle() && $this->collect) {
gc_collect_cycles();
$this->collect = false;
}
}

public static function getSubscribedEvents(): array
{
return [
WorkerMessageReceivedEvent::class => ['resetBefore', -1024],
WorkerRunningEvent::class => ['collectAfter', -1024],
];
}
}
57 changes: 54 additions & 3 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\RuntimeException;
Expand Down Expand Up @@ -586,7 +587,7 @@ public function testFlushBatchOnStop()
$this->assertSame($expectedMessages, $handler->processedMessages);
}

public function testGcCollectCyclesIsCalledOnMessageHandle()
public function testGcCollectCyclesIsCalledOnIdleWorker()
{
$apiMessage = new DummyMessage('API');

Expand All @@ -595,14 +596,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle()
$bus = $this->createMock(MessageBusInterface::class);

$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new ResetMemoryUsageListener());
$before = 0;
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use (&$before) {
static $i = 0;

$after = gc_status()['runs'];
if (0 === $i) {
$this->assertFalse($event->isWorkerIdle());
$this->assertSame(0, $after - $before);
} else if (1 === $i) {
$this->assertTrue($event->isWorkerIdle());
$this->assertSame(1, $after - $before);
} else if (3 === $i) {
// Wait a few idle phases before stopping.
$this->assertSame(1, $after - $before);
$event->getWorker()->stop();
}

$i++;
}, PHP_INT_MIN);


$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);

gc_collect_cycles();
$before = gc_status()['runs'];

$worker->run([
'sleep' => 0,
]);
}

public function testMemoryUsageIsResetOnMessageHandle()
{
$apiMessage = new DummyMessage('API');

$receiver = new DummyReceiver([[new Envelope($apiMessage)]]);

$bus = $this->createMock(MessageBusInterface::class);

$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new ResetMemoryUsageListener());
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

// Allocate and deallocate 4 MB. The use of random_int() is to
// prevent compile-time optimization.
$memory = str_repeat(random_int(0, 1), 4 * 1024 * 1024);
unset($memory);

$before = memory_get_peak_usage();

$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
$worker->run();

$gcStatus = gc_status();
// This should be roughly 4 MB smaller than $before.
$after = memory_get_peak_usage();

$this->assertGreaterThan(0, $gcStatus['runs']);
$this->assertTrue($after < $before);
}

/**
Expand Down
2 changes: 0 additions & 2 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ public function run(array $options = []): void
// this should prevent multiple lower priority receivers from
// blocking too long before the higher priority are checked
if ($envelopeHandled) {
gc_collect_cycles();

break;
}
}
Expand Down
Loading