-
Notifications
You must be signed in to change notification settings - Fork 36
[PECO-1259] Implement retry behavior for CloudFetch #211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ceb0f0a
95cafb4
78d3bb0
8fc0a19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import { Response } from 'node-fetch'; | ||
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy'; | ||
import IClientContext, { ClientConfig } from '../../contracts/IClientContext'; | ||
import RetryError, { RetryErrorCode } from '../../errors/RetryError'; | ||
|
||
function getRetryDelay(attempt: number, config: ClientConfig): number { | ||
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1 | ||
return Math.min(config.retryDelayMin * scale, config.retryDelayMax); | ||
} | ||
|
||
function delay(milliseconds: number): Promise<void> { | ||
return new Promise<void>((resolve) => { | ||
setTimeout(() => resolve(), milliseconds); | ||
}); | ||
} | ||
|
||
export default class HttpRetryPolicy implements IRetryPolicy<Response> { | ||
private context: IClientContext; | ||
|
||
private readonly startTime: number; // in milliseconds | ||
|
||
private attempt: number; | ||
|
||
constructor(context: IClientContext) { | ||
this.context = context; | ||
this.startTime = Date.now(); | ||
this.attempt = 0; | ||
} | ||
|
||
public async shouldRetry(response: Response): Promise<ShouldRetryResult> { | ||
if (!response.ok) { | ||
switch (response.status) { | ||
// On these status codes it's safe to retry the request. However, | ||
// both error codes mean that server is overwhelmed or even down. | ||
// Therefore, we need to add some delay between attempts so | ||
// server can recover and more likely handle next request | ||
case 429: // Too Many Requests | ||
case 503: // Service Unavailable | ||
this.attempt += 1; | ||
|
||
const clientConfig = this.context.getConfig(); | ||
|
||
// Delay interval depends on current attempt - the more attempts we do | ||
// the longer the interval will be | ||
// TODO: Respect `Retry-After` header (PECO-729) | ||
const retryDelay = getRetryDelay(this.attempt, clientConfig); | ||
|
||
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I think you can bump this logic up a few lines. If we already exceeded the maximum number of attempts there's no reason to get the retry delay. |
||
if (attemptsExceeded) { | ||
throw new RetryError(RetryErrorCode.AttemptsExceeded, response); | ||
} | ||
|
||
const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout; | ||
if (timeoutExceeded) { | ||
throw new RetryError(RetryErrorCode.TimeoutExceeded, response); | ||
} | ||
|
||
return { shouldRetry: true, retryAfter: retryDelay }; | ||
|
||
// TODO: Here we should handle other error types (see PECO-730) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For future reference, here's the retry policy used in the python connector: https://github.com/databricks/databricks-sql-python/blob/3f6834c9797503132cb0d1b9b770acc36cd22d42/src/databricks/sql/auth/retry.py#L308. |
||
|
||
// no default | ||
} | ||
} | ||
|
||
return { shouldRetry: false }; | ||
} | ||
|
||
public async invokeWithRetry(operation: RetryableOperation<Response>): Promise<Response> { | ||
for (;;) { | ||
const response = await operation(); // eslint-disable-line no-await-in-loop | ||
const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop | ||
if (!status.shouldRetry) { | ||
return response; | ||
} | ||
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,13 @@ | ||
import http from 'http'; | ||
import { HeadersInit } from 'node-fetch'; | ||
import { HeadersInit, Response } from 'node-fetch'; | ||
import IRetryPolicy from './IRetryPolicy'; | ||
|
||
export default interface IConnectionProvider { | ||
getThriftConnection(): Promise<any>; | ||
|
||
getAgent(): Promise<http.Agent>; | ||
|
||
setHeaders(headers: HeadersInit): void; | ||
|
||
getRetryPolicy(): Promise<IRetryPolicy<Response>>; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
export type ShouldRetryResult = | ||
| { | ||
shouldRetry: false; | ||
} | ||
| { | ||
shouldRetry: true; | ||
retryAfter: number; // in milliseconds | ||
}; | ||
|
||
export type RetryableOperation<R> = () => Promise<R>; | ||
|
||
export default interface IRetryPolicy<R> { | ||
shouldRetry(response: R): Promise<ShouldRetryResult>; | ||
|
||
invokeWithRetry(operation: RetryableOperation<R>): Promise<R>; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
export enum RetryErrorCode { | ||
AttemptsExceeded = 'ATTEMPTS_EXCEEDED', | ||
TimeoutExceeded = 'TIMEOUT_EXCEEDED', | ||
} | ||
|
||
const errorMessages: Record<RetryErrorCode, string> = { | ||
[RetryErrorCode.AttemptsExceeded]: 'Max retry count exceeded', | ||
[RetryErrorCode.TimeoutExceeded]: 'Retry timeout exceeded', | ||
}; | ||
|
||
export default class RetryError extends Error { | ||
public readonly errorCode: RetryErrorCode; | ||
|
||
public readonly payload: unknown; | ||
|
||
constructor(errorCode: RetryErrorCode, payload?: unknown) { | ||
super(errorMessages[errorCode]); | ||
this.errorCode = errorCode; | ||
this.payload = payload; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this is important. When the server responds with 429 or 503 it includes a Retry-After header that we should monitor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's why we have PECO-729. This PR (and this particular code) is almost untouched, just moved to a different place so it could be used for cloudfetch. After we merge this, I'm going to make it match PySQL's behavior, as you mentioned in your other comment below