diff --git a/lib/connection/connections/HttpConnection.ts b/lib/connection/connections/HttpConnection.ts index fc33f509..9a413ade 100644 --- a/lib/connection/connections/HttpConnection.ts +++ b/lib/connection/connections/HttpConnection.ts @@ -1,7 +1,7 @@ import thrift from 'thrift'; import https from 'https'; import http from 'http'; -import { HeadersInit } from 'node-fetch'; +import { HeadersInit, Response } from 'node-fetch'; import { ProxyAgent } from 'proxy-agent'; import IConnectionProvider from '../contracts/IConnectionProvider'; @@ -9,6 +9,8 @@ import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOption import IClientContext from '../../contracts/IClientContext'; import ThriftHttpConnection from './ThriftHttpConnection'; +import IRetryPolicy from '../contracts/IRetryPolicy'; +import HttpRetryPolicy from './HttpRetryPolicy'; export default class HttpConnection implements IConnectionProvider { private readonly options: IConnectionOptions; @@ -102,6 +104,7 @@ export default class HttpConnection implements IConnectionProvider { url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`, transport: thrift.TBufferedTransport, protocol: thrift.TBinaryProtocol, + getRetryPolicy: () => this.getRetryPolicy(), }, { agent, @@ -116,4 +119,8 @@ export default class HttpConnection implements IConnectionProvider { return this.connection; } + + public async getRetryPolicy(): Promise> { + return new HttpRetryPolicy(this.context); + } } diff --git a/lib/connection/connections/HttpRetryPolicy.ts b/lib/connection/connections/HttpRetryPolicy.ts new file mode 100644 index 00000000..6d649555 --- /dev/null +++ b/lib/connection/connections/HttpRetryPolicy.ts @@ -0,0 +1,79 @@ +import { Response } from 'node-fetch'; +import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy'; +import IClientContext, { ClientConfig } from '../../contracts/IClientContext'; +import RetryError, { RetryErrorCode } from '../../errors/RetryError'; + +function getRetryDelay(attempt: number, config: ClientConfig): number { + const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1 + return Math.min(config.retryDelayMin * scale, config.retryDelayMax); +} + +function delay(milliseconds: number): Promise { + return new Promise((resolve) => { + setTimeout(() => resolve(), milliseconds); + }); +} + +export default class HttpRetryPolicy implements IRetryPolicy { + private context: IClientContext; + + private readonly startTime: number; // in milliseconds + + private attempt: number; + + constructor(context: IClientContext) { + this.context = context; + this.startTime = Date.now(); + this.attempt = 0; + } + + public async shouldRetry(response: Response): Promise { + if (!response.ok) { + switch (response.status) { + // On these status codes it's safe to retry the request. However, + // both error codes mean that server is overwhelmed or even down. + // Therefore, we need to add some delay between attempts so + // server can recover and more likely handle next request + case 429: // Too Many Requests + case 503: // Service Unavailable + this.attempt += 1; + + const clientConfig = this.context.getConfig(); + + // Delay interval depends on current attempt - the more attempts we do + // the longer the interval will be + // TODO: Respect `Retry-After` header (PECO-729) + const retryDelay = getRetryDelay(this.attempt, clientConfig); + + const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts; + if (attemptsExceeded) { + throw new RetryError(RetryErrorCode.AttemptsExceeded, response); + } + + const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout; + if (timeoutExceeded) { + throw new RetryError(RetryErrorCode.TimeoutExceeded, response); + } + + return { shouldRetry: true, retryAfter: retryDelay }; + + // TODO: Here we should handle other error types (see PECO-730) + + // no default + } + } + + return { shouldRetry: false }; + } + + public async invokeWithRetry(operation: RetryableOperation): Promise { + for (;;) { + const response = await operation(); // eslint-disable-line no-await-in-loop + const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop + if (!status.shouldRetry) { + return response; + } + await delay(status.retryAfter); // eslint-disable-line no-await-in-loop + } + } +} diff --git a/lib/connection/connections/ThriftHttpConnection.ts b/lib/connection/connections/ThriftHttpConnection.ts index 1864b957..27612ce0 100644 --- a/lib/connection/connections/ThriftHttpConnection.ts +++ b/lib/connection/connections/ThriftHttpConnection.ts @@ -9,6 +9,7 @@ import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstr import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch'; // @ts-expect-error TS7016: Could not find a declaration file for module import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error'; +import IRetryPolicy from '../contracts/IRetryPolicy'; export class THTTPException extends Thrift.TApplicationException { public readonly statusCode: unknown; @@ -31,6 +32,7 @@ interface ThriftHttpConnectionOptions { url: string; transport?: TTransportType; protocol?: TProtocolConstructor; + getRetryPolicy(): Promise>; } // This type describes a shape of internals of Thrift client object. @@ -56,6 +58,8 @@ export default class ThriftHttpConnection extends EventEmitter { // This field is used by Thrift internally, so name and type are important private readonly protocol: TProtocolConstructor; + private readonly getRetryPolicy: () => Promise>; + // thrift.createClient sets this field internally public client?: ThriftClient; @@ -65,6 +69,7 @@ export default class ThriftHttpConnection extends EventEmitter { this.config = config; this.transport = options.transport ?? TBufferedTransport; this.protocol = options.protocol ?? TBinaryProtocol; + this.getRetryPolicy = options.getRetryPolicy; } public setHeaders(headers: HeadersInit) { @@ -87,7 +92,11 @@ export default class ThriftHttpConnection extends EventEmitter { body: data, }; - fetch(this.url, requestConfig) + this.getRetryPolicy() + .then((retryPolicy) => { + const makeRequest = () => fetch(this.url, requestConfig); + return retryPolicy.invokeWithRetry(makeRequest); + }) .then((response) => { if (response.status !== 200) { throw new THTTPException(response); diff --git a/lib/connection/contracts/IConnectionProvider.ts b/lib/connection/contracts/IConnectionProvider.ts index d0eb7b4e..ffd8e5be 100644 --- a/lib/connection/contracts/IConnectionProvider.ts +++ b/lib/connection/contracts/IConnectionProvider.ts @@ -1,5 +1,6 @@ import http from 'http'; -import { HeadersInit } from 'node-fetch'; +import { HeadersInit, Response } from 'node-fetch'; +import IRetryPolicy from './IRetryPolicy'; export default interface IConnectionProvider { getThriftConnection(): Promise; @@ -7,4 +8,6 @@ export default interface IConnectionProvider { getAgent(): Promise; setHeaders(headers: HeadersInit): void; + + getRetryPolicy(): Promise>; } diff --git a/lib/connection/contracts/IRetryPolicy.ts b/lib/connection/contracts/IRetryPolicy.ts new file mode 100644 index 00000000..d389795f --- /dev/null +++ b/lib/connection/contracts/IRetryPolicy.ts @@ -0,0 +1,16 @@ +export type ShouldRetryResult = + | { + shouldRetry: false; + } + | { + shouldRetry: true; + retryAfter: number; // in milliseconds + }; + +export type RetryableOperation = () => Promise; + +export default interface IRetryPolicy { + shouldRetry(response: R): Promise; + + invokeWithRetry(operation: RetryableOperation): Promise; +} diff --git a/lib/errors/RetryError.ts b/lib/errors/RetryError.ts new file mode 100644 index 00000000..e2fa7143 --- /dev/null +++ b/lib/errors/RetryError.ts @@ -0,0 +1,21 @@ +export enum RetryErrorCode { + AttemptsExceeded = 'ATTEMPTS_EXCEEDED', + TimeoutExceeded = 'TIMEOUT_EXCEEDED', +} + +const errorMessages: Record = { + [RetryErrorCode.AttemptsExceeded]: 'Max retry count exceeded', + [RetryErrorCode.TimeoutExceeded]: 'Retry timeout exceeded', +}; + +export default class RetryError extends Error { + public readonly errorCode: RetryErrorCode; + + public readonly payload: unknown; + + constructor(errorCode: RetryErrorCode, payload?: unknown) { + super(errorMessages[errorCode]); + this.errorCode = errorCode; + this.payload = payload; + } +} diff --git a/lib/hive/Commands/BaseCommand.ts b/lib/hive/Commands/BaseCommand.ts index f059d9e1..c211bc38 100644 --- a/lib/hive/Commands/BaseCommand.ts +++ b/lib/hive/Commands/BaseCommand.ts @@ -1,23 +1,8 @@ -import { Thrift } from 'thrift'; +import { Response } from 'node-fetch'; import TCLIService from '../../../thrift/TCLIService'; import HiveDriverError from '../../errors/HiveDriverError'; -import IClientContext, { ClientConfig } from '../../contracts/IClientContext'; - -interface CommandExecutionInfo { - startTime: number; // in milliseconds - attempt: number; -} - -function getRetryDelay(attempt: number, config: ClientConfig): number { - const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1 - return Math.min(config.retryDelayMin * scale, config.retryDelayMax); -} - -function delay(milliseconds: number): Promise { - return new Promise((resolve) => { - setTimeout(() => resolve(), milliseconds); - }); -} +import RetryError, { RetryErrorCode } from '../../errors/RetryError'; +import IClientContext from '../../contracts/IClientContext'; export default abstract class BaseCommand { protected client: TCLIService.Client; @@ -29,57 +14,23 @@ export default abstract class BaseCommand { this.context = context; } - protected executeCommand(request: object, command: Function | void): Promise { - return this.invokeWithErrorHandling(request, command, { startTime: Date.now(), attempt: 0 }); - } - - private async invokeWithErrorHandling( - request: object, - command: Function | void, - info: CommandExecutionInfo, - ): Promise { + protected async executeCommand(request: object, command: Function | void): Promise { try { return await this.invokeCommand(request, command); } catch (error) { - if (error instanceof Thrift.TApplicationException) { - if ('statusCode' in error) { - switch (error.statusCode) { - // On these status codes it's safe to retry the request. However, - // both error codes mean that server is overwhelmed or even down. - // Therefore, we need to add some delay between attempts so - // server can recover and more likely handle next request - case 429: // Too Many Requests - case 503: // Service Unavailable - info.attempt += 1; - - const clientConfig = this.context.getConfig(); - - // Delay interval depends on current attempt - the more attempts we do - // the longer the interval will be - // TODO: Respect `Retry-After` header (PECO-729) - const retryDelay = getRetryDelay(info.attempt, clientConfig); - - const attemptsExceeded = info.attempt >= clientConfig.retryMaxAttempts; - if (attemptsExceeded) { - throw new HiveDriverError( - `Hive driver: ${error.statusCode} when connecting to resource. Max retry count exceeded.`, - ); - } - - const timeoutExceeded = Date.now() - info.startTime + retryDelay >= clientConfig.retriesTimeout; - if (timeoutExceeded) { - throw new HiveDriverError( - `Hive driver: ${error.statusCode} when connecting to resource. Retry timeout exceeded.`, - ); - } - - await delay(retryDelay); - return this.invokeWithErrorHandling(request, command, info); - - // TODO: Here we should handle other error types (see PECO-730) - - // no default - } + if (error instanceof RetryError) { + const statusCode = error.payload instanceof Response ? error.payload.status : undefined; + + switch (error.errorCode) { + case RetryErrorCode.AttemptsExceeded: + throw new HiveDriverError( + `Hive driver: ${statusCode ?? 'Error'} when connecting to resource. Max retry count exceeded.`, + ); + case RetryErrorCode.TimeoutExceeded: + throw new HiveDriverError( + `Hive driver: ${statusCode ?? 'Error'} when connecting to resource. Retry timeout exceeded.`, + ); + // no default } } diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index f1743628..f7b3f4cd 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -62,10 +62,9 @@ export default class CloudFetchResultHandler implements IResultsProvider fetch(url, requestConfig)); } } diff --git a/tests/unit/hive/commands/BaseCommand.test.js b/tests/unit/hive/commands/BaseCommand.test.js index d6b286cf..bfee82de 100644 --- a/tests/unit/hive/commands/BaseCommand.test.js +++ b/tests/unit/hive/commands/BaseCommand.test.js @@ -1,20 +1,30 @@ const { expect, AssertionError } = require('chai'); +const { Response } = require('node-fetch'); const { Thrift } = require('thrift'); const HiveDriverError = require('../../../../dist/errors/HiveDriverError').default; const BaseCommand = require('../../../../dist/hive/Commands/BaseCommand').default; +const HttpRetryPolicy = require('../../../../dist/connection/connections/HttpRetryPolicy').default; const DBSQLClient = require('../../../../dist/DBSQLClient').default; class ThriftClientMock { - constructor(methodHandler) { + constructor(context, methodHandler) { + this.context = context; this.methodHandler = methodHandler; } CustomMethod(request, callback) { try { - const response = this.methodHandler(); - return callback(undefined, response !== undefined ? response : ThriftClientMock.defaultResponse); + const retryPolicy = new HttpRetryPolicy(this.context); + retryPolicy + .invokeWithRetry(this.methodHandler) + .then((response) => { + callback(undefined, response?.body ?? ThriftClientMock.defaultResponse); + }) + .catch((error) => { + callback(error); + }); } catch (error) { - return callback(error); + callback(error); } } } @@ -101,11 +111,11 @@ describe('BaseCommand', () => { let methodCallCount = 0; const command = new CustomCommand( - new ThriftClientMock(() => { + new ThriftClientMock(context, () => { methodCallCount += 1; - const error = new Thrift.TApplicationException(); - error.statusCode = statusCode; - throw error; + return new Response(undefined, { + status: statusCode, + }); }), context, ); @@ -138,11 +148,11 @@ describe('BaseCommand', () => { let methodCallCount = 0; const command = new CustomCommand( - new ThriftClientMock(() => { + new ThriftClientMock(context, () => { methodCallCount += 1; - const error = new Thrift.TApplicationException(); - error.statusCode = statusCode; - throw error; + return new Response(undefined, { + status: statusCode, + }); }), context, ); @@ -178,14 +188,19 @@ describe('BaseCommand', () => { let methodCallCount = 0; const command = new CustomCommand( - new ThriftClientMock(() => { + new ThriftClientMock(context, () => { methodCallCount += 1; if (methodCallCount <= 3) { - const error = new Thrift.TApplicationException(); - error.statusCode = statusCode; - throw error; + return new Response(undefined, { + status: statusCode, + }); } - return ThriftClientMock.defaultResponse; + + const response = new Response(undefined, { + status: 200, + }); + response.body = ThriftClientMock.defaultResponse; + return response; }), context, ); @@ -207,7 +222,7 @@ describe('BaseCommand', () => { }; const command = new CustomCommand( - new ThriftClientMock(() => { + new ThriftClientMock(context, () => { const error = new Thrift.TApplicationException(undefined, errorMessage); error.statusCode = 500; throw error; @@ -237,7 +252,7 @@ describe('BaseCommand', () => { }; const command = new CustomCommand( - new ThriftClientMock(() => { + new ThriftClientMock(context, () => { throw new Thrift.TApplicationException(undefined, errorMessage); }), context, @@ -265,7 +280,7 @@ describe('BaseCommand', () => { }; const command = new CustomCommand( - new ThriftClientMock(() => { + new ThriftClientMock(context, () => { throw new Error(errorMessage); }), context,