Skip to content

Commit d5a754d

Browse files
committed
Fixes deadlock with shareReplayWhileLatest. ReactiveX#1323
1 parent d215a2a commit d5a754d

File tree

3 files changed

+63
-120
lines changed

3 files changed

+63
-120
lines changed

RxSwift/Observables/ShareReplayScope.swift

Lines changed: 23 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,7 @@ extension ObservableType {
190190
*/
191191
public func shareReplay(_ bufferSize: Int)
192192
-> Observable<E> {
193-
if bufferSize == 1 {
194-
return ShareReplay1(source: self.asObservable())
195-
}
196-
else {
197-
return self.replay(bufferSize).refCount()
198-
}
193+
return self.replay(bufferSize).refCount()
199194
}
200195
}
201196

@@ -268,25 +263,30 @@ fileprivate final class ShareReplay1WhileConnectedConnection<Element>
268263
_parent._connection = nil
269264
}
270265
_observers = Observers()
271-
_subscription.dispose()
272266
}
273267

274268
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
275269
_lock.lock()
276-
_synchronized_unsubscribe(disposeKey)
270+
let shouldDisconnect = _synchronized_unsubscribe(disposeKey)
277271
_lock.unlock()
272+
if shouldDisconnect {
273+
_subscription.dispose()
274+
}
278275
}
279276

280277
@inline(__always)
281-
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
278+
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
282279
// if already unsubscribed, just return
283280
if self._observers.removeKey(disposeKey) == nil {
284-
return
281+
return false
285282
}
286283

287284
if _observers.count == 0 {
288285
_synchronized_dispose()
286+
return true
289287
}
288+
289+
return false
290290
}
291291

292292
#if TRACE_RESOURCES
@@ -319,13 +319,13 @@ final fileprivate class ShareReplay1WhileConnected<Element>
319319
let count = connection._observers.count
320320

321321
let disposable = connection._synchronized_subscribe(observer)
322+
323+
_lock.unlock()
322324

323325
if count == 0 {
324326
connection.connect()
325327
}
326328

327-
_lock.unlock()
328-
329329
return disposable
330330
}
331331

@@ -411,25 +411,30 @@ fileprivate final class ShareWhileConnectedConnection<Element>
411411
_parent._connection = nil
412412
}
413413
_observers = Observers()
414-
_subscription.dispose()
415414
}
416415

417416
final func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
418417
_lock.lock()
419-
_synchronized_unsubscribe(disposeKey)
418+
let shouldDisconnect = _synchronized_unsubscribe(disposeKey)
420419
_lock.unlock()
420+
if shouldDisconnect {
421+
_subscription.dispose()
422+
}
421423
}
422424

423425
@inline(__always)
424-
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
426+
final private func _synchronized_unsubscribe(_ disposeKey: DisposeKey) -> Bool {
425427
// if already unsubscribed, just return
426428
if self._observers.removeKey(disposeKey) == nil {
427-
return
429+
return false
428430
}
429431

430432
if _observers.count == 0 {
431433
_synchronized_dispose()
434+
return true
432435
}
436+
437+
return false
433438
}
434439

435440
#if TRACE_RESOURCES
@@ -463,12 +468,12 @@ final fileprivate class ShareWhileConnected<Element>
463468

464469
let disposable = connection._synchronized_subscribe(observer)
465470

471+
_lock.unlock()
472+
466473
if count == 0 {
467474
connection.connect()
468475
}
469476

470-
_lock.unlock()
471-
472477
return disposable
473478
}
474479

