@@ -16,18 +16,20 @@ public class AsyncQueue<T>
16
16
private readonly Queue < TaskCompletionSource < T > > consumerQueue = new Queue < TaskCompletionSource < T > > ( ) ;
17
17
private SemaphoreSlim consumerQueueLock = new SemaphoreSlim ( 1 ) ;
18
18
19
+ public int Count => queue . Count ;
20
+
19
21
/// <summary>
20
22
/// Supports multi-threaded producers.
21
23
/// Time complexity: O(1).
22
24
/// </summary>
23
- public async Task EnqueueAsync ( T value , CancellationToken taskCancellationToken = default ( CancellationToken ) )
25
+ public async Task EnqueueAsync ( T value , int millisecondsTimeout = int . MaxValue , CancellationToken taskCancellationToken = default ( CancellationToken ) )
24
26
{
25
- await consumerQueueLock . WaitAsync ( taskCancellationToken ) ;
27
+ await consumerQueueLock . WaitAsync ( millisecondsTimeout , taskCancellationToken ) ;
26
28
27
29
if ( consumerQueue . Count > 0 )
28
30
{
29
31
var consumer = consumerQueue . Dequeue ( ) ;
30
- consumer . SetResult ( value ) ;
32
+ consumer . TrySetResult ( value ) ;
31
33
}
32
34
else
33
35
{
@@ -41,22 +43,29 @@ public class AsyncQueue<T>
41
43
/// Supports multi-threaded consumers.
42
44
/// Time complexity: O(1).
43
45
/// </summary>
44
- public async Task < T > DequeueAsync ( CancellationToken taskCancellationToken = default ( CancellationToken ) )
46
+ public async Task < T > DequeueAsync ( int millisecondsTimeout = int . MaxValue , CancellationToken taskCancellationToken = default ( CancellationToken ) )
45
47
{
46
- await consumerQueueLock . WaitAsync ( taskCancellationToken ) ;
48
+ await consumerQueueLock . WaitAsync ( millisecondsTimeout , taskCancellationToken ) ;
49
+
50
+ TaskCompletionSource < T > consumer ;
51
+
52
+ try
53
+ {
54
+ if ( queue . Count > 0 )
55
+ {
56
+ var result = queue . Dequeue ( ) ;
57
+ consumerQueueLock . Release ( ) ;
58
+ return result ;
59
+ }
47
60
48
- if ( queue . Count > 0 )
61
+ consumer = new TaskCompletionSource < T > ( ) ;
62
+ taskCancellationToken . Register ( ( ) => consumer . TrySetCanceled ( ) ) ;
63
+ consumerQueue . Enqueue ( consumer ) ;
64
+ }
65
+ finally
49
66
{
50
- var result = queue . Dequeue ( ) ;
51
67
consumerQueueLock . Release ( ) ;
52
- return result ;
53
68
}
54
-
55
- var consumer = new TaskCompletionSource < T > ( ) ;
56
- taskCancellationToken . Register ( ( ) => consumer . TrySetCanceled ( ) ) ;
57
- consumerQueue . Enqueue ( consumer ) ;
58
-
59
- consumerQueueLock . Release ( ) ;
60
69
61
70
return await consumer . Task ;
62
71
0 commit comments