Skip to content

Commit 6ffad7b

Browse files
committed
feature #30759 [Messenger] Adding the "sync" transport to call handlers synchronously (weaverryan)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Adding the "sync" transport to call handlers synchronously | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | none | License | MIT | Doc PR | symfony/symfony-docs#11236 This adds a `sync://` transport that just calls the handlers immediately. Why? This allows you to route your messages to some "async" transport. But then, when developing locally or running your tests, you can choose to run them synchronously instead: ```yml # config/packages/messenger.yaml framework: messenger: transports: async: '%env(MESSENGER_TRANSPORT_DSN)%' routing: 'App\Message\SmsNotification': async 'App\Message\OtherMessage': async ``` ``` # .env # by default, handle this sync MESSENGER_TRANSPORT_DSN=sync:// ``` ``` # .env.local on production (or set this via real env vars) # on production, use amqp MESSENGER_TRANSPORT_DSN=amqp://....... ``` Cheers! Commits ------- 3da5a43 Adding the "sync" transport to call handlers synchronously
2 parents 3edb79f + 3da5a43 commit 6ffad7b

File tree

7 files changed

+144
-2
lines changed

7 files changed

+144
-2
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@
6767
<argument type="service" id="messenger.transport.serializer" />
6868
</service>
6969

70+
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
71+
<tag name="messenger.transport_factory" />
72+
</service>
73+
7074
<!-- retry -->
7175
<service id="messenger.retry_strategy_locator">
7276
<tag name="container.service_locator" />

src/Symfony/Component/Messenger/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
8+
explicitly handle messages asynchronously.
79
* Added optional parameter `prefetch_count` in connection configuration,
810
to setup channel prefetch count
911
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`

src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php

+8-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
18+
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
1819
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1920
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2021
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -81,7 +82,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
8182
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
8283
}
8384

84-
// on a redelivery, never call local handlers
85+
// if the message was marked (usually by SyncTransport) that it handlers
86+
// MUST be called, mark them to be handled.
87+
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
88+
$handle = true;
89+
}
90+
91+
// on a redelivery, only send back to queue: never call local handlers
8592
if (null !== $redeliveryStamp) {
8693
$handle = false;
8794
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* Stamp marks that the handlers *should* be called immediately.
16+
*
17+
* This is used by the SyncTransport to indicate to the
18+
* SendMessageMiddleware that handlers *should* be called
19+
* immediately, even though a transport was set.
20+
*
21+
* @experimental in 4.3
22+
*
23+
* @author Ryan Weaver <ryan@symfonycasts.com>
24+
*/
25+
class ForceCallHandlersStamp implements StampInterface
26+
{
27+
}

src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php

+18-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
1616
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
17+
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
1718
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1819
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1920
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -86,6 +87,8 @@ public function testItSendsTheMessageToMultipleSenders()
8687
public function testItSendsToOnlyOneSenderOnRedelivery()
8788
{
8889
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
90+
// even with a ForceCallHandlersStamp, the next middleware won't be called
91+
$envelope = $envelope->with(new ForceCallHandlersStamp());
8992
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
9093
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
9194

@@ -237,7 +240,7 @@ public function testItDoesNotDispatchWithNoSenders()
237240
$middleware->handle($envelope, $this->getStackMock());
238241
}
239242

240-
public function testItDoesNotDispatchOnRetry()
243+
public function testItDoesNotDispatchOnRedeliver()
241244
{
242245
$envelope = new Envelope(new DummyMessage('original envelope'));
243246
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
@@ -251,4 +254,18 @@ public function testItDoesNotDispatchOnRetry()
251254

252255
$middleware->handle($envelope, $this->getStackMock(false));
253256
}
257+
258+
public function testItHandlesWithForceCallHandlersStamp()
259+
{
260+
$envelope = new Envelope(new DummyMessage('original envelope'));
261+
$envelope = $envelope->with(new ForceCallHandlersStamp());
262+
263+
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
264+
$sender->expects($this->once())->method('send')->willReturn($envelope);
265+
266+
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
267+
268+
// next handler *should* be called
269+
$middleware->handle($envelope, $this->getStackMock(true));
270+
}
254271
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Sync;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
17+
use Symfony\Component\Messenger\Transport\TransportInterface;
18+
19+
/**
20+
* A "fake" transport that marks messages to be handled immediately.
21+
*
22+
* @experimental in 4.3
23+
*
24+
* @author Ryan Weaver <ryan@symfonycasts.com>
25+
*/
26+
class SyncTransport implements TransportInterface
27+
{
28+
public function receive(callable $handler): void
29+
{
30+
throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.');
31+
}
32+
33+
public function stop(): void
34+
{
35+
throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.');
36+
}
37+
38+
public function ack(Envelope $envelope): void
39+
{
40+
throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.');
41+
}
42+
43+
public function reject(Envelope $envelope): void
44+
{
45+
throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.');
46+
}
47+
48+
public function send(Envelope $envelope): Envelope
49+
{
50+
return $envelope->with(new ForceCallHandlersStamp());
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Sync;
13+
14+
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
15+
use Symfony\Component\Messenger\Transport\TransportInterface;
16+
17+
/**
18+
* @experimental in 4.3
19+
*
20+
* @author Ryan Weaver <ryan@symfonycasts.com>
21+
*/
22+
class SyncTransportFactory implements TransportFactoryInterface
23+
{
24+
public function createTransport(string $dsn, array $options): TransportInterface
25+
{
26+
return new SyncTransport();
27+
}
28+
29+
public function supports(string $dsn, array $options): bool
30+
{
31+
return 0 === strpos($dsn, 'sync://');
32+
}
33+
}

0 commit comments

Comments
 (0)