Skip to content

Commit 19a3c7f

Browse files
committed
Changing ReceiverInterface::get() to return an iterable
1 parent 9867d7c commit 19a3c7f

File tree

6 files changed

+82
-75
lines changed

6 files changed

+82
-75
lines changed

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,9 +612,9 @@ public function __invoke(DummyMessage $message): void
612612

613613
class DummyReceiver implements ReceiverInterface
614614
{
615-
public function get(string $queue = null): ?Envelope
615+
public function get(): iterable
616616
{
617-
return new Envelope(new DummyMessage('Dummy'));
617+
yield new Envelope(new DummyMessage('Dummy'));
618618
}
619619

620620
public function stop(): void

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ public function testWorkerDispatchTheReceivedMessage()
3535
$ipaMessage = new DummyMessage('IPA');
3636

3737
$receiver = new DummyReceiver([
38-
new Envelope($apiMessage),
39-
new Envelope($ipaMessage),
38+
[new Envelope($apiMessage), new Envelope($ipaMessage)]
4039
]);
4140

4241
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
@@ -58,7 +57,7 @@ public function testWorkerDispatchTheReceivedMessage()
5857
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
5958
{
6059
$envelope = new Envelope(new DummyMessage('API'));
61-
$receiver = new DummyReceiver([$envelope]);
60+
$receiver = new DummyReceiver([[$envelope]]);
6261
$envelope = $envelope->with(new ReceivedStamp());
6362

6463
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
@@ -77,7 +76,7 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
7776
public function testDispatchCausesRetry()
7877
{
7978
$receiver = new DummyReceiver([
80-
new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')),
79+
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))]
8180
]);
8281

8382
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
@@ -116,7 +115,7 @@ public function testDispatchCausesRetry()
116115
public function testDispatchCausesRejectWhenNoRetry()
117116
{
118117
$receiver = new DummyReceiver([
119-
new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')),
118+
[new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))]
120119
]);
121120

