diff --git a/lib/DBSQLOperation/CompleteOperationHelper.ts b/lib/DBSQLOperation/CompleteOperationHelper.ts deleted file mode 100644 index 49bf74cf..00000000 --- a/lib/DBSQLOperation/CompleteOperationHelper.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { TOperationHandle, TCloseOperationResp } from '../../thrift/TCLIService_types'; -import HiveDriver from '../hive/HiveDriver'; -import Status from '../dto/Status'; - -export default class CompleteOperationHelper { - private readonly driver: HiveDriver; - - private readonly operationHandle: TOperationHandle; - - private closeOperation?: TCloseOperationResp; - - public closed: boolean = false; - - public cancelled: boolean = false; - - constructor(driver: HiveDriver, operationHandle: TOperationHandle, closeOperation?: TCloseOperationResp) { - this.driver = driver; - this.operationHandle = operationHandle; - this.closeOperation = closeOperation; - } - - public async cancel(): Promise { - if (this.cancelled) { - return Status.success(); - } - - const response = await this.driver.cancelOperation({ - operationHandle: this.operationHandle, - }); - Status.assert(response.status); - this.cancelled = true; - return new Status(response.status); - } - - public async close(): Promise { - if (!this.closed && this.closeOperation) { - Status.assert(this.closeOperation.status); - this.closed = true; - } - - if (this.closed) { - return Status.success(); - } - - const response = await this.driver.closeOperation({ - operationHandle: this.operationHandle, - }); - Status.assert(response.status); - this.closed = true; - return new Status(response.status); - } -} diff --git a/lib/DBSQLOperation/SchemaHelper.ts b/lib/DBSQLOperation/SchemaHelper.ts deleted file mode 100644 index 830d04b0..00000000 --- a/lib/DBSQLOperation/SchemaHelper.ts +++ /dev/null @@ -1,70 +0,0 @@ -import { TGetResultSetMetadataResp, TOperationHandle, TSparkRowSetType } from '../../thrift/TCLIService_types'; -import HiveDriver from '../hive/HiveDriver'; -import Status from '../dto/Status'; -import IOperationResult from '../result/IOperationResult'; -import JsonResult from '../result/JsonResult'; -import ArrowResult from '../result/ArrowResult'; -import CloudFetchResult from '../result/CloudFetchResult'; -import HiveDriverError from '../errors/HiveDriverError'; -import { definedOrError } from '../utils'; - -export default class SchemaHelper { - private readonly driver: HiveDriver; - - private readonly operationHandle: TOperationHandle; - - private metadata?: TGetResultSetMetadataResp; - - private resultHandler?: IOperationResult; - - constructor(driver: HiveDriver, operationHandle: TOperationHandle, metadata?: TGetResultSetMetadataResp) { - this.driver = driver; - this.operationHandle = operationHandle; - this.metadata = metadata; - } - - private async fetchMetadata() { - if (!this.metadata) { - const metadata = await this.driver.getResultSetMetadata({ - operationHandle: this.operationHandle, - }); - Status.assert(metadata.status); - this.metadata = metadata; - } - - return this.metadata; - } - - public async fetch() { - const metadata = await this.fetchMetadata(); - return definedOrError(metadata.schema); - } - - public async getResultHandler(): Promise { - const metadata = await this.fetchMetadata(); - const resultFormat = definedOrError(metadata.resultFormat); - - if (!this.resultHandler) { - switch (resultFormat) { - case TSparkRowSetType.COLUMN_BASED_SET: - this.resultHandler = new JsonResult(metadata.schema); - break; - case TSparkRowSetType.ARROW_BASED_SET: - this.resultHandler = new ArrowResult(metadata.schema, metadata.arrowSchema); - break; - case TSparkRowSetType.URL_BASED_SET: - this.resultHandler = new CloudFetchResult(metadata.schema); - break; - default: - this.resultHandler = undefined; - break; - } - } - - if (!this.resultHandler) { - throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`); - } - - return this.resultHandler; - } -} diff --git a/lib/DBSQLOperation/index.ts b/lib/DBSQLOperation/index.ts index 4ecb3ed8..ba51038e 100644 --- a/lib/DBSQLOperation/index.ts +++ b/lib/DBSQLOperation/index.ts @@ -11,14 +11,21 @@ import { TOperationHandle, TTableSchema, TSparkDirectResults, + TGetResultSetMetadataResp, + TSparkRowSetType, + TCloseOperationResp, } from '../../thrift/TCLIService_types'; import Status from '../dto/Status'; import OperationStatusHelper from './OperationStatusHelper'; -import SchemaHelper from './SchemaHelper'; import FetchResultsHelper from './FetchResultsHelper'; -import CompleteOperationHelper from './CompleteOperationHelper'; import IDBSQLLogger, { LogLevel } from '../contracts/IDBSQLLogger'; import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; +import IOperationResult from '../result/IOperationResult'; +import JsonResult from '../result/JsonResult'; +import ArrowResult from '../result/ArrowResult'; +import CloudFetchResult from '../result/CloudFetchResult'; +import { definedOrError } from '../utils'; +import HiveDriverError from '../errors/HiveDriverError'; const defaultMaxRows = 100000; @@ -37,11 +44,17 @@ export default class DBSQLOperation implements IOperation { private readonly _status: OperationStatusHelper; - private readonly _schema: SchemaHelper; - private readonly _data: FetchResultsHelper; - private readonly _completeOperation: CompleteOperationHelper; + private readonly closeOperation?: TCloseOperationResp; + + private closed: boolean = false; + + private cancelled: boolean = false; + + private metadata?: TGetResultSetMetadataResp; + + private resultHandler?: IOperationResult; constructor( driver: HiveDriver, @@ -56,18 +69,14 @@ export default class DBSQLOperation implements IOperation { const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation); this._status = new OperationStatusHelper(this.driver, this.operationHandle, directResults?.operationStatus); - this._schema = new SchemaHelper(this.driver, this.operationHandle, directResults?.resultSetMetadata); + this.metadata = directResults?.resultSetMetadata; this._data = new FetchResultsHelper( this.driver, this.operationHandle, [directResults?.resultSet], useOnlyPrefetchedResults, ); - this._completeOperation = new CompleteOperationHelper( - this.driver, - this.operationHandle, - directResults?.closeOperation, - ); + this.closeOperation = directResults?.closeOperation; this.logger.log(LogLevel.debug, `Operation created with id: ${this.getId()}`); } @@ -115,7 +124,7 @@ export default class DBSQLOperation implements IOperation { await this.waitUntilReady(options); const [resultHandler, data] = await Promise.all([ - this._schema.getResultHandler(), + this.getResultHandler(), this._data.fetch(options?.maxRows || defaultMaxRows), ]); @@ -145,12 +154,18 @@ export default class DBSQLOperation implements IOperation { * @throws {StatusError} */ public async cancel(): Promise { - if (this._completeOperation.closed || this._completeOperation.cancelled) { + if (this.closed || this.cancelled) { return Status.success(); } this.logger?.log(LogLevel.debug, `Cancelling operation with id: ${this.getId()}`); - const result = this._completeOperation.cancel(); + + const response = await this.driver.cancelOperation({ + operationHandle: this.operationHandle, + }); + Status.assert(response.status); + this.cancelled = true; + const result = new Status(response.status); // Cancelled operation becomes unusable, similarly to being closed this.onClose?.(); @@ -162,12 +177,20 @@ export default class DBSQLOperation implements IOperation { * @throws {StatusError} */ public async close(): Promise { - if (this._completeOperation.closed || this._completeOperation.cancelled) { + if (this.closed || this.cancelled) { return Status.success(); } this.logger?.log(LogLevel.debug, `Closing operation with id: ${this.getId()}`); - const result = await this._completeOperation.close(); + + const response = + this.closeOperation ?? + (await this.driver.closeOperation({ + operationHandle: this.operationHandle, + })); + Status.assert(response.status); + this.closed = true; + const result = new Status(response.status); this.onClose?.(); return result; @@ -180,7 +203,7 @@ export default class DBSQLOperation implements IOperation { public async hasMoreRows(): Promise { // If operation is closed or cancelled - we should not try to get data from it - if (this._completeOperation.closed || this._completeOperation.cancelled) { + if (this.closed || this.cancelled) { return false; } @@ -190,7 +213,7 @@ export default class DBSQLOperation implements IOperation { } // If we fetched all the data from server - check if there's anything buffered in result handler - const resultHandler = await this._schema.getResultHandler(); + const resultHandler = await this.getResultHandler(); return resultHandler.hasPendingData(); } @@ -204,14 +227,15 @@ export default class DBSQLOperation implements IOperation { await this.waitUntilReady(options); this.logger?.log(LogLevel.debug, `Fetching schema for operation with id: ${this.getId()}`); - return this._schema.fetch(); + const metadata = await this.fetchMetadata(); + return metadata.schema ?? null; } private async failIfClosed(): Promise { - if (this._completeOperation.closed) { + if (this.closed) { throw new OperationStateError(OperationStateErrorCode.Closed); } - if (this._completeOperation.cancelled) { + if (this.cancelled) { throw new OperationStateError(OperationStateErrorCode.Canceled); } } @@ -222,13 +246,53 @@ export default class DBSQLOperation implements IOperation { } catch (error) { if (error instanceof OperationStateError) { if (error.errorCode === OperationStateErrorCode.Canceled) { - this._completeOperation.cancelled = true; + this.cancelled = true; } if (error.errorCode === OperationStateErrorCode.Closed) { - this._completeOperation.closed = true; + this.closed = true; } } throw error; } } + + private async fetchMetadata() { + if (!this.metadata) { + const metadata = await this.driver.getResultSetMetadata({ + operationHandle: this.operationHandle, + }); + Status.assert(metadata.status); + this.metadata = metadata; + } + + return this.metadata; + } + + private async getResultHandler(): Promise { + const metadata = await this.fetchMetadata(); + const resultFormat = definedOrError(metadata.resultFormat); + + if (!this.resultHandler) { + switch (resultFormat) { + case TSparkRowSetType.COLUMN_BASED_SET: + this.resultHandler = new JsonResult(metadata.schema); + break; + case TSparkRowSetType.ARROW_BASED_SET: + this.resultHandler = new ArrowResult(metadata.schema, metadata.arrowSchema); + break; + case TSparkRowSetType.URL_BASED_SET: + this.resultHandler = new CloudFetchResult(metadata.schema); + break; + default: + this.resultHandler = undefined; + break; + } + } + + if (!this.resultHandler) { + throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`); + } + + return this.resultHandler; + } } diff --git a/tests/e2e/arrow.test.js b/tests/e2e/arrow.test.js index 924412ee..a75c3059 100644 --- a/tests/e2e/arrow.test.js +++ b/tests/e2e/arrow.test.js @@ -75,7 +75,7 @@ describe('Arrow support', () => { const result = await operation.fetchAll(); expect(result).to.deep.equal(expectedColumn); - const resultHandler = await operation._schema.getResultHandler(); + const resultHandler = await operation.getResultHandler(); expect(resultHandler).to.be.not.instanceof(ArrowResult); await operation.close(); @@ -92,7 +92,7 @@ describe('Arrow support', () => { const result = await operation.fetchAll(); expect(fixArrowResult(result)).to.deep.equal(expectedArrow); - const resultHandler = await operation._schema.getResultHandler(); + const resultHandler = await operation.getResultHandler(); expect(resultHandler).to.be.instanceof(ArrowResult); await operation.close(); @@ -109,7 +109,7 @@ describe('Arrow support', () => { const result = await operation.fetchAll(); expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes); - const resultHandler = await operation._schema.getResultHandler(); + const resultHandler = await operation.getResultHandler(); expect(resultHandler).to.be.instanceof(ArrowResult); await operation.close(); @@ -129,7 +129,7 @@ describe('Arrow support', () => { `); // We use some internals here to check that server returned response with multiple batches - const resultHandler = await operation._schema.getResultHandler(); + const resultHandler = await operation.getResultHandler(); expect(resultHandler).to.be.instanceof(ArrowResult); const rawData = await operation._data.fetch(rowsCount); diff --git a/tests/e2e/cloudfetch.test.js b/tests/e2e/cloudfetch.test.js index 267f6936..3997f6af 100644 --- a/tests/e2e/cloudfetch.test.js +++ b/tests/e2e/cloudfetch.test.js @@ -56,7 +56,7 @@ describe('CloudFetch', () => { await operation.finished(); // Check if we're actually getting data via CloudFetch - const resultHandler = await operation._schema.getResultHandler(); + const resultHandler = await operation.getResultHandler(); expect(resultHandler).to.be.instanceOf(CloudFetchResult); // Fetch first chunk and check if result handler behaves properly. diff --git a/tests/unit/DBSQLOperation.test.js b/tests/unit/DBSQLOperation.test.js index 948aae4d..1169b17c 100644 --- a/tests/unit/DBSQLOperation.test.js +++ b/tests/unit/DBSQLOperation.test.js @@ -8,6 +8,7 @@ const OperationStateError = require('../../dist/errors/OperationStateError').def const HiveDriverError = require('../../dist/errors/HiveDriverError').default; const JsonResult = require('../../dist/result/JsonResult').default; const ArrowResult = require('../../dist/result/ArrowResult').default; +const CloudFetchResult = require('../../dist/result/CloudFetchResult').default; // Create logger that won't emit // @@ -289,14 +290,14 @@ describe('DBSQLOperation', () => { sinon.spy(driver, 'cancelOperation'); const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; await operation.cancel(); expect(driver.cancelOperation.called).to.be.true; - expect(operation._completeOperation.cancelled).to.be.true; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.true; + expect(operation.closed).to.be.false; }); it('should return immediately if already cancelled', async () => { @@ -305,18 +306,18 @@ describe('DBSQLOperation', () => { sinon.spy(driver, 'cancelOperation'); const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; await operation.cancel(); expect(driver.cancelOperation.callCount).to.be.equal(1); - expect(operation._completeOperation.cancelled).to.be.true; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.true; + expect(operation.closed).to.be.false; await operation.cancel(); expect(driver.cancelOperation.callCount).to.be.equal(1); - expect(operation._completeOperation.cancelled).to.be.true; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.true; + expect(operation.closed).to.be.false; }); it('should return immediately if already closed', async () => { @@ -326,18 +327,18 @@ describe('DBSQLOperation', () => { sinon.spy(driver, 'closeOperation'); const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; await operation.close(); expect(driver.closeOperation.callCount).to.be.equal(1); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.true; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.true; await operation.cancel(); expect(driver.cancelOperation.callCount).to.be.equal(0); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.true; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.true; }); it('should throw an error in case of a status error and keep state', async () => { @@ -346,8 +347,8 @@ describe('DBSQLOperation', () => { driver.cancelOperationResp.status.statusCode = TStatusCode.ERROR_STATUS; const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; try { await operation.cancel(); @@ -357,8 +358,8 @@ describe('DBSQLOperation', () => { throw e; } expect(e).to.be.instanceOf(StatusError); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; } }); @@ -368,7 +369,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation(driver, handle, logger); await operation.cancel(); - expect(operation._completeOperation.cancelled).to.be.true; + expect(operation.cancelled).to.be.true; await expectFailure(() => operation.fetchAll()); await expectFailure(() => operation.fetchChunk()); @@ -385,14 +386,14 @@ describe('DBSQLOperation', () => { sinon.spy(driver, 'closeOperation'); const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; await operation.close(); expect(driver.closeOperation.called).to.be.true; - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.true; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.true; }); it('should return immediately if already closed', async () => { @@ -401,18 +402,18 @@ describe('DBSQLOperation', () => { sinon.spy(driver, 'closeOperation'); const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; await operation.close(); expect(driver.closeOperation.callCount).to.be.equal(1); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.true; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.true; await operation.close(); expect(driver.closeOperation.callCount).to.be.equal(1); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.true; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.true; }); it('should return immediately if already cancelled', async () => { @@ -422,18 +423,18 @@ describe('DBSQLOperation', () => { sinon.spy(driver, 'cancelOperation'); const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; await operation.cancel(); expect(driver.cancelOperation.callCount).to.be.equal(1); - expect(operation._completeOperation.cancelled).to.be.true; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.true; + expect(operation.closed).to.be.false; await operation.close(); expect(driver.closeOperation.callCount).to.be.equal(0); - expect(operation._completeOperation.cancelled).to.be.true; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.true; + expect(operation.closed).to.be.false; }); it('should initialize from directResults', async () => { @@ -446,14 +447,14 @@ describe('DBSQLOperation', () => { }, }); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; await operation.close(); expect(driver.closeOperation.called).to.be.false; - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.true; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.true; expect(driver.closeOperation.callCount).to.be.equal(0); }); @@ -463,8 +464,8 @@ describe('DBSQLOperation', () => { driver.closeOperationResp.status.statusCode = TStatusCode.ERROR_STATUS; const operation = new DBSQLOperation(driver, handle, logger); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; try { await operation.close(); @@ -474,8 +475,8 @@ describe('DBSQLOperation', () => { throw e; } expect(e).to.be.instanceOf(StatusError); - expect(operation._completeOperation.cancelled).to.be.false; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.cancelled).to.be.false; + expect(operation.closed).to.be.false; } }); @@ -485,7 +486,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation(driver, handle, logger); await operation.close(); - expect(operation._completeOperation.closed).to.be.true; + expect(operation.closed).to.be.true; await expectFailure(() => operation.fetchAll()); await expectFailure(() => operation.fetchChunk()); @@ -819,7 +820,7 @@ describe('DBSQLOperation', () => { driver.getResultSetMetadata.resetHistory(); const operation = new DBSQLOperation(driver, handle, logger); - const resultHandler = await operation._schema.getResultHandler(); + const resultHandler = await operation.getResultHandler(); expect(driver.getResultSetMetadata.called).to.be.true; expect(resultHandler).to.be.instanceOf(JsonResult); } @@ -829,10 +830,20 @@ describe('DBSQLOperation', () => { driver.getResultSetMetadata.resetHistory(); const operation = new DBSQLOperation(driver, handle, logger); - const resultHandler = await operation._schema.getResultHandler(); + const resultHandler = await operation.getResultHandler(); expect(driver.getResultSetMetadata.called).to.be.true; expect(resultHandler).to.be.instanceOf(ArrowResult); } + + cloudFetchHandler: { + driver.getResultSetMetadataResp.resultFormat = TSparkRowSetType.URL_BASED_SET; + driver.getResultSetMetadata.resetHistory(); + + const operation = new DBSQLOperation(driver, handle, logger); + const resultHandler = await operation.getResultHandler(); + expect(driver.getResultSetMetadata.called).to.be.true; + expect(resultHandler).to.be.instanceOf(CloudFetchResult); + } }); }); diff --git a/tests/unit/DBSQLSession.test.js b/tests/unit/DBSQLSession.test.js index 40fa97fd..8a439b87 100644 --- a/tests/unit/DBSQLSession.test.js +++ b/tests/unit/DBSQLSession.test.js @@ -436,7 +436,7 @@ describe('DBSQLSession', () => { const session = createSession(); const operation = await session.executeStatement('SELECT * FROM table'); expect(operation.onClose).to.be.not.undefined; - expect(operation._completeOperation.closed).to.be.false; + expect(operation.closed).to.be.false; expect(session.operations.items.size).to.eq(1); sinon.spy(session.operations, 'closeAll'); @@ -446,7 +446,7 @@ describe('DBSQLSession', () => { expect(operation.close.called).to.be.true; expect(session.operations.closeAll.called).to.be.true; expect(operation.onClose).to.be.undefined; - expect(operation._completeOperation.closed).to.be.true; + expect(operation.closed).to.be.true; expect(session.operations.items.size).to.eq(0); });