This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathscheduledobserver.js
64 lines (54 loc) · 1.93 KB
/
scheduledobserver.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
var ScheduledObserver = Rx.internals.ScheduledObserver = (function (__super__) {
inherits(ScheduledObserver, __super__);
function ScheduledObserver(scheduler, observer) {
__super__.call(this);
this.scheduler = scheduler;
this.observer = observer;
this.isAcquired = false;
this.hasFaulted = false;
this.queue = [];
this.disposable = new SerialDisposable();
}
function enqueueNext(observer, x) { return function () { observer.onNext(x); }; }
function enqueueError(observer, e) { return function () { observer.onError(e); }; }
function enqueueCompleted(observer) { return function () { observer.onCompleted(); }; }
ScheduledObserver.prototype.next = function (x) {
this.queue.push(enqueueNext(this.observer, x));
};
ScheduledObserver.prototype.error = function (e) {
this.queue.push(enqueueError(this.observer, e));
};
ScheduledObserver.prototype.completed = function () {
this.queue.push(enqueueCompleted(this.observer));
};
function scheduleMethod(state, recurse) {
var work;
if (state.queue.length > 0) {
work = state.queue.shift();
} else {
state.isAcquired = false;
return;
}
var res = tryCatch(work)();
if (res === errorObj) {
state.queue = [];
state.hasFaulted = true;
return thrower(res.e);
}
recurse(state);
}
ScheduledObserver.prototype.ensureActive = function () {
var isOwner = false;
if (!this.hasFaulted && this.queue.length > 0) {
isOwner = !this.isAcquired;
this.isAcquired = true;
}
isOwner &&
this.disposable.setDisposable(this.scheduler.scheduleRecursive(this, scheduleMethod));
};
ScheduledObserver.prototype.dispose = function () {
__super__.prototype.dispose.call(this);
this.disposable.dispose();
};
return ScheduledObserver;
}(AbstractObserver));