This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathobserver.js
75 lines (68 loc) · 3.08 KB
/
observer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* Supports push-style iteration over an observable sequence.
*/
var Observer = Rx.Observer = function () { };
/**
* Creates a notification callback from an observer.
* @returns The action that forwards its input notification to the underlying observer.
*/
Observer.prototype.toNotifier = function () {
var observer = this;
return function (n) { return n.accept(observer); };
};
/**
* Hides the identity of an observer.
* @returns An observer that hides the identity of the specified observer.
*/
Observer.prototype.asObserver = function () {
var self = this;
return new AnonymousObserver(
function (x) { self.onNext(x); },
function (err) { self.onError(err); },
function () { self.onCompleted(); });
};
/**
* Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods.
* If a violation is detected, an Error is thrown from the offending observer method call.
* @returns An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
*/
Observer.prototype.checked = function () { return new CheckedObserver(this); };
/**
* Creates an observer from the specified OnNext, along with optional OnError, and OnCompleted actions.
* @param {Function} [onNext] Observer's OnNext action implementation.
* @param {Function} [onError] Observer's OnError action implementation.
* @param {Function} [onCompleted] Observer's OnCompleted action implementation.
* @returns {Observer} The observer object implemented using the given actions.
*/
var observerCreate = Observer.create = function (onNext, onError, onCompleted) {
onNext || (onNext = noop);
onError || (onError = defaultError);
onCompleted || (onCompleted = noop);
return new AnonymousObserver(onNext, onError, onCompleted);
};
/**
* Creates an observer from a notification callback.
* @param {Function} handler Action that handles a notification.
* @returns The observer object that invokes the specified handler using a notification corresponding to each message it receives.
*/
Observer.fromNotifier = function (handler, thisArg) {
var cb = bindCallback(handler, thisArg, 1);
return new AnonymousObserver(function (x) {
return cb(notificationCreateOnNext(x));
}, function (e) {
return cb(notificationCreateOnError(e));
}, function () {
return cb(notificationCreateOnCompleted());
});
};
/**
* Schedules the invocation of observer methods on the given scheduler.
* @param {Scheduler} scheduler Scheduler to schedule observer messages on.
* @returns {Observer} Observer whose messages are scheduled on the given scheduler.
*/
Observer.prototype.notifyOn = function (scheduler) {
return new ObserveOnObserver(scheduler, this);
};
Observer.prototype.makeSafe = function(disposable) {
return new AnonymousSafeObserver(this._onNext, this._onError, this._onCompleted, disposable);
};