Skip to content

[PECO-1260] Support results compression #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

useCloudFetch: false,
cloudFetchConcurrentDownloads: 10,

useLZ4Compression: true,
};
}

Expand Down
4 changes: 2 additions & 2 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,14 @@ export default class DBSQLOperation implements IOperation {
case TSparkRowSetType.ARROW_BASED_SET:
resultSource = new ArrowResultConverter(
this.context,
new ArrowResultHandler(this.context, this._data, metadata.arrowSchema),
new ArrowResultHandler(this.context, this._data, metadata.arrowSchema, metadata.lz4Compressed),
metadata.schema,
);
break;
case TSparkRowSetType.URL_BASED_SET:
resultSource = new ArrowResultConverter(
this.context,
new CloudFetchResultHandler(this.context, this._data),
new CloudFetchResultHandler(this.context, this._data, metadata.lz4Compressed),
metadata.schema,
);
break;
Expand Down
1 change: 1 addition & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export default class DBSQLSession implements IDBSQLSession {
...getArrowOptions(clientConfig),
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters),
canDecompressLZ4Result: clientConfig.useLZ4Compression,
});
const response = await this.handleResponse(operationPromise);
const operation = this.createOperation(response);
Expand Down
2 changes: 2 additions & 0 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export interface ClientConfig {

useCloudFetch: boolean;
cloudFetchConcurrentDownloads: number;

useLZ4Compression: boolean;
}

export default interface IClientContext {
Expand Down
18 changes: 13 additions & 5 deletions lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Buffer } from 'buffer';
import LZ4 from 'lz4';
import { TRowSet } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
Expand All @@ -10,10 +10,18 @@ export default class ArrowResultHandler implements IResultsProvider<Array<Buffer

private readonly arrowSchema?: Buffer;

constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, arrowSchema?: Buffer) {
private readonly isLZ4Compressed: boolean;

constructor(
context: IClientContext,
source: IResultsProvider<TRowSet | undefined>,
arrowSchema?: Buffer,
isLZ4Compressed?: boolean,
) {
this.context = context;
this.source = source;
this.arrowSchema = arrowSchema;
this.isLZ4Compressed = isLZ4Compressed ?? false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's a difference between pysql and golang: pysql enables lz4 compression by default. golang does not. Is there a compelling reason to not enable this by default in nodejs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No good reason. I'll change it to be enabled by default

Copy link
Contributor Author

@kravets-levko kravets-levko Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I just realized that it already defaults to true - the default value is set in DBSQLClient.getDefaultConfig. This ?? false handles a case when server responds with metadata.lz4Compressed = undefined (which means that actual result is not compressed)

}

public async hasMore() {
Expand All @@ -31,9 +39,9 @@ export default class ArrowResultHandler implements IResultsProvider<Array<Buffer
const rowSet = await this.source.fetchNext(options);

const batches: Array<Buffer> = [];
rowSet?.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
batches.push(arrowBatch.batch);
rowSet?.arrowBatches?.forEach(({ batch }) => {
if (batch) {
batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch);
}
});

Expand Down
14 changes: 11 additions & 3 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Buffer } from 'buffer';
import LZ4 from 'lz4';
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
import { TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
Expand All @@ -9,13 +9,16 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B

private readonly source: IResultsProvider<TRowSet | undefined>;

private readonly isLZ4Compressed: boolean;

private pendingLinks: Array<TSparkArrowResultLink> = [];

private downloadTasks: Array<Promise<Buffer>> = [];

constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>) {
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, isLZ4Compressed?: boolean) {
this.context = context;
this.source = source;
this.isLZ4Compressed = isLZ4Compressed ?? false;
}

public async hasMore() {
Expand All @@ -42,7 +45,12 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
}

const batch = await this.downloadTasks.shift();
return batch ? [batch] : [];
const batches = batch ? [batch] : [];

if (this.isLZ4Compressed) {
return batches.map((buffer) => LZ4.decode(buffer));
}
return batches;
}

private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
Expand Down
Loading