|
21 | 21 | use Symfony\Component\Messenger\Event\WorkerStartedEvent;
|
22 | 22 | use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
23 | 23 | use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
| 24 | +use Symfony\Component\Messenger\Exception\RuntimeException; |
24 | 25 | use Symfony\Component\Messenger\MessageBusInterface;
|
25 | 26 | use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
26 | 27 | use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
27 | 28 | use Symfony\Component\Messenger\Stamp\SentStamp;
|
28 | 29 | use Symfony\Component\Messenger\Stamp\StampInterface;
|
29 | 30 | use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
| 31 | +use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; |
30 | 32 | use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
31 | 33 | use Symfony\Component\Messenger\Worker;
|
32 | 34 | use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
@@ -245,6 +247,41 @@ public function testWorkerWithMultipleReceivers()
|
245 | 247 | $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
|
246 | 248 | }
|
247 | 249 |
|
| 250 | + public function testWorkerLimitQueues() |
| 251 | + { |
| 252 | + $envelope = [new Envelope(new DummyMessage('message1'))]; |
| 253 | + $receiver = $this->createMock(QueueReceiverInterface::class); |
| 254 | + $receiver->expects($this->once()) |
| 255 | + ->method('getFromQueues') |
| 256 | + ->with(['foo']) |
| 257 | + ->willReturn($envelope) |
| 258 | + ; |
| 259 | + $receiver->expects($this->never()) |
| 260 | + ->method('get') |
| 261 | + ; |
| 262 | + |
| 263 | + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); |
| 264 | + |
| 265 | + $dispatcher = new EventDispatcher(); |
| 266 | + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); |
| 267 | + |
| 268 | + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); |
| 269 | + $worker->run(['queues' => ['foo']]); |
| 270 | + } |
| 271 | + |
| 272 | + public function testWorkerLimitQueuesUnsupported() |
| 273 | + { |
| 274 | + $receiver1 = $this->createMock(QueueReceiverInterface::class); |
| 275 | + $receiver2 = $this->createMock(ReceiverInterface::class); |
| 276 | + |
| 277 | + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); |
| 278 | + |
| 279 | + $worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus); |
| 280 | + $this->expectException(RuntimeException::class); |
| 281 | + $this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class)); |
| 282 | + $worker->run(['queues' => ['foo']]); |
| 283 | + } |
| 284 | + |
248 | 285 | public function testWorkerMessageReceivedEventMutability()
|
249 | 286 | {
|
250 | 287 | $envelope = new Envelope(new DummyMessage('Hello'));
|
|
0 commit comments