Skip to content

Commit f81f82b

Browse files
committed
Adding support for record messages
1 parent 5b08019 commit f81f82b

File tree

8 files changed

+397
-0
lines changed

8 files changed

+397
-0
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
<argument type="service" id="validator" />
3535
</service>
3636

37+
<service id="messenger.middleware.record_messages" class="Symfony\Component\Messenger\Middleware\HandleRecordedMessageMiddleware" abstract="true">
38+
<argument /> <!-- Message bus -->
39+
<argument type="service" id="messenger.recorder" />
40+
</service>
41+
3742
<!-- Logging -->
3843
<service id="messenger.middleware.logging" class="Symfony\Component\Messenger\Middleware\LoggingMiddleware" abstract="true">
3944
<argument type="service" id="logger" />
@@ -62,5 +67,11 @@
6267

6368
<tag name="messenger.transport_factory" />
6469
</service>
70+
71+
<!-- Message recorder -->
72+
<service id="messenger.recorder" class="Symfony\Component\Messenger\MessageRecorder">
73+
<tag name="kernel.reset" method="reset" />
74+
</service>
75+
<service id="Symfony\Component\Messenger\MessageRecorderInterface" alias="messenger.recorder" />
6576
</services>
6677
</container>
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* When handling messages, some handlers caused an exception. This exception
16+
* contains all those handler exceptions.
17+
*
18+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
19+
*/
20+
class MessageHandlingException extends \RuntimeException implements ExceptionInterface
21+
{
22+
private $exceptions = array();
23+
24+
public function __construct(array $exceptions)
25+
{
26+
$message = sprintf(
27+
"Some handlers for recorded messages threw an exception. Their messages were: \n\n%s",
28+
implode(", \n", array_map(function (\Throwable $e) {
29+
return $e->getMessage();
30+
}, $exceptions))
31+
);
32+
33+
$this->exceptions = $exceptions;
34+
parent::__construct($message);
35+
}
36+
37+
public function getExceptions(): array
38+
{
39+
return $this->exceptions;
40+
}
41+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Symfony\Contracts\Service\ResetInterface;
15+
16+
/**
17+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
18+
* @author Matthias Noback <matthiasnoback@gmail.com>
19+
*/
20+
class MessageRecorder implements MessageRecorderInterface, RecordedMessageCollectionInterface, ResetInterface
21+
{
22+
private $messages = array();
23+
24+
/**
25+
* {@inheritdoc}
26+
*/
27+
public function getRecordedMessages(): array
28+
{
29+
return $this->messages;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function resetRecordedMessages(): void
36+
{
37+
$this->reset();
38+
}
39+
40+
/**
41+
* {@inheritdoc}
42+
*/
43+
public function reset()
44+
{
45+
$this->messages = array();
46+
}
47+
48+
/**
49+
* {@inheritdoc}
50+
*/
51+
public function record($message): void
52+
{
53+
$this->messages[] = $message;
54+
}
55+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
/**
15+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
16+
* @author Matthias Noback <matthiasnoback@gmail.com>
17+
*/
18+
interface MessageRecorderInterface
19+
{
20+
/**
21+
* Record a message.
22+
*
23+
* @param object $message
24+
*/
25+
public function record($message);
26+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Exception\MessageHandlingException;
15+
use Symfony\Component\Messenger\MessageBusInterface;
16+
use Symfony\Component\Messenger\RecordedMessageCollectionInterface;
17+
18+
/**
19+
* A middleware that takes all recorded messages and dispatch them to the bus.
20+
*
21+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
22+
* @author Matthias Noback <matthiasnoback@gmail.com>
23+
*/
24+
class HandleRecordedMessageMiddleware implements MiddlewareInterface
25+
{
26+
private $messageRecorder;
27+
private $messageBus;
28+
29+
public function __construct(MessageBusInterface $messageBus, RecordedMessageCollectionInterface $messageRecorder)
30+
{
31+
$this->messageRecorder = $messageRecorder;
32+
$this->messageBus = $messageBus;
33+
}
34+
35+
public function handle($message, callable $next)
36+
{
37+
// Make sure the recorder is empty before we begin
38+
$this->messageRecorder->resetRecordedMessages();
39+
40+
try {
41+
$returnData = $next($message);
42+
} catch (\Throwable $exception) {
43+
$this->messageRecorder->resetRecordedMessages();
44+
45+
throw $exception;
46+
}
47+
48+
$exceptions = array();
49+
while (!empty($recordedMessages = $this->messageRecorder->getRecordedMessages())) {
50+
$this->messageRecorder->resetRecordedMessages();
51+
// Assert: The message recorder is empty, all messages are in $recordedMessages
52+
53+
foreach ($recordedMessages as $recordedMessage) {
54+
try {
55+
$this->messageBus->dispatch($recordedMessage);
56+
} catch (\Throwable $exception) {
57+
$exceptions[] = $exception;
58+
}
59+
}
60+
}
61+
62+
if (!empty($exceptions)) {
63+
if (1 === \count($exceptions)) {
64+
throw $exceptions[0];
65+
}
66+
throw new MessageHandlingException($exceptions);
67+
}
68+
69+
return $returnData;
70+
}
71+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
/**
15+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
16+
* @author Matthias Noback <matthiasnoback@gmail.com>
17+
*/
18+
interface RecordedMessageCollectionInterface
19+
{
20+
/**
21+
* Fetch recorded messages.
22+
*
23+
* @return object[]
24+
*/
25+
public function getRecordedMessages(): array;
26+
27+
/**
28+
* Remove all recorded messages.
29+
*/
30+
public function resetRecordedMessages(): void;
31+
}

0 commit comments

Comments
 (0)