Skip to content

Commit ff63bb3

Browse files
committed
feature #32783 [Messenger] InMemoryTransport handle acknowledged and rejected messages (tienvx)
This PR was merged into the 4.4 branch. Discussion ---------- [Messenger] InMemoryTransport handle acknowledged and rejected messages | Q | A | ------------- | --- | Branch? | 4.4 <!-- see below --> | Bug fix? | no | New feature? | yes <!-- please update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | Fixed tickets | #... <!-- #-prefixed issue number(s), if any --> | License | MIT | Doc PR | symfony/symfony-docs/pull/12045 <!-- required for new features --> This PR do 2 things: * Limit receiving messages from InMemoryTransport to 1 (reverted, another PR will fix the bug: worker does not stop when receiver return more messages than the number specify by the `--limit` option) * Handle acknowledged and rejected messages in InMemoryTransport. Currently, it does not care about acknowledged and rejected messages. So it always return all messages that have been sent. So if we run console command `messenger:consume`, it will never stop, even though we set the `--limit` option. For more information, please check the [reproduction](https://github.com/tienvx/symfony-messenger-in-memory-reproduction) project for the expected behavior. See also my [messenger-memory-transport](https://github.com/tienvx/messenger-memory-transport) project Commits ------- 71e7bdf [Messenger] InMemoryTransport handle acknowledged and rejected messages
2 parents 9130c6a + 71e7bdf commit ff63bb3

File tree

3 files changed

+37
-3
lines changed

3 files changed

+37
-3
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
88
pass a `RoutableMessageBus` instance instead.
99
* Added support for auto trimming of Redis streams.
10+
* `InMemoryTransport` handle acknowledged and rejected messages.
1011

1112
4.3.0
1213
-----

src/Symfony/Component/Messenger/Tests/Transport/InMemoryTransportTest.php

+15-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,20 @@ public function testSend()
3434
{
3535
$envelope = new Envelope(new \stdClass());
3636
$this->transport->send($envelope);
37-
$this->assertSame([$envelope], $this->transport->get());
37+
$this->assertSame([$envelope], $this->transport->getSent());
38+
}
39+
40+
public function testQueue()
41+
{
42+
$envelope1 = new Envelope(new \stdClass());
43+
$this->transport->send($envelope1);
44+
$envelope2 = new Envelope(new \stdClass());
45+
$this->transport->send($envelope2);
46+
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
47+
$this->transport->ack($envelope1);
48+
$this->assertSame([$envelope2], $this->transport->get());
49+
$this->transport->reject($envelope2);
50+
$this->assertSame([], $this->transport->get());
3851
}
3952

4053
public function testAck()
@@ -63,5 +76,6 @@ public function testReset()
6376
$this->assertEmpty($this->transport->get(), 'Should be empty after reset');
6477
$this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset');
6578
$this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset');
79+
$this->assertEmpty($this->transport->getSent(), 'Should be empty after reset');
6680
}
6781
}

src/Symfony/Component/Messenger/Transport/InMemoryTransport.php

+21-2
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,17 @@ class InMemoryTransport implements TransportInterface, ResetInterface
3636
*/
3737
private $rejected = [];
3838

39+
/**
40+
* @var Envelope[]
41+
*/
42+
private $queue = [];
43+
3944
/**
4045
* {@inheritdoc}
4146
*/
4247
public function get(): iterable
4348
{
44-
return $this->sent;
49+
return array_values($this->queue);
4550
}
4651

4752
/**
@@ -50,6 +55,8 @@ public function get(): iterable
5055
public function ack(Envelope $envelope): void
5156
{
5257
$this->acknowledged[] = $envelope;
58+
$id = spl_object_hash($envelope);
59+
unset($this->queue[$id]);
5360
}
5461

5562
/**
@@ -58,6 +65,8 @@ public function ack(Envelope $envelope): void
5865
public function reject(Envelope $envelope): void
5966
{
6067
$this->rejected[] = $envelope;
68+
$id = spl_object_hash($envelope);
69+
unset($this->queue[$id]);
6170
}
6271

6372
/**
@@ -66,13 +75,15 @@ public function reject(Envelope $envelope): void
6675
public function send(Envelope $envelope): Envelope
6776
{
6877
$this->sent[] = $envelope;
78+
$id = spl_object_hash($envelope);
79+
$this->queue[$id] = $envelope;
6980

7081
return $envelope;
7182
}
7283

7384
public function reset()
7485
{
75-
$this->sent = $this->rejected = $this->acknowledged = [];
86+
$this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
7687
}
7788

7889
/**
@@ -90,4 +101,12 @@ public function getRejected(): array
90101
{
91102
return $this->rejected;
92103
}
104+
105+
/**
106+
* @return Envelope[]
107+
*/
108+
public function getSent(): array
109+
{
110+
return $this->sent;
111+
}
93112
}

0 commit comments

Comments
 (0)