Skip to content

Commit fd947cb

Browse files
committed
Changes publish, replay, replay all to clear state in case of sequence termination.
1 parent 17c2492 commit fd947cb

File tree

3 files changed

+276
-330
lines changed

3 files changed

+276
-330
lines changed

RxSwift/Observables/Multicast.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ extension ObservableType {
6060
- returns: A connectable observable sequence that shares a single subscription to the underlying sequence.
6161
*/
6262
public func publish() -> ConnectableObservable<E> {
63-
return self.multicast(PublishSubject())
63+
return self.multicast { PublishSubject() }
6464
}
6565
}
6666

@@ -78,7 +78,7 @@ extension ObservableType {
7878
*/
7979
public func replay(_ bufferSize: Int)
8080
-> ConnectableObservable<E> {
81-
return self.multicast(ReplaySubject.create(bufferSize: bufferSize))
81+
return self.multicast { ReplaySubject.create(bufferSize: bufferSize) }
8282
}
8383

8484
/**
@@ -92,7 +92,7 @@ extension ObservableType {
9292
*/
9393
public func replayAll()
9494
-> ConnectableObservable<E> {
95-
return self.multicast(ReplaySubject.createUnbounded())
95+
return self.multicast { ReplaySubject.createUnbounded() }
9696
}
9797
}
9898

RxTest/ColdObservable.swift

+6-1
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,19 @@ final class ColdObservable<Element>
2626

2727
let i = self.subscriptions.count - 1
2828

29+
var disposed = false
30+
2931
for recordedEvent in recordedEvents {
3032
_ = testScheduler.scheduleRelativeVirtual((), dueTime: recordedEvent.time, action: { _ in
31-
observer.on(recordedEvent.value)
33+
if !disposed {
34+
observer.on(recordedEvent.value)
35+
}
3236
return Disposables.create()
3337
})
3438
}
3539

3640
return Disposables.create {
41+
disposed = true
3742
let existing = self.subscriptions[i]
3843
self.subscriptions[i] = Subscription(existing.subscribe, self.testScheduler.clock)
3944
}

0 commit comments

Comments
 (0)