17
17
18
18
package com .dangdang .ddframe .rdb .sharding .merger ;
19
19
20
+ import java .sql .ResultSet ;
21
+ import java .sql .SQLException ;
22
+ import java .util .List ;
23
+
20
24
import com .dangdang .ddframe .rdb .sharding .exception .ShardingJdbcException ;
25
+ import com .dangdang .ddframe .rdb .sharding .merger .component .ComponentResultSet ;
21
26
import com .dangdang .ddframe .rdb .sharding .merger .component .coupling .GroupByCouplingResultSet ;
22
27
import com .dangdang .ddframe .rdb .sharding .merger .component .coupling .LimitCouplingResultSet ;
28
+ import com .dangdang .ddframe .rdb .sharding .merger .component .coupling .MemoryOrderByCouplingResultSet ;
23
29
import com .dangdang .ddframe .rdb .sharding .merger .component .other .WrapperResultSet ;
24
30
import com .dangdang .ddframe .rdb .sharding .merger .component .reducer .IteratorReducerResultSet ;
25
- import com .dangdang .ddframe .rdb .sharding .parser .result .merger .IndexColumn ;
31
+ import com .dangdang .ddframe .rdb .sharding .merger .component .reducer .MemoryOrderByReducerResultSet ;
32
+ import com .dangdang .ddframe .rdb .sharding .merger .component .reducer .StreamingOrderByReducerResultSet ;
26
33
import com .dangdang .ddframe .rdb .sharding .parser .result .merger .MergeContext ;
27
34
import com .google .common .base .Function ;
28
35
import com .google .common .base .Predicate ;
32
39
import lombok .RequiredArgsConstructor ;
33
40
import lombok .extern .slf4j .Slf4j ;
34
41
35
- import java .sql .ResultSet ;
36
- import java .sql .SQLException ;
37
- import java .util .List ;
38
-
39
42
/**
40
43
* 创建归并分片结果集的工厂.
41
44
*
@@ -53,25 +56,19 @@ public final class ResultSetFactory {
53
56
* @return 结果集包装
54
57
*/
55
58
public static ResultSet getResultSet (final List <ResultSet > resultSets , final MergeContext mergeContext ) throws SQLException {
56
- // TODO 如果不filter, 直接操纵可能为空的resultSet会有什么结果, 能否统一处理
57
59
List <ResultSet > filteredResultSets = filterResultSets (resultSets );
58
60
if (filteredResultSets .isEmpty ()) {
59
61
log .trace ("Sharding-JDBC: No data found in origin result sets" );
60
62
return resultSets .get (0 );
61
63
}
62
- // TODO 只有1个的情况和多个情况有何不同, 需要单独处理么
63
64
if (1 == filteredResultSets .size ()) {
64
65
log .trace ("Sharding-JDBC: Only one result set" );
65
66
return filteredResultSets .get (0 );
66
67
}
67
- setColumnIndex ((WrapperResultSet ) filteredResultSets .get (0 ), mergeContext );
68
- ResultSetPipelineBuilder builder = new ResultSetPipelineBuilder (filteredResultSets , mergeContext .getOrderByColumns ());
69
- buildReducer (builder , mergeContext );
70
- buildCoupling (builder , mergeContext );
71
- return builder .build ();
68
+ mergeContext .buildContextWithResultSet ((WrapperResultSet ) filteredResultSets .get (0 ));
69
+ return buildCoupling (buildReducer (filteredResultSets , mergeContext ), mergeContext );
72
70
}
73
71
74
- // TODO 能否直接使用WrapperResultSet
75
72
private static List <ResultSet > filterResultSets (final List <ResultSet > resultSets ) {
76
73
return Lists .newArrayList (Collections2 .filter (Lists .transform (resultSets , new Function <ResultSet , ResultSet >() {
77
74
@@ -92,40 +89,36 @@ public boolean apply(final ResultSet input) {
92
89
}));
93
90
}
94
91
95
- private static void setColumnIndex (final WrapperResultSet resultSet , final MergeContext mergeContext ) {
96
- for ( IndexColumn each : mergeContext .getMergeFocusedColumns ()) {
97
- if (0 == each . getColumnIndex ()) {
98
- each . setColumnIndex ( resultSet . getColumnIndex ( each ) );
92
+ private static ResultSet buildReducer (final List < ResultSet > filteredResultSets , final MergeContext mergeContext ) throws SQLException {
93
+ if ( mergeContext .hasGroupBy ()) {
94
+ if (mergeContext . groupByKeysEqualsOrderByKeys ()) {
95
+ return join ( new StreamingOrderByReducerResultSet ( mergeContext . getCurrentOrderByKeys ()), filteredResultSets );
99
96
}
97
+ return join (new MemoryOrderByReducerResultSet (mergeContext .getCurrentOrderByKeys ()), filteredResultSets );
98
+ } else if (mergeContext .hasOrderBy ()) {
99
+ return join (new StreamingOrderByReducerResultSet (mergeContext .getCurrentOrderByKeys ()), filteredResultSets );
100
+ } else {
101
+ return join (new IteratorReducerResultSet (), filteredResultSets );
100
102
}
101
103
}
102
104
103
- // TODO reducer目的是什么, 是为了确定读取resultSet的next走内存还是走streaming吗, 如果是,是否抽象出两个Reducer就够了
104
- private static void buildReducer (final ResultSetPipelineBuilder builder , final MergeContext mergeContext ) throws SQLException {
105
- // TODO 判断hasGroupByOrAggregation并获取什么样的OrderByColumns, 能否封装到mergeContext对象里
105
+ private static ResultSet buildCoupling (final ResultSet preResultSet , final MergeContext mergeContext ) throws SQLException {
106
+ ResultSet currentResultSet = preResultSet ;
106
107
if (mergeContext .hasGroupByOrAggregation ()) {
107
- builder .joinSortReducer (mergeContext .transformGroupByColumnToOrderByColumn ());
108
- return ;
108
+ currentResultSet = join (new GroupByCouplingResultSet (mergeContext .getGroupByColumns (), mergeContext .getAggregationColumns ()), currentResultSet );
109
109
}
110
- if (mergeContext .hasOrderBy ()) {
111
- builder .joinSortReducer (mergeContext .getOrderByColumns ());
112
- return ;
113
- }
114
- builder .join (new IteratorReducerResultSet ());
115
- }
116
-
117
- // TODO Reducer和Coupling大致流程一致, 两个有什么区别
118
- private static void buildCoupling (final ResultSetPipelineBuilder builder , final MergeContext mergeContext ) throws SQLException {
119
- if (mergeContext .hasGroupByOrAggregation ()) {
120
- // TODO 保持一致, 都new一个CouplingResultSet
121
- builder .join (new GroupByCouplingResultSet (mergeContext .getGroupByColumns (), mergeContext .getAggregationColumns ()));
122
- }
123
- if (mergeContext .hasOrderBy ()) {
124
- // TODO 保持一致, 都new一个CouplingResultSet
125
- builder .joinSortCoupling (mergeContext .getOrderByColumns ());
110
+ if (mergeContext .needToSort ()) {
111
+ currentResultSet = join (new MemoryOrderByCouplingResultSet (mergeContext .getCurrentOrderByKeys ()), currentResultSet );
126
112
}
127
113
if (mergeContext .hasLimit ()) {
128
- builder . join (new LimitCouplingResultSet (mergeContext .getLimit ()));
114
+ currentResultSet = join (new LimitCouplingResultSet (mergeContext .getLimit ()), currentResultSet );
129
115
}
116
+ return currentResultSet ;
117
+ }
118
+
119
+ private static <T > ComponentResultSet <T > join (final ComponentResultSet <T > resultSet , final T preResultSet ) throws SQLException {
120
+ log .trace ("{} joined" , resultSet .getClass ().getSimpleName ());
121
+ resultSet .init (preResultSet );
122
+ return resultSet ;
130
123
}
131
124
}
0 commit comments