-
Notifications
You must be signed in to change notification settings - Fork 990
/
Copy pathoperation-poller.ts
109 lines (98 loc) · 3.29 KB
/
operation-poller.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import { Client } from "./apiv2";
import { FirebaseError } from "./error";
import { Queue } from "./throttler/queue";
export interface LongRunningOperation<T> {
// The identifier of the Operation.
readonly name: string;
// Set to `true` if the Operation is done.
readonly done: boolean;
// Additional metadata about the Operation.
readonly metadata: T | undefined;
}
export interface OperationPollerOptions {
pollerName?: string;
apiOrigin: string;
apiVersion: string;
operationResourceName: string;
backoff?: number;
maxBackoff?: number;
masterTimeout?: number;
onPoll?: (operation: OperationResult<any>) => any;
doneFn?: (op: any) => boolean;
}
const DEFAULT_INITIAL_BACKOFF_DELAY_MILLIS = 250;
const DEFAULT_MASTER_TIMEOUT_MILLIS = 30000;
export interface OperationResult<T> {
done?: boolean;
response?: T;
error?: {
name: string;
message: string;
code: number;
details?: any[];
};
metadata?: {
[key: string]: any;
};
}
export class OperationPoller<T> {
/**
* Returns a promise that resolves when the operation is completed with a successful response.
* Rejects the promise if the masterTimeout runs out before the operation is "done", or when it is
* "done" with an error response, or when the api request rejects with an unrecoverable error.
* @param options poller options.
*/
async poll(options: OperationPollerOptions): Promise<T> {
const queue: Queue<() => Promise<OperationResult<T>>, OperationResult<T>> = new Queue({
name: options.pollerName || "LRO Poller",
concurrency: 1,
retries: Number.MAX_SAFE_INTEGER,
maxBackoff: options.maxBackoff,
backoff: options.backoff || DEFAULT_INITIAL_BACKOFF_DELAY_MILLIS,
});
const masterTimeout = options.masterTimeout || DEFAULT_MASTER_TIMEOUT_MILLIS;
const { response, error } = await queue.run(this.getPollingTask(options), masterTimeout);
queue.close();
if (error) {
throw error instanceof FirebaseError
? error
: new FirebaseError(error.message, { status: error.code, original: error });
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return response!;
}
private getPollingTask(options: OperationPollerOptions): () => Promise<OperationResult<T>> {
const apiClient = new Client({
urlPrefix: options.apiOrigin,
apiVersion: options.apiVersion,
auth: true,
});
return async () => {
let res;
try {
res = await apiClient.get<OperationResult<T>>(options.operationResourceName);
} catch (err: any) {
// Responses with 500 or 503 status code are treated as retriable errors.
if (err.status === 500 || err.status === 503) {
throw err;
}
return { error: err };
}
if (options.onPoll) {
options.onPoll(res.body);
}
if (options.doneFn) {
const done = options.doneFn(res.body);
if (!done) {
throw new Error("Polling incomplete, should trigger retry with backoff");
}
} else if (!res.body.done) {
throw new Error("Polling incomplete, should trigger retry with backoff");
}
return res.body;
};
}
}
export function pollOperation<T>(options: OperationPollerOptions): Promise<T> {
return new OperationPoller<T>().poll(options);
}