Skip to content

[Messenger] Re-introduce wrapped message configuration (with fix) #27182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@

use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class SendMessageMiddleware implements MiddlewareInterface
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $senderLocator;

Expand All @@ -32,17 +34,19 @@ public function __construct(SenderLocatorInterface $senderLocator)
*/
public function handle($message, callable $next)
{
if ($message instanceof ReceivedMessage) {
return $next($message->getMessage());
$envelope = Envelope::wrap($message);
if ($envelope->get(ReceivedMessage::class)) {
// It's a received message. Do not send it back:
return $next($message);
}

if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) {
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
foreach ($senders as $sender) {
if (null === $sender) {
continue;
}

$sender->send($message);
$sender->send($envelope);
}

if (!\in_array(null, $senders, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
namespace Symfony\Component\Messenger\Asynchronous\Transport;

use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\EnvelopeItemInterface;

/**
* Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify
* Marker config for a received message.
* This is mainly used by the `SendMessageMiddleware` middleware to identify
* a message should not be sent if it was just received.
*
* @see SendMessageMiddleware
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
final class ReceivedMessage
final class ReceivedMessage implements EnvelopeItemInterface
{
private $message;

public function __construct($message)
public function serialize()
{
$this->message = $message;
return '';
}

public function getMessage()
public function unserialize($serialized)
{
return $this->message;
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Asynchronous\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\ReceiverInterface;

/**
Expand All @@ -27,12 +28,12 @@ public function __construct(ReceiverInterface $decoratedConsumer)

public function receive(callable $handler): void
{
$this->decoratedReceiver->receive(function ($message) use ($handler) {
if (null !== $message) {
$message = new ReceivedMessage($message);
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
if (null !== $envelope) {
$envelope = $envelope->with(new ReceivedMessage());
}

$handler($message);
$handler($envelope);
});
}

Expand Down
89 changes: 89 additions & 0 deletions src/Symfony/Component/Messenger/Envelope.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

/**
* A message wrapped in an envelope with items (configurations, markers, ...).
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class Envelope
{
private $items = array();
private $message;

/**
* @param object $message
* @param EnvelopeItemInterface[] $items
*/
public function __construct($message, array $items = array())
{
$this->message = $message;
foreach ($items as $item) {
$this->items[\get_class($item)] = $item;
}
}

/**
* Wrap a message into an envelope if not already wrapped.
*
* @param Envelope|object $message
*/
public static function wrap($message): self
{
return $message instanceof self ? $message : new self($message);
}

/**
* @return Envelope a new Envelope instance with additional item
*/
public function with(EnvelopeItemInterface $item): self
{
$cloned = clone $this;

$cloned->items[\get_class($item)] = $item;

return $cloned;
}

public function withMessage($message): self
{
$cloned = clone $this;

$cloned->message = $message;

return $cloned;
}

public function get(string $itemFqcn): ?EnvelopeItemInterface
{
return $this->items[$itemFqcn] ?? null;
}

/**
* @return EnvelopeItemInterface[] indexed by fqcn
*/
public function all(): array
{
return $this->items;
}

/**
* @return object The original message contained in the envelope
*/
public function getMessage()
{
return $this->message;
}
}
23 changes: 23 additions & 0 deletions src/Symfony/Component/Messenger/EnvelopeAwareInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

/**
* A Messenger protagonist aware of the message envelope and its content.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeAwareInterface
{
}
24 changes: 24 additions & 0 deletions src/Symfony/Component/Messenger/EnvelopeItemInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

/**
* An envelope item related to a message.
* This item must be serializable for transport.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
interface EnvelopeItemInterface extends \Serializable
{
}
19 changes: 15 additions & 4 deletions src/Symfony/Component/Messenger/MessageBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public function dispatch($message)
throw new InvalidArgumentException(sprintf('Invalid type for message argument. Expected object, but got "%s".', \gettype($message)));
}

return \call_user_func($this->callableForNextMiddleware(0), $message);
return \call_user_func($this->callableForNextMiddleware(0, Envelope::wrap($message)), $message);
}

private function callableForNextMiddleware(int $index): callable
private function callableForNextMiddleware(int $index, Envelope $currentEnvelope): callable
{
if (null === $this->indexedMiddlewareHandlers) {
$this->indexedMiddlewareHandlers = \is_array($this->middlewareHandlers) ? array_values($this->middlewareHandlers) : iterator_to_array($this->middlewareHandlers, false);
Expand All @@ -59,8 +59,19 @@ private function callableForNextMiddleware(int $index): callable

$middleware = $this->indexedMiddlewareHandlers[$index];

return function ($message) use ($middleware, $index) {
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1));
return function ($message) use ($middleware, $index, $currentEnvelope) {
if ($message instanceof Envelope) {
$currentEnvelope = $message;
} else {
$message = $currentEnvelope->withMessage($message);
}

if (!$middleware instanceof EnvelopeAwareInterface) {
// Do not provide the envelope if the middleware cannot read it:
$message = $message->getMessage();
}

return $middleware->handle($message, $this->callableForNextMiddleware($index + 1, $currentEnvelope));
};
}
}
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/MessageBusInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface MessageBusInterface
*
* The bus can return a value coming from handlers, but is not required to do so.
*
* @param object $message
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
*
* @return mixed
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Middleware\Configuration;

use Symfony\Component\Messenger\EnvelopeItemInterface;
use Symfony\Component\Validator\Constraints\GroupSequence;

/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*
* @experimental in 4.1
*/
final class ValidationConfiguration implements EnvelopeItemInterface
{
private $groups;

/**
* @param string[]|GroupSequence $groups
*/
public function __construct($groups)
{
$this->groups = $groups;
}

public function getGroups()
{
return $this->groups;
}

public function serialize()
{
$isGroupSequence = $this->groups instanceof GroupSequence;

return serialize(array(
'groups' => $isGroupSequence ? $this->groups->groups : $this->groups,
'is_group_sequence' => $isGroupSequence,
));
}

public function unserialize($serialized)
{
list(
'groups' => $groups,
'is_group_sequence' => $isGroupSequence
) = unserialize($serialized, array('allowed_classes' => false));

$this->__construct($isGroupSequence ? new GroupSequence($groups) : $groups);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Psr\Log\LoggerInterface;

/**
Expand Down Expand Up @@ -51,10 +50,6 @@ public function handle($message, callable $next)

private function createContext($message): array
{
if ($message instanceof ReceivedMessage) {
$message = $message->getMessage();
}

return array(
'message' => $message,
'class' => \get_class($message),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Exception\ValidationFailedException;
use Symfony\Component\Messenger\Middleware\Configuration\ValidationConfiguration;
use Symfony\Component\Validator\Validator\ValidatorInterface;

/**
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class ValidationMiddleware implements MiddlewareInterface
class ValidationMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $validator;

Expand All @@ -28,9 +31,17 @@ public function __construct(ValidatorInterface $validator)

public function handle($message, callable $next)
{
$violations = $this->validator->validate($message);
$envelope = Envelope::wrap($message);
$subject = $envelope->getMessage();
$groups = null;
/** @var ValidationConfiguration|null $validationConfig */
if ($validationConfig = $envelope->get(ValidationConfiguration::class)) {
$groups = $validationConfig->getGroups();
}

$violations = $this->validator->validate($subject, null, $groups);
if (\count($violations)) {
throw new ValidationFailedException($message, $violations);
throw new ValidationFailedException($subject, $violations);
}

return $next($message);
Expand Down
Loading