122121
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
@@ -139,7 +138,7 @@ public function testDispatchCausesRejectWhenNoRetry()
139138
public function testDispatchCausesRejectOnUnrecoverableMessage()
140139
{
141140
$receiver = new DummyReceiver([
142-
new Envelope(new DummyMessage('Hello')),
141+
[new Envelope(new DummyMessage('Hello'))],
143142
]);
144143

145144
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
@@ -180,7 +179,7 @@ public function testWorkerDoesNotSendNullMessagesToTheBus()
180179
public function testWorkerDispatchesEventsOnSuccess()
181180
{
182181
$envelope = new Envelope(new DummyMessage('Hello'));
183-
$receiver = new DummyReceiver([$envelope]);
182+
$receiver = new DummyReceiver([[$envelope]]);
184183

185184
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
186185
$bus->method('dispatch')->willReturn($envelope);
@@ -207,7 +206,7 @@ public function testWorkerDispatchesEventsOnSuccess()
207206
public function testWorkerDispatchesEventsOnError()
208207
{
209208
$envelope = new Envelope(new DummyMessage('Hello'));
210-
$receiver = new DummyReceiver([$envelope]);
209+
$receiver = new DummyReceiver([[$envelope]]);
211210

212211
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
213212
$exception = new \InvalidArgumentException('Oh no!');
@@ -235,18 +234,20 @@ public function testWorkerDispatchesEventsOnError()
235234

236235
class DummyReceiver implements ReceiverInterface
237236
{
238-
private $envelopes;
237+
private $deliveriesOfEnvelopes;
239238
private $acknowledgeCount = 0;
240239
private $rejectCount = 0;
241240

242-
public function __construct(array $envelopes)
241+
public function __construct(array $deliveriesOfEnvelopes)
243242
{
244-
$this->envelopes = $envelopes;
243+
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
245244
}
246245

247-
public function get(): ?Envelope
246+
public function get(): iterable
248247
{
249-
return array_shift($this->envelopes);
248+
$val = array_shift($this->deliveriesOfEnvelopes);
249+
250+
return null === $val ? [] : $val;
250251
}
251252

252253
public function ack(Envelope $envelope): void

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public function __construct(Connection $connection, SerializerInterface $seriali
4040
/**
4141
* {@inheritdoc}
4242
*/
43-
public function get(): ?Envelope
43+
public function get(): iterable
4444
{
4545
try {
4646
$amqpEnvelope = $this->connection->get();
@@ -49,7 +49,7 @@ public function get(): ?Envelope
4949
}
5050

5151
if (null === $amqpEnvelope) {
52-
return null;
52+
return [];
5353
}
5454

5555
try {
@@ -64,7 +64,7 @@ public function get(): ?Envelope
6464
throw $exception;
6565
}
6666

67-
return $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
67+
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
6868
}
6969

7070
public function ack(Envelope $envelope): void

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public function __construct(Connection $connection, SerializerInterface $seriali
3838
/**
3939
* {@inheritdoc}
4040
*/
41-
public function get(string $queue = null): ?Envelope
41+
public function get(): iterable
4242
{
43-
return ($this->receiver ?? $this->getReceiver())->get($queue);
43+
return ($this->receiver ?? $this->getReceiver())->get();
4444
}
4545

4646
/**

src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ interface ReceiverInterface
3030
* and a MessageDecodingFailedException should be thrown.
3131
*
3232
* @throws TransportException If there is an issue communicating with the transport
33+
* @return Envelope[]
3334
*/
34-
public function get(): ?Envelope;
35+
public function get(): iterable;
3536

3637
/**
3738
* Acknowledge that the passed message was handled.

src/Symfony/Component/Messenger/Worker.php

Lines changed: 59 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -79,74 +79,79 @@ public function run(callable $onHandledCallback = null): void
7979
};
8080

8181
while (false === $this->shouldStop) {
82-
$envelope = $this->receiver->get();
82+
$envelopes = $this->receiver->get();
8383

84-
if (null === $envelope) {
84+
$envelopeHandled = false;
85+
foreach ($envelopes as $envelope) {
86+
$envelopeHandled = true;
87+
88+
$this->handleMessage($envelope);
89+
$handled($envelope);
90+
}
91+
92+
if (false === $envelopeHandled) {
8593
$handled(null);
8694

8795
usleep(1000000);
88-
89-
continue;
9096
}
97+
}
98+
}
9199

92-
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
93-
94-
$message = $envelope->getMessage();
95-
$context = [
96-
'message' => $message,
97-
'class' => \get_class($message),
98-
];
99-
100-
try {
101-
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
102-
} catch (\Throwable $throwable) {
103-
$shouldRetry = $this->shouldRetry($throwable, $envelope);
104-
105-
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
106-
107-
if ($shouldRetry) {
108-
if (null === $this->retryStrategy) {
109-
// not logically allowed, but check just in case
110-
throw new LogicException('Retrying is not supported without a retry strategy.');
111-
}
112-
113-
$retryCount = $this->getRetryCount($envelope) + 1;
114-
if (null !== $this->logger) {
115-
$this->logger->info('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
116-
}
117-
118-
// add the delay and retry stamp info + remove ReceivedStamp
119-
$retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope)))
120-
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
121-
->withoutAll(ReceivedStamp::class);
122-
123-
// re-send the message
124-
$this->bus->dispatch($retryEnvelope);
125-
// acknowledge the previous message has received
126-
$this->receiver->ack($envelope);
127-
} else {
128-
if (null !== $this->logger) {
129-
$this->logger->info('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
130-
}
131-
132-
$this->receiver->reject($envelope);
133-
}
100+
private function handleMessage(Envelope $envelope)
101+
{
102+
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
134103

135-
$handled($envelope);
104+
$message = $envelope->getMessage();
105+
$context = [
106+
'message' => $message,
107+
'class' => \get_class($message),
108+
];
136109

137-
continue;
138-
}
110+
try {
111+
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
112+
} catch (\Throwable $throwable) {
113+
$shouldRetry = $this->shouldRetry($throwable, $envelope);
139114

140-
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
115+
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
141116

142-
if (null !== $this->logger) {
143-
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
117+
if ($shouldRetry) {
118+
if (null === $this->retryStrategy) {
119+
// not logically allowed, but check just in case
120+
throw new LogicException('Retrying is not supported without a retry strategy.');
121+
}
122+
123+
$retryCount = $this->getRetryCount($envelope) + 1;
124+
if (null !== $this->logger) {
125+
$this->logger->info('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
126+
}
127+
128+
// add the delay and retry stamp info + remove ReceivedStamp
129+
$retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope)))
130+
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
131+
->withoutAll(ReceivedStamp::class);
132+
133+
// re-send the message
134+
$this->bus->dispatch($retryEnvelope);
135+
// acknowledge the previous message has received
136+
$this->receiver->ack($envelope);
137+
} else {
138+
if (null !== $this->logger) {
139+
$this->logger->info('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
140+
}
141+
142+
$this->receiver->reject($envelope);
144143
}
145144

146-
$this->receiver->ack($envelope);
145+
return;
146+
}
147147

148-
$handled($envelope);
148+
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
149+
150+
if (null !== $this->logger) {
151+
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
149152
}
153+
154+
$this->receiver->ack($envelope);
150155
}
151156

152157
public function stop(): void

0 commit comments

Comments
 (0)