Skip to content

Commit 937f9cd

Browse files
authored
Merge pull request php-enqueue#7 from php-enqueue/amqp-fix-receive-wait-time
[bug][amqp-ext] Receive timeout parameter is miliseconds
2 parents b477918 + 6c6fe8c commit 937f9cd

File tree

2 files changed

+31
-7
lines changed

2 files changed

+31
-7
lines changed

pkg/amqp-ext/AmqpConsumer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public function receive($timeout = 0)
6868

6969
$originalTimeout = $extConnection->getReadTimeout();
7070
try {
71-
$extConnection->setReadTimeout($timeout);
71+
$extConnection->setReadTimeout($timeout / 1000);
7272

7373
if (false == $this->isInit) {
7474
$this->getExtQueue()->consume(null, AMQP_NOPARAM, $this->consumerId);

pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public function testWaitsForTwoSecondsAndReturnNullOnReceive()
4141
$startAt = microtime(true);
4242

4343
$consumer = $this->amqpContext->createConsumer($queue);
44-
$message = $consumer->receive(2);
44+
$message = $consumer->receive(2000);
4545

4646
$endAt = microtime(true);
4747

@@ -83,7 +83,7 @@ public function testProduceAndReceiveOneMessageSentDirectlyToQueue()
8383
$producer->send($queue, $message);
8484

8585
$consumer = $this->amqpContext->createConsumer($queue);
86-
$message = $consumer->receive(1);
86+
$message = $consumer->receive(1000);
8787

8888
$this->assertInstanceOf(AmqpMessage::class, $message);
8989
$consumer->acknowledge($message);
@@ -115,7 +115,7 @@ public function testProduceAndReceiveOneMessageSentDirectlyToTemporaryQueue()
115115
$producer->send($queue, $message);
116116

117117
$consumer = $this->amqpContext->createConsumer($queue);
118-
$message = $consumer->receive(1);
118+
$message = $consumer->receive(1000);
119119

120120
$this->assertInstanceOf(AmqpMessage::class, $message);
121121
$consumer->acknowledge($message);
@@ -140,7 +140,7 @@ public function testProduceAndReceiveOneMessageSentDirectlyToTopic()
140140
$producer->send($topic, $message);
141141

142142
$consumer = $this->amqpContext->createConsumer($queue);
143-
$message = $consumer->receive(1);
143+
$message = $consumer->receive(1000);
144144

145145
$this->assertInstanceOf(AmqpMessage::class, $message);
146146
$consumer->acknowledge($message);
@@ -157,13 +157,37 @@ public function testConsumerReceiveMessageFromTopicDirectly()
157157

158158
$consumer = $this->amqpContext->createConsumer($topic);
159159
//guard
160-
$this->assertNull($consumer->receive(1));
160+
$this->assertNull($consumer->receive(1000));
161161

162162
$message = $this->amqpContext->createMessage(__METHOD__);
163163

164164
$producer = $this->amqpContext->createProducer();
165165
$producer->send($topic, $message);
166-
$actualMessage = $consumer->receive(1);
166+
$actualMessage = $consumer->receive(1000);
167+
168+
$this->assertInstanceOf(AmqpMessage::class, $actualMessage);
169+
$consumer->acknowledge($message);
170+
171+
$this->assertEquals(__METHOD__, $message->getBody());
172+
}
173+
174+
public function testConsumerReceiveMessageWithZeroTimeout()
175+
{
176+
$topic = $this->amqpContext->createTopic('amqp_ext.test_exchange');
177+
$topic->setType(AMQP_EX_TYPE_FANOUT);
178+
179+
$this->amqpContext->declareTopic($topic);
180+
181+
$consumer = $this->amqpContext->createConsumer($topic);
182+
//guard
183+
$this->assertNull($consumer->receive(1000));
184+
185+
$message = $this->amqpContext->createMessage(__METHOD__);
186+
187+
$producer = $this->amqpContext->createProducer();
188+
$producer->send($topic, $message);
189+
usleep(100);
190+
$actualMessage = $consumer->receive(0);
167191

168192
$this->assertInstanceOf(AmqpMessage::class, $actualMessage);
169193
$consumer->acknowledge($message);

0 commit comments

Comments
 (0)