diff --git a/lib/DBSQLOperation/OperationStatusHelper.ts b/lib/DBSQLOperation/OperationStatusHelper.ts deleted file mode 100644 index 81fb7760..00000000 --- a/lib/DBSQLOperation/OperationStatusHelper.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { TGetOperationStatusResp, TOperationHandle, TOperationState } from '../../thrift/TCLIService_types'; -import HiveDriver from '../hive/HiveDriver'; -import Status from '../dto/Status'; -import { WaitUntilReadyOptions } from '../contracts/IOperation'; -import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; - -async function delay(ms?: number): Promise { - return new Promise((resolve) => { - setTimeout(() => { - resolve(); - }, ms); - }); -} - -export default class OperationStatusHelper { - private readonly driver: HiveDriver; - - private readonly operationHandle: TOperationHandle; - - private state: number = TOperationState.INITIALIZED_STATE; - - // Once operation is finished or fails - cache status response, because subsequent calls - // to `getOperationStatus()` may fail with irrelevant errors, e.g. HTTP 404 - private operationStatus?: TGetOperationStatusResp; - - public hasResultSet: boolean = false; - - constructor(driver: HiveDriver, operationHandle: TOperationHandle, operationStatus?: TGetOperationStatusResp) { - this.driver = driver; - this.operationHandle = operationHandle; - this.hasResultSet = operationHandle.hasResultSet; - - if (operationStatus) { - this.processOperationStatusResponse(operationStatus); - } - } - - private isInProgress(response: TGetOperationStatusResp) { - switch (response.operationState) { - case TOperationState.INITIALIZED_STATE: - case TOperationState.PENDING_STATE: - case TOperationState.RUNNING_STATE: - return true; - default: - return false; - } - } - - private processOperationStatusResponse(response: TGetOperationStatusResp) { - Status.assert(response.status); - - this.state = response.operationState ?? this.state; - - if (typeof response.hasResultSet === 'boolean') { - this.hasResultSet = response.hasResultSet; - } - - if (!this.isInProgress(response)) { - this.operationStatus = response; - } - - return response; - } - - public async status(progress: boolean): Promise { - if (this.operationStatus) { - return this.operationStatus; - } - - const response = await this.driver.getOperationStatus({ - operationHandle: this.operationHandle, - getProgressUpdate: progress, - }); - - return this.processOperationStatusResponse(response); - } - - private async isReady(options?: WaitUntilReadyOptions): Promise { - const response = await this.status(Boolean(options?.progress)); - - if (options?.callback) { - await Promise.resolve(options.callback(response)); - } - - switch (response.operationState) { - case TOperationState.INITIALIZED_STATE: - return false; - case TOperationState.PENDING_STATE: - return false; - case TOperationState.RUNNING_STATE: - return false; - case TOperationState.FINISHED_STATE: - return true; - case TOperationState.CANCELED_STATE: - throw new OperationStateError(OperationStateErrorCode.Canceled, response); - case TOperationState.CLOSED_STATE: - throw new OperationStateError(OperationStateErrorCode.Closed, response); - case TOperationState.ERROR_STATE: - throw new OperationStateError(OperationStateErrorCode.Error, response); - case TOperationState.TIMEDOUT_STATE: - throw new OperationStateError(OperationStateErrorCode.Timeout, response); - case TOperationState.UKNOWN_STATE: - default: - throw new OperationStateError(OperationStateErrorCode.Unknown, response); - } - } - - public async waitUntilReady(options?: WaitUntilReadyOptions): Promise { - if (this.state === TOperationState.FINISHED_STATE) { - return; - } - const isReady = await this.isReady(options); - if (!isReady) { - await delay(100); // add some delay between status requests - return this.waitUntilReady(options); - } - } -} diff --git a/lib/DBSQLOperation/index.ts b/lib/DBSQLOperation/index.ts index c2cf05c6..31690224 100644 --- a/lib/DBSQLOperation/index.ts +++ b/lib/DBSQLOperation/index.ts @@ -14,9 +14,9 @@ import { TGetResultSetMetadataResp, TSparkRowSetType, TCloseOperationResp, + TOperationState, } from '../../thrift/TCLIService_types'; import Status from '../dto/Status'; -import OperationStatusHelper from './OperationStatusHelper'; import FetchResultsHelper from './FetchResultsHelper'; import IDBSQLLogger, { LogLevel } from '../contracts/IDBSQLLogger'; import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; @@ -33,6 +33,14 @@ interface DBSQLOperationConstructorOptions { logger: IDBSQLLogger; } +async function delay(ms?: number): Promise { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, ms); + }); +} + export default class DBSQLOperation implements IOperation { private readonly driver: HiveDriver; @@ -42,8 +50,6 @@ export default class DBSQLOperation implements IOperation { public onClose?: () => void; - private readonly _status: OperationStatusHelper; - private readonly _data: FetchResultsHelper; private readonly closeOperation?: TCloseOperationResp; @@ -54,6 +60,14 @@ export default class DBSQLOperation implements IOperation { private metadata?: TGetResultSetMetadataResp; + private state: number = TOperationState.INITIALIZED_STATE; + + // Once operation is finished or fails - cache status response, because subsequent calls + // to `getOperationStatus()` may fail with irrelevant errors, e.g. HTTP 404 + private operationStatus?: TGetOperationStatusResp; + + private hasResultSet: boolean = false; + private resultHandler?: IOperationResult; constructor( @@ -68,7 +82,11 @@ export default class DBSQLOperation implements IOperation { const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation); - this._status = new OperationStatusHelper(this.driver, this.operationHandle, directResults?.operationStatus); + this.hasResultSet = operationHandle.hasResultSet; + if (directResults?.operationStatus) { + this.processOperationStatusResponse(directResults.operationStatus); + } + this.metadata = directResults?.resultSetMetadata; this._data = new FetchResultsHelper( this.driver, @@ -117,7 +135,7 @@ export default class DBSQLOperation implements IOperation { public async fetchChunk(options?: FetchOptions): Promise> { await this.failIfClosed(); - if (!this._status.hasResultSet) { + if (!this.hasResultSet) { return []; } @@ -146,7 +164,17 @@ export default class DBSQLOperation implements IOperation { public async status(progress: boolean = false): Promise { await this.failIfClosed(); this.logger?.log(LogLevel.debug, `Fetching status for operation with id: ${this.getId()}`); - return this._status.status(progress); + + if (this.operationStatus) { + return this.operationStatus; + } + + const response = await this.driver.getOperationStatus({ + operationHandle: this.operationHandle, + getProgressUpdate: progress, + }); + + return this.processOperationStatusResponse(response); } /** @@ -220,7 +248,7 @@ export default class DBSQLOperation implements IOperation { public async getSchema(options?: GetSchemaOptions): Promise { await this.failIfClosed(); - if (!this._status.hasResultSet) { + if (!this.hasResultSet) { return null; } @@ -247,18 +275,58 @@ export default class DBSQLOperation implements IOperation { } private async waitUntilReady(options?: WaitUntilReadyOptions) { - try { - await this._status.waitUntilReady(options); - } catch (error) { - if (error instanceof OperationStateError) { - if (error.errorCode === OperationStateErrorCode.Canceled) { + if (this.state === TOperationState.FINISHED_STATE) { + return; + } + + let isReady = false; + + while (!isReady) { + // eslint-disable-next-line no-await-in-loop + const response = await this.status(Boolean(options?.progress)); + + if (options?.callback) { + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(options.callback(response)); + } + + switch (response.operationState) { + // For these states do nothing and continue waiting + case TOperationState.INITIALIZED_STATE: + case TOperationState.PENDING_STATE: + case TOperationState.RUNNING_STATE: + break; + + // Operation is completed, so exit the loop + case TOperationState.FINISHED_STATE: + isReady = true; + break; + + // Operation was cancelled, so set a flag and exit the loop (throw an error) + case TOperationState.CANCELED_STATE: this.cancelled = true; - } - if (error.errorCode === OperationStateErrorCode.Closed) { + throw new OperationStateError(OperationStateErrorCode.Canceled, response); + + // Operation was closed, so set a flag and exit the loop (throw an error) + case TOperationState.CLOSED_STATE: this.closed = true; - } + throw new OperationStateError(OperationStateErrorCode.Closed, response); + + // Error states - throw and exit the loop + case TOperationState.ERROR_STATE: + throw new OperationStateError(OperationStateErrorCode.Error, response); + case TOperationState.TIMEDOUT_STATE: + throw new OperationStateError(OperationStateErrorCode.Timeout, response); + case TOperationState.UKNOWN_STATE: + default: + throw new OperationStateError(OperationStateErrorCode.Unknown, response); + } + + // If not ready yet - make some delay before the next status requests + if (!isReady) { + // eslint-disable-next-line no-await-in-loop + await delay(100); } - throw error; } } @@ -301,4 +369,26 @@ export default class DBSQLOperation implements IOperation { return this.resultHandler; } + + private processOperationStatusResponse(response: TGetOperationStatusResp) { + Status.assert(response.status); + + this.state = response.operationState ?? this.state; + + if (typeof response.hasResultSet === 'boolean') { + this.hasResultSet = response.hasResultSet; + } + + const isInProgress = [ + TOperationState.INITIALIZED_STATE, + TOperationState.PENDING_STATE, + TOperationState.RUNNING_STATE, + ].includes(this.state); + + if (!isInProgress) { + this.operationStatus = response; + } + + return response; + } } diff --git a/tests/unit/DBSQLOperation.test.js b/tests/unit/DBSQLOperation.test.js index 1169b17c..b0654170 100644 --- a/tests/unit/DBSQLOperation.test.js +++ b/tests/unit/DBSQLOperation.test.js @@ -113,8 +113,8 @@ describe('DBSQLOperation', () => { const driver = new DriverMock(); const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._status.state).to.equal(TOperationState.INITIALIZED_STATE); - expect(operation._status.hasResultSet).to.be.true; + expect(operation.state).to.equal(TOperationState.INITIALIZED_STATE); + expect(operation.hasResultSet).to.be.true; }); it('should pick up state from directResults', async () => { @@ -129,8 +129,8 @@ describe('DBSQLOperation', () => { }, }); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.hasResultSet).to.be.true; + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.hasResultSet).to.be.true; }); it('should fetch status and update internal state', async () => { @@ -144,15 +144,15 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._status.state).to.equal(TOperationState.INITIALIZED_STATE); - expect(operation._status.hasResultSet).to.be.false; + expect(operation.state).to.equal(TOperationState.INITIALIZED_STATE); + expect(operation.hasResultSet).to.be.false; const status = await operation.status(); expect(driver.getOperationStatus.called).to.be.true; expect(status.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.hasResultSet).to.be.true; + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.hasResultSet).to.be.true; }); it('should request progress', async () => { @@ -181,8 +181,8 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._status.state).to.equal(TOperationState.INITIALIZED_STATE); - expect(operation._status.hasResultSet).to.be.false; + expect(operation.state).to.equal(TOperationState.INITIALIZED_STATE); + expect(operation.hasResultSet).to.be.false; // First call - should fetch data and cache driver.getOperationStatusResp = { @@ -193,8 +193,8 @@ describe('DBSQLOperation', () => { expect(driver.getOperationStatus.callCount).to.equal(1); expect(status1.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.hasResultSet).to.be.true; + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.hasResultSet).to.be.true; // Second call - should return cached data driver.getOperationStatusResp = { @@ -205,8 +205,8 @@ describe('DBSQLOperation', () => { expect(driver.getOperationStatus.callCount).to.equal(1); expect(status2.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.hasResultSet).to.be.true; + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.hasResultSet).to.be.true; }); it('should fetch status if directResults status is not finished', async () => { @@ -226,15 +226,15 @@ describe('DBSQLOperation', () => { }, }); - expect(operation._status.state).to.equal(TOperationState.RUNNING_STATE); // from directResults - expect(operation._status.hasResultSet).to.be.false; + expect(operation.state).to.equal(TOperationState.RUNNING_STATE); // from directResults + expect(operation.hasResultSet).to.be.false; const status = await operation.status(false); expect(driver.getOperationStatus.called).to.be.true; expect(status.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.hasResultSet).to.be.true; + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.hasResultSet).to.be.true; }); it('should not fetch status if directResults status is finished', async () => { @@ -254,15 +254,15 @@ describe('DBSQLOperation', () => { }, }); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); // from directResults - expect(operation._status.hasResultSet).to.be.false; + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); // from directResults + expect(operation.hasResultSet).to.be.false; const status = await operation.status(false); expect(driver.getOperationStatus.called).to.be.false; expect(status.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); - expect(operation._status.hasResultSet).to.be.false; + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.hasResultSet).to.be.false; }); it('should throw an error in case of a status error', async () => { @@ -516,12 +516,12 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._status.state).to.equal(TOperationState.INITIALIZED_STATE); + expect(operation.state).to.equal(TOperationState.INITIALIZED_STATE); await operation.finished(); expect(driver.getOperationStatus.callCount).to.be.equal(attemptsUntilFinished); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); }); }, ); @@ -676,7 +676,7 @@ describe('DBSQLOperation', () => { expect(driver.getOperationStatus.called).to.be.true; expect(schema).to.deep.equal(driver.getResultSetMetadataResp.schema); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); }); it('should request progress', async () => { @@ -887,7 +887,7 @@ describe('DBSQLOperation', () => { expect(driver.getOperationStatus.called).to.be.true; expect(results).to.deep.equal([]); - expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE); + expect(operation.state).to.equal(TOperationState.FINISHED_STATE); }); it('should request progress', async () => {