@@ -44,7 +44,7 @@ public function testWorkerDispatchTheReceivedMessage()
44
44
$ bus ->expects ($ this ->at (1 ))->method ('dispatch ' )->with ($ envelope = new Envelope ($ ipaMessage , new ReceivedStamp ()))->willReturn ($ envelope );
45
45
46
46
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' );
47
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
47
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
48
48
// stop after the messages finish
49
49
if (null === $ envelope ) {
50
50
$ worker ->stop ();
@@ -65,7 +65,7 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
65
65
$ retryStrategy = $ this ->getMockBuilder (RetryStrategyInterface::class)->getMock ();
66
66
67
67
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' , $ retryStrategy );
68
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
68
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
69
69
// stop after the messages finish
70
70
if (null === $ envelope ) {
71
71
$ worker ->stop ();
@@ -101,7 +101,7 @@ public function testDispatchCausesRetry()
101
101
$ retryStrategy ->expects ($ this ->once ())->method ('isRetryable ' )->willReturn (true );
102
102
103
103
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' , $ retryStrategy );
104
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
104
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
105
105
// stop after the messages finish
106
106
if (null === $ envelope ) {
107
107
$ worker ->stop ();
@@ -125,7 +125,7 @@ public function testDispatchCausesRejectWhenNoRetry()
125
125
$ retryStrategy ->expects ($ this ->once ())->method ('isRetryable ' )->willReturn (false );
126
126
127
127
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' , $ retryStrategy );
128
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
128
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
129
129
// stop after the messages finish
130
130
if (null === $ envelope ) {
131
131
$ worker ->stop ();
@@ -148,7 +148,7 @@ public function testDispatchCausesRejectOnUnrecoverableMessage()
148
148
$ retryStrategy ->expects ($ this ->never ())->method ('isRetryable ' );
149
149
150
150
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' , $ retryStrategy );
151
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
151
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
152
152
// stop after the messages finish
153
153
if (null === $ envelope ) {
154
154
$ worker ->stop ();
@@ -168,7 +168,7 @@ public function testWorkerDoesNotSendNullMessagesToTheBus()
168
168
$ retryStrategy = $ this ->getMockBuilder (RetryStrategyInterface::class)->getMock ();
169
169
170
170
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' , $ retryStrategy );
171
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
171
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
172
172
// stop after the messages finish
173
173
if (null === $ envelope ) {
174
174
$ worker ->stop ();
@@ -195,7 +195,7 @@ public function testWorkerDispatchesEventsOnSuccess()
195
195
);
196
196
197
197
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' , $ retryStrategy , $ eventDispatcher );
198
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
198
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
199
199
// stop after the messages finish
200
200
if (null === $ envelope ) {
201
201
$ worker ->stop ();
@@ -223,13 +223,49 @@ public function testWorkerDispatchesEventsOnError()
223
223
);
224
224
225
225
$ worker = new Worker ($ receiver , $ bus , 'receiver_id ' , $ retryStrategy , $ eventDispatcher );
226
- $ worker ->run (function (?Envelope $ envelope ) use ($ worker ) {
226
+ $ worker ->run ([ ' sleep ' => 0 ], function (?Envelope $ envelope ) use ($ worker ) {
227
227
// stop after the messages finish
228
228
if (null === $ envelope ) {
229
229
$ worker ->stop ();
230
230
}
231
231
});
232
232
}
233
+
234
+ public function testTimeoutIsConfigurable ()
235
+ {
236
+ $ apiMessage = new DummyMessage ('API ' );
237
+ $ receiver = new DummyReceiver ([
238
+ [new Envelope ($ apiMessage ), new Envelope ($ apiMessage )],
239
+ null , // will cause a wait
240
+ null , // will cause a wait
241
+ [new Envelope ($ apiMessage )],
242
+ [new Envelope ($ apiMessage )],
243
+ null , // will cause a wait
244
+ [new Envelope ($ apiMessage )],
245
+ ]);
246
+
247
+ $ bus = $ this ->getMockBuilder (MessageBusInterface::class)->getMock ();
248
+
249
+ $ worker = new Worker ($ receiver , $ bus , 'receiver_id ' );
250
+ $ receivedCount = 0 ;
251
+ $ startTime = microtime (true );
252
+ // sleep .1 after each idle
253
+ $ worker ->run (['sleep ' => 100000 ], function (?Envelope $ envelope ) use ($ worker , &$ receivedCount , $ startTime ) {
254
+ if (null !== $ envelope ) {
255
+ $ receivedCount ++;
256
+ }
257
+
258
+ if (5 === $ receivedCount ) {
259
+ $ worker ->stop ();
260
+ $ duration = microtime (true ) - $ startTime ;
261
+
262
+ // wait time should be .3 seconds, so execution should
263
+ // be only a bit more than that
264
+ $ this ->assertGreaterThanOrEqual (.3 , $ duration );
265
+ $ this ->assertLessThan (.5 , $ duration );
266
+ }
267
+ });
268
+ }
233
269
}
234
270
235
271
class DummyReceiver implements ReceiverInterface
0 commit comments