Skip to content

Commit 2790234

Browse files
committed
feat: datasource athena
1 parent d61487d commit 2790234

File tree

10 files changed

+876
-12
lines changed

10 files changed

+876
-12
lines changed

server/node-service/.yarnrc.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
nodeLinker: node-modules
2-
2+
npmRegistryServer: "https://registry.npmmirror.com"
33
yarnPath: .yarn/releases/yarn-3.3.1.cjs

server/node-service/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
},
2323
"dependencies": {
2424
"@apidevtools/swagger-parser": "^10.1.0",
25+
"@aws-sdk/client-athena": "^3.271.0",
2526
"@aws-sdk/client-dynamodb": "^3.266.1",
2627
"@aws-sdk/client-s3": "^3.238.0",
2728
"@aws-sdk/s3-request-presigner": "^3.241.0",
@@ -43,7 +44,7 @@
4344
"morgan": "^1.10.0",
4445
"openapi-types": "^12.1.0",
4546
"openblocks-core": "^0.0.5",
46-
"openblocks-sdk": "^0.0.34",
47+
"openblocks-sdk": "^0.0.35",
4748
"stylis": "^4.1.3",
4849
"swagger-client": "^3.18.5",
4950
"typescript": "^4.9.3",
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { ConfigToType } from "openblocks-sdk/dataSource";
2+
3+
const dataSourceConfig = {
4+
type: "dataSource",
5+
params: [
6+
{
7+
key: "region",
8+
type: "textInput",
9+
label: "Region",
10+
rules: [{ required: true, message: "Please input the AWS Region" }],
11+
defaultValue: "us-west-1",
12+
},
13+
{
14+
key: "accessKey",
15+
label: "Access key ID",
16+
type: "textInput",
17+
placeholder: "<Your Access key ID>",
18+
rules: [{ required: true, message: "Please input the Access Key ID" }],
19+
},
20+
{
21+
key: "secretKey",
22+
label: "Secret key",
23+
type: "password",
24+
rules: [{ required: true, message: "Please input the Secret Key" }],
25+
},
26+
{
27+
key: "s3Location",
28+
type: "textInput",
29+
label: "Results Location",
30+
placeholder: "s3://bucket/prefix/object",
31+
tooltip: "A AWS S3 folder to save query results.",
32+
},
33+
],
34+
} as const;
35+
36+
export default dataSourceConfig;
37+
38+
export type DataSourceDataType = ConfigToType<typeof dataSourceConfig>;
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { DataSourcePlugin } from "openblocks-sdk/dataSource";
2+
import dataSourceConfig, { DataSourceDataType } from "./dataSourceConfig";
3+
import queryConfig, { ActionDataType } from "./queryConfig";
4+
import {
5+
GetQueryExecutionCommandOutput,
6+
QueryExecutionState,
7+
ResultSet,
8+
AthenaClient,
9+
GetQueryExecutionCommand,
10+
GetQueryResultsCommand,
11+
StartQueryExecutionCommand,
12+
} from "@aws-sdk/client-athena";
13+
import _ from "lodash";
14+
import { ServiceError } from "../../common/error";
15+
16+
function parseResultSet(resultSet?: ResultSet) {
17+
if (!resultSet) {
18+
return [];
19+
}
20+
const rows: any[] = [];
21+
const [header, ...data] = resultSet.Rows || [];
22+
const columns = header.Data?.map((i) => i.VarCharValue) || [];
23+
data?.forEach((row) => {
24+
const entries: [string, string][] = [];
25+
row.Data?.forEach((data, idx) => {
26+
const column = columns[idx];
27+
if (!column) {
28+
entries.push([`_col_${idx}`, data.VarCharValue || ""]);
29+
} else {
30+
entries.push([column, data.VarCharValue || ""]);
31+
}
32+
});
33+
rows.push(Object.fromEntries(entries));
34+
});
35+
return rows;
36+
}
37+
38+
// fixme: should use the timeout config filled in frontend
39+
const timeout = 1000 * 60; // 60s
40+
41+
const athenaPlugin: DataSourcePlugin<ActionDataType, DataSourceDataType> = {
42+
id: "athena",
43+
name: "Athena",
44+
category: "api",
45+
icon: "athena.svg",
46+
dataSourceConfig,
47+
queryConfig,
48+
run: async function (actionData, dataSourceConfig): Promise<any> {
49+
const { accessKey, secretKey, region } = dataSourceConfig;
50+
const client = new AthenaClient({
51+
credentials: {
52+
accessKeyId: accessKey,
53+
secretAccessKey: secretKey,
54+
},
55+
region,
56+
});
57+
if (actionData.actionName === "Query") {
58+
const startRet = await client.send(
59+
new StartQueryExecutionCommand({
60+
QueryString: actionData.queryString,
61+
ResultConfiguration: {
62+
OutputLocation: dataSourceConfig.s3Location,
63+
},
64+
})
65+
);
66+
67+
const start = Date.now();
68+
69+
let execution: GetQueryExecutionCommandOutput | undefined = undefined;
70+
waitLoop: while (Date.now() - start < timeout) {
71+
execution = await client.send(
72+
new GetQueryExecutionCommand({ QueryExecutionId: startRet.QueryExecutionId })
73+
);
74+
const { State, StateChangeReason } = execution.QueryExecution?.Status || {};
75+
switch (State) {
76+
case QueryExecutionState.RUNNING:
77+
case QueryExecutionState.QUEUED:
78+
await new Promise((r) => setTimeout(r, 2000));
79+
break;
80+
case QueryExecutionState.CANCELLED:
81+
case QueryExecutionState.FAILED:
82+
throw new ServiceError(`query execution state: ${State}, reason: ${StateChangeReason}`);
83+
case QueryExecutionState.SUCCEEDED:
84+
break waitLoop;
85+
}
86+
}
87+
88+
const result = await client.send(
89+
new GetQueryResultsCommand({ QueryExecutionId: startRet.QueryExecutionId })
90+
);
91+
92+
return parseResultSet(result.ResultSet);
93+
}
94+
},
95+
};
96+
97+
export default athenaPlugin;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { ConfigToType } from "openblocks-sdk/dataSource";
2+
3+
const queryConfig = {
4+
type: "query",
5+
label: "Action",
6+
actions: [
7+
{
8+
actionName: "Query",
9+
label: "Query",
10+
params: [
11+
{
12+
label: "Query String",
13+
key: "queryString",
14+
type: "sqlInput",
15+
},
16+
],
17+
},
18+
],
19+
} as const;
20+
21+
export type ActionDataType = ConfigToType<typeof queryConfig>;
22+
23+
export default queryConfig;

server/node-service/src/plugins/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import openApiPlugin from "./openApi";
66
import dynamoDBPlugin from "./dynamodb";
77
import firebasePlugin from "./firebase";
88
import couchdbPlugin from "./couchdb";
9-
import woocommercePlugin from "./woocommerce";
9+
import wooCommercePlugin from "./woocommerce";
1010
import openAiPlugin from "./openAi";
11+
import athenaPlugin from "./athena";
1112

1213
const plugins: (DataSourcePlugin | DataSourcePluginFactory)[] = [
1314
// helloWorldPlugin,
@@ -17,8 +18,9 @@ const plugins: (DataSourcePlugin | DataSourcePluginFactory)[] = [
1718
dynamoDBPlugin,
1819
firebasePlugin,
1920
couchdbPlugin,
20-
woocommercePlugin,
21+
wooCommercePlugin,
2122
openAiPlugin,
23+
athenaPlugin,
2224
];
2325

2426
export default plugins;

server/node-service/src/plugins/woocommerce/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ const parseOptions: ParseOpenApiOptions = {
5959
},
6060
};
6161

62-
const woocommercePlugin: DataSourcePlugin<any, DataSourceConfigType> = {
62+
const wooCommercePlugin: DataSourcePlugin<any, DataSourceConfigType> = {
6363
id: "woocommerce",
6464
name: "WooCommerce",
6565
icon: "woocommerce.png",
@@ -88,4 +88,4 @@ const woocommercePlugin: DataSourcePlugin<any, DataSourceConfigType> = {
8888
},
8989
};
9090

91-
export default woocommercePlugin;
91+
export default wooCommercePlugin;

server/node-service/src/services/plugin.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,12 @@ export async function evalToValue<T extends Config>(
7474
};
7575
}
7676

77-
if (config.type === "textInput" || config.type === "select" || config.type === "password") {
77+
if (
78+
config.type === "textInput" ||
79+
config.type === "select" ||
80+
config.type === "password" ||
81+
config.type === "sqlInput"
82+
) {
7883
return toString(evalCodeToValue(dsl, context));
7984
}
8085

Lines changed: 19 additions & 0 deletions
Loading

0 commit comments

Comments
 (0)