Skip to content

[PECO-1259] Implement retry behavior for CloudFetch #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import thrift from 'thrift';
import https from 'https';
import http from 'http';
import { HeadersInit } from 'node-fetch';
import { HeadersInit, Response } from 'node-fetch';
import { ProxyAgent } from 'proxy-agent';

import IConnectionProvider from '../contracts/IConnectionProvider';
import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions';
import IClientContext from '../../contracts/IClientContext';

import ThriftHttpConnection from './ThriftHttpConnection';
import IRetryPolicy from '../contracts/IRetryPolicy';
import HttpRetryPolicy from './HttpRetryPolicy';

export default class HttpConnection implements IConnectionProvider {
private readonly options: IConnectionOptions;
Expand Down Expand Up @@ -102,6 +104,7 @@ export default class HttpConnection implements IConnectionProvider {
url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`,
transport: thrift.TBufferedTransport,
protocol: thrift.TBinaryProtocol,
getRetryPolicy: () => this.getRetryPolicy(),
},
{
agent,
Expand All @@ -116,4 +119,8 @@ export default class HttpConnection implements IConnectionProvider {

return this.connection;
}

public async getRetryPolicy(): Promise<IRetryPolicy<Response>> {
return new HttpRetryPolicy(this.context);
}
}
79 changes: 79 additions & 0 deletions lib/connection/connections/HttpRetryPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Response } from 'node-fetch';
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';
import RetryError, { RetryErrorCode } from '../../errors/RetryError';

function getRetryDelay(attempt: number, config: ClientConfig): number {
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
}

function delay(milliseconds: number): Promise<void> {
return new Promise<void>((resolve) => {
setTimeout(() => resolve(), milliseconds);
});
}

export default class HttpRetryPolicy implements IRetryPolicy<Response> {
private context: IClientContext;

private readonly startTime: number; // in milliseconds

private attempt: number;

constructor(context: IClientContext) {
this.context = context;
this.startTime = Date.now();
this.attempt = 0;
}

public async shouldRetry(response: Response): Promise<ShouldRetryResult> {
if (!response.ok) {
switch (response.status) {
// On these status codes it's safe to retry the request. However,
// both error codes mean that server is overwhelmed or even down.
// Therefore, we need to add some delay between attempts so
// server can recover and more likely handle next request
case 429: // Too Many Requests
case 503: // Service Unavailable
this.attempt += 1;

const clientConfig = this.context.getConfig();

// Delay interval depends on current attempt - the more attempts we do
// the longer the interval will be
// TODO: Respect `Retry-After` header (PECO-729)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is important. When the server responds with 429 or 503 it includes a Retry-After header that we should monitor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why we have PECO-729. This PR (and this particular code) is almost untouched, just moved to a different place so it could be used for cloudfetch. After we merge this, I'm going to make it match PySQL's behavior, as you mentioned in your other comment below

const retryDelay = getRetryDelay(this.attempt, clientConfig);

const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think you can bump this logic up a few lines. If we already exceeded the maximum number of attempts there's no reason to get the retry delay.

if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, response);
}

const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, response);
}

return { shouldRetry: true, retryAfter: retryDelay };

// TODO: Here we should handle other error types (see PECO-730)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// no default
}
}

return { shouldRetry: false };
}

public async invokeWithRetry(operation: RetryableOperation<Response>): Promise<Response> {
for (;;) {
const response = await operation(); // eslint-disable-line no-await-in-loop
const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop
if (!status.shouldRetry) {
return response;
}
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
}
}
}
11 changes: 10 additions & 1 deletion lib/connection/connections/ThriftHttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstr
import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch';
// @ts-expect-error TS7016: Could not find a declaration file for module
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';
import IRetryPolicy from '../contracts/IRetryPolicy';

export class THTTPException extends Thrift.TApplicationException {
public readonly statusCode: unknown;
Expand All @@ -31,6 +32,7 @@ interface ThriftHttpConnectionOptions {
url: string;
transport?: TTransportType;
protocol?: TProtocolConstructor;
getRetryPolicy(): Promise<IRetryPolicy<Response>>;
}

// This type describes a shape of internals of Thrift client object.
Expand All @@ -56,6 +58,8 @@ export default class ThriftHttpConnection extends EventEmitter {
// This field is used by Thrift internally, so name and type are important
private readonly protocol: TProtocolConstructor;

private readonly getRetryPolicy: () => Promise<IRetryPolicy<Response>>;

// thrift.createClient sets this field internally
public client?: ThriftClient;

Expand All @@ -65,6 +69,7 @@ export default class ThriftHttpConnection extends EventEmitter {
this.config = config;
this.transport = options.transport ?? TBufferedTransport;
this.protocol = options.protocol ?? TBinaryProtocol;
this.getRetryPolicy = options.getRetryPolicy;
}

public setHeaders(headers: HeadersInit) {
Expand All @@ -87,7 +92,11 @@ export default class ThriftHttpConnection extends EventEmitter {
body: data,
};

fetch(this.url, requestConfig)
this.getRetryPolicy()
.then((retryPolicy) => {
const makeRequest = () => fetch(this.url, requestConfig);
return retryPolicy.invokeWithRetry(makeRequest);
})
.then((response) => {
if (response.status !== 200) {
throw new THTTPException(response);
Expand Down
5 changes: 4 additions & 1 deletion lib/connection/contracts/IConnectionProvider.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import http from 'http';
import { HeadersInit } from 'node-fetch';
import { HeadersInit, Response } from 'node-fetch';
import IRetryPolicy from './IRetryPolicy';

export default interface IConnectionProvider {
getThriftConnection(): Promise<any>;

getAgent(): Promise<http.Agent>;

setHeaders(headers: HeadersInit): void;

getRetryPolicy(): Promise<IRetryPolicy<Response>>;
}
16 changes: 16 additions & 0 deletions lib/connection/contracts/IRetryPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export type ShouldRetryResult =
| {
shouldRetry: false;
}
| {
shouldRetry: true;
retryAfter: number; // in milliseconds
};

export type RetryableOperation<R> = () => Promise<R>;

export default interface IRetryPolicy<R> {
shouldRetry(response: R): Promise<ShouldRetryResult>;

invokeWithRetry(operation: RetryableOperation<R>): Promise<R>;
}
21 changes: 21 additions & 0 deletions lib/errors/RetryError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export enum RetryErrorCode {
AttemptsExceeded = 'ATTEMPTS_EXCEEDED',
TimeoutExceeded = 'TIMEOUT_EXCEEDED',
}

const errorMessages: Record<RetryErrorCode, string> = {
[RetryErrorCode.AttemptsExceeded]: 'Max retry count exceeded',
[RetryErrorCode.TimeoutExceeded]: 'Retry timeout exceeded',
};

export default class RetryError extends Error {
public readonly errorCode: RetryErrorCode;

public readonly payload: unknown;

constructor(errorCode: RetryErrorCode, payload?: unknown) {
super(errorMessages[errorCode]);
this.errorCode = errorCode;
this.payload = payload;
}
}
83 changes: 17 additions & 66 deletions lib/hive/Commands/BaseCommand.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,8 @@
import { Thrift } from 'thrift';
import { Response } from 'node-fetch';
import TCLIService from '../../../thrift/TCLIService';
import HiveDriverError from '../../errors/HiveDriverError';
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';

interface CommandExecutionInfo {
startTime: number; // in milliseconds
attempt: number;
}

function getRetryDelay(attempt: number, config: ClientConfig): number {
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
}

function delay(milliseconds: number): Promise<void> {
return new Promise<void>((resolve) => {
setTimeout(() => resolve(), milliseconds);
});
}
import RetryError, { RetryErrorCode } from '../../errors/RetryError';
import IClientContext from '../../contracts/IClientContext';

export default abstract class BaseCommand {
protected client: TCLIService.Client;
Expand All @@ -29,57 +14,23 @@ export default abstract class BaseCommand {
this.context = context;
}

protected executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
return this.invokeWithErrorHandling<Response>(request, command, { startTime: Date.now(), attempt: 0 });
}

private async invokeWithErrorHandling<Response>(
request: object,
command: Function | void,
info: CommandExecutionInfo,
): Promise<Response> {
protected async executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
try {
return await this.invokeCommand<Response>(request, command);
} catch (error) {
if (error instanceof Thrift.TApplicationException) {
if ('statusCode' in error) {
switch (error.statusCode) {
// On these status codes it's safe to retry the request. However,
// both error codes mean that server is overwhelmed or even down.
// Therefore, we need to add some delay between attempts so
// server can recover and more likely handle next request
case 429: // Too Many Requests
case 503: // Service Unavailable
info.attempt += 1;

const clientConfig = this.context.getConfig();

// Delay interval depends on current attempt - the more attempts we do
// the longer the interval will be
// TODO: Respect `Retry-After` header (PECO-729)
const retryDelay = getRetryDelay(info.attempt, clientConfig);

const attemptsExceeded = info.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new HiveDriverError(
`Hive driver: ${error.statusCode} when connecting to resource. Max retry count exceeded.`,
);
}

const timeoutExceeded = Date.now() - info.startTime + retryDelay >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new HiveDriverError(
`Hive driver: ${error.statusCode} when connecting to resource. Retry timeout exceeded.`,
);
}

await delay(retryDelay);
return this.invokeWithErrorHandling(request, command, info);

// TODO: Here we should handle other error types (see PECO-730)

// no default
}
if (error instanceof RetryError) {
const statusCode = error.payload instanceof Response ? error.payload.status : undefined;

switch (error.errorCode) {
case RetryErrorCode.AttemptsExceeded:
throw new HiveDriverError(
`Hive driver: ${statusCode ?? 'Error'} when connecting to resource. Max retry count exceeded.`,
);
case RetryErrorCode.TimeoutExceeded:
throw new HiveDriverError(
`Hive driver: ${statusCode ?? 'Error'} when connecting to resource. Retry timeout exceeded.`,
);
// no default
}
}

Expand Down
7 changes: 3 additions & 4 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
private async fetch(url: RequestInfo, init?: RequestInit) {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();
const retryPolicy = await connectionProvider.getRetryPolicy();

return fetch(url, {
agent,
...init,
});
const requestConfig: RequestInit = { agent, ...init };
return retryPolicy.invokeWithRetry(() => fetch(url, requestConfig));
}
}
Loading