Skip to content

DBSQLOperation Refactoring (1 of 3) #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 15, 2023
52 changes: 0 additions & 52 deletions lib/DBSQLOperation/CompleteOperationHelper.ts

This file was deleted.

70 changes: 0 additions & 70 deletions lib/DBSQLOperation/SchemaHelper.ts

This file was deleted.

110 changes: 87 additions & 23 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@ import {
TOperationHandle,
TTableSchema,
TSparkDirectResults,
TGetResultSetMetadataResp,
TSparkRowSetType,
TCloseOperationResp,
} from '../../thrift/TCLIService_types';
import Status from '../dto/Status';
import OperationStatusHelper from './OperationStatusHelper';
import SchemaHelper from './SchemaHelper';
import FetchResultsHelper from './FetchResultsHelper';
import CompleteOperationHelper from './CompleteOperationHelper';
import IDBSQLLogger, { LogLevel } from '../contracts/IDBSQLLogger';
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
import IOperationResult from '../result/IOperationResult';
import JsonResult from '../result/JsonResult';
import ArrowResult from '../result/ArrowResult';
import CloudFetchResult from '../result/CloudFetchResult';
import { definedOrError } from '../utils';
import HiveDriverError from '../errors/HiveDriverError';

const defaultMaxRows = 100000;

