Skip to content

[Messenger] use Envelope internally, return void, add EnvelopeHandlerInterface and other cleanups #28881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Tobias Schultze <http://tobion.de>
*/
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
class SendMessageMiddleware implements MiddlewareInterface
{
private $senderLocator;
private $messagesToSendAndHandleMapping;
Expand All @@ -34,32 +33,28 @@ public function __construct(SenderLocatorInterface $senderLocator, array $messag
}

/**
* @param Envelope $envelope
*
* {@inheritdoc}
*/
public function handle($envelope, callable $next)
public function handle(Envelope $envelope, callable $next): void
{
if ($envelope->get(ReceivedMessage::class)) {
// It's a received message. Do not send it back:
return $next($envelope);
$next($envelope);

return;
}

$sender = $this->senderLocator->getSenderForMessage($envelope->getMessage());
$sender = $this->senderLocator->getSender($envelope);

if ($sender) {
$sender->send($envelope);

if (!$this->mustSendAndHandle($envelope->getMessage())) {
if (!AbstractSenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $envelope)) {
// message has no corresponding handler
return;
}
}

return $next($envelope);
}

private function mustSendAndHandle($message): bool
{
return (bool) AbstractSenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $message);
$next($envelope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,39 @@

namespace Symfony\Component\Messenger\Asynchronous\Routing;

use Symfony\Component\Messenger\Envelope;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @internal
*/
abstract class AbstractSenderLocator implements SenderLocatorInterface
{
public static function getValueFromMessageRouting(array $mapping, $message)
public static function getValueFromMessageRouting(array $mapping, Envelope $envelope)
{
if (isset($mapping[\get_class($message)])) {
return $mapping[\get_class($message)];
$name = $envelope->getMessageName();

if (null !== $name && isset($mapping[$name])) {
return $mapping[$name];
}
if ($parentsMapping = array_intersect_key($mapping, class_parents($message))) {
return current($parentsMapping);

if (isset($mapping[$class = \get_class($envelope->getMessage())])) {
return $mapping[$class];
}
if ($interfaceMapping = array_intersect_key($mapping, class_implements($message))) {
return current($interfaceMapping);

foreach (class_parents($class) as $name) {
if (isset($mapping[$name])) {
return $mapping[$name];
}
}
if (isset($mapping['*'])) {
return $mapping['*'];

foreach (class_implements($class) as $name) {
if (isset($mapping[$name])) {
return $mapping[$name];
}
}

return null;
return $mapping['*'] ?? null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Asynchronous\Routing;

use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;

/**
Expand All @@ -31,9 +32,9 @@ public function __construct(ContainerInterface $senderServiceLocator, array $mes
/**
* {@inheritdoc}
*/
public function getSenderForMessage($message): ?SenderInterface
public function getSender(Envelope $envelope): ?SenderInterface
{
$senderId = self::getValueFromMessageRouting($this->messageToSenderIdMapping, $message);
$senderId = self::getValueFromMessageRouting($this->messageToSenderIdMapping, $envelope);

return $senderId ? $this->senderServiceLocator->get($senderId) : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Asynchronous\Routing;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Transport\SenderInterface;

Expand All @@ -29,15 +30,15 @@ public function __construct(array $messageToSenderMapping)
/**
* {@inheritdoc}
*/
public function getSenderForMessage($message): ?SenderInterface
public function getSender(Envelope $envelope): ?SenderInterface
{
$sender = self::getValueFromMessageRouting($this->messageToSenderMapping, $message);
$sender = self::getValueFromMessageRouting($this->messageToSenderMapping, $envelope);
if (null === $sender) {
return null;
}

if (!$sender instanceof SenderInterface) {
throw new RuntimeException(sprintf('The sender instance provided for message "%s" should be of type "%s" but got "%s".', \get_class($message), SenderInterface::class, \is_object($sender) ? \get_class($sender) : \gettype($sender)));
throw new RuntimeException(sprintf('The sender instance provided for message "%s" should be of type "%s" but got "%s".', $envelope->getMessageName() ?? \get_class($envelope->getMessage()), SenderInterface::class, \is_object($sender) ? \get_class($sender) : \gettype($sender)));
}

return $sender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Asynchronous\Routing;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface;

/**
Expand All @@ -21,10 +22,6 @@ interface SenderLocatorInterface
{
/**
* Gets the sender (if applicable) for the given message object.
*
* @param object $message
*
* @return SenderInterface|null
*/
public function getSenderForMessage($message): ?SenderInterface;
public function getSender(Envelope $envelope): ?SenderInterface;
}
7 changes: 7 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CHANGELOG
-----

* The component is not experimental anymore
* [BC BREAK] `MessageBusInterface::dispatch()` and `MiddlewareInterface::handle()` now return `void`
* [BC BREAK] The signature of `Amqp*` classes changed to take a `Connection` as a first argument and an optional
`Serializer` as a second argument.
* [BC BREAK] `SenderLocator` has been renamed to `ContainerSenderLocator`
Expand All @@ -19,6 +20,12 @@ CHANGELOG
* [BC BREAK] The `EncoderInterface` and `DecoderInterface` have been replaced by a unified `Symfony\Component\Messenger\Transport\Serialization\SerializerInterface`.
* [BC BREAK] The locator passed to `ContainerHandlerLocator` should not prefix its keys by "handler." anymore
* [BC BREAK] The `AbstractHandlerLocator::getHandler()` method uses `?callable` as return type
* [BC BREAK] `SenderLocatorInterface::getSenderForMessage()` has been replaced by `getSender(Envelope $envelope)`
* [BC BREAK] `MessengerDataCollector::getMessages()` returns an iterable, not just an array anymore
* [BC BREAK] `AbstractHandlerLocator` is now internal
* [BC BREAK] `HandlerLocatorInterface::resolve()` has been replaced by `getHandler(Envelope $envelope, bool $allowNoHandler = false)`
* [BC BREAK] `SenderLocatorInterface::getSenderForMessage()` has been replaced by `getSender(Envelope $envelope)`
* [BC BREAK] `SenderInterface::send()` returns `void`

4.1.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Symfony\Component\HttpKernel\DataCollector\LateDataCollectorInterface;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\VarDumper\Caster\ClassStub;
use Symfony\Component\VarDumper\Cloner\Data;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand Down Expand Up @@ -55,14 +54,10 @@ public function lateCollect()
}

// Order by call time
usort($messages, function (array $a, array $b): int {
return $a[1] > $b[1] ? 1 : -1;
});
usort($messages, function ($a, $b) { return $a[1] <=> $b[1]; });

// Keep the messages clones only
$this->data['messages'] = array_map(function (array $item): Data {
return $item[0];
}, $messages);
$this->data['messages'] = array_column($messages, 0);
}

/**
Expand Down Expand Up @@ -98,14 +93,6 @@ private function collectMessage(string $busName, array $tracedMessage)
'caller' => $tracedMessage['caller'],
);

if (array_key_exists('result', $tracedMessage)) {
$result = $tracedMessage['result'];
$debugRepresentation['result'] = array(
'type' => \is_object($result) ? \get_class($result) : \gettype($result),
'value' => $result,
);
}

if (isset($tracedMessage['exception'])) {
$exception = $tracedMessage['exception'];

Expand All @@ -120,18 +107,21 @@ private function collectMessage(string $busName, array $tracedMessage)

public function getExceptionsCount(string $bus = null): int
{
return array_reduce($this->getMessages($bus), function (int $carry, Data $message) {
return $carry += isset($message['exception']) ? 1 : 0;
}, 0);
$count = 0;
foreach ($this->getMessages($bus) as $message) {
$count += (int) isset($message['exception']);
}

return $count;
}

public function getMessages(string $bus = null): array
public function getMessages(string $bus = null): iterable
{
$messages = $this->data['messages'] ?? array();

return $bus ? array_filter($messages, function (Data $message) use ($bus): bool {
return $bus === $message['bus'];
}) : $messages;
foreach ($this->data['messages'] ?? array() as $message) {
if (null === $bus || $bus === $message['bus']) {
yield $message;
}
}
}

public function getBuses(): array
Expand Down
34 changes: 15 additions & 19 deletions src/Symfony/Component/Messenger/Envelope.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ final class Envelope
*/
public function __construct($message, array $items = array())
{
if (!\is_object($message)) {
throw new \TypeError(sprintf('Invalid argument provided to "%s()": expected object but got %s.', __METHOD__, \gettype($message)));
}
$this->message = $message;
foreach ($items as $item) {
$this->items[\get_class($item)] = $item;
Expand All @@ -38,9 +41,15 @@ public function __construct($message, array $items = array())
*
* @param Envelope|object $message
*/
public static function wrap($message): self
public static function wrap($message, string $name = null): self
{
return $message instanceof self ? $message : new self($message);
$envelope = $message instanceof self ? clone $message : new self($message);
if (null !== $name) {
return $envelope->with(new MessageConfiguration($name));
}
unset($envelope->items[MessageConfiguration::class]);

return $envelope;
}

/**
Expand All @@ -55,15 +64,6 @@ public function with(EnvelopeItemInterface $item): self
return $cloned;
}

public function withMessage($message): self
{
$cloned = clone $this;

$cloned->message = $message;

return $cloned;
}

public function get(string $itemFqcn): ?EnvelopeItemInterface
{
return $this->items[$itemFqcn] ?? null;
Expand All @@ -85,14 +85,10 @@ public function getMessage()
return $this->message;
}

/**
* @param object $target
*
* @return Envelope|object The original message or the envelope if the target supports it
* (i.e implements {@link EnvelopeAwareInterface}).
*/
public function getMessageFor($target)
public function getMessageName(): ?string
{
return $target instanceof EnvelopeAwareInterface ? $this : $this->message;
$config = $this->items[MessageConfiguration::class] ?? null;

return $config ? $config->getName() : null;
}
}
21 changes: 0 additions & 21 deletions src/Symfony/Component/Messenger/EnvelopeAwareInterface.php

This file was deleted.

11 changes: 5 additions & 6 deletions src/Symfony/Component/Messenger/Handler/ChainHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Handler;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;

/**
Expand All @@ -30,21 +31,19 @@ class ChainHandler
*/
public function __construct(array $handlers)
{
if (empty($handlers)) {
if (!$handlers) {
throw new InvalidArgumentException('A collection of message handlers requires at least one handler.');
}

$this->handlers = $handlers;
}

public function __invoke($message)
public function __invoke(Envelope $envelope)
{
$results = array();
$message = $envelope->getMessage();

foreach ($this->handlers as $handler) {
$results[] = $handler($message);
$handler($message);
}

return $results;
}
}
Loading