Skip to content

Commit f0b2acd

Browse files
committed
Allows to register handlers on a specific transport (and get rid of this handler alias)
1 parent 96a7907 commit f0b2acd

23 files changed

+426
-209
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ CHANGELOG
8181
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
8282
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
8383
* [BC BREAK] A `HandlerFailedException` exception will be thrown if one or more handler fails.
84+
* [BC BREAK] The `HandlersLocationInterface::getHandlers` method needs to return `HandlerDescriptor`
85+
instances instead of callables.
86+
* [BC BREAK] The `HandledStamp` stamp has changed: `handlerAlias` has been renamed to `handlerName`,
87+
`getCallableName` has been removed and its constructor only has 2 arguments now.
88+
* [BC BREAK] The `ReceivedStamp` needs to exposes the name of the transport from which the message
89+
has been received.
8490

8591
4.2.0
8692
-----

src/Symfony/Component/Messenger/Command/DebugCommand.php

+17-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
8484
foreach ($handlersByMessage as $message => $handlers) {
8585
$tableRows[] = [sprintf('<fg=cyan>%s</fg=cyan>', $message)];
8686
foreach ($handlers as $handler) {
87-
$tableRows[] = [sprintf(' handled by <info>%s</>', $handler)];
87+
$tableRows[] = [
88+
sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
89+
];
8890
}
8991
}
9092

@@ -97,4 +99,18 @@ protected function execute(InputInterface $input, OutputInterface $output)
9799
}
98100
}
99101
}
102+
103+
private function formatConditions(array $options): string
104+
{
105+
if (!$options) {
106+
return '';
107+
}
108+
109+
$optionsMapping = [];
110+
foreach ($options as $key => $value) {
111+
$optionsMapping[] = ' '.$key.'='.$value;
112+
}
113+
114+
return ' (when'.implode(', ', $optionsMapping).')';
115+
}
100116
}

src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

+28-21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Symfony\Component\DependencyInjection\Definition;
2020
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
2121
use Symfony\Component\DependencyInjection\Reference;
22+
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2223
use Symfony\Component\Messenger\Handler\HandlersLocator;
2324
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
2425
use Symfony\Component\Messenger\TraceableMessageBus;
@@ -94,32 +95,33 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
9495
$message = null;
9596
$handlerBuses = (array) ($tag['bus'] ?? $busIds);
9697

97-
foreach ($handles as $message => $method) {
98+
foreach ($handles as $message => $options) {
9899
$buses = $handlerBuses;
100+
99101
if (\is_int($message)) {
100-
$message = $method;
101-
$method = '__invoke';
102+
if (\is_string($options)) {
103+
$message = $options;
104+
$options = [];
105+
} else {
106+
throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', \gettype($options), $message, $serviceId));
107+
}
102108
}
103109

104-
if (\is_array($message)) {
105-
list($message, $priority) = $message;
106-
} else {
107-
$priority = $tag['priority'] ?? 0;
110+
if (\is_string($options)) {
111+
$options = ['method' => $options];
108112
}
109113

110-
if (\is_array($method)) {
111-
if (isset($method['bus'])) {
112-
if (!\in_array($method['bus'], $busIds)) {
113-
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
114+
$priority = $tag['priority'] ?? $options['priority'] ?? 0;
115+
$method = $options['method'] ?? '__invoke';
114116

115-
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $method['bus']));
116-
}
117+
if (isset($options['bus'])) {
118+
if (!\in_array($options['bus'], $busIds)) {
119+
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
117120

118-
$buses = [$method['bus']];
121+
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
119122
}
120123

121-
$priority = $method['priority'] ?? $priority;
122-
$method = $method['method'] ?? '__invoke';
124+
$buses = [$options['bus']];
123125
}
124126

125127
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@@ -141,7 +143,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
141143
}
142144

143145
foreach ($buses as $handlerBus) {
144-
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = $definitionId;
146+
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options];
145147
}
146148
}
147149

@@ -154,15 +156,20 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
154156
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
155157
foreach ($handlersByMessage as $message => $handlersByPriority) {
156158
krsort($handlersByPriority);
157-
$handlersByBusAndMessage[$bus][$message] = array_unique(array_merge(...$handlersByPriority));
159+
$handlersByBusAndMessage[$bus][$message] = array_merge(...$handlersByPriority);
158160
}
159161
}
160162

161163
$handlersLocatorMappingByBus = [];
162164
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
163-
foreach ($handlersByMessage as $message => $handlerIds) {
164-
$handlers = array_map(function (string $handlerId) { return new Reference($handlerId); }, $handlerIds);
165-
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlers);
165+
foreach ($handlersByMessage as $message => $handlers) {
166+
$handlerDescriptors = [];
167+
foreach ($handlers as $handler) {
168+
$definitions[$definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]]);
169+
$handlerDescriptors[] = new Reference($definitionId);
170+
}
171+
172+
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlerDescriptors);
166173
}
167174
}
168175
$container->addDefinitions($definitions);

src/Symfony/Component/Messenger/HandleTrait.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private function handle($message)
5252