Expand All @@ -37,11 +44,17 @@ export default class DBSQLOperation implements IOperation {

private readonly _status: OperationStatusHelper;

private readonly _schema: SchemaHelper;

private readonly _data: FetchResultsHelper;

private readonly _completeOperation: CompleteOperationHelper;
private readonly closeOperation?: TCloseOperationResp;

private closed: boolean = false;

private cancelled: boolean = false;

private metadata?: TGetResultSetMetadataResp;

private resultHandler?: IOperationResult;

constructor(
driver: HiveDriver,
Expand All @@ -56,18 +69,14 @@ export default class DBSQLOperation implements IOperation {
const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation);

this._status = new OperationStatusHelper(this.driver, this.operationHandle, directResults?.operationStatus);
this._schema = new SchemaHelper(this.driver, this.operationHandle, directResults?.resultSetMetadata);
this.metadata = directResults?.resultSetMetadata;
this._data = new FetchResultsHelper(
this.driver,
this.operationHandle,
[directResults?.resultSet],
useOnlyPrefetchedResults,
);
this._completeOperation = new CompleteOperationHelper(
this.driver,
this.operationHandle,
directResults?.closeOperation,
);
this.closeOperation = directResults?.closeOperation;
this.logger.log(LogLevel.debug, `Operation created with id: ${this.getId()}`);
}

Expand Down Expand Up @@ -115,7 +124,7 @@ export default class DBSQLOperation implements IOperation {
await this.waitUntilReady(options);

const [resultHandler, data] = await Promise.all([
this._schema.getResultHandler(),
this.getResultHandler(),
this._data.fetch(options?.maxRows || defaultMaxRows),
]);

Expand Down Expand Up @@ -145,12 +154,18 @@ export default class DBSQLOperation implements IOperation {
* @throws {StatusError}
*/
public async cancel(): Promise<Status> {
if (this._completeOperation.closed || this._completeOperation.cancelled) {
if (this.closed || this.cancelled) {
return Status.success();
}

this.logger?.log(LogLevel.debug, `Cancelling operation with id: ${this.getId()}`);
const result = this._completeOperation.cancel();

const response = await this.driver.cancelOperation({
operationHandle: this.operationHandle,
});
Status.assert(response.status);
this.cancelled = true;
const result = new Status(response.status);

// Cancelled operation becomes unusable, similarly to being closed
this.onClose?.();
Expand All @@ -162,12 +177,20 @@ export default class DBSQLOperation implements IOperation {
* @throws {StatusError}
*/
public async close(): Promise<Status> {
if (this._completeOperation.closed || this._completeOperation.cancelled) {
if (this.closed || this.cancelled) {
return Status.success();
}

this.logger?.log(LogLevel.debug, `Closing operation with id: ${this.getId()}`);
const result = await this._completeOperation.close();

const response =
this.closeOperation ??
(await this.driver.closeOperation({
operationHandle: this.operationHandle,
}));
Status.assert(response.status);
this.closed = true;
const result = new Status(response.status);

this.onClose?.();
return result;
Expand All @@ -180,7 +203,7 @@ export default class DBSQLOperation implements IOperation {

public async hasMoreRows(): Promise<boolean> {
// If operation is closed or cancelled - we should not try to get data from it
if (this._completeOperation.closed || this._completeOperation.cancelled) {
if (this.closed || this.cancelled) {
return false;
}

Expand All @@ -190,7 +213,7 @@ export default class DBSQLOperation implements IOperation {
}

// If we fetched all the data from server - check if there's anything buffered in result handler
const resultHandler = await this._schema.getResultHandler();
const resultHandler = await this.getResultHandler();
return resultHandler.hasPendingData();
}

Expand All @@ -204,14 +227,15 @@ export default class DBSQLOperation implements IOperation {
await this.waitUntilReady(options);

this.logger?.log(LogLevel.debug, `Fetching schema for operation with id: ${this.getId()}`);
return this._schema.fetch();
const metadata = await this.fetchMetadata();
return metadata.schema ?? null;
}

private async failIfClosed(): Promise<void> {
if (this._completeOperation.closed) {
if (this.closed) {
throw new OperationStateError(OperationStateErrorCode.Closed);
}
if (this._completeOperation.cancelled) {
if (this.cancelled) {
throw new OperationStateError(OperationStateErrorCode.Canceled);
}
}
Expand All @@ -222,13 +246,53 @@ export default class DBSQLOperation implements IOperation {
} catch (error) {
if (error instanceof OperationStateError) {
if (error.errorCode === OperationStateErrorCode.Canceled) {
this._completeOperation.cancelled = true;
this.cancelled = true;
}
if (error.errorCode === OperationStateErrorCode.Closed) {
this._completeOperation.closed = true;
this.closed = true;
}
}
throw error;
}
}

private async fetchMetadata() {
if (!this.metadata) {
const metadata = await this.driver.getResultSetMetadata({
operationHandle: this.operationHandle,
});
Status.assert(metadata.status);
this.metadata = metadata;
}

return this.metadata;
}

private async getResultHandler(): Promise<IOperationResult> {
const metadata = await this.fetchMetadata();
const resultFormat = definedOrError(metadata.resultFormat);

if (!this.resultHandler) {
switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
this.resultHandler = new JsonResult(metadata.schema);
break;
case TSparkRowSetType.ARROW_BASED_SET:
this.resultHandler = new ArrowResult(metadata.schema, metadata.arrowSchema);
break;
case TSparkRowSetType.URL_BASED_SET:
this.resultHandler = new CloudFetchResult(metadata.schema);
break;
default:
this.resultHandler = undefined;
break;
}
}

if (!this.resultHandler) {
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
}

return this.resultHandler;
}
}
8 changes: 4 additions & 4 deletions tests/e2e/arrow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ describe('Arrow support', () => {
const result = await operation.fetchAll();
expect(result).to.deep.equal(expectedColumn);

const resultHandler = await operation._schema.getResultHandler();
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.not.instanceof(ArrowResult);

await operation.close();
Expand All @@ -92,7 +92,7 @@ describe('Arrow support', () => {
const result = await operation.fetchAll();
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);

const resultHandler = await operation._schema.getResultHandler();
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);

await operation.close();
Expand All @@ -109,7 +109,7 @@ describe('Arrow support', () => {
const result = await operation.fetchAll();
expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes);

const resultHandler = await operation._schema.getResultHandler();
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);

await operation.close();
Expand All @@ -129,7 +129,7 @@ describe('Arrow support', () => {
`);

// We use some internals here to check that server returned response with multiple batches
const resultHandler = await operation._schema.getResultHandler();
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);

const rawData = await operation._data.fetch(rowsCount);
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/cloudfetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('CloudFetch', () => {
await operation.finished();

// Check if we're actually getting data via CloudFetch
const resultHandler = await operation._schema.getResultHandler();
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceOf(CloudFetchResult);

// Fetch first chunk and check if result handler behaves properly.
Expand Down
Loading