|
| 1 | +using System; |
| 2 | +using System.Reactive.Disposables; |
| 3 | +using System.Threading.Tasks; |
| 4 | + |
| 5 | +namespace ReactNative.Bridge.Queue |
| 6 | +{ |
| 7 | + /// <summary> |
| 8 | + /// A queue for executing actions in order. |
| 9 | + /// </summary> |
| 10 | + /// <remarks> |
| 11 | + /// This action queue ensures that async continuations are performed on the same thread. |
| 12 | + /// </remarks> |
| 13 | + public class LimitedConcurrencyActionQueue : IActionQueue |
| 14 | + { |
| 15 | + private readonly object _dispatchGate = new object(); |
| 16 | + private readonly object _actionGate = new object(); |
| 17 | + private readonly Action<Exception> _onError; |
| 18 | + private readonly CancellationDisposable _cancellationDisposable; |
| 19 | + private readonly ConcurrentExclusiveSchedulerPair _schedulerPair; |
| 20 | + private readonly TaskFactory _taskFactory; |
| 21 | + |
| 22 | + /// <summary> |
| 23 | + /// Creates an action queue where the actions are performed on the |
| 24 | + /// <see cref="ConcurrentExclusiveSchedulerPair.ExclusiveScheduler"/>. |
| 25 | + /// </summary> |
| 26 | + /// <param name="onError">The error handler.</param> |
| 27 | + public LimitedConcurrencyActionQueue(Action<Exception> onError) |
| 28 | + { |
| 29 | + if (onError == null) |
| 30 | + throw new ArgumentNullException(nameof(onError)); |
| 31 | + |
| 32 | + _onError = onError; |
| 33 | + _cancellationDisposable = new CancellationDisposable(); |
| 34 | + _schedulerPair = new ConcurrentExclusiveSchedulerPair(); |
| 35 | + _taskFactory = new TaskFactory( |
| 36 | + _cancellationDisposable.Token, |
| 37 | + TaskCreationOptions.None, |
| 38 | + TaskContinuationOptions.None, |
| 39 | + _schedulerPair.ExclusiveScheduler); |
| 40 | + } |
| 41 | + |
| 42 | + /// <summary> |
| 43 | + /// Dispatch an action to the queue. |
| 44 | + /// </summary> |
| 45 | + /// <param name="action">The action.</param> |
| 46 | + /// <remarks> |
| 47 | + /// Returns immediately. |
| 48 | + /// </remarks> |
| 49 | + public void Dispatch(Action action) |
| 50 | + { |
| 51 | + lock (_dispatchGate) |
| 52 | + { |
| 53 | + if (!_cancellationDisposable.IsDisposed) |
| 54 | + { |
| 55 | + _taskFactory.StartNew(() => |
| 56 | + { |
| 57 | + try |
| 58 | + { |
| 59 | + lock (_actionGate) |
| 60 | + { |
| 61 | + if (!_cancellationDisposable.IsDisposed) |
| 62 | + { |
| 63 | + action(); |
| 64 | + } |
| 65 | + } |
| 66 | + } |
| 67 | + catch (Exception ex) |
| 68 | + { |
| 69 | + _onError(ex); |
| 70 | + } |
| 71 | + }); |
| 72 | + } |
| 73 | + } |
| 74 | + } |
| 75 | + |
| 76 | + /// <summary> |
| 77 | + /// Checks if the current thread is running in the context of this |
| 78 | + /// action queue. |
| 79 | + /// </summary> |
| 80 | + /// <returns> |
| 81 | + /// <code>true</code> if the current thread is running an action |
| 82 | + /// dispatched by this action queue, otherwise <code>false</code>. |
| 83 | + /// </returns> |
| 84 | + public bool IsOnThread() |
| 85 | + { |
| 86 | + return TaskScheduler.Current == _schedulerPair.ExclusiveScheduler; |
| 87 | + } |
| 88 | + |
| 89 | + /// <summary> |
| 90 | + /// Disposes the action queue. |
| 91 | + /// </summary> |
| 92 | + public void Dispose() |
| 93 | + { |
| 94 | + lock (_dispatchGate) |
| 95 | + lock (_actionGate) |
| 96 | + { |
| 97 | + _cancellationDisposable.Dispose(); |
| 98 | + } |
| 99 | + } |
| 100 | + } |
| 101 | +} |
0 commit comments