@@ -65,9 +65,27 @@ public final class StatementExecuteBackendHandler implements BackendHandler {
65
65
66
66
private final PreparedStatementRoutingEngine routingEngine ;
67
67
68
+ private List <Connection > connections ;
69
+
70
+ private List <ResultSet > resultSets ;
71
+
72
+ private MergedResult mergedResult ;
73
+
74
+ private int currentSequenceId ;
75
+
76
+ private int columnCount ;
77
+
78
+ private final List <ColumnType > columnTypes ;
79
+
80
+ private boolean hasMoreResultValueFlag ;
81
+
68
82
public StatementExecuteBackendHandler (final List <PreparedStatementParameter > preparedStatementParameters , final int statementId , final DatabaseType databaseType , final boolean showSQL ) {
69
83
this .preparedStatementParameters = preparedStatementParameters ;
70
84
routingEngine = new PreparedStatementRoutingEngine (PreparedStatementRegistry .getInstance ().getSQL (statementId ), ShardingRuleRegistry .getInstance ().getShardingRule (), databaseType , showSQL );
85
+ connections = new ArrayList <>(1024 );
86
+ resultSets = new ArrayList <>(1024 );
87
+ columnTypes = new ArrayList <>(32 );
88
+ hasMoreResultValueFlag = true ;
71
89
}
72
90
73
91
@ Override
@@ -77,24 +95,23 @@ public CommandResponsePackets execute() {
77
95
if (routeResult .getExecutionUnits ().isEmpty ()) {
78
96
return new CommandResponsePackets (new OKPacket (1 , 0 , 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue (), 0 , "" ));
79
97
}
80
- List <ColumnType > columnTypes = new ArrayList <>(32 );
81
98
List <CommandResponsePackets > result = new LinkedList <>();
82
99
for (SQLExecutionUnit each : routeResult .getExecutionUnits ()) {
83
100
// TODO multiple threads
84
- result .add (execute (routeResult .getSqlStatement (), each , columnTypes ));
101
+ result .add (execute (routeResult .getSqlStatement (), each ));
85
102
}
86
- return merge (routeResult .getSqlStatement (), result , columnTypes );
103
+ return merge (routeResult .getSqlStatement (), result );
87
104
}
88
105
89
- private CommandResponsePackets execute (final SQLStatement sqlStatement , final SQLExecutionUnit sqlExecutionUnit , final List < ColumnType > columnTypes ) {
106
+ private CommandResponsePackets execute (final SQLStatement sqlStatement , final SQLExecutionUnit sqlExecutionUnit ) {
90
107
switch (sqlStatement .getType ()) {
91
108
case DQL :
92
- return executeQuery (ShardingRuleRegistry .getInstance ().getDataSourceMap ().get (sqlExecutionUnit .getDataSource ()), sqlExecutionUnit .getSql (), columnTypes );
109
+ return executeQuery (ShardingRuleRegistry .getInstance ().getDataSourceMap ().get (sqlExecutionUnit .getDataSource ()), sqlExecutionUnit .getSql ());
93
110
case DML :
94
111
case DDL :
95
112
return executeUpdate (ShardingRuleRegistry .getInstance ().getDataSourceMap ().get (sqlExecutionUnit .getDataSource ()), sqlExecutionUnit .getSql (), sqlStatement );
96
113
default :
97
- return executeCommon (ShardingRuleRegistry .getInstance ().getDataSourceMap ().get (sqlExecutionUnit .getDataSource ()), sqlExecutionUnit .getSql (), columnTypes );
114
+ return executeCommon (ShardingRuleRegistry .getInstance ().getDataSourceMap ().get (sqlExecutionUnit .getDataSource ()), sqlExecutionUnit .getSql ());
98
115
}
99
116
}
100
117
@@ -112,13 +129,16 @@ private void setJDBCPreparedStatementParameters(final PreparedStatement prepared
112
129
}
113
130
}
114
131
115
- private CommandResponsePackets executeQuery (final DataSource dataSource , final String sql , final List <ColumnType > columnTypes ) {
116
- try (
117
- Connection connection = dataSource .getConnection ();
118
- PreparedStatement preparedStatement = connection .prepareStatement (sql )) {
132
+ private CommandResponsePackets executeQuery (final DataSource dataSource , final String sql ) {
133
+ PreparedStatement preparedStatement ;
134
+ try {
135
+ Connection connection = dataSource .getConnection ();
136
+ connections .add (connection );
137
+ preparedStatement = connection .prepareStatement (sql );
138
+ preparedStatement .setFetchSize (Integer .MIN_VALUE );
119
139
setJDBCPreparedStatementParameters (preparedStatement );
120
- ResultSet resultSet = preparedStatement .executeQuery ();
121
- return getDatabaseProtocolPackets (resultSet , columnTypes );
140
+ resultSets . add ( preparedStatement .executeQuery () );
141
+ return getDatabaseProtocolPackets ();
122
142
} catch (final SQLException ex ) {
123
143
return new CommandResponsePackets (new ErrPacket (1 , ex .getErrorCode (), "" , ex .getSQLState (), ex .getMessage ()));
124
144
}
@@ -144,24 +164,24 @@ private CommandResponsePackets executeUpdate(final DataSource dataSource, final
144
164
} catch (final SQLException ex ) {
145
165
return new CommandResponsePackets (new ErrPacket (1 , ex .getErrorCode (), "" , ex .getSQLState (), ex .getMessage ()));
146
166
} finally {
147
- if (preparedStatement != null ) {
167
+ if (null != preparedStatement ) {
148
168
try {
149
169
preparedStatement .close ();
150
170
} catch (final SQLException ignore ) {
151
171
}
152
172
}
153
173
}
154
-
155
174
}
156
175
157
- private CommandResponsePackets executeCommon (final DataSource dataSource , final String sql , final List < ColumnType > columnTypes ) {
176
+ private CommandResponsePackets executeCommon (final DataSource dataSource , final String sql ) {
158
177
try (
159
178
Connection connection = dataSource .getConnection ();
160
179
PreparedStatement preparedStatement = connection .prepareStatement (sql )) {
161
180
setJDBCPreparedStatementParameters (preparedStatement );
162
181
boolean hasResultSet = preparedStatement .execute ();
163
182
if (hasResultSet ) {
164
- return getDatabaseProtocolPackets (preparedStatement .getResultSet (), columnTypes );
183
+ resultSets .add (preparedStatement .getResultSet ());
184
+ return getDatabaseProtocolPackets ();
165
185
} else {
166
186
return new CommandResponsePackets (new OKPacket (1 , preparedStatement .getUpdateCount (), 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue (), 0 , "" ));
167
187
}
@@ -170,11 +190,11 @@ private CommandResponsePackets executeCommon(final DataSource dataSource, final
170
190
}
171
191
}
172
192
173
- private CommandResponsePackets getDatabaseProtocolPackets (final ResultSet resultSet , final List < ColumnType > columnTypes ) throws SQLException {
193
+ private CommandResponsePackets getDatabaseProtocolPackets () throws SQLException {
174
194
CommandResponsePackets result = new CommandResponsePackets ();
175
195
int currentSequenceId = 0 ;
176
- ResultSetMetaData resultSetMetaData = resultSet .getMetaData ();
177
- int columnCount = resultSetMetaData .getColumnCount ();
196
+ ResultSetMetaData resultSetMetaData = resultSets . get ( resultSets . size () - 1 ) .getMetaData ();
197
+ columnCount = resultSetMetaData .getColumnCount ();
178
198
if (0 == columnCount ) {
179
199
result .addPacket (new OKPacket (++currentSequenceId , 0 , 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue (), 0 , "" ));
180
200
return result ;
@@ -188,14 +208,6 @@ private CommandResponsePackets getDatabaseProtocolPackets(final ResultSet result
188
208
columnTypes .add (columnType );
189
209
}
190
210
result .addPacket (new EofPacket (++currentSequenceId , 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue ()));
191
- while (resultSet .next ()) {
192
- List <Object > data = new ArrayList <>(columnCount );
193
- for (int i = 1 ; i <= columnCount ; i ++) {
194
- data .add (resultSet .getObject (i ));
195
- }
196
- result .addPacket (new BinaryResultSetRowPacket (++currentSequenceId , columnCount , data , columnTypes ));
197
- }
198
- result .addPacket (new EofPacket (++currentSequenceId , 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue ()));
199
211
return result ;
200
212
}
201
213
@@ -208,7 +220,7 @@ private long getGeneratedKey(final PreparedStatement preparedStatement) throws S
208
220
return result ;
209
221
}
210
222
211
- private CommandResponsePackets merge (final SQLStatement sqlStatement , final List <CommandResponsePackets > packets , final List < ColumnType > columnTypes ) {
223
+ private CommandResponsePackets merge (final SQLStatement sqlStatement , final List <CommandResponsePackets > packets ) {
212
224
if (1 == packets .size ()) {
213
225
return packets .iterator ().next ();
214
226
}
@@ -225,7 +237,7 @@ private CommandResponsePackets merge(final SQLStatement sqlStatement, final List
225
237
return mergeDML (headPackets );
226
238
}
227
239
if (SQLType .DQL == sqlStatement .getType () || SQLType .DAL == sqlStatement .getType ()) {
228
- return mergeDQLorDAL (sqlStatement , packets , columnTypes );
240
+ return mergeDQLorDAL (sqlStatement , packets );
229
241
}
230
242
return packets .get (0 );
231
243
}
@@ -241,44 +253,88 @@ private CommandResponsePackets mergeDML(final CommandResponsePackets firstPacket
241
253
return new CommandResponsePackets (new OKPacket (1 , affectedRows , 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue (), 0 , "" ));
242
254
}
243
255
244
- private CommandResponsePackets mergeDQLorDAL (final SQLStatement sqlStatement , final List <CommandResponsePackets > packets , final List < ColumnType > columnTypes ) {
256
+ private CommandResponsePackets mergeDQLorDAL (final SQLStatement sqlStatement , final List <CommandResponsePackets > packets ) {
245
257
List <QueryResult > queryResults = new ArrayList <>(packets .size ());
246
- for (CommandResponsePackets each : packets ) {
258
+ for (int i = 0 ; i < packets . size (); i ++ ) {
247
259
// TODO replace to a common PacketQueryResult
248
- queryResults .add (new MySQLPacketStatementExecuteQueryResult (each ));
260
+ queryResults .add (new MySQLPacketStatementExecuteQueryResult (packets . get ( i ), resultSets . get ( i ), columnTypes ));
249
261
}
250
- MergedResult mergedResult ;
251
262
try {
252
263
mergedResult = MergeEngineFactory .newInstance (ShardingRuleRegistry .getInstance ().getShardingRule (), queryResults , sqlStatement ).merge ();
253
264
} catch (final SQLException ex ) {
254
265
return new CommandResponsePackets (new ErrPacket (1 , ex .getErrorCode (), "" , ex .getSQLState (), ex .getMessage ()));
255
266
}
256
- return buildPackets (packets , mergedResult , columnTypes );
267
+ return buildPackets (packets );
257
268
}
258
269
259
- private CommandResponsePackets buildPackets (final List <CommandResponsePackets > packets , final MergedResult mergedResult , final List < ColumnType > columnTypes ) {
270
+ private CommandResponsePackets buildPackets (final List <CommandResponsePackets > packets ) {
260
271
CommandResponsePackets result = new CommandResponsePackets ();
261
272
Iterator <DatabaseProtocolPacket > databaseProtocolPacketsSampling = packets .iterator ().next ().getDatabaseProtocolPackets ().iterator ();
262
273
FieldCountPacket fieldCountPacketSampling = (FieldCountPacket ) databaseProtocolPacketsSampling .next ();
263
274
result .addPacket (fieldCountPacketSampling );
264
- int columnCount = fieldCountPacketSampling . getColumnCount () ;
275
+ ++ currentSequenceId ;
265
276
for (int i = 0 ; i < columnCount ; i ++) {
266
277
result .addPacket (databaseProtocolPacketsSampling .next ());
278
+ ++currentSequenceId ;
267
279
}
268
280
result .addPacket (databaseProtocolPacketsSampling .next ());
269
- int currentSequenceId = result .size ();
281
+ ++currentSequenceId ;
282
+ return result ;
283
+ }
284
+
285
+ /**
286
+ * Has more Result value.
287
+ *
288
+ * @return has more result value
289
+ * @throws SQLException sql exception
290
+ */
291
+ public boolean hasMoreResultValue () throws SQLException {
292
+ if (!hasMoreResultValueFlag ) {
293
+ return false ;
294
+ }
295
+ if (!mergedResult .next ()) {
296
+ hasMoreResultValueFlag = false ;
297
+ cleanJDBCResources ();
298
+ }
299
+ return true ;
300
+ }
301
+
302
+ /**
303
+ * Get result value.
304
+ *
305
+ * @return database protocol packet
306
+ */
307
+ public DatabaseProtocolPacket getResultValue () {
308
+ if (!hasMoreResultValueFlag ) {
309
+ return new EofPacket (++currentSequenceId , 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue ());
310
+ }
270
311
try {
271
- while (mergedResult .next ()) {
272
- List <Object > data = new ArrayList <>(columnCount );
273
- for (int i = 1 ; i <= columnCount ; i ++) {
274
- data .add (mergedResult .getValue (i , Object .class ));
275
- }
276
- result .addPacket (new BinaryResultSetRowPacket (++currentSequenceId , columnCount , data , columnTypes ));
312
+ List <Object > data = new ArrayList <>(columnCount );
313
+ for (int i = 1 ; i <= columnCount ; i ++) {
314
+ data .add (mergedResult .getValue (i , Object .class ));
277
315
}
316
+ return new BinaryResultSetRowPacket (++currentSequenceId , columnCount , data , columnTypes );
278
317
} catch (final SQLException ex ) {
279
- return new CommandResponsePackets (new ErrPacket (1 , ex .getErrorCode (), "" , ex .getSQLState (), ex .getMessage ()));
318
+ return new ErrPacket (1 , ex .getErrorCode (), "" , ex .getSQLState (), ex .getMessage ());
319
+ }
320
+ }
321
+
322
+ private void cleanJDBCResources () {
323
+ for (ResultSet each : resultSets ) {
324
+ if (null != each ) {
325
+ try {
326
+ each .close ();
327
+ } catch (final SQLException ignore ) {
328
+ }
329
+ }
330
+ }
331
+ for (Connection each : connections ) {
332
+ if (null != each ) {
333
+ try {
334
+ each .close ();
335
+ } catch (final SQLException ignore ) {
336
+ }
337
+ }
280
338
}
281
- result .addPacket (new EofPacket (++currentSequenceId , 0 , StatusFlag .SERVER_STATUS_AUTOCOMMIT .getValue ()));
282
- return result ;
283
339
}
284
340
}
0 commit comments