20
20
import com .linkedin .data .ByteString ;
21
21
import com .linkedin .r2 .message .stream .entitystream .WriteHandle ;
22
22
23
- import org .mockito .Mockito ;
24
- import org .mockito .invocation .InvocationOnMock ;
25
- import org .mockito .stubbing .Answer ;
26
- import org .mockito .stubbing .OngoingStubbing ;
27
- import org .testng .Assert ;
28
- import org .testng .annotations .DataProvider ;
29
- import org .testng .annotations .Test ;
30
-
31
23
import java .io .ByteArrayInputStream ;
32
24
import java .io .ByteArrayOutputStream ;
33
25
import java .io .IOException ;
38
30
import java .util .concurrent .TimeUnit ;
39
31
import java .util .concurrent .TimeoutException ;
40
32
33
+ import org .testng .Assert ;
34
+ import org .testng .annotations .DataProvider ;
35
+ import org .testng .annotations .Test ;
36
+
37
+ import org .mockito .Mockito ;
38
+ import org .mockito .invocation .InvocationOnMock ;
39
+ import org .mockito .stubbing .Answer ;
40
+ import org .mockito .stubbing .OngoingStubbing ;
41
+
41
42
import static org .mockito .Matchers .isA ;
42
- import static org .mockito .Mockito .*;
43
+ import static org .mockito .Mockito .doAnswer ;
44
+ import static org .mockito .Mockito .never ;
45
+ import static org .mockito .Mockito .spy ;
46
+ import static org .mockito .Mockito .times ;
47
+ import static org .mockito .Mockito .verify ;
48
+ import static org .mockito .Mockito .verifyNoMoreInteractions ;
49
+ import static org .mockito .Mockito .when ;
43
50
44
51
45
52
/**
@@ -125,6 +132,39 @@ public int read(byte[] b) throws IOException
125
132
}
126
133
}
127
134
135
+ //Simulates an input stream that times out after a specified number of reads.
136
+ private static class TimeoutByteArrayInputStream extends StrictByteArrayInputStream
137
+ {
138
+ private int _numberOfReadsBeforeTimeout ;
139
+ private final CountDownLatch _latch ;
140
+
141
+ private TimeoutByteArrayInputStream (final byte [] bytes , final int numberOfReadsBeforeTimeout ,
142
+ final CountDownLatch latchToWaitOnTimeout )
143
+ {
144
+ super (bytes );
145
+ _numberOfReadsBeforeTimeout = numberOfReadsBeforeTimeout ;
146
+ _latch = latchToWaitOnTimeout ;
147
+ }
148
+
149
+ @ Override
150
+ public int read (byte [] b ) throws IOException
151
+ {
152
+ try
153
+ {
154
+ if (_numberOfReadsBeforeTimeout == 0 )
155
+ {
156
+ _latch .await ();
157
+ }
158
+ }
159
+ catch (InterruptedException interruptException )
160
+ {
161
+ Assert .fail ();
162
+ }
163
+ _numberOfReadsBeforeTimeout --;
164
+ return super .read (b );
165
+ }
166
+ }
167
+
128
168
//Simulates an input stream that throws IOException after a configurable number of reads.
129
169
private static class ExceptionThrowingByteArrayInputStream extends StrictByteArrayInputStream
130
170
{
@@ -394,7 +434,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable
394
434
395
435
///////////////////////////////////////////////////////////////////////////////////////
396
436
397
- //This test will verify that timeouts on reads are handled properly
437
+ //This test will verify that timeouts on reads are handled properly.
398
438
@ DataProvider (name = "timeoutDataSources" )
399
439
public Object [][] timeoutDataSources () throws Exception
400
440
{
@@ -409,35 +449,41 @@ public Object[][] timeoutDataSources() throws Exception
409
449
}
410
450
411
451
final byte [] largeInputData = builder .toString ().getBytes ();
412
- final SlowByteArrayInputStream timeoutFirst = new SlowByteArrayInputStream (largeInputData , 500 , 0 );
413
- final SlowByteArrayInputStream timeoutSubsequently = new SlowByteArrayInputStream (largeInputData , 0 , 10 );
452
+ final CountDownLatch timeoutFirstLatch = new CountDownLatch (1 );
453
+ final CountDownLatch timeoutSubsequentlyLatch = new CountDownLatch (1 );
454
+ final TimeoutByteArrayInputStream timeoutFirst = new TimeoutByteArrayInputStream (largeInputData , 0 , timeoutFirstLatch );
455
+ final TimeoutByteArrayInputStream timeoutSubsequently = new TimeoutByteArrayInputStream (largeInputData , 5 , timeoutSubsequentlyLatch );
414
456
415
457
//TEST_CHUNK_SIZE * 5 writes should be how much data was copied over
416
458
final byte [] largeInputDataPartial = Arrays .copyOf (largeInputData , TEST_CHUNK_SIZE * 5 );
417
459
418
460
return new Object [][]
419
461
{
420
- //Timeout on first read. Nothing should have been read. One call on writeHandle.remaining() should have been seen.
421
- {timeoutFirst , 0 , 1 , new byte [0 ]},
422
- //Timeout on the 6th read. We should expect 5 writes. Six calls on writeHandle.remaining() should have been seen.
423
- {timeoutSubsequently , 5 , 6 , largeInputDataPartial }
462
+ //Timeout on first read. Nothing should have been read. One call on writeHandle.remaining() should have been seen.
463
+ {timeoutFirst , 0 , 1 , new byte [0 ], timeoutFirstLatch },
464
+ //Timeout on the 6th read. We should expect 5 writes. Six calls on writeHandle.remaining() should have been seen.
465
+ {timeoutSubsequently , 5 , 6 , largeInputDataPartial , timeoutSubsequentlyLatch }
424
466
};
425
467
}
426
468
427
469
@ Test (dataProvider = "timeoutDataSources" )
428
- public void testTimeoutDataSources (final SlowByteArrayInputStream slowByteArrayInputStream ,
429
- final int expectedTotalWrites , final int expectedWriteHandleRemainingCalls , final byte [] expectedDataWritten )
470
+ public void testTimeoutDataSources (final TimeoutByteArrayInputStream timeoutByteArrayInputStream ,
471
+ final int expectedTotalWrites , final int expectedWriteHandleRemainingCalls ,
472
+ final byte [] expectedDataWritten , final CountDownLatch latch )
430
473
{
431
- //Setup:
474
+ //Setup
432
475
final WriteHandle writeHandle = Mockito .mock (WriteHandle .class );
476
+ //For the maximum blocking time, we choose some value that isn't too short for a read to occur from the in memory
477
+ //InputStream, but also not too long to prevent the test from taking a while to finish (due to waiting for the timeout
478
+ //to occur).
433
479
final MultiPartMIMEInputStream multiPartMIMEInputStream =
434
- new MultiPartMIMEInputStream .Builder (slowByteArrayInputStream , _scheduledExecutorService ,
480
+ new MultiPartMIMEInputStream .Builder (timeoutByteArrayInputStream , _scheduledExecutorService ,
435
481
Collections .<String , String >emptyMap ()).withWriteChunkSize (TEST_CHUNK_SIZE )
436
- .withMaximumBlockingTime (45 )
482
+ .withMaximumBlockingTime (100 )
437
483
.build ();
438
484
439
485
//Doesn't matter what we return here as long as its constant and above 0.
440
- when (writeHandle .remaining ()).thenReturn (500 );
486
+ when (writeHandle .remaining ()).thenReturn (1 );
441
487
442
488
final ByteArrayOutputStream byteArrayOutputStream = setupMockWriteHandleToOutputStream (writeHandle );
443
489
@@ -464,6 +510,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable
464
510
try
465
511
{
466
512
boolean successful = errorLatch .await (_testTimeout , TimeUnit .MILLISECONDS );
513
+
514
+ //Unblock the thread in the thread pool.
515
+ latch .countDown ();;
516
+
467
517
if (!successful )
468
518
{
469
519
Assert .fail ("Timeout when waiting for input stream to completely transfer" );
@@ -478,7 +528,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable
478
528
//Assert
479
529
Assert .assertEquals (byteArrayOutputStream .toByteArray (), expectedDataWritten ,
480
530
"Partial data should have been transferred in the case of a timeout" );
481
- Assert .assertEquals (slowByteArrayInputStream .isClosed (), true );
531
+ Assert .assertEquals (timeoutByteArrayInputStream .isClosed (), true );
482
532
483
533
//Mock verifies:
484
534
verify (writeHandle , times (expectedTotalWrites )).write (isA (ByteString .class ));
0 commit comments