Skip to content

Commit e81606c

Browse files
martinsikalxhub
authored andcommitted
fix(core): fix proper propagation of subscriptions in EventEmitter (#22016)
Closes #21999 PR Close #22016
1 parent f791e9f commit e81606c

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

packages/core/src/event_emitter.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
import {Subject} from 'rxjs/Subject';
10+
import {Subscription} from 'rxjs/Subscription';
1011

1112
/**
1213
* Use by directives and components to emit custom Events.
@@ -111,6 +112,12 @@ export class EventEmitter<T> extends Subject<T> {
111112
}
112113
}
113114

114-
return super.subscribe(schedulerFn, errorFn, completeFn);
115+
const sink = super.subscribe(schedulerFn, errorFn, completeFn);
116+
117+
if (generatorOrNext instanceof Subscription) {
118+
generatorOrNext.add(sink);
119+
}
120+
121+
return sink;
115122
}
116123
}

packages/core/test/event_emitter_spec.ts

+55
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*/
88

99
import {AsyncTestCompleter, beforeEach, describe, expect, inject, it} from '@angular/core/testing/src/testing_internal';
10+
import {filter} from 'rxjs/operators';
11+
1012
import {EventEmitter} from '../src/event_emitter';
1113

1214
{
@@ -125,6 +127,59 @@ import {EventEmitter} from '../src/event_emitter';
125127
expect(e.observers.length > 0).toBe(true);
126128
});
127129

130+
it('remove a subscriber subscribed directly to EventEmitter', () => {
131+
const sub = emitter.subscribe();
132+
expect(emitter.observers.length).toBe(1);
133+
sub.unsubscribe();
134+
expect(emitter.observers.length).toBe(0);
135+
});
136+
137+
it('remove a subscriber subscribed after applying operators with pipe()', () => {
138+
const sub = emitter.pipe(filter(() => true)).subscribe();
139+
expect(emitter.observers.length).toBe(1);
140+
sub.unsubscribe();
141+
expect(emitter.observers.length).toBe(0);
142+
});
143+
144+
it('unsubscribing a subscriber invokes the dispose method', () => {
145+
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
146+
const sub = emitter.subscribe();
147+
sub.add(() => async.done());
148+
sub.unsubscribe();
149+
});
150+
});
151+
152+
it('unsubscribing a subscriber after applying operators with pipe() invokes the dispose method',
153+
() => {
154+
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
155+
const sub = emitter.pipe(filter(() => true)).subscribe();
156+
sub.add(() => async.done());
157+
sub.unsubscribe();
158+
});
159+
});
160+
161+
it('error thrown inside an Rx chain propagates to the error handler and disposes the chain',
162+
() => {
163+
let errorPropagated = false;
164+
emitter.pipe(filter(() => { throw new Error(); }), )
165+
.subscribe(() => {}, err => errorPropagated = true, );
166+
167+
emitter.next(1);
168+
169+
expect(errorPropagated).toBe(true);
170+
expect(emitter.observers.length).toBe(0);
171+
});
172+
173+
it('error sent by EventEmitter should dispose the Rx chain and remove subscribers', () => {
174+
let errorPropagated = false;
175+
emitter.pipe(filter(() => true)).subscribe(() => {}, err => errorPropagated = true, );
176+
177+
emitter.error(1);
178+
179+
expect(errorPropagated).toBe(true);
180+
expect(emitter.observers.length).toBe(0);
181+
});
182+
128183
// TODO: vsavkin: add tests cases
129184
// should call dispose on the subscription if generator returns {done:true}
130185
// should call dispose on the subscription on throw

0 commit comments

Comments
 (0)