Skip to content

Commit 3e3af7e

Browse files
committed
Fixes array version of merge operator completing immediatelly in case one of the observable sequences is empty. ReactiveX#1221
1 parent db883ab commit 3e3af7e

File tree

3 files changed

+66
-1
lines changed

3 files changed

+66
-1
lines changed

RxSwift/Observables/Merge.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,12 +442,13 @@ fileprivate class MergeSink<SourceType, S: ObservableConvertibleType, O: Observe
442442

443443
func run(_ sources: [SourceType]) -> Disposable {
444444
let _ = _group.insert(_sourceSubscription)
445-
_stopped = true
446445

447446
for source in sources {
448447
self.on(.next(source))
449448
}
450449

450+
_stopped = true
451+
451452
checkCompleted()
452453

453454
return _group

Sources/AllTestz/main.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,6 +1668,8 @@ final class ObservableMergeTest_ : ObservableMergeTest, RxTestCase {
16681668
("testMerge_MergeConcat_OuterError", ObservableMergeTest.testMerge_MergeConcat_OuterError),
16691669
("testMerge_MergeConcat_InnerError", ObservableMergeTest.testMerge_MergeConcat_InnerError),
16701670
("testMergeSync_Empty", ObservableMergeTest.testMergeSync_Empty),
1671+
("testMergeSync_EmptyData_DoesntCompleteImmediatelly", ObservableMergeTest.testMergeSync_EmptyData_DoesntCompleteImmediatelly),
1672+
("testMergeSync_EmptyEmpty_Completes", ObservableMergeTest.testMergeSync_EmptyEmpty_Completes),
16711673
("testMergeSync_Data", ObservableMergeTest.testMergeSync_Data),
16721674
("testMergeSync_ObservableOfObservable_InnerThrows", ObservableMergeTest.testMergeSync_ObservableOfObservable_InnerThrows),
16731675
("testFlatMapFirst_Complete", ObservableMergeTest.testFlatMapFirst_Complete),

Tests/RxSwiftTests/Observable+MergeTests.swift

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,68 @@ extension ObservableMergeTest {
10151015
XCTAssertEqual(res.events, messages)
10161016
}
10171017
}
1018+
1019+
func testMergeSync_EmptyData_DoesntCompleteImmediatelly() {
1020+
let factories: [(Observable<Int>, Observable<Int>) -> Observable<Int>] =
1021+
[
1022+
{ ys1, ys2 in Observable.merge(ys1, ys2) },
1023+
{ ys1, ys2 in Observable.merge(AnyCollection([ys1, ys2])) },
1024+
{ ys1, ys2 in Observable.merge([ys1, ys2]) },
1025+
]
1026+
for factory in factories {
1027+
let scheduler = TestScheduler(initialClock: 0)
1028+
1029+
let ys1 = Observable<Int>.empty()
1030+
1031+
let ys2 = scheduler.createColdObservable([
1032+
next(10, 201),
1033+
next(20, 202),
1034+
completed(50)
1035+
])
1036+
1037+
let res = scheduler.start {
1038+
factory(ys1.asObservable(), ys2.asObservable())
1039+
}
1040+
1041+
let messages = [
1042+
next(210, 201),
1043+
next(220, 202),
1044+
completed(250)
1045+
]
1046+
1047+
XCTAssertEqual(res.events, messages)
1048+
1049+
XCTAssertEqual(ys2.subscriptions, [
1050+
Subscription(200, 250),
1051+
])
1052+
}
1053+
}
1054+
1055+
func testMergeSync_EmptyEmpty_Completes() {
1056+
let factories: [(Observable<Int>, Observable<Int>) -> Observable<Int>] =
1057+
[
1058+
{ ys1, ys2 in Observable.merge(ys1, ys2) },
1059+
{ ys1, ys2 in Observable.merge(AnyCollection([ys1, ys2])) },
1060+
{ ys1, ys2 in Observable.merge([ys1, ys2]) },
1061+
]
1062+
for factory in factories {
1063+
let scheduler = TestScheduler(initialClock: 0)
1064+
1065+
let ys1 = Observable<Int>.empty()
1066+
1067+
let ys2 = Observable<Int>.empty()
1068+
1069+
let res = scheduler.start {
1070+
factory(ys1.asObservable(), ys2.asObservable())
1071+
}
1072+
1073+
let messages = [
1074+
completed(200, Int.self)
1075+
]
1076+
1077+
XCTAssertEqual(res.events, messages)
1078+
}
1079+
}
10181080

10191081
func testMergeSync_Data() {
10201082
let factories: [(Observable<Int>, Observable<Int>, Observable<Int>) -> Observable<Int>] =

0 commit comments

Comments
 (0)