@@ -489,101 +494,3 @@ final fileprivate class ShareWhileConnected<Element>
489494
return connection
490495
}
491496
}
492-
493-
// optimized version of share replay for most common case
494-
final fileprivate class ShareReplay1<Element>
495-
: Observable<Element>
496-
, ObserverType
497-
, SynchronizedUnsubscribeType {
498-
499-
typealias Observers = AnyObserver<Element>.s
500-
typealias DisposeKey = Observers.KeyType
501-
502-
private let _source: Observable<Element>
503-
504-
private let _lock = RecursiveLock()
505-
506-
private var _connection: SingleAssignmentDisposable?
507-
private var _element: Element?
508-
private var _stopped = false
509-
private var _stopEvent = nil as Event<Element>?
510-
private var _observers = Observers()
511-
512-
init(source: Observable<Element>) {
513-
self._source = source
514-
}
515-
516-
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
517-
_lock.lock()
518-
let result = _synchronized_subscribe(observer)
519-
_lock.unlock()
520-
return result
521-
}
522-
523-
func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
524-
if let element = self._element {
525-
observer.on(.next(element))
526-
}
527-
528-
if let stopEvent = self._stopEvent {
529-
observer.on(stopEvent)
530-
return Disposables.create()
531-
}
532-
533-
let initialCount = self._observers.count
534-
535-
let disposeKey = self._observers.insert(observer.on)
536-
537-
if initialCount == 0 {
538-
let connection = SingleAssignmentDisposable()
539-
_connection = connection
540-
541-
connection.setDisposable(self._source.subscribe(self))
542-
}
543-
544-
return SubscriptionDisposable(owner: self, key: disposeKey)
545-
}
546-
547-
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
548-
_lock.lock()
549-
_synchronized_unsubscribe(disposeKey)
550-
_lock.unlock()
551-
}
552-
553-
func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
554-
// if already unsubscribed, just return
555-
if self._observers.removeKey(disposeKey) == nil {
556-
return
557-
}
558-
559-
if _observers.count == 0 {
560-
_connection?.dispose()
561-
_connection = nil
562-
}
563-
}
564-
565-
func on(_ event: Event<E>) {
566-
dispatch(_synchronized_on(event), event)
567-
}
568-
569-
func _synchronized_on(_ event: Event<E>) -> Observers {
570-
_lock.lock(); defer { _lock.unlock() }
571-
if _stopped {
572-
return Observers()
573-
}
574-
575-
switch event {
576-
case .next(let element):
577-
_element = element
578-
return _observers
579-
case .error, .completed:
580-
_stopEvent = event
581-
_stopped = true
582-
let observers = _observers
583-
_observers = Observers()
584-
_connection?.dispose()
585-
_connection = nil
586-
return observers
587-
}
588-
}
589-
}

Sources/AllTestz/main.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ final class AnomaliesTest_ : AnomaliesTest, RxTestCase {
5555

5656
static var allTests: [(String, (AnomaliesTest_) -> () -> ())] { return [
5757
("test936", AnomaliesTest.test936),
58+
("test1323", AnomaliesTest.test1323),
5859
("testSeparationBetweenOnAndSubscriptionLocks", AnomaliesTest.testSeparationBetweenOnAndSubscriptionLocks),
5960
] }
6061
}

Tests/RxSwiftTests/Anomalies.swift

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,39 @@ extension AnomaliesTest {
6868
}
6969
}
7070

71+
func test1323() {
72+
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
73+
_ = share(Observable<Int>.create({ observer in
74+
observer.on(.next(1))
75+
Thread.sleep(forTimeInterval: 0.1)
76+
observer.on(.completed)
77+
return Disposables.create()
78+
})
79+
.flatMap { (int) -> Observable<Int> in
80+
return Observable.create { (observer) -> Disposable in
81+
DispatchQueue.global().async {
82+
observer.onNext(int)
83+
observer.onCompleted()
84+
}
85+
return Disposables.create()
86+
}
87+
})
88+
.subscribe { (e) in
89+
}
90+
}
91+
92+
for op in [
93+
{ $0.share(replay: 0, scope: .whileConnected) },
94+
{ $0.share(replay: 0, scope: .forever) },
95+
{ $0.share(replay: 1, scope: .whileConnected) },
96+
{ $0.share(replay: 1, scope: .forever) },
97+
{ $0.share(replay: 2, scope: .whileConnected) },
98+
{ $0.share(replay: 2, scope: .forever) },
99+
] as [(Observable<Int>) -> Observable<Int>] {
100+
performSharingOperatorsTest(share: op)
101+
}
102+
}
103+
71104
func testSeparationBetweenOnAndSubscriptionLocks() {
72105
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
73106
for i in 0 ..< 1 {
@@ -112,10 +145,12 @@ extension AnomaliesTest {
112145
}
113146

114147
for op in [
115-
{ $0.shareReplay(1) },
116-
{ $0.replay(1).refCount() },
117-
{ $0.publish().refCount() },
118-
{ $0.shareReplayLatestWhileConnected() }
148+
{ $0.share(replay: 0, scope: .whileConnected) },
149+
{ $0.share(replay: 0, scope: .forever) },
150+
{ $0.share(replay: 1, scope: .whileConnected) },
151+
{ $0.share(replay: 1, scope: .forever) },
152+
{ $0.share(replay: 2, scope: .whileConnected) },
153+
{ $0.share(replay: 2, scope: .forever) },
119154
] as [(Observable<Int>) -> Observable<Int>] {
120155
performSharingOperatorsTest(share: op)
121156
}

0 commit comments

Comments
 (0)