Chapter 14 - Concurrency and Asynchrony
Threading Basics
Tasks
Principles of Asynchrony
Asynchronous Functions in C#
Asynchronous Streams (from C# 8)
Asynchronous Patterns
Cancellation
async void Main()
{
var token = new CancellationToken();
Task.Delay (5000).ContinueWith (ant => token.Cancel()); // Tell it to cancel in two seconds.
await Foo (token);
}
// This is a simplified version of the CancellationToken type in System.Threading:
class CancellationToken
{
public bool IsCancellationRequested { get; private set; }
public void Cancel() { IsCancellationRequested = true; }
public void ThrowIfCancellationRequested()
{
if (IsCancellationRequested) throw new OperationCanceledException();
}
}
async Task Foo (CancellationToken cancellationToken)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine (i);
await Task.Delay (1000);
cancellationToken.ThrowIfCancellationRequested();
}
}
Using the real CancellationToken
async void Main()
{
var cancelSource = new CancellationTokenSource();
Task.Delay (5000).ContinueWith (ant => cancelSource.Cancel()); // Tell it to cancel in two seconds.
await Foo (cancelSource.Token);
}
async Task Foo (CancellationToken cancellationToken)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine (i);
await Task.Delay (1000);
cancellationToken.ThrowIfCancellationRequested();
}
}
Using the real CancellationToken - improved version
async void Main()
{
var cancelSource = new CancellationTokenSource (5000); // This tells it to cancel in 5 seconds
await Foo (cancelSource.Token);
}
async Task Foo (CancellationToken cancellationToken)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine (i);
await Task.Delay (1000, cancellationToken); // Cancellation tokens propagate nicely
}
}
Progress reporting - with a delegate
async void Main()
{
Action<int> progress = i => Console.WriteLine (i + " %");
await Foo (progress);
}
Task Foo (Action<int> onProgressPercentChanged)
{
return Task.Run (() =>
{
for (int i = 0; i < 1000; i++)
{
if (i % 10 == 0) onProgressPercentChanged (i / 10);
// Do something compute-bound...
}
});
}
Progress reporting - with IProgress
async void Main()
{
Action<int> progress = i => Console.WriteLine (i + " %");
await Foo (progress);
}
Task Foo (Action<int> onProgressPercentChanged)
{
return Task.Run (() =>
{
for (int i = 0; i < 1000; i++)
{
if (i % 10 == 0) onProgressPercentChanged (i / 10);
// Do something compute-bound...
}
});
}
Task combinators - WhenAny
async void Main()
{
Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3());
Console.WriteLine ("Done");
Console.WriteLine (winningTask.Result); // 1
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - await winning task
async void Main()
{
Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3());
Console.WriteLine ("Done");
Console.WriteLine (await winningTask); // 1
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - in one step
async void Main()
{
int answer = await await Task.WhenAny (Delay1(), Delay2(), Delay3());
answer.Dump();
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - timeouts
async void Main()
{
Task<string> task = SomeAsyncFunc();
Task winner = await (Task.WhenAny (task, Task.Delay(5000)));
if (winner != task) throw new TimeoutException();
string result = await task; // Unwrap result/re-throw
}
async Task<string> SomeAsyncFunc()
{
await Task.Delay (10000);
return "foo";
}
Task combinators - WhenAll
async void Main()
{
await Task.WhenAll (Delay1(), Delay2(), Delay3());
"Done".Dump();
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAll - exceptions
async void Main()
{
Task task1 = Task.Run (() => { throw null; } );
Task task2 = Task.Run (() => { throw null; } );
Task all = Task.WhenAll (task1, task2);
try { await all; }
catch
{
Console.WriteLine (all.Exception.InnerExceptions.Count); // 2
}
}
Task combinators - WhenAll - return values
async void Main()
{
Task<int> task1 = Task.Run (() => 1);
Task<int> task2 = Task.Run (() => 2);
int[] results = await Task.WhenAll (task1, task2); // { 1, 2 }
results.Dump();
}
Task combinators - WhenAll - web page downloads
async void Main()
{
int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split());
totalSize.Dump();
}
async Task<int> GetTotalSize (string[] uris)
{
IEnumerable<Task<byte[]>> downloadTasks = uris.Select (uri => new WebClient().DownloadDataTaskAsync (uri));
byte[][] contents = await Task.WhenAll (downloadTasks);
return contents.Sum (c => c.Length);
}
Task combinators - WhenAll - web page downloads improved
async void Main()
{
int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split());
totalSize.Dump();
}
async Task<int> GetTotalSize (string[] uris)
{
IEnumerable<Task<int>> downloadTasks = uris.Select (async uri =>
(await new WebClient().DownloadDataTaskAsync (uri)).Length);
int[] contentLengths = await Task.WhenAll (downloadTasks);
return contentLengths.Sum();
}
Custom combinators - WithTimeout
async void Main()
{
string result = await SomeAsyncFunc().WithTimeout (TimeSpan.FromSeconds (2));
result.Dump();
}
async Task<string> SomeAsyncFunc()
{
await Task.Delay (10000);
return "foo";
}
public static class Extensions
{
public async static Task<TResult> WithTimeout<TResult> (this Task<TResult> task, TimeSpan timeout)
{
Task winner = await (Task.WhenAny (task, Task.Delay (timeout)));
if (winner != task) throw new TimeoutException();
return await task; // Unwrap result/re-throw
}
}
Custom combinators - WithCancellation
async void Main()
{
var cts = new CancellationTokenSource (3000); // Cancel after 3 seconds
string result = await SomeAsyncFunc().WithCancellation (cts.Token);
result.Dump();
}
async Task<string> SomeAsyncFunc()
{
await Task.Delay (10000);
return "foo";
}
public static class Extensions
{
public static Task<TResult> WithCancellation<TResult> (this Task<TResult> task, CancellationToken cancelToken)
{
var tcs = new TaskCompletionSource<TResult>();
var reg = cancelToken.Register (() => tcs.TrySetCanceled ());
task.ContinueWith (ant =>
{
reg.Dispose();
if (ant.IsCanceled)
tcs.TrySetCanceled();
else if (ant.IsFaulted)
tcs.TrySetException (ant.Exception.InnerException);
else
tcs.TrySetResult (ant.Result);
});
return tcs.Task;
}
}
Custom combinators - WhenAllOrError
// This will throw an exception immediately.
async void Main()
{
Task<int> task1 = Task.Run (() => { throw null; return 42; } );
Task<int> task2 = Task.Delay (5000).ContinueWith (ant => 53);
int[] results = await WhenAllOrError (task1, task2);
}
async Task<TResult[]> WhenAllOrError<TResult> (params Task<TResult>[] tasks)
{
var killJoy = new TaskCompletionSource<TResult[]>();
foreach (var task in tasks)
task.ContinueWith (ant =>
{
if (ant.IsCanceled)
killJoy.TrySetCanceled();
else if (ant.IsFaulted)
killJoy.TrySetException (ant.Exception.InnerException);
});
return await await Task.WhenAny (killJoy.Task, Task.WhenAll (tasks));
}
C# 12
in a Nutshell
About the Book
Code Listings
C# 12 in a Nutshell
C# 10 in a Nutshell
C# 9.0 in a Nutshell
C# 8.0 in a Nutshell
C# 7.0 in a Nutshell
Extras
Contact
Buy print or Kindle edition
Buy PDF edition
Read via O'Reilly subscription