@@ -190,12 +190,7 @@ extension ObservableType {
190
190
*/
191
191
public func shareReplay( _ bufferSize: Int )
192
192
-> Observable < E > {
193
- if bufferSize == 1 {
194
- return ShareReplay1 ( source: self . asObservable ( ) )
195
- }
196
- else {
197
- return self . replay ( bufferSize) . refCount ( )
198
- }
193
+ return self . replay ( bufferSize) . refCount ( )
199
194
}
200
195
}
201
196
@@ -268,25 +263,30 @@ fileprivate final class ShareReplay1WhileConnectedConnection<Element>
268
263
_parent. _connection = nil
269
264
}
270
265
_observers = Observers ( )
271
- _subscription. dispose ( )
272
266
}
273
267
274
268
final func synchronizedUnsubscribe( _ disposeKey: DisposeKey ) {
275
269
_lock. lock ( )
276
- _synchronized_unsubscribe ( disposeKey)
270
+ let shouldDisconnect = _synchronized_unsubscribe ( disposeKey)
277
271
_lock. unlock ( )
272
+ if shouldDisconnect {
273
+ _subscription. dispose ( )
274
+ }
278
275
}
279
276
280
277
@inline ( __always)
281
- final private func _synchronized_unsubscribe( _ disposeKey: DisposeKey ) {
278
+ final private func _synchronized_unsubscribe( _ disposeKey: DisposeKey ) -> Bool {
282
279
// if already unsubscribed, just return
283
280
if self . _observers. removeKey ( disposeKey) == nil {
284
- return
281
+ return false
285
282
}
286
283
287
284
if _observers. count == 0 {
288
285
_synchronized_dispose ( )
286
+ return true
289
287
}
288
+
289
+ return false
290
290
}
291
291
292
292
#if TRACE_RESOURCES
@@ -319,13 +319,13 @@ final fileprivate class ShareReplay1WhileConnected<Element>
319
319
let count = connection. _observers. count
320
320
321
321
let disposable = connection. _synchronized_subscribe ( observer)
322
+
323
+ _lock. unlock ( )
322
324
323
325
if count == 0 {
324
326
connection. connect ( )
325
327
}
326
328
327
- _lock. unlock ( )
328
-
329
329
return disposable
330
330
}
331
331
@@ -411,25 +411,30 @@ fileprivate final class ShareWhileConnectedConnection<Element>
411
411
_parent. _connection = nil
412
412
}
413
413
_observers = Observers ( )
414
- _subscription. dispose ( )
415
414
}
416
415
417
416
final func synchronizedUnsubscribe( _ disposeKey: DisposeKey ) {
418
417
_lock. lock ( )
419
- _synchronized_unsubscribe ( disposeKey)
418
+ let shouldDisconnect = _synchronized_unsubscribe ( disposeKey)
420
419
_lock. unlock ( )
420
+ if shouldDisconnect {
421
+ _subscription. dispose ( )
422
+ }
421
423
}
422
424
423
425
@inline ( __always)
424
- final private func _synchronized_unsubscribe( _ disposeKey: DisposeKey ) {
426
+ final private func _synchronized_unsubscribe( _ disposeKey: DisposeKey ) -> Bool {
425
427
// if already unsubscribed, just return
426
428
if self . _observers. removeKey ( disposeKey) == nil {
427
- return
429
+ return false
428
430
}
429
431
430
432
if _observers. count == 0 {
431
433
_synchronized_dispose ( )
434
+ return true
432
435
}
436
+
437
+ return false
433
438
}
434
439
435
440
#if TRACE_RESOURCES
@@ -463,12 +468,12 @@ final fileprivate class ShareWhileConnected<Element>
463
468
464
469
let disposable = connection. _synchronized_subscribe ( observer)
465
470
471
+ _lock. unlock ( )
472
+
466
473
if count == 0 {
467
474
connection. connect ( )
468
475
}
469
476
470
- _lock. unlock ( )
471
-
472
477
return disposable
473
478
}
474
479
@@ -489,101 +494,3 @@ final fileprivate class ShareWhileConnected<Element>
489
494
return connection
490
495
}
491
496
}
492
-
493
- // optimized version of share replay for most common case
494
- final fileprivate class ShareReplay1 < Element>
495
- : Observable < Element >
496
- , ObserverType
497
- , SynchronizedUnsubscribeType {
498
-
499
- typealias Observers = AnyObserver < Element > . s
500
- typealias DisposeKey = Observers . KeyType
501
-
502
- private let _source : Observable < Element >
503
-
504
- private let _lock = RecursiveLock ( )
505
-
506
- private var _connection : SingleAssignmentDisposable ?
507
- private var _element : Element ?
508
- private var _stopped = false
509
- private var _stopEvent = nil as Event < Element > ?
510
- private var _observers = Observers ( )
511
-
512
- init ( source: Observable < Element > ) {
513
- self . _source = source
514
- }
515
-
516
- override func subscribe< O : ObserverType > ( _ observer: O ) -> Disposable where O. E == E {
517
- _lock. lock ( )
518
- let result = _synchronized_subscribe ( observer)
519
- _lock. unlock ( )
520
- return result
521
- }
522
-
523
- func _synchronized_subscribe< O : ObserverType > ( _ observer: O ) -> Disposable where O. E == E {
524
- if let element = self . _element {
525
- observer. on ( . next( element) )
526
- }
527
-
528
- if let stopEvent = self . _stopEvent {
529
- observer. on ( stopEvent)
530
- return Disposables . create ( )
531
- }
532
-
533
- let initialCount = self . _observers. count
534
-
535
- let disposeKey = self . _observers. insert ( observer. on)
536
-
537
- if initialCount == 0 {
538
- let connection = SingleAssignmentDisposable ( )
539
- _connection = connection
540
-
541
- connection. setDisposable ( self . _source. subscribe ( self ) )
542
- }
543
-
544
- return SubscriptionDisposable ( owner: self , key: disposeKey)
545
- }
546
-
547
- func synchronizedUnsubscribe( _ disposeKey: DisposeKey ) {
548
- _lock. lock ( )
549
- _synchronized_unsubscribe ( disposeKey)
550
- _lock. unlock ( )
551
- }
552
-
553
- func _synchronized_unsubscribe( _ disposeKey: DisposeKey ) {
554
- // if already unsubscribed, just return
555
- if self . _observers. removeKey ( disposeKey) == nil {
556
- return
557
- }
558
-
559
- if _observers. count == 0 {
560
- _connection? . dispose ( )
561
- _connection = nil
562
- }
563
- }
564
-
565
- func on( _ event: Event < E > ) {
566
- dispatch ( _synchronized_on ( event) , event)
567
- }
568
-
569
- func _synchronized_on( _ event: Event < E > ) -> Observers {
570
- _lock. lock ( ) ; defer { _lock. unlock ( ) }
571
- if _stopped {
572
- return Observers ( )
573
- }
574
-
575
- switch event {
576
- case . next( let element) :
577
- _element = element
578
- return _observers
579
- case . error, . completed:
580
- _stopEvent = event
581
- _stopped = true
582
- let observers = _observers
583
- _observers = Observers ( )
584
- _connection? . dispose ( )
585
- _connection = nil
586
- return observers
587
- }
588
- }
589
- }
0 commit comments