-
Notifications
You must be signed in to change notification settings - Fork 26.2k
/
Copy pathpending_tasks.ts
160 lines (145 loc) · 5.11 KB
/
pending_tasks.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.dev/license
*/
import {BehaviorSubject, Observable} from 'rxjs';
import {inject} from './di/injector_compatibility';
import {ɵɵdefineInjectable} from './di/interface/defs';
import {OnDestroy} from './interface/lifecycle_hooks';
import {
ChangeDetectionScheduler,
NotificationSource,
} from './change_detection/scheduling/zoneless_scheduling';
import {INTERNAL_APPLICATION_ERROR_HANDLER} from './error_handler';
/**
* Internal implementation of the pending tasks service.
*/
export class PendingTasksInternal implements OnDestroy {
private taskId = 0;
private pendingTasks = new Set<number>();
private destroyed = false;
private pendingTask = new BehaviorSubject<boolean>(false);
get hasPendingTasks(): boolean {
// Accessing the value of a closed `BehaviorSubject` throws an error.
return this.destroyed ? false : this.pendingTask.value;
}
/**
* In case the service is about to be destroyed, return a self-completing observable.
* Otherwise, return the observable that emits the current state of pending tasks.
*/
get hasPendingTasksObservable(): Observable<boolean> {
if (this.destroyed) {
// Manually creating the observable pulls less symbols from RxJS than `of(false)`.
return new Observable<boolean>((subscriber) => {
subscriber.next(false);
subscriber.complete();
});
}
return this.pendingTask;
}
add(): number {
// Emitting a value to a closed subject throws an error.
if (!this.hasPendingTasks && !this.destroyed) {
this.pendingTask.next(true);
}
const taskId = this.taskId++;
this.pendingTasks.add(taskId);
return taskId;
}
has(taskId: number): boolean {
return this.pendingTasks.has(taskId);
}
remove(taskId: number): void {
this.pendingTasks.delete(taskId);
if (this.pendingTasks.size === 0 && this.hasPendingTasks) {
this.pendingTask.next(false);
}
}
ngOnDestroy(): void {
this.pendingTasks.clear();
if (this.hasPendingTasks) {
this.pendingTask.next(false);
}
// We call `unsubscribe()` to release observers, as users may forget to
// unsubscribe manually when subscribing to `isStable`. We do not call
// `complete()` because it is unsafe; if someone subscribes using the `first`
// operator and the observable completes before emitting a value,
// RxJS will throw an error.
this.destroyed = true;
this.pendingTask.unsubscribe();
}
/** @nocollapse */
static ɵprov = /** @pureOrBreakMyCode */ /* @__PURE__ */ ɵɵdefineInjectable({
token: PendingTasksInternal,
providedIn: 'root',
factory: () => new PendingTasksInternal(),
});
}
/**
* Service that keeps track of pending tasks contributing to the stableness of Angular
* application. While several existing Angular services (ex.: `HttpClient`) will internally manage
* tasks influencing stability, this API gives control over stability to library and application
* developers for specific cases not covered by Angular internals.
*
* The concept of stability comes into play in several important scenarios:
* - SSR process needs to wait for the application stability before serializing and sending rendered
* HTML;
* - tests might want to delay assertions until the application becomes stable;
*
* @usageNotes
* ```ts
* const pendingTasks = inject(PendingTasks);
* const taskCleanup = pendingTasks.add();
* // do work that should block application's stability and then:
* taskCleanup();
* ```
*
* @publicApi
*/
export class PendingTasks {
private readonly internalPendingTasks = inject(PendingTasksInternal);
private readonly scheduler = inject(ChangeDetectionScheduler);
private readonly errorHandler = inject(INTERNAL_APPLICATION_ERROR_HANDLER);
/**
* Adds a new task that should block application's stability.
* @returns A cleanup function that removes a task when called.
*/
add(): () => void {
const taskId = this.internalPendingTasks.add();
return () => {
if (!this.internalPendingTasks.has(taskId)) {
// This pending task has already been cleared.
return;
}
// Notifying the scheduler will hold application stability open until the next tick.
this.scheduler.notify(NotificationSource.PendingTaskRemoved);
this.internalPendingTasks.remove(taskId);
};
}
/**
* Runs an asynchronous function and blocks the application's stability until the function completes.
*
* ```ts
* pendingTasks.run(async () => {
* const userData = await fetch('/api/user');
* this.userData.set(userData);
* });
* ```
*
* @param fn The asynchronous function to execute
* @developerPreview
*/
run(fn: () => Promise<unknown>): void {
const removeTask = this.add();
fn().catch(this.errorHandler).finally(removeTask);
}
/** @nocollapse */
static ɵprov = /** @pureOrBreakMyCode */ /* @__PURE__ */ ɵɵdefineInjectable({
token: PendingTasks,
providedIn: 'root',
factory: () => new PendingTasks(),
});
}