Skip to content

Commit 1026afe

Browse files
committed
refactor result set factory
1 parent 305295b commit 1026afe

18 files changed

+187
-328
lines changed

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetFactory.java

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717

1818
package com.dangdang.ddframe.rdb.sharding.merger;
1919

20+
import java.sql.ResultSet;
21+
import java.sql.SQLException;
22+
import java.util.List;
23+
2024
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
25+
import com.dangdang.ddframe.rdb.sharding.merger.component.ComponentResultSet;
2126
import com.dangdang.ddframe.rdb.sharding.merger.component.coupling.GroupByCouplingResultSet;
2227
import com.dangdang.ddframe.rdb.sharding.merger.component.coupling.LimitCouplingResultSet;
28+
import com.dangdang.ddframe.rdb.sharding.merger.component.coupling.MemoryOrderByCouplingResultSet;
2329
import com.dangdang.ddframe.rdb.sharding.merger.component.other.WrapperResultSet;
2430
import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.IteratorReducerResultSet;
25-
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.IndexColumn;
31+
import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.MemoryOrderByReducerResultSet;
32+
import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.StreamingOrderByReducerResultSet;
2633
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
2734
import com.google.common.base.Function;
2835
import com.google.common.base.Predicate;
@@ -32,10 +39,6 @@
3239
import lombok.RequiredArgsConstructor;
3340
import lombok.extern.slf4j.Slf4j;
3441

35-
import java.sql.ResultSet;
36-
import java.sql.SQLException;
37-
import java.util.List;
38-
3942
/**
4043
* 创建归并分片结果集的工厂.
4144
*
@@ -53,25 +56,19 @@ public final class ResultSetFactory {
5356
* @return 结果集包装
5457
*/
5558
public static ResultSet getResultSet(final List<ResultSet> resultSets, final MergeContext mergeContext) throws SQLException {
56-
// TODO 如果不filter, 直接操纵可能为空的resultSet会有什么结果, 能否统一处理
5759
List<ResultSet> filteredResultSets = filterResultSets(resultSets);
5860
if (filteredResultSets.isEmpty()) {
5961
log.trace("Sharding-JDBC: No data found in origin result sets");
6062
return resultSets.get(0);
6163
}
62-
// TODO 只有1个的情况和多个情况有何不同, 需要单独处理么
6364
if (1 == filteredResultSets.size()) {
6465
log.trace("Sharding-JDBC: Only one result set");
6566
return filteredResultSets.get(0);
6667
}
67-
setColumnIndex((WrapperResultSet) filteredResultSets.get(0), mergeContext);
68-
ResultSetPipelineBuilder builder = new ResultSetPipelineBuilder(filteredResultSets, mergeContext.getOrderByColumns());
69-
buildReducer(builder, mergeContext);
70-
buildCoupling(builder, mergeContext);
71-
return builder.build();
68+
mergeContext.buildContextWithResultSet((WrapperResultSet) filteredResultSets.get(0));
69+
return buildCoupling(buildReducer(filteredResultSets, mergeContext), mergeContext);
7270
}
7371

