Skip to content

Commit bf89cd6

Browse files
committed
Making sleep configurable
1 parent 19a3c7f commit bf89cd6

File tree

7 files changed

+63
-18
lines changed

7 files changed

+63
-18
lines changed

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ protected function configure(): void
7474
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
7575
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
7676
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
77+
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7778
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
7879
])
7980
->setDescription('Consumes messages')
@@ -184,7 +185,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
184185
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
185186
}
186187

187-
$worker->run();
188+
$worker->run([
189+
'sleep' => $input->getOption('sleep') * 1000000
190+
]);
188191
}
189192

190193
private function convertToBytes(string $memoryLimit): int

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public function testWorkerDispatchTheReceivedMessage()
4444
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
4545

4646
$worker = new Worker($receiver, $bus, 'receiver_id');
47-
$worker->run(function (?Envelope $envelope) use ($worker) {
47+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
4848
// stop after the messages finish
4949
if (null === $envelope) {
5050
$worker->stop();
@@ -65,7 +65,7 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
6565
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
6666

6767
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
68-
$worker->run(function (?Envelope $envelope) use ($worker) {
68+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
6969
// stop after the messages finish
7070
if (null === $envelope) {
7171
$worker->stop();
@@ -101,7 +101,7 @@ public function testDispatchCausesRetry()
101101
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
102102

103103
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
104-
$worker->run(function (?Envelope $envelope) use ($worker) {
104+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
105105
// stop after the messages finish
106106
if (null === $envelope) {
107107
$worker->stop();
@@ -125,7 +125,7 @@ public function testDispatchCausesRejectWhenNoRetry()
125125
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
126126

127127
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
128-
$worker->run(function (?Envelope $envelope) use ($worker) {
128+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
129129
// stop after the messages finish
130130
if (null === $envelope) {
131131
$worker->stop();
@@ -148,7 +148,7 @@ public function testDispatchCausesRejectOnUnrecoverableMessage()
148148
$retryStrategy->expects($this->never())->method('isRetryable');
149149

150150
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
151-
$worker->run(function (?Envelope $envelope) use ($worker) {
151+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
152152
// stop after the messages finish
153153
if (null === $envelope) {
154154
$worker->stop();
@@ -168,7 +168,7 @@ public function testWorkerDoesNotSendNullMessagesToTheBus()
168168
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
169169

170170
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
171-
$worker->run(function (?Envelope $envelope) use ($worker) {
171+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
172172
// stop after the messages finish
173173
if (null === $envelope) {
174174
$worker->stop();
@@ -195,7 +195,7 @@ public function testWorkerDispatchesEventsOnSuccess()
195195
);
196196

197197
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
198-
$worker->run(function (?Envelope $envelope) use ($worker) {
198+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
199199
// stop after the messages finish
200200
if (null === $envelope) {
201201
$worker->stop();
@@ -223,13 +223,49 @@ public function testWorkerDispatchesEventsOnError()
223223
);
224224

225225
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
226-
$worker->run(function (?Envelope $envelope) use ($worker) {
226+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
227227
// stop after the messages finish
228228
if (null === $envelope) {
229229
$worker->stop();
230230
}
231231
});
232232
}
233+
234+
public function testTimeoutIsConfigurable()
235+
{
236+
$apiMessage = new DummyMessage('API');
237+
$receiver = new DummyReceiver([
238+
[new Envelope($apiMessage), new Envelope($apiMessage)],
239+
null, // will cause a wait
240+
null, // will cause a wait
241+
[new Envelope($apiMessage)],
242+
[new Envelope($apiMessage)],
243+
null, // will cause a wait
244+
[new Envelope($apiMessage)],
245+
]);
246+
247+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
248+
249+
$worker = new Worker($receiver, $bus, 'receiver_id');
250+
$receivedCount = 0;
251+
$startTime = microtime(true);
252+
// sleep .1 after each idle
253+
$worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) {
254+
if (null !== $envelope) {
255+
$receivedCount++;
256+
}
257+
258+
if (5 === $receivedCount) {
259+
$worker->stop();
260+
$duration = microtime(true) - $startTime;
261+
262+
// wait time should be .3 seconds, so execution should
263+
// be only a bit more than that
264+
$this->assertGreaterThanOrEqual(.3, $duration);
265+
$this->assertLessThan(.5, $duration);
266+
}
267+
});
268+
}
233269
}
234270

235271
class DummyReceiver implements ReceiverInterface

src/Symfony/Component/Messenger/Worker.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,12 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu
6060
/**
6161
* Receive the messages and dispatch them to the bus.
6262
*/
63-
public function run(callable $onHandledCallback = null): void
63+
public function run(array $options = [], callable $onHandledCallback = null): void
6464
{
65+
$options = array_merge([
66+
'sleep' => 1000000,
67+
], $options);
68+
6569
if (\function_exists('pcntl_signal')) {
6670
pcntl_signal(SIGTERM, function () {
6771
$this->stop();
@@ -92,7 +96,7 @@ public function run(callable $onHandledCallback = null): void
9296
if (false === $envelopeHandled) {
9397
$handled(null);
9498

95-
usleep(1000000);
99+
usleep($options['sleep']);
96100
}
97101
}
98102
}

src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit,
3737
};
3838
}
3939

40-
public function run(callable $onHandledCallback = null): void
40+
public function run(array $options = [], callable $onHandledCallback = null): void
4141
{
42-
$this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback) {
42+
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) {
4343
if (null !== $onHandledCallback) {
4444
$onHandledCallback($envelope);
4545
}

src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ public function __construct(WorkerInterface $decoratedWorker, int $maximumNumber
3333
$this->logger = $logger;
3434
}
3535

36-
public function run(callable $onHandledCallback = null): void
36+
public function run(array $options = [], callable $onHandledCallback = null): void
3737
{
3838
$receivedMessages = 0;
3939

40-
$this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
40+
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
4141
if (null !== $onHandledCallback) {
4242
$onHandledCallback($envelope);
4343
}

src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSe
3333
$this->logger = $logger;
3434
}
3535

36-
public function run(callable $onHandledCallback = null): void
36+
public function run(array $options = [], callable $onHandledCallback = null): void
3737
{
3838
$startTime = microtime(true);
3939
$endTime = $startTime + $this->timeLimitInSeconds;
4040

41-
$this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
41+
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
4242
if (null !== $onHandledCallback) {
4343
$onHandledCallback($envelope);
4444
}

src/Symfony/Component/Messenger/WorkerInterface.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ interface WorkerInterface
2424
*
2525
* The $onHandledCallback will be passed the Envelope that was just
2626
* handled or null if nothing was handled.
27+
*
28+
* @param mixed[] $options Options used to control worker behavior.
2729
*/
28-
public function run(callable $onHandledCallback = null): void;
30+
public function run(array $options = [], callable $onHandledCallback = null): void;
2931

3032
/**
3133
* Stop receiving messages.

0 commit comments

Comments
 (0)