5353
if (\count($handledStamps) > 1) {
5454
$handlers = implode(', ', array_map(function (HandledStamp $stamp): string {
55-
return sprintf('"%s"', $stamp->getHandlerAlias() ?? $stamp->getCallableName());
55+
return sprintf('"%s"', $stamp->getHandlerName());
5656
}, $handledStamps));
5757

5858
throw new LogicException(sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__, \count($handledStamps), $handlers));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* Describes a handler and the possible associated options, such as `from_transport`, `bus`, etc.
16+
*
17+
* @author Samuel Roze <samuel.roze@gmail.com>
18+
*
19+
* @experimental in 4.3
20+
*/
21+
final class HandlerDescriptor
22+
{
23+
private $handler;
24+
private $options;
25+
26+
public function __construct(callable $handler, array $options = [])
27+
{
28+
$this->handler = $handler;
29+
$this->options = $options;
30+
}
31+
32+
public function getHandler(): callable
33+
{
34+
return $this->handler;
35+
}
36+
37+
public function getName(): string
38+
{
39+
$name = $this->callableName($this->handler);
40+
$alias = $this->options['alias'] ?? null;
41+
42+
if (null !== $alias) {
43+
$name .= '@'.$alias;
44+
}
45+
46+
return $name;
47+
}
48+
49+
public function getOption(string $option)
50+
{
51+
return $this->options[$option] ?? null;
52+
}
53+
54+
private function callableName(callable $handler)
55+
{
56+
if (\is_array($handler)) {
57+
if (\is_object($handler[0])) {
58+
return \get_class($handler[0]).'::'.$handler[1];
59+
}
60+
61+
return $handler[0].'::'.$handler[1];
62+
}
63+
64+
if (\is_string($handler)) {
65+
return $handler;
66+
}
67+
68+
if ($handler instanceof \Closure) {
69+
$r = new \ReflectionFunction($handler);
70+
if (false !== strpos($r->name, '{closure}')) {
71+
return 'Closure';
72+
}
73+
if ($class = $r->getClosureScopeClass()) {
74+
return $class->name.'::'.$r->name;
75+
}
76+
77+
return $r->name;
78+
}
79+
80+
return \get_class($handler).'::__invoke';
81+
}
82+
}

src/Symfony/Component/Messenger/Handler/HandlersLocator.php

+32-4
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
namespace Symfony\Component\Messenger\Handler;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1516

1617
/**
1718
* Maps a message to a list of handlers.
1819
*
1920
* @author Nicolas Grekas <p@tchwork.com>
21+
* @author Samuel Roze <samuel.roze@gmail.com>
2022
*
2123
* @experimental in 4.2
2224
*/
@@ -25,7 +27,7 @@ class HandlersLocator implements HandlersLocatorInterface
2527
private $handlers;
2628

2729
/**
28-
* @param callable[][] $handlers
30+
* @param HandlerDescriptor[][]|callable[][] $handlers
2931
*/
3032
public function __construct(array $handlers)
3133
{
@@ -40,10 +42,23 @@ public function getHandlers(Envelope $envelope): iterable
4042
$seen = [];
4143

4244
foreach (self::listTypes($envelope) as $type) {
43-
foreach ($this->handlers[$type] ?? [] as $alias => $handler) {
44-
if (!\in_array($handler, $seen, true)) {
45-
yield $alias => $seen[] = $handler;
45+
foreach ($this->handlers[$type] ?? [] as $handlerDescriptor) {
46+
if (\is_callable($handlerDescriptor)) {
47+
$handlerDescriptor = new HandlerDescriptor($handlerDescriptor);
4648
}
49+
50+
if (!$this->shouldHandle($envelope, $handlerDescriptor)) {
51+
continue;
52+
}
53+
54+
$name = $handlerDescriptor->getName();
55+
if (\in_array($name, $seen)) {
56+
continue;
57+
}
58+
59+
$seen[] = $name;
60+
61+
yield $handlerDescriptor;
4762
}
4863
}
4964
}
@@ -60,4 +75,17 @@ public static function listTypes(Envelope $envelope): array
6075
+ class_implements($class)
6176
+ ['*' => '*'];
6277
}
78+
79+
private function shouldHandle(Envelope $envelope, HandlerDescriptor $handlerDescriptor)
80+
{
81+
if (null === $received = $envelope->last(ReceivedStamp::class)) {
82+
return true;
83+
}
84+
85+
if (null === $expectedTransport = $handlerDescriptor->getOption('from_transport')) {
86+
return true;
87+
}
88+
89+
return $received->getTransportName() === $expectedTransport;
90+
}
6391
}

src/Symfony/Component/Messenger/Handler/HandlersLocatorInterface.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ interface HandlersLocatorInterface
2525
/**
2626
* Returns the handlers for the given message name.
2727
*
28-
* @return iterable|callable[] Indexed by handler alias if available
28+
* @return iterable|HandlerDescriptor[] Indexed by handler alias if available
2929
*/
3030
public function getHandlers(Envelope $envelope): iterable;
3131
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

+9-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
1818
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
19+
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
1920
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
2021
use Symfony\Component\Messenger\Stamp\HandledStamp;
2122

@@ -54,17 +55,16 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5455
];
5556

5657
$exceptions = [];
57-
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
58-
$alias = \is_string($alias) ? $alias : null;
59-
60-
if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
58+
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
59+
if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
6160
continue;
6261
}
6362

6463
try {
65-
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
64+
$handler = $handlerDescriptor->getHandler();
65+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
6666
$envelope = $envelope->with($handledStamp);
67-
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
67+
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getHandlerName()]);
6868
} catch (\Throwable $e) {
6969
$exceptions[] = $e;
7070
}
@@ -85,12 +85,11 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
8585
return $stack->next()->handle($envelope, $stack);
8686
}
8787

88-
private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
88+
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
8989
{
9090
$some = array_filter($envelope
91-
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
92-
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
93-
$stamp->getHandlerAlias() === $alias;
91+
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
92+
return $stamp->getHandlerName() === $handlerDescriptor->getName();
9493
});
9594

9695
return \count($some) > 0;

0 commit comments

Comments
 (0)