Skip to content

Commit 8f0976a

Browse files
committed
Java:MultiDataSource 为 APIJSON 和 SQLAuto 新增支持时序数据库 InfluxDB
1 parent 65f27e5 commit 8f0976a

File tree

5 files changed

+198
-56
lines changed

5 files changed

+198
-56
lines changed

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@
111111
<artifactId>nebula-jdbc</artifactId>
112112
<version>3.0.0</version>
113113
</dependency>
114+
<dependency>
115+
<groupId>org.influxdb</groupId>
116+
<artifactId>influxdb-java</artifactId>
117+
<version>2.23</version>
118+
</dependency>
114119
<!-- Oracle, SQLServer 等其它数据库的 JDBC 驱动,可以在这里加上 Maven 依赖或 libs 目录放 Jar 包并依赖 -->
115120
<!-- 数据库 JDBC 驱动 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -->
116121

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/boot/DemoController.java

Lines changed: 102 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,99 +1371,147 @@ public String execute(@RequestBody String request, HttpSession session) {
13711371
long startTime = System.currentTimeMillis();
13721372

13731373
JSONObject req = JSON.parseObject(request);
1374+
String sql = req == null ? null : req.getString("sql");
1375+
if (StringUtil.isEmpty(sql)) {
1376+
throw new IllegalArgumentException("SQL 不能为空!");
1377+
}
1378+
13741379
String database = req.getString("database");
1380+
String schema = req.getString("schema");
13751381
String uri = req.getString("uri");
13761382
String account = req.getString("account");
13771383
String password = req.getString("password");
1378-
String sql = req.getString("sql");
13791384
JSONArray arg = req.getJSONArray("args");
13801385
try {
1381-
String trimmedSQL = sql.trim();
1386+
if (StringUtil.isEmpty(database, true)) {
1387+
1388+
int start = uri.indexOf("://");
1389+
String prefix = uri.substring(0, start);
1390+
int mid = prefix.lastIndexOf(":");
1391+
if (mid >= 0) {
1392+
prefix = prefix.substring(mid + 1);
1393+
}
1394+
1395+
if (DemoSQLExecutor.DATABASE_INFLUXDB.equalsIgnoreCase(prefix)) {
1396+
database = prefix.toUpperCase();
1397+
1398+
int end = uri.lastIndexOf("/");
1399+
if (end >= 0) {
1400+
if (StringUtil.isEmpty(schema, true)) {
1401+
schema = uri.substring(end + 1);
1402+
}
1403+
uri = uri.substring(0, end);
1404+
}
1405+
1406+
uri = "http" + uri.substring(start);
1407+
1408+
account = "root";
1409+
password = "apijson@123";
1410+
} else if (DemoSQLExecutor.DATABASE_NEBULA.equalsIgnoreCase(prefix)) {
1411+
database = prefix.toUpperCase();
1412+
}
1413+
}
1414+
1415+
1416+
String trimmedSQL = sql == null ? null : sql.trim();
13821417

13831418
List<Object> valueList = arg;
13841419

13851420
DemoSQLExecutor executor = new DemoSQLExecutor();
13861421
DemoSQLConfig config = new DemoSQLConfig();
13871422

1388-
config.setDatabase(database); // "NEBULA"); //
1423+
config.setDatabase(database);
1424+
config.setSchema(schema);
13891425
config.setDBUri(uri);
13901426
config.setDBAccount(account);
13911427
config.setDBPassword(password);
13921428
config.setPrepared(true);
13931429
config.setPreparedValueList(valueList);
1430+
config.setSql(sql);
13941431

1395-
String sqlPrefix = trimmedSQL.length() >= 7 ? trimmedSQL.substring(0, 7).toUpperCase() : "";
1432+
String sqlPrefix = trimmedSQL == null || trimmedSQL.length() < 7 ? "" : trimmedSQL.substring(0, 7).toUpperCase();
13961433
boolean isWrite = sqlPrefix.startsWith("INSERT ") || sqlPrefix.startsWith("UPDATE ") || sqlPrefix.startsWith("DELETE ");
13971434

1398-
long executeStartTime = System.currentTimeMillis();
1435+
JSONArray arr = null;
13991436

1437+
long updateCount = 0;
1438+
long executeDuration = 0;
1439+
long cursorDuration = 0;
1440+
long rsDuration = 0;
1441+
long executeStartTime = System.currentTimeMillis();
14001442

1401-
Statement statement = executor.getStatement(config, trimmedSQL);
1402-
if (statement instanceof PreparedStatement) {
1403-
if (EXECUTE_STRICTLY) {
1404-
if (isWrite) {
1405-
((PreparedStatement) statement).executeUpdate();
1443+
if (DemoSQLExecutor.DATABASE_INFLUXDB.equals(database)) {
1444+
JSONObject result = executor.execute(config, false);
1445+
if (isWrite) {
1446+
updateCount = result == null ? 0 : result.getIntValue(JSONResponse.KEY_COUNT);
1447+
} else {
1448+
arr = result == null ? null : result.getJSONArray(DemoSQLExecutor.KEY_RAW_LIST);
1449+
}
1450+
} else {
1451+
Statement statement = executor.getStatement(config, trimmedSQL);
1452+
if (statement instanceof PreparedStatement) {
1453+
if (EXECUTE_STRICTLY) {
1454+
if (isWrite) {
1455+
((PreparedStatement) statement).executeUpdate();
1456+
} else {
1457+
((PreparedStatement) statement).executeQuery();
1458+
}
14061459
} else {
1407-
((PreparedStatement) statement).executeQuery();
1460+
((PreparedStatement) statement).execute();
1461+
}
1462+
} else {
1463+
if (arg != null && !arg.isEmpty()) {
1464+
throw new UnsupportedOperationException("非预编译模式不允许传参 arg !");
14081465
}
1409-
}
1410-
else {
1411-
((PreparedStatement) statement).execute();
1412-
}
1413-
} else {
1414-
if (arg != null && ! arg.isEmpty()) {
1415-
throw new UnsupportedOperationException("非预编译模式不允许传参 arg !");
1416-
}
14171466

1418-
if (EXECUTE_STRICTLY) {
1419-
if (isWrite) {
1420-
statement.executeUpdate(sql);
1467+
if (EXECUTE_STRICTLY) {
1468+
if (isWrite) {
1469+
statement.executeUpdate(sql);
1470+
} else {
1471+
statement.executeQuery(sql);
1472+
}
14211473
} else {
1422-
statement.executeQuery(sql);
1474+
statement.execute(sql);
14231475
}
14241476
}
1425-
else {
1426-
statement.execute(sql);
1427-
}
1428-
}
14291477

1430-
long executeDuration = System.currentTimeMillis() - executeStartTime;
1478+
executeDuration = System.currentTimeMillis() - executeStartTime;
14311479

1432-
ResultSet rs = statement.getResultSet();
1433-
ResultSetMetaData rsmd = rs == null ? null : rs.getMetaData();
1434-
int length = rsmd == null ? 0 : rsmd.getColumnCount();
1480+
arr = new JSONArray();
1481+
ResultSet rs = statement.getResultSet();
1482+
ResultSetMetaData rsmd = rs == null ? null : rs.getMetaData();
1483+
int length = rsmd == null ? 0 : rsmd.getColumnCount();
14351484

1436-
JSONArray arr = new JSONArray();
14371485

1438-
long cursorDuration = 0;
1439-
long rsDuration = 0;
1486+
long cursorStartTime = System.currentTimeMillis();
1487+
while (rs != null && rs.next()) {
1488+
cursorDuration += System.currentTimeMillis() - cursorStartTime;
14401489

1441-
long cursorStartTime = System.currentTimeMillis();
1442-
while (rs != null && rs.next()) {
1443-
cursorDuration += System.currentTimeMillis() - cursorStartTime;
1490+
JSONObject obj = new JSONObject(true);
1491+
for (int i = 1; i <= length; i++) {
1492+
long sqlStartTime = System.currentTimeMillis();
1493+
String name = rsmd.getColumnName(i); // rsmd.getColumnLable(i);
1494+
Object value = rs.getObject(i);
1495+
rsDuration += System.currentTimeMillis() - sqlStartTime;
14441496

1445-
JSONObject obj = new JSONObject(true);
1446-
for (int i = 1; i <= length; i++) {
1447-
long sqlStartTime = System.currentTimeMillis();
1448-
String name = rsmd.getColumnName(i); // rsmd.getColumnLable(i);
1449-
Object value = rs.getObject(i);
1450-
rsDuration += System.currentTimeMillis() - sqlStartTime;
1497+
obj.put(name, value);
1498+
}
14511499

1452-
obj.put(name, value);
1500+
arr.add(obj);
14531501
}
14541502

1455-
arr.add(obj);
1503+
// try {
1504+
updateCount = statement.getUpdateCount();
1505+
// } catch (Throwable e) {
1506+
// e.printStackTrace();
1507+
// }
14561508
}
14571509

14581510
JSONObject result = DemoParser.newSuccessResult();
14591511
result.put("sql", sql);
14601512
result.put("args", arg);
14611513
if (isWrite) {
1462-
// try {
1463-
result.put("count", statement.getUpdateCount());
1464-
// } catch (Throwable e) {
1465-
// e.printStackTrace();
1466-
// }
1514+
result.put("count", updateCount);
14671515
}
14681516
result.put("list", arr);
14691517

@@ -1475,7 +1523,10 @@ public String execute(@RequestBody String request, HttpSession session) {
14751523

14761524
result.put("time:start|duration|end|parse|sql", startTime + "|" + duration + "|" + endTime + "|" + parseDuration + "|" + sqlDuration);
14771525

1478-
// return result.toJSONString();
1526+
if (DemoSQLExecutor.DATABASE_NEBULA.equalsIgnoreCase(database) == false) {
1527+
return result.toJSONString();
1528+
}
1529+
14791530
return com.alibaba.fastjson.JSON.toJSONString(result, new ValueFilter() {
14801531
@Override
14811532
public Object process(Object o, String key, Object val) {

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoFunctionParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package apijson.demo;
1616

17+
import java.io.IOException;
1718
import java.util.ArrayList;
1819
import java.util.Arrays;
1920
import java.util.Collection;
@@ -246,5 +247,11 @@ public Object verifyAccess(@NotNull JSONObject curObj) throws Exception {
246247
return null;
247248
}
248249

249-
250+
// apijson-framework 5.4.0 以下取消注释,兼容 Function 表中 name = getMethodDefinition 的记录(或者删除这条记录,如果使用 UnitAuto,则版本要在 2.7.2 以下)
251+
// public String getMethodDefinition(JSONObject request) throws IllegalArgumentException, ClassNotFoundException, IOException {
252+
// return super.getMethodDefination(request);
253+
// }
254+
// public String getMethodDefinition(JSONObject request, String method, String arguments, String type, String exceptions, String language) throws IllegalArgumentException, ClassNotFoundException, IOException {
255+
// return super.getMethodDefination(request, method, arguments, type, exceptions, language);
256+
// }
250257
}

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,22 @@ public String getDBPassword() {
294294
return "";
295295
}
296296

297+
private String sql;
298+
public String getSQL() throws Exception {
299+
return getSQL(isPrepared());
300+
}
301+
@Override
302+
public String getSQL(boolean prepared) throws Exception {
303+
if (StringUtil.isNotEmpty(sql)) {
304+
return sql;
305+
}
306+
return super.getSQL(prepared);
307+
}
308+
309+
public void setSql(String sql) {
310+
this.sql = sql;
311+
}
312+
297313
// 取消注释后,默认的 APIJSON 配置表会由业务表所在 数据库类型 database 和 数据库模式 schema 改为自定义的
298314
// @Override
299315
// public String getConfigDatabase() {

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLExecutor.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,31 @@
1414

1515
package apijson.demo;
1616

17-
import apijson.JSON;
18-
import apijson.RequestMethod;
17+
import apijson.*;
1918
import com.alibaba.druid.pool.DruidDataSource;
2019
import com.alibaba.fastjson.JSONObject;
2120
import com.vesoft.nebula.jdbc.impl.NebulaDriver;
2221
import com.zaxxer.hikari.HikariDataSource;
2322

2423
import java.io.Serializable;
2524
import java.sql.Connection;
25+
import java.sql.SQLException;
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Properties;
2929
import java.util.concurrent.TimeUnit;
3030

3131
import javax.sql.DataSource;
3232

33-
import apijson.Log;
3433
import apijson.boot.DemoApplication;
3534
import apijson.framework.APIJSONSQLExecutor;
3635
import apijson.orm.SQLConfig;
36+
import org.influxdb.BatchOptions;
37+
import org.influxdb.InfluxDB;
38+
import org.influxdb.InfluxDBFactory;
39+
import org.influxdb.dto.Point;
40+
import org.influxdb.dto.Query;
41+
import org.influxdb.dto.QueryResult;
3742
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
3843
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
3944
import org.springframework.data.redis.core.RedisTemplate;
@@ -117,11 +122,13 @@ public synchronized void removeCache(String sql, SQLConfig config) {
117122

118123
// Redis 缓存 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
119124

125+
public static final String DATABASE_NEBULA = "NEBULA";
126+
public static final String DATABASE_INFLUXDB = "INFLUXDB";
120127

121128
// 适配连接池,如果这里能拿到连接池的有效 Connection,则 SQLConfig 不需要配置 dbVersion, dbUri, dbAccount, dbPassword
122129
@Override
123130
public Connection getConnection(SQLConfig config) throws Exception {
124-
if ("NEBULA".equals(config.getDatabase())) { // 3.0.0 及以下要这样连接
131+
if (DATABASE_NEBULA.equals(config.getDatabase())) { // 3.0.0 及以下要这样连接
125132
String uri = config.getDBUri();
126133

127134
int start = uri.indexOf("://");
@@ -186,6 +193,62 @@ public Connection getConnection(SQLConfig config) throws Exception {
186193
return super.getConnection(config);
187194
}
188195

196+
197+
@Override
198+
public JSONObject execute(SQLConfig config, boolean unknownType) throws Exception {
199+
if (DATABASE_INFLUXDB.equals(config.getDatabase())) {
200+
InfluxDB influxDB = InfluxDBFactory.connect(config.getDBUri(), config.getDBAccount(), config.getDBPassword());
201+
202+
influxDB.setDatabase(config.getSchema());
203+
204+
String sql = config.getSQL(config.isPrepared());
205+
String trimmedSQL = sql == null ? null : sql.trim();
206+
String sqlPrefix = trimmedSQL == null || trimmedSQL.length() < 7 ? "" : trimmedSQL.substring(0, 7).toUpperCase();
207+
boolean isWrite = sqlPrefix.startsWith("INSERT ") || sqlPrefix.startsWith("UPDATE ") || sqlPrefix.startsWith("DELETE ");
208+
209+
if (isWrite) {
210+
influxDB.enableBatch(
211+
BatchOptions.DEFAULTS
212+
.threadFactory(runnable -> {
213+
Thread thread = new Thread(runnable);
214+
thread.setDaemon(true);
215+
return thread;
216+
})
217+
);
218+
219+
Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));
220+
221+
influxDB.write(sql);
222+
223+
JSONObject result = new JSONObject(true);
224+
result.put(JSONResponse.KEY_COUNT, 1); // FIXME
225+
return result;
226+
}
227+
228+
QueryResult qr = influxDB.query(new Query(sql));
229+
230+
String err = qr == null ? null : qr.getError();
231+
if (StringUtil.isNotEmpty(qr, true)) {
232+
throw new SQLException(err);
233+
}
234+
235+
List<QueryResult.Result> list = qr == null ? null : qr.getResults();
236+
if (list == null || list.isEmpty()) {
237+
return new JSONObject(true);
238+
}
239+
240+
JSONObject result = JSON.parseObject(list.get(0));
241+
if (list.size() > 0) {
242+
result.put(KEY_RAW_LIST, list);
243+
}
244+
245+
return result;
246+
}
247+
248+
249+
return super.execute(config, unknownType);
250+
}
251+
189252
// 取消注释支持 !key 反选字段 和 字段名映射,需要先依赖插件 https://github.com/APIJSON/apijson-column
190253
// @Override
191254
// protected String getKey(SQLConfig config, ResultSet rs, ResultSetMetaData rsmd, int tablePosition, JSONObject table,

0 commit comments

Comments
 (0)