|
38 | 38 | use Symfony\Component\Messenger\MessageBusInterface;
|
39 | 39 | use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
|
40 | 40 | use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
| 41 | +use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; |
| 42 | +use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; |
41 | 43 | use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
42 | 44 | use Symfony\Component\Messenger\Stamp\SentStamp;
|
43 | 45 | use Symfony\Component\Messenger\Stamp\StampInterface;
|
@@ -584,6 +586,49 @@ public function testFlushBatchOnStop()
|
584 | 586 |
|
585 | 587 | $this->assertSame($expectedMessages, $handler->processedMessages);
|
586 | 588 | }
|
| 589 | + |
| 590 | + public function testFlushRemovesNoAutoAckStampOnException() |
| 591 | + { |
| 592 | + $envelope = new Envelope(new DummyMessage('Test')); |
| 593 | + $receiver = new DummyReceiver([[$envelope]]); |
| 594 | + |
| 595 | + $bus = new class implements MessageBusInterface { |
| 596 | + public function dispatch(object $message, array $stamps = []): Envelope |
| 597 | + { |
| 598 | + $envelope = Envelope::wrap($message, $stamps); |
| 599 | + if ($envelope->last(FlushBatchHandlersStamp::class)) { |
| 600 | + throw new \RuntimeException('Flush failed'); |
| 601 | + } |
| 602 | + |
| 603 | + return $envelope; |
| 604 | + } |
| 605 | + }; |
| 606 | + |
| 607 | + $dispatcher = new EventDispatcher(); |
| 608 | + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) { |
| 609 | + static $calls = 0; |
| 610 | + if (++$calls >= 2) { |
| 611 | + $event->getWorker()->stop(); |
| 612 | + } |
| 613 | + }); |
| 614 | + |
| 615 | + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock()); |
| 616 | + |
| 617 | + $reflection = new \ReflectionClass($worker); |
| 618 | + $unacksProperty = $reflection->getProperty('unacks'); |
| 619 | + $unacks = $unacksProperty->getValue($worker); |
| 620 | + $dummyHandler = new DummyBatchHandler(); |
| 621 | + $envelopeWithNoAutoAck = $envelope->with(new NoAutoAckStamp(new HandlerDescriptor($dummyHandler))); |
| 622 | + $unacks->attach($dummyHandler, [$envelopeWithNoAutoAck, 'transport']); |
| 623 | + |
| 624 | + $worker->run(); |
| 625 | + |
| 626 | + $this->assertSame(1, $receiver->getRejectCount()); |
| 627 | + $rejectedEnvelopes = $receiver->getRejectedEnvelopes(); |
| 628 | + $this->assertCount(1, $rejectedEnvelopes); |
| 629 | + $rejectedEnvelope = $rejectedEnvelopes[0]; |
| 630 | + $this->assertNull($rejectedEnvelope->last(NoAutoAckStamp::class)); |
| 631 | + } |
587 | 632 | }
|
588 | 633 |
|
589 | 634 | class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
|
|
0 commit comments