Skip to content

Commit b343881

Browse files
committed
Improves locking behavior of merge and switch operators. ReactiveX#1344
1 parent e782081 commit b343881

File tree

4 files changed

+102
-52
lines changed

4 files changed

+102
-52
lines changed

RxSwift/Observables/Merge.swift

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,7 @@ fileprivate final class MergeLimitedBasicSink<SourceSequence: ObservableConverti
220220

221221
fileprivate class MergeLimitedSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType>
222222
: Sink<Observer>
223-
, LockOwnerType
224-
, SynchronizedOnType where Observer.E == SourceSequence.E {
223+
, ObserverType where Observer.E == SourceSequence.E {
225224
typealias QueueType = Queue<SourceSequence>
226225

227226
let _maxConcurrent: Int
@@ -267,14 +266,10 @@ fileprivate class MergeLimitedSink<SourceElement, SourceSequence: ObservableConv
267266
func performMap(_ element: SourceElement) throws -> SourceSequence {
268267
rxAbstractMethod()
269268
}
270-
271-
func on(_ event: Event<SourceElement>) {
272-
synchronizedOn(event)
273-
}
274269

275-
func _synchronized_on(_ event: Event<SourceElement>) {
276-
switch event {
277-
case .next(let element):
270+
@inline(__always)
271+
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
272+
_lock.lock(); defer { _lock.unlock() } // {
278273
let subscribe: Bool
279274
if _activeCount < _maxConcurrent {
280275
_activeCount += 1
@@ -293,17 +288,31 @@ fileprivate class MergeLimitedSink<SourceElement, SourceSequence: ObservableConv
293288

294289
if subscribe {
295290
do {
296-
let value = try performMap(element)
297-
self.subscribe(value, group: _group)
291+
return try performMap(element)
298292
} catch {
299293
forwardOn(.error(error))
300294
dispose()
301295
}
302296
}
297+
298+
return nil
299+
// }
300+
}
301+
302+
func on(_ event: Event<SourceElement>) {
303+
switch event {
304+
case .next(let element):
305+
if let sequence = self.nextElementArrived(element: element) {
306+
self.subscribe(sequence, group: _group)
307+
}
303308
case .error(let error):
309+
_lock.lock(); defer { _lock.unlock() }
310+
304311
forwardOn(.error(error))
305312
dispose()
306313
case .completed:
314+
_lock.lock(); defer { _lock.unlock() }
315+
307316
if _activeCount == 0 {
308317
forwardOn(.completed)
309318
dispose()
@@ -452,48 +461,59 @@ fileprivate class MergeSink<SourceElement, SourceSequence: ObservableConvertible
452461
func performMap(_ element: SourceElement) throws -> SourceSequence {
453462
rxAbstractMethod()
454463
}
464+
465+
@inline(__always)
466+
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
467+
_lock.lock(); defer { _lock.unlock() } // {
468+
if !subscribeNext {
469+
return nil
470+
}
471+
472+
do {
473+
let value = try performMap(element)
474+
_activeCount += 1
475+
return value
476+
}
477+
catch let e {
478+
forwardOn(.error(e))
479+
dispose()
480+
return nil
481+
}
482+
// }
483+
}
455484

456485
func on(_ event: Event<SourceElement>) {
457-
_lock.lock(); defer { _lock.unlock() } // lock {
458-
switch event {
459-
case .next(let element):
460-
if !subscribeNext {
461-
return
462-
}
463-
do {
464-
let value = try performMap(element)
465-
subscribeInner(value.asObservable())
466-
}
467-
catch let e {
468-
forwardOn(.error(e))
469-
dispose()
470-
}
471-
case .error(let error):
472-
forwardOn(.error(error))
473-
dispose()
474-
case .completed:
475-
_stopped = true
476-
_sourceSubscription.dispose()
477-
checkCompleted()
486+
switch event {
487+
case .next(let element):
488+
if let value = nextElementArrived(element: element) {
489+
subscribeInner(value.asObservable())
478490
}
479-
//}
491+
case .error(let error):
492+
_lock.lock(); defer { _lock.unlock() }
493+
forwardOn(.error(error))
494+
dispose()
495+
case .completed:
496+
_lock.lock(); defer { _lock.unlock() }
497+
_stopped = true
498+
_sourceSubscription.dispose()
499+
checkCompleted()
500+
}
480501
}
481502

482503
func subscribeInner(_ source: Observable<Observer.E>) {
483504
let iterDisposable = SingleAssignmentDisposable()
484505
if let disposeKey = _group.insert(iterDisposable) {
485-
_activeCount += 1
486506
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
487507
let subscription = source.subscribe(iter)
488508
iterDisposable.setDisposable(subscription)
489509
}
490510
}
491511

492-
func run(_ sources: [SourceElement]) -> Disposable {
493-
let _ = _group.insert(_sourceSubscription)
512+
func run(_ sources: [Observable<Observer.E>]) -> Disposable {
513+
_activeCount += sources.count
494514

495515
for source in sources {
496-
self.on(.next(source))
516+
subscribeInner(source)
497517
}
498518

499519
_stopped = true

RxSwift/Observables/Switch.swift

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,7 @@ extension ObservableType where E : ObservableConvertibleType {
4545

4646
fileprivate class SwitchSink<SourceType, S: ObservableConvertibleType, O: ObserverType>
4747
: Sink<O>
48-
, ObserverType
49-
, LockOwnerType
50-
, SynchronizedOnType where S.E == O.E {
48+
, ObserverType where S.E == O.E {
5149
typealias E = SourceType
5250

5351
fileprivate let _subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
@@ -69,39 +67,46 @@ fileprivate class SwitchSink<SourceType, S: ObservableConvertibleType, O: Observ
6967
_subscriptions.setDisposable(subscription)
7068
return Disposables.create(_subscriptions, _innerSubscription)
7169
}
72-
73-
func on(_ event: Event<E>) {
74-
synchronizedOn(event)
75-
}
7670

7771
func performMap(_ element: SourceType) throws -> S {
7872
rxAbstractMethod()
7973
}
8074

81-
func _synchronized_on(_ event: Event<E>) {
82-
switch event {
83-
case .next(let element):
75+
@inline(__always)
76+
final private func nextElementArrived(element: E) -> (Int, Observable<S.E>)? {
77+
_lock.lock(); defer { _lock.unlock() } // {
8478
do {
8579
let observable = try performMap(element).asObservable()
8680
_hasLatest = true
8781
_latest = _latest &+ 1
88-
let latest = _latest
82+
return (_latest, observable)
83+
}
84+
catch let error {
85+
forwardOn(.error(error))
86+
dispose()
87+
}
88+
89+
return nil
90+
// }
91+
}
8992

93+
func on(_ event: Event<E>) {
94+
switch event {
95+
case .next(let element):
96+
if let (latest, observable) = nextElementArrived(element: element) {
9097
let d = SingleAssignmentDisposable()
9198
_innerSubscription.disposable = d
9299

93100
let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
94101
let disposable = observable.subscribe(observer)
95102
d.setDisposable(disposable)
96103
}
97-
catch let error {
98-
forwardOn(.error(error))
99-
dispose()
100-
}
101104
case .error(let error):
105+
_lock.lock(); defer { _lock.unlock() }
102106
forwardOn(.error(error))
103107
dispose()
104108
case .completed:
109+
_lock.lock(); defer { _lock.unlock() }
105110
_stopped = true
106111

107112
_subscriptions.dispose()

Sources/AllTestz/main.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ final class AnomaliesTest_ : AnomaliesTest, RxTestCase {
5656
static var allTests: [(String, (AnomaliesTest_) -> () -> ())] { return [
5757
("test936", AnomaliesTest.test936),
5858
("test1323", AnomaliesTest.test1323),
59+
("test1344", AnomaliesTest.test1344),
5960
("testSeparationBetweenOnAndSubscriptionLocks", AnomaliesTest.testSeparationBetweenOnAndSubscriptionLocks),
6061
] }
6162
}

Tests/RxSwiftTests/Anomalies.swift

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,30 @@ extension AnomaliesTest {
101101
}
102102
}
103103

104+
func test1344(){
105+
let disposeBag = DisposeBag()
106+
let foo = Observable<Int>.create({ observer in
107+
observer.on(.next(1))
108+
Thread.sleep(forTimeInterval: 0.1)
109+
observer.on(.completed)
110+
return Disposables.create()
111+
})
112+
.flatMap { (int) -> Observable<[Int]> in
113+
return Observable.create { (observer) -> Disposable in
114+
DispatchQueue.global().async {
115+
observer.onNext([int])
116+
}
117+
self.sleep(0.1)
118+
return Disposables.create()
119+
}
120+
}
121+
122+
Observable.merge(foo, .just([42]))
123+
.subscribe { (e) in
124+
}
125+
.disposed(by: disposeBag)
126+
}
127+
104128
func testSeparationBetweenOnAndSubscriptionLocks() {
105129
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
106130
for i in 0 ..< 1 {

0 commit comments

Comments
 (0)