Skip to content

Commit e8522a5

Browse files
committed
add timeout param
1 parent 3f83f35 commit e8522a5

File tree

1 file changed

+23
-14
lines changed

1 file changed

+23
-14
lines changed

Advanced.Algorithms/Distributed/AsyncQueue.cs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@ public class AsyncQueue<T>
1616
private readonly Queue<TaskCompletionSource<T>> consumerQueue = new Queue<TaskCompletionSource<T>>();
1717
private SemaphoreSlim consumerQueueLock = new SemaphoreSlim(1);
1818

19+
public int Count => queue.Count;
20+
1921
/// <summary>
2022
/// Supports multi-threaded producers.
2123
/// Time complexity: O(1).
2224
/// </summary>
23-
public async Task EnqueueAsync(T value, CancellationToken taskCancellationToken = default(CancellationToken))
25+
public async Task EnqueueAsync(T value, int millisecondsTimeout = int.MaxValue, CancellationToken taskCancellationToken = default(CancellationToken))
2426
{
25-
await consumerQueueLock.WaitAsync(taskCancellationToken);
27+
await consumerQueueLock.WaitAsync(millisecondsTimeout, taskCancellationToken);
2628

2729
if(consumerQueue.Count > 0)
2830
{
2931
var consumer = consumerQueue.Dequeue();
30-
consumer.SetResult(value);
32+
consumer.TrySetResult(value);
3133
}
3234
else
3335
{
@@ -41,22 +43,29 @@ public class AsyncQueue<T>
4143
/// Supports multi-threaded consumers.
4244
/// Time complexity: O(1).
4345
/// </summary>
44-
public async Task<T> DequeueAsync(CancellationToken taskCancellationToken = default(CancellationToken))
46+
public async Task<T> DequeueAsync(int millisecondsTimeout = int.MaxValue, CancellationToken taskCancellationToken = default(CancellationToken))
4547
{
46-
await consumerQueueLock.WaitAsync(taskCancellationToken);
48+
await consumerQueueLock.WaitAsync(millisecondsTimeout, taskCancellationToken);
49+
50+
TaskCompletionSource<T> consumer;
51+
52+
try
53+
{
54+
if (queue.Count > 0)
55+
{
56+
var result = queue.Dequeue();
57+
consumerQueueLock.Release();
58+
return result;
59+
}
4760

48-
if (queue.Count > 0)
61+
consumer = new TaskCompletionSource<T>();
62+
taskCancellationToken.Register(() => consumer.TrySetCanceled());
63+
consumerQueue.Enqueue(consumer);
64+
}
65+
finally
4966
{
50-
var result = queue.Dequeue();
5167
consumerQueueLock.Release();
52-
return result;
5368
}
54-
55-
var consumer = new TaskCompletionSource<T>();
56-
taskCancellationToken.Register(() => consumer.TrySetCanceled());
57-
consumerQueue.Enqueue(consumer);
58-
59-
consumerQueueLock.Release();
6069

6170
return await consumer.Task;
6271

0 commit comments

Comments
 (0)