This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathwindowed.js
78 lines (67 loc) · 2.46 KB
/
windowed.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
var WindowedObservable = (function (__super__) {
inherits(WindowedObservable, __super__);
function WindowedObservable(source, windowSize) {
__super__.call(this);
this.source = source;
this.windowSize = windowSize;
}
function scheduleMethod(s, self) {
return self.source.request(self.windowSize);
}
WindowedObservable.prototype._subscribe = function (o) {
this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription));
return new BinaryDisposable(
this.subscription,
defaultScheduler.schedule(this, scheduleMethod)
);
};
var WindowedObserver = (function (__sub__) {
inherits(WindowedObserver, __sub__);
function WindowedObserver(observer, observable, cancel) {
this.observer = observer;
this.observable = observable;
this.cancel = cancel;
this.received = 0;
this.scheduleDisposable = null;
__sub__.call(this);
}
WindowedObserver.prototype.completed = function () {
this.observer.onCompleted();
this.dispose();
};
WindowedObserver.prototype.error = function (error) {
this.observer.onError(error);
this.dispose();
};
function innerScheduleMethod(s, self) {
return self.observable.source.request(self.observable.windowSize);
}
WindowedObserver.prototype.next = function (value) {
this.observer.onNext(value);
this.received = ++this.received % this.observable.windowSize;
this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod));
};
WindowedObserver.prototype.dispose = function () {
this.observer = null;
if (this.cancel) {
this.cancel.dispose();
this.cancel = null;
}
if (this.scheduleDisposable) {
this.scheduleDisposable.dispose();
this.scheduleDisposable = null;
}
__sub__.prototype.dispose.call(this);
};
return WindowedObserver;
}(AbstractObserver));
return WindowedObservable;
}(Observable));
/**
* Creates a sliding windowed observable based upon the window size.
* @param {Number} windowSize The number of items in the window
* @returns {Observable} A windowed observable based upon the window size.
*/
ControlledObservable.prototype.windowed = function (windowSize) {
return new WindowedObservable(this, windowSize);
};