Skip to content
This repository was archived by the owner on Oct 17, 2024. It is now read-only.

Commit cde00b8

Browse files
authored
Fix a StreamGroup bug when a component stream's listen() throws (#173)
This would put the StreamGroup into an inconsistent state where it would believe itself to be active, but only some streams would have subscriptions. This was exacerbated by dart-lang/sdk#45815, which meant that even though _onListen threw an error a StreamSubscription was created and returned, so further callbacks could still be called. Now instead of going into an inconsistent state, the StreamGroup simply cancels itself.
1 parent 2d77b8e commit cde00b8

File tree

4 files changed

+97
-7
lines changed

4 files changed

+97
-7
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 2.6.1
2+
3+
* When `StreamGroup.stream.listen()` is called, gracefully handle component
4+
streams throwing errors when their `Stream.listen()` methods are called.
5+
16
## 2.6.0
27

38
* Add a `StreamCloser` class, which is a `StreamTransformer` that allows the

lib/src/stream_group.dart

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import 'dart:async';
66

7+
import 'package:collection/collection.dart';
8+
79
/// A collection of streams whose events are unified and sent through a central
810
/// stream.
911
///
@@ -185,13 +187,24 @@ class StreamGroup<T> implements Sink<Stream<T>> {
185187
/// This is called for both single-subscription and broadcast groups.
186188
void _onListen() {
187189
_state = _StreamGroupState.listening;
188-
_subscriptions.forEach((stream, subscription) {
190+
191+
for (var entry in _subscriptions.entries.toList()) {
189192
// If this is a broadcast group and this isn't the first time it's been
190193
// listened to, there may still be some subscriptions to
191194
// single-subscription streams.
192-
if (subscription != null) return;
193-
_subscriptions[stream] = _listenToStream(stream);
194-
});
195+
if (entry.value != null) return;
196+
197+
var stream = entry.key;
198+
try {
199+
_subscriptions[stream] = _listenToStream(stream);
200+
} catch (error) {
201+
// If [Stream.listen] throws a synchronous error (for example because
202+
// the stream has already been listened to), cancel all subscriptions
203+
// and rethrow the error.
204+
_onCancel()?.catchError((_) {});
205+
rethrow;
206+
}
207+
}
195208
}
196209

197210
/// A callback called when [stream] is paused.
@@ -216,8 +229,17 @@ class StreamGroup<T> implements Sink<Stream<T>> {
216229
Future? _onCancel() {
217230
_state = _StreamGroupState.canceled;
218231

219-
var futures = _subscriptions.values
220-
.map((subscription) => subscription!.cancel())
232+
var futures = _subscriptions.entries
233+
.map((entry) {
234+
var subscription = entry.value;
235+
if (subscription != null) return subscription.cancel();
236+
try {
237+
return entry.key.listen(null).cancel();
238+
} catch (_) {
239+
return null;
240+
}
241+
})
242+
.whereNotNull()
221243
.toList();
222244

223245
_subscriptions.clear();

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: async
2-
version: 2.6.0
2+
version: 2.6.1
33

44
description: Utility functions and classes related to the 'dart:async' library.
55
repository: https://github.com/dart-lang/async

test/stream_group_test.dart

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,69 @@ void main() {
246246
expect(fired, isTrue);
247247
});
248248
});
249+
250+
group('when listen() throws an error', () {
251+
late Stream<String> alreadyListened;
252+
setUp(() {
253+
alreadyListened = Stream.value('foo')..listen(null);
254+
});
255+
256+
group('listen()', () {
257+
test('rethrows that error', () {
258+
streamGroup.add(alreadyListened);
259+
260+
// We can't use expect(..., throwsStateError) here bceause of
261+
// dart-lang/sdk#45815.
262+
runZonedGuarded(
263+
() => streamGroup.stream.listen(expectAsync1((_) {}, count: 0)),
264+
expectAsync2((error, _) => expect(error, isStateError)));
265+
});
266+
267+
test('cancels other subscriptions', () async {
268+
var firstCancelled = false;
269+
var first =
270+
StreamController<String>(onCancel: () => firstCancelled = true);
271+
streamGroup.add(first.stream);
272+
273+
streamGroup.add(alreadyListened);
274+
275+
var lastCancelled = false;
276+
var last =
277+
StreamController<String>(onCancel: () => lastCancelled = true);
278+
streamGroup.add(last.stream);
279+
280+
runZonedGuarded(() => streamGroup.stream.listen(null), (_, __) {});
281+
282+
expect(firstCancelled, isTrue);
283+
expect(lastCancelled, isTrue);
284+
});
285+
286+
// There really shouldn't even be a subscription here, but due to
287+
// dart-lang/sdk#45815 there is.
288+
group('canceling the subscription is a no-op', () {
289+
test('synchronously', () {
290+
streamGroup.add(alreadyListened);
291+
292+
var subscription = runZonedGuarded(
293+
() => streamGroup.stream.listen(null),
294+
expectAsync2((_, __) {}, count: 1));
295+
296+
expect(subscription!.cancel(), completes);
297+
});
298+
299+
test('asynchronously', () async {
300+
streamGroup.add(alreadyListened);
301+
302+
var subscription = runZonedGuarded(
303+
() => streamGroup.stream.listen(null),
304+
expectAsync2((_, __) {}, count: 1));
305+
306+
await pumpEventQueue();
307+
expect(subscription!.cancel(), completes);
308+
});
309+
});
310+
});
311+
});
249312
});
250313

251314
group('broadcast', () {

0 commit comments

Comments
 (0)