74-
// TODO 能否直接使用WrapperResultSet
7572
private static List<ResultSet> filterResultSets(final List<ResultSet> resultSets) {
7673
return Lists.newArrayList(Collections2.filter(Lists.transform(resultSets, new Function<ResultSet, ResultSet>() {
7774

@@ -92,40 +89,36 @@ public boolean apply(final ResultSet input) {
9289
}));
9390
}
9491

95-
private static void setColumnIndex(final WrapperResultSet resultSet, final MergeContext mergeContext) {
96-
for (IndexColumn each : mergeContext.getMergeFocusedColumns()) {
97-
if (0 == each.getColumnIndex()) {
98-
each.setColumnIndex(resultSet.getColumnIndex(each));
92+
private static ResultSet buildReducer(final List<ResultSet> filteredResultSets, final MergeContext mergeContext) throws SQLException {
93+
if (mergeContext.hasGroupBy()) {
94+
if (mergeContext.groupByKeysEqualsOrderByKeys()) {
95+
return join(new StreamingOrderByReducerResultSet(mergeContext.getCurrentOrderByKeys()), filteredResultSets);
9996
}
97+
return join(new MemoryOrderByReducerResultSet(mergeContext.getCurrentOrderByKeys()), filteredResultSets);
98+
} else if (mergeContext.hasOrderBy()) {
99+
return join(new StreamingOrderByReducerResultSet(mergeContext.getCurrentOrderByKeys()), filteredResultSets);
100+
} else {
101+
return join(new IteratorReducerResultSet(), filteredResultSets);
100102
}
101103
}
102104

103-
// TODO reducer目的是什么, 是为了确定读取resultSet的next走内存还是走streaming吗, 如果是,是否抽象出两个Reducer就够了
104-
private static void buildReducer(final ResultSetPipelineBuilder builder, final MergeContext mergeContext) throws SQLException {
105-
// TODO 判断hasGroupByOrAggregation并获取什么样的OrderByColumns, 能否封装到mergeContext对象里
105+
private static ResultSet buildCoupling(final ResultSet preResultSet, final MergeContext mergeContext) throws SQLException {
106+
ResultSet currentResultSet = preResultSet;
106107
if (mergeContext.hasGroupByOrAggregation()) {
107-
builder.joinSortReducer(mergeContext.transformGroupByColumnToOrderByColumn());
108-
return;
108+
currentResultSet = join(new GroupByCouplingResultSet(mergeContext.getGroupByColumns(), mergeContext.getAggregationColumns()), currentResultSet);
109109
}
110-
if (mergeContext.hasOrderBy()) {
111-
builder.joinSortReducer(mergeContext.getOrderByColumns());
112-
return;
113-
}
114-
builder.join(new IteratorReducerResultSet());
115-
}
116-
117-
// TODO Reducer和Coupling大致流程一致, 两个有什么区别
118-
private static void buildCoupling(final ResultSetPipelineBuilder builder, final MergeContext mergeContext) throws SQLException {
119-
if (mergeContext.hasGroupByOrAggregation()) {
120-
// TODO 保持一致, 都new一个CouplingResultSet
121-
builder.join(new GroupByCouplingResultSet(mergeContext.getGroupByColumns(), mergeContext.getAggregationColumns()));
122-
}
123-
if (mergeContext.hasOrderBy()) {
124-
// TODO 保持一致, 都new一个CouplingResultSet
125-
builder.joinSortCoupling(mergeContext.getOrderByColumns());
110+
if (mergeContext.needToSort()) {
111+
currentResultSet = join(new MemoryOrderByCouplingResultSet(mergeContext.getCurrentOrderByKeys()), currentResultSet);
126112
}
127113
if (mergeContext.hasLimit()) {
128-
builder.join(new LimitCouplingResultSet(mergeContext.getLimit()));
114+
currentResultSet = join(new LimitCouplingResultSet(mergeContext.getLimit()), currentResultSet);
129115
}
116+
return currentResultSet;
117+
}
118+
119+
private static <T> ComponentResultSet<T> join(final ComponentResultSet<T> resultSet, final T preResultSet) throws SQLException {
120+
log.trace("{} joined", resultSet.getClass().getSimpleName());
121+
resultSet.init(preResultSet);
122+
return resultSet;
130123
}
131124
}

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetPipelineBuilder.java

Lines changed: 0 additions & 122 deletions
This file was deleted.

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/ResultSetUtil.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717

1818
package com.dangdang.ddframe.rdb.sharding.merger.common;
1919

20-
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
21-
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn.OrderByType;
22-
import lombok.AccessLevel;
23-
import lombok.RequiredArgsConstructor;
24-
2520
import java.math.BigDecimal;
2621
import java.sql.Time;
2722
import java.sql.Timestamp;
2823
import java.util.Date;
2924

25+
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
26+
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn.OrderByType;
27+
import lombok.AccessLevel;
28+
import lombok.RequiredArgsConstructor;
29+
3030
/**
3131
* 结果集处理工具类.
3232
*
@@ -95,7 +95,6 @@ private static Object convertNumberValue(final Object value, final Class<?> conv
9595
return number.floatValue();
9696
case "java.math.BigDecimal":
9797
return new BigDecimal(number.toString());
98-
// TODO Object和String不是number, 不会进入switch
9998
case "java.lang.Object":
10099
return value;
101100
case "java.lang.String":

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/ComponentResultSet.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@
2121
import java.sql.SQLException;
2222

2323
/**
24-
* 组件接口.
24+
* 管道化组件接口.
2525
*
26-
* @author gaohongtao
26+
* @param <T> 前置组件类型
2727
*/
28-
// TODO 泛型的javadoc
29-
// TODO ComponentResultSet是什么
3028
public interface ComponentResultSet<T> extends ResultSet {
3129

32-
// TODO 注释
33-
// TODO 接口参数不需要加final
34-
void init(final T preResultSet) throws SQLException;
30+
/**
31+
* 初始化管道组件.
32+
*
33+
* @param preComponent 前置管道组件
34+
* @return 返回初始化完成的管道组件
35+
* @throws SQLException 访问组件可能抛出异常
36+
*/
37+
ComponentResultSet init(T preComponent) throws SQLException;
3538
}

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/CouplingResultSet.java renamed to sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/CouplingResultSet.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@
1515
* </p>
1616
*/
1717

18-
package com.dangdang.ddframe.rdb.sharding.merger.component;
18+
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
1919

2020
import java.sql.ResultSet;
2121

22+
import com.dangdang.ddframe.rdb.sharding.merger.component.ComponentResultSet;
23+
2224
/**
2325
* 节点结果集.
2426
*
2527
* @author gaohongtao
2628
*/
27-
// TODO 放到coupling包
2829
public interface CouplingResultSet extends ComponentResultSet<ResultSet> {
2930

3031
}

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/GroupByCouplingResultSet.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.List;
2424

2525
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
26-
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
26+
import com.dangdang.ddframe.rdb.sharding.merger.component.ComponentResultSet;
2727
import com.dangdang.ddframe.rdb.sharding.merger.row.GroupByRow;
2828
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
2929
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
@@ -49,8 +49,9 @@ public class GroupByCouplingResultSet extends AbstractRowSetResultSetAdapter imp
4949
private boolean hasNext;
5050

5151
@Override
52-
public void init(final ResultSet preResultSet) {
52+
public ComponentResultSet init(final ResultSet preResultSet) {
5353
setResultSets(Collections.singletonList(preResultSet));
54+
return this;
5455
}
5556

5657
@Override

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/LimitCouplingResultSet.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.sql.SQLException;
2222

2323
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractForwardingResultSetAdapter;
24-
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
24+
import com.dangdang.ddframe.rdb.sharding.merger.component.ComponentResultSet;
2525
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
2626
import lombok.extern.slf4j.Slf4j;
2727

@@ -46,9 +46,10 @@ public LimitCouplingResultSet(final Limit limit) {
4646
}
4747

4848
@Override
49-
public void init(final ResultSet preResultSet) {
49+
public ComponentResultSet init(final ResultSet preResultSet) {
5050
setDelegate(preResultSet);
5151
this.preResultSet = preResultSet;
52+
return this;
5253
}
5354

5455
@Override
@@ -78,9 +79,4 @@ private boolean skipOffset() throws SQLException {
7879
initial = true;
7980
return result;
8081
}
81-
82-
@Override
83-
public String toString() {
84-
return String.format("Limit row number:%d limit size:%d result set stat:%s", rowNumber, limit.getRowCount(), super.toString());
85-
}
8682
}

sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/MemoryOrderByCouplingResultSet.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323

24-
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
24+
import com.dangdang.ddframe.rdb.sharding.merger.component.ComponentResultSet;
2525
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
2626
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
2727

@@ -37,7 +37,8 @@ public MemoryOrderByCouplingResultSet(final List<OrderByColumn> expectOrderList)
3737
}
3838

3939
@Override
40-
public void init(final ResultSet preResultSet) {
40+
public ComponentResultSet init(final ResultSet preResultSet) {
4141
setResultSets(Collections.singletonList(preResultSet));
42+
return this;
4243
}
4344
}

0 commit comments

Comments
 (0)