-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathArrowResultHandler.ts
72 lines (60 loc) · 2.16 KB
/
ArrowResultHandler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import { TGetResultSetMetadataResp, TRowSet } from '../../thrift/TCLIService_types';
import HiveDriverError from '../errors/HiveDriverError';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { ArrowBatch, hiveSchemaToArrowSchema } from './utils';
import { LZ4 } from '../utils';
export default class ArrowResultHandler implements IResultsProvider<ArrowBatch> {
private readonly context: IClientContext;
private readonly source: IResultsProvider<TRowSet | undefined>;
private readonly arrowSchema?: Buffer;
private readonly isLZ4Compressed: boolean;
constructor(
context: IClientContext,
source: IResultsProvider<TRowSet | undefined>,
{ schema, arrowSchema, lz4Compressed }: TGetResultSetMetadataResp,
) {
this.context = context;
this.source = source;
// Arrow schema is not available in old DBR versions, which also don't support native Arrow types,
// so it's possible to infer Arrow schema from Hive schema ignoring `useArrowNativeTypes` option
this.arrowSchema = arrowSchema ?? hiveSchemaToArrowSchema(schema);
this.isLZ4Compressed = lz4Compressed ?? false;
if (this.isLZ4Compressed && !LZ4) {
throw new HiveDriverError('Cannot handle LZ4 compressed result: module `lz4` not installed');
}
}
public async hasMore() {
if (!this.arrowSchema) {
return false;
}
return this.source.hasMore();
}
public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (!this.arrowSchema) {
return {
batches: [],
rowCount: 0,
};
}
const rowSet = await this.source.fetchNext(options);
const batches: Array<Buffer> = [];
let totalRowCount = 0;
rowSet?.arrowBatches?.forEach(({ batch, rowCount }) => {
if (batch) {
batches.push(this.isLZ4Compressed ? LZ4!.decode(batch) : batch);
totalRowCount += rowCount.toNumber(true);
}
});
if (batches.length === 0) {
return {
batches: [],
rowCount: 0,
};
}
return {
batches: [this.arrowSchema, ...batches],
rowCount: totalRowCount,
};
}
}