|
24 | 24 | use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
|
25 | 25 | use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
|
26 | 26 | use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
| 27 | +use Symfony\Component\Messenger\Exception\ValidationFailedException; |
27 | 28 | use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
28 | 29 | use Symfony\Component\Messenger\Handler\HandlersLocator;
|
29 | 30 | use Symfony\Component\Messenger\MessageBus;
|
|
32 | 33 | use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
|
33 | 34 | use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
|
34 | 35 | use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
| 36 | +use Symfony\Component\Messenger\Middleware\ValidationMiddleware; |
35 | 37 | use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
36 | 38 | use Symfony\Component\Messenger\Stamp\BusNameStamp;
|
37 | 39 | use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
|
|
42 | 44 | use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
43 | 45 | use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
|
44 | 46 | use Symfony\Component\Messenger\Worker;
|
| 47 | +use Symfony\Component\Validator\ConstraintViolation; |
| 48 | +use Symfony\Component\Validator\ConstraintViolationList; |
| 49 | +use Symfony\Component\Validator\Validator\ValidatorInterface; |
45 | 50 |
|
46 | 51 | class FailureIntegrationTest extends TestCase
|
47 | 52 | {
|
@@ -440,6 +445,87 @@ public function testStampsAddedByMiddlewaresDontDisappearWhenDelayedMessageFails
|
440 | 445 | $this->assertCount(1, $messagesWaiting);
|
441 | 446 | $this->assertSame('some.bus', $messagesWaiting[0]->last(BusNameStamp::class)?->getBusName());
|
442 | 447 | }
|
| 448 | + |
| 449 | + public function testStampsAddedByMiddlewaresDontDisappearWhenValidationFails() |
| 450 | + { |
| 451 | + $transport1 = new DummyFailureTestSenderAndReceiver(); |
| 452 | + |
| 453 | + $transports = [ |
| 454 | + 'transport1' => $transport1, |
| 455 | + ]; |
| 456 | + |
| 457 | + $locator = $this->createMock(ContainerInterface::class); |
| 458 | + $locator->expects($this->any()) |
| 459 | + ->method('has') |
| 460 | + ->willReturn(true); |
| 461 | + $locator->expects($this->any()) |
| 462 | + ->method('get') |
| 463 | + ->willReturnCallback(fn ($transportName) => $transports[$transportName]); |
| 464 | + $senderLocator = new SendersLocator([], $locator); |
| 465 | + |
| 466 | + $retryStrategyLocator = $this->createMock(ContainerInterface::class); |
| 467 | + $retryStrategyLocator->expects($this->any()) |
| 468 | + ->method('has') |
| 469 | + ->willReturn(true); |
| 470 | + $retryStrategyLocator->expects($this->any()) |
| 471 | + ->method('get') |
| 472 | + ->willReturn(new MultiplierRetryStrategy(1)); |
| 473 | + |
| 474 | + $violationList = new ConstraintViolationList([new ConstraintViolation('validation failed', null, [], null, null, null)]); |
| 475 | + $validator = $this->createMock(ValidatorInterface::class); |
| 476 | + $validator->expects($this->once())->method('validate')->willReturn($violationList); |
| 477 | + |
| 478 | + $middlewareStack = new \ArrayIterator([ |
| 479 | + new AddBusNameStampMiddleware('some.bus'), |
| 480 | + new ValidationMiddleware($validator), |
| 481 | + new SendMessageMiddleware($senderLocator), |
| 482 | + ]); |
| 483 | + |
| 484 | + $bus = new MessageBus($middlewareStack); |
| 485 | + |
| 486 | + $transport1Handler = fn () => $bus->dispatch(new \stdClass(), [new DispatchAfterCurrentBusStamp()]); |
| 487 | + |
| 488 | + $handlerLocator = new HandlersLocator([ |
| 489 | + DummyMessage::class => [new HandlerDescriptor($transport1Handler)], |
| 490 | + ]); |
| 491 | + |
| 492 | + $middlewareStack->append(new HandleMessageMiddleware($handlerLocator)); |
| 493 | + |
| 494 | + $dispatcher = new EventDispatcher(); |
| 495 | + |
| 496 | + $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator)); |
| 497 | + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); |
| 498 | + |
| 499 | + $runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable { |
| 500 | + $throwable = null; |
| 501 | + $failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) { |
| 502 | + $throwable = $event->getThrowable(); |
| 503 | + }; |
| 504 | + $dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener); |
| 505 | + |
| 506 | + $worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher); |
| 507 | + |
| 508 | + $worker->run(); |
| 509 | + |
| 510 | + $dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener); |
| 511 | + |
| 512 | + return $throwable; |
| 513 | + }; |
| 514 | + |
| 515 | + // Simulate receive from external source |
| 516 | + $transport1->send(new Envelope(new DummyMessage('API'))); |
| 517 | + |
| 518 | + // Receive the message from "transport1" |
| 519 | + $throwable = $runWorker('transport1'); |
| 520 | + |
| 521 | + $this->assertInstanceOf(ValidationFailedException::class, $throwable, $throwable->getMessage()); |
| 522 | + |
| 523 | + $messagesWaiting = $transport1->getMessagesWaitingToBeReceived(); |
| 524 | + |
| 525 | + // Stamps should not be dropped on message that's queued for retry |
| 526 | + $this->assertCount(1, $messagesWaiting); |
| 527 | + $this->assertSame('some.bus', $messagesWaiting[0]->last(BusNameStamp::class)?->getBusName()); |
| 528 | + } |
443 | 529 | }
|
444 | 530 |
|
445 | 531 | class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface
|
|
0 commit comments