Skip to content

[Messenger] Add a memory limit option for ConsumeMessagesCommand #26975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument type="service" id="message_bus" />
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />

<tag name="console.command" command="messenger:consume-messages" />
</service>
Expand Down
48 changes: 43 additions & 5 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
namespace Symfony\Component\Messenger\Command;

use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;

Expand All @@ -33,24 +35,27 @@ class ConsumeMessagesCommand extends Command

private $bus;
private $receiverLocator;
private $logger;

public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null)
{
parent::__construct();

$this->bus = $bus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
}

/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDefinition(array(
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
Expand All @@ -61,6 +66,10 @@ protected function configure()
Use the --limit option to limit the number of messages received:

<info>php %command.full_name% <receiver-name> --limit=10</info>

Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:

<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
EOF
)
;
Expand All @@ -69,7 +78,7 @@ protected function configure()
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): void
{
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
Expand All @@ -80,10 +89,39 @@ protected function execute(InputInterface $input, OutputInterface $output)
}

if ($limit = $input->getOption('limit')) {
$receiver = new MaximumCountReceiver($receiver, $limit);
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
}

if ($memoryLimit = $input->getOption('memory-limit')) {
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
}

$worker = new Worker($receiver, $this->bus);
$worker->run();
}

private function convertToBytes(string $memoryLimit): int
{
$memoryLimit = strtolower($memoryLimit);
$max = strtolower(ltrim($memoryLimit, '+'));
if (0 === strpos($max, '0x')) {
$max = intval($max, 16);
} elseif (0 === strpos($max, '0')) {
$max = intval($max, 8);
} else {
$max = (int) $max;
}

switch (substr($memoryLimit, -1)) {
case 't': $max *= 1024;
// no break
case 'g': $max *= 1024;
// no break
case 'm': $max *= 1024;
// no break
case 'k': $max *= 1024;
}

return $max;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
Expand Down Expand Up @@ -58,7 +57,7 @@ public function testItSendsAndReceivesMessages()
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);

if (2 == ++$receivedMessages) {
if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
Expand Down Expand Up @@ -116,9 +115,15 @@ public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
$connection->queue()->purge();

$sender = new AmqpSender($serializer, $connection);
$receiver = new MaximumCountReceiver(new AmqpReceiver($serializer, $connection), 2);
$receiver->receive(function ($message) {
$receiver = new AmqpReceiver($serializer, $connection);

$receivedMessages = 0;
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
$this->assertNull($message);

if (2 === ++$receivedMessages) {
$receiver->stop();
}
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace Symfony\Component\Messenger\Tests\Fixtures;

use Symfony\Component\Messenger\Transport\ReceiverInterface;

class CallbackReceiver implements ReceiverInterface
{
private $callable;

public function __construct(callable $callable)
{
$this->callable = $callable;
}

public function receive(callable $handler): void
{
$callable = $this->callable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

($this->callable)($handler);

$callable($handler);
}

public function stop(): void
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;

use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;

class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
{
/**
* @dataProvider memoryProvider
*/
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
};

$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();

$decoratedReceiver->expects($this->once())->method('receive');
if (true === $shouldStop) {
$decoratedReceiver->expects($this->once())->method('stop');
} else {
$decoratedReceiver->expects($this->never())->method('stop');
}

$memoryResolver = function () use ($memoryUsage) {
return $memoryUsage;
};

$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver);
$memoryLimitReceiver->receive(function () {});
}

public function memoryProvider()
{
yield array(2048, 1024, true);
yield array(1024, 1024, false);
yield array(1024, 2048, false);
}

public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('API'));
};

$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();

$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');

$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => 64 * 1024 * 1024));

$memoryResolver = function () {
return 70 * 1024 * 1024;
};

$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver);
$memoryLimitReceiver->receive(function () {});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;

use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;

class StopWhenMessageCountIsExceededReceiverTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
$handler(new DummyMessage('Second message'));
$handler(new DummyMessage('Third message'));
};

$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();

$decoratedReceiver->expects($this->once())->method('receive');
if (true === $shouldStop) {
$decoratedReceiver->expects($this->any())->method('stop');
} else {
$decoratedReceiver->expects($this->never())->method('stop');
}

$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max);
$maximumCountReceiver->receive(function () {});
}

public function countProvider()
{
yield array(1, true);
yield array(2, true);
yield array(3, true);
yield array(4, false);
}

public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage()
{
$callable = function ($handler) {
$handler(null);
$handler(null);
$handler(null);
$handler(null);
};

$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();

$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->never())->method('stop');

$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1);
$maximumCountReceiver->receive(function () {});
}

public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
{
$callable = function ($handler) {
$handler(new DummyMessage('First message'));
};

$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
->setConstructorArgs(array($callable))
->enableProxyingToOriginalMethods()
->getMock();

$decoratedReceiver->expects($this->once())->method('receive');
$decoratedReceiver->expects($this->once())->method('stop');

$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Receiver stopped due to maximum count of {count} exceeded'),
$this->equalTo(array('count' => 1))
);

$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger);
$maximumCountReceiver->receive(function () {});
}
}
22 changes: 1 addition & 21 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;

class WorkerTest extends TestCase
Expand Down Expand Up @@ -83,23 +83,3 @@ public function testWorkerDoesNotSendNullMessagesToTheBus()
$worker->run();
}
}

class CallbackReceiver implements ReceiverInterface
{
private $callable;

public function __construct(callable $callable)
{
$this->callable = $callable;
}

public function receive(callable $handler): void
{
$callable = $this->callable;
$callable($handler);
}

public function stop(): void
{
}
}
Loading