Skip to content

Commit 5cd6ab6

Browse files
committed
Join - order by support on first table
1 parent 6ce55d2 commit 5cd6ab6

File tree

7 files changed

+165
-52
lines changed

7 files changed

+165
-52
lines changed

src/main/java/org/elasticsearch/plugin/nlpcn/ElasticJoinExecutor.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package org.elasticsearch.plugin.nlpcn;
22

33
import com.google.common.collect.ImmutableMap;
4+
import org.elasticsearch.action.search.SearchRequestBuilder;
45
import org.elasticsearch.action.search.SearchResponse;
6+
import org.elasticsearch.action.search.SearchType;
57
import org.elasticsearch.client.Client;
68
import org.elasticsearch.common.text.StringText;
9+
import org.elasticsearch.common.unit.TimeValue;
710
import org.elasticsearch.common.xcontent.XContentBuilder;
811
import org.elasticsearch.common.xcontent.XContentFactory;
912
import org.elasticsearch.common.xcontent.XContentType;
13+
1014
import org.elasticsearch.rest.BytesRestResponse;
1115
import org.elasticsearch.rest.RestChannel;
1216
import org.elasticsearch.rest.RestStatus;
@@ -20,6 +24,7 @@
2024
import org.nlpcn.es4sql.query.join.HashJoinElasticRequestBuilder;
2125
import org.nlpcn.es4sql.query.join.JoinRequestBuilder;
2226
import org.nlpcn.es4sql.query.join.NestedLoopsElasticRequestBuilder;
27+
import org.nlpcn.es4sql.query.join.TableInJoinRequestBuilder;
2328

2429
import java.io.IOException;
2530
import java.util.*;
@@ -173,33 +178,37 @@ protected Object deepSearchInMap(Map<String, Object> fieldsMap, String name) {
173178
protected void addUnmatchedResults(List<InternalSearchHit> combinedResults, Collection<SearchHitsResult> firstTableSearchHits, List<Field> secondTableReturnedFields,int currentNumOfIds, int totalLimit,String t1Alias,String t2Alias) {
174179
boolean limitReached = false;
175180
for(SearchHitsResult hitsResult : firstTableSearchHits){
176-
if(!hitsResult.isMatchedWithOtherTable()){
177-
for(SearchHit hit: hitsResult.getSearchHits() ) {
181+
if(!hitsResult.isMatchedWithOtherTable())
182+
for (InternalSearchHit hit : hitsResult.getSearchHits()) {
178183

179184
//todo: decide which id to put or type. or maby its ok this way. just need to doc.
180-
addUnmachedResult(combinedResults, secondTableReturnedFields, currentNumOfIds, t1Alias, t2Alias, hit);
185+
InternalSearchHit unmachedResult = createUnmachedResult(secondTableReturnedFields, hit.docId(), t1Alias, t2Alias, hit);
186+
combinedResults.add(unmachedResult);
181187
currentNumOfIds++;
182-
if(currentNumOfIds >= totalLimit){
188+
if (currentNumOfIds >= totalLimit) {
183189
limitReached = true;
184190
break;
185191
}
186192

187193
}
188-
}
189194
if(limitReached) break;
190195
}
191196
}
192197

193-
protected void addUnmachedResult(List<InternalSearchHit> combinedResults, List<Field> secondTableReturnedFields, int currentNumOfIds, String t1Alias, String t2Alias, SearchHit hit) {
194-
InternalSearchHit searchHit = new InternalSearchHit(currentNumOfIds, hit.id() + "|0", new StringText(hit.getType() + "|null"), hit.getFields());
198+
protected InternalSearchHit createUnmachedResult( List<Field> secondTableReturnedFields, int docId, String t1Alias, String t2Alias, SearchHit hit) {
199+
String unmatchedId = hit.id() + "|0";
200+
StringText unamatchedType = new StringText(hit.getType() + "|null");
201+
202+
InternalSearchHit searchHit = new InternalSearchHit(docId, unmatchedId, unamatchedType, hit.getFields());
203+
195204
searchHit.sourceRef(hit.getSourceRef());
196205
searchHit.sourceAsMap().clear();
197206
searchHit.sourceAsMap().putAll(hit.sourceAsMap());
198207
Map<String,Object> emptySecondTableHitSource = createNullsSource(secondTableReturnedFields);
199208

200209
mergeSourceAndAddAliases(emptySecondTableHitSource, searchHit,t1Alias,t2Alias);
201210

202-
combinedResults.add(searchHit);
211+
return searchHit;
203212
}
204213

205214
protected Map<String, Object> createNullsSource(List<Field> secondTableReturnedFields) {
@@ -217,5 +226,18 @@ protected void updateMetaSearchResults( SearchResponse searchResponse) {
217226
this.metaResults.updateTimeOut(searchResponse.isTimedOut());
218227
}
219228

229+
protected SearchResponse scrollOneTimeWithMax(Client client,TableInJoinRequestBuilder tableRequest) {
230+
SearchResponse responseWithHits;SearchRequestBuilder scrollRequest = tableRequest.getRequestBuilder()
231+
.setScroll(new TimeValue(60000))
232+
.setSize(MAX_RESULTS_ON_ONE_FETCH);
233+
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
234+
if(!ordered) scrollRequest.setSearchType(SearchType.SCAN);
235+
responseWithHits = scrollRequest.get();
236+
//on ordered select - not using SCAN , elastic returns hits on first scroll
237+
if(!ordered)
238+
responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId()).setScroll(new TimeValue(600000)).get();
239+
return responseWithHits;
240+
}
241+
220242

221243
}

src/main/java/org/elasticsearch/plugin/nlpcn/HashJoinComparisonStructure.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package org.elasticsearch.plugin.nlpcn;
22

3-
import org.elasticsearch.common.text.StringText;
4-
import org.elasticsearch.search.SearchHit;
53
import org.elasticsearch.search.internal.InternalSearchHit;
64
import org.nlpcn.es4sql.domain.Field;
75

src/main/java/org/elasticsearch/plugin/nlpcn/HashJoinElasticExecutor.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public List<InternalSearchHit> innerRun() throws IOException, SqlParseException
5252
Map<String, Map<String, List<Object>>> optimizationTermsFilterStructure =
5353
initOptimizationStructure();
5454

55-
5655
updateFirstTableLimitIfNeeded();
5756
TableInJoinRequestBuilder firstTableRequest = requestBuilder.getFirstTable();
5857
createKeyToResultsAndFillOptimizationStructure(optimizationTermsFilterStructure, firstTableRequest);
@@ -76,6 +75,14 @@ public List<InternalSearchHit> innerRun() throws IOException, SqlParseException
7675
t1Alias,
7776
t2Alias);
7877
}
78+
if(firstTableRequest.getOriginalSelect().isOrderdSelect()){
79+
combinedResult.sort(new Comparator<InternalSearchHit>() {
80+
@Override
81+
public int compare(InternalSearchHit o1, InternalSearchHit o2) {
82+
return o1.docId() - o2.docId();
83+
}
84+
});
85+
}
7986
return combinedResult;
8087
}
8188

@@ -153,7 +160,7 @@ private List<InternalSearchHit> createCombinedResults( TableInJoinRequestBuilder
153160

154161

155162

156-
InternalSearchHit searchHit = new InternalSearchHit(resultIds, combinedId, new StringText(matchingHit.getType() + "|" + secondTableHit.getType()), matchingHit.getFields());
163+
InternalSearchHit searchHit = new InternalSearchHit(matchingHit.docId(), combinedId, new StringText(matchingHit.getType() + "|" + secondTableHit.getType()), matchingHit.getFields());
157164
searchHit.sourceRef(matchingHit.getSourceRef());
158165
searchHit.sourceAsMap().clear();
159166
searchHit.sourceAsMap().putAll(matchingHit.sourceAsMap());
@@ -188,7 +195,7 @@ private void copyMaps(Map<String, Object> into, Map<String, Object> from) {
188195
}
189196

190197
private void createKeyToResultsAndFillOptimizationStructure(Map<String,Map<String, List<Object>>> optimizationTermsFilterStructure, TableInJoinRequestBuilder firstTableRequest) {
191-
List<SearchHit> firstTableHits = fetchAllHits(firstTableRequest.getRequestBuilder(), firstTableRequest.getHintLimit());
198+
List<SearchHit> firstTableHits = fetchAllHits(firstTableRequest);
192199

193200
int resultIds = 1;
194201
for (SearchHit hit : firstTableHits) {
@@ -210,22 +217,21 @@ private void createKeyToResultsAndFillOptimizationStructure(Map<String,Map<Strin
210217
}
211218
}
212219

213-
private List<SearchHit> fetchAllHits(SearchRequestBuilder requestBuilder, Integer hintLimit) {
220+
private List<SearchHit> fetchAllHits(TableInJoinRequestBuilder tableInJoinRequest) {
221+
Integer hintLimit = tableInJoinRequest.getHintLimit();
222+
SearchRequestBuilder requestBuilder = tableInJoinRequest.getRequestBuilder();
214223
if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) {
215224
requestBuilder.setSize(hintLimit);
216225
SearchResponse searchResponse = requestBuilder.get();
217226
updateMetaSearchResults(searchResponse);
218227
return Arrays.asList(searchResponse.getHits().getHits());
219228
}
220-
return scrollTillLimit(requestBuilder, hintLimit);
229+
return scrollTillLimit(tableInJoinRequest, hintLimit);
221230
}
222231

223-
private List<SearchHit> scrollTillLimit(SearchRequestBuilder requestBuilder, Integer hintLimit) {
224-
SearchResponse scrollResp = requestBuilder.setSearchType(SearchType.SCAN)
225-
.setScroll(new TimeValue(60000))
226-
.setSize(MAX_RESULTS_ON_ONE_FETCH).get();
232+
private List<SearchHit> scrollTillLimit(TableInJoinRequestBuilder tableInJoinRequest, Integer hintLimit) {
233+
SearchResponse scrollResp = scrollOneTimeWithMax(client,tableInJoinRequest);
227234

228-
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).get();
229235
updateMetaSearchResults(scrollResp);
230236
List<SearchHit> hitsWithScan = new ArrayList<>();
231237
int curentNumOfResults = 0;

src/main/java/org/elasticsearch/plugin/nlpcn/NestedLoopsElasticExecutor.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.nlpcn.es4sql.query.join.TableInJoinRequestBuilder;
1818

1919
import java.util.ArrayList;
20-
import java.util.Collection;
2120
import java.util.List;
2221
import java.util.Map;
2322

@@ -26,9 +25,7 @@
2625
*/
2726
public class NestedLoopsElasticExecutor extends ElasticJoinExecutor {
2827

29-
private final NestedLoopsElasticRequestBuilder
30-
31-
nestedLoopsRequest;
28+
private final NestedLoopsElasticRequestBuilder nestedLoopsRequest;
3229
private final Client client;
3330

3431
public NestedLoopsElasticExecutor(Client client, NestedLoopsElasticRequestBuilder nestedLoops) {
@@ -98,7 +95,8 @@ private int combineResultsFromMultiResponses(List<InternalSearchHit> combinedRes
9895
SearchHits responseForHit = multiItemResponse.getHits();
9996

10097
if(responseForHit.getHits().length == 0 && nestedLoopsRequest.getJoinType() == SQLJoinTableSource.JoinType.LEFT_OUTER_JOIN){
101-
addUnmachedResult(combinedResults, nestedLoopsRequest.getSecondTable().getReturnedFields(), currentCombinedResults, t1Alias, t2Alias, hitFromFirstTable);
98+
InternalSearchHit unmachedResult = createUnmachedResult(nestedLoopsRequest.getSecondTable().getReturnedFields(), currentCombinedResults, t1Alias, t2Alias, hitFromFirstTable);
99+
combinedResults.add(unmachedResult);
102100
currentCombinedResults++;
103101
continue;
104102
}
@@ -182,22 +180,21 @@ private FetchWithScrollResponse firstFetch(TableInJoinRequestBuilder tableReques
182180
if(hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH){
183181

184182
responseWithHits = tableRequest.getRequestBuilder().setSize(hintLimit).get();
185-
needScrollForFirstTable=true;
183+
needScrollForFirstTable=false;
186184
}
187185
else {
188186
//scroll request with max.
189-
responseWithHits = tableRequest.getRequestBuilder().setSearchType(SearchType.SCAN)
190-
.setScroll(new TimeValue(60000))
191-
.setSize(MAX_RESULTS_ON_ONE_FETCH).get();
187+
responseWithHits = scrollOneTimeWithMax(client,tableRequest);
192188
if(responseWithHits.getHits().getTotalHits() < MAX_RESULTS_ON_ONE_FETCH)
193189
needScrollForFirstTable = true;
194-
responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId()).setScroll(new TimeValue(600000)).get();
195190
}
196191

197192
updateMetaSearchResults(responseWithHits);
198193
return new FetchWithScrollResponse(responseWithHits,needScrollForFirstTable);
199194
}
200195

196+
197+
201198
private void orderConditions(String t1Alias, String t2Alias) {
202199
orderConditionRecursive(t1Alias,t2Alias,nestedLoopsRequest.getConnectedWhere());
203200
// Collection<Condition> conditions = nestedLoopsRequest.getT1FieldToCondition().values();

src/main/java/org/nlpcn/es4sql/parse/SqlParser.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.nlpcn.es4sql.domain.hints.Hint;
1515
import org.nlpcn.es4sql.domain.hints.HintFactory;
1616
import org.nlpcn.es4sql.exception.SqlParseException;
17-
import org.nlpcn.es4sql.query.join.NestedLoopsElasticRequestBuilder;
1817
import org.nlpcn.es4sql.spatial.SpatialParamsFactory;
1918

2019
/**
@@ -350,24 +349,29 @@ private void findOrderBy(MySqlSelectQueryBlock query, Select select) throws SqlP
350349
return;
351350
}
352351
List<SQLSelectOrderByItem> items = orderBy.getItems();
353-
List<String> lists = new ArrayList<>();
354-
for (SQLSelectOrderByItem sqlSelectOrderByItem : items) {
355-
SQLExpr expr = sqlSelectOrderByItem.getExpr();
356-
lists.add(FieldMaker.makeField(expr, null,null).toString());
357-
if (sqlSelectOrderByItem.getType() == null) {
358-
sqlSelectOrderByItem.setType(SQLOrderingSpecification.ASC);
359-
}
360-
String type = sqlSelectOrderByItem.getType().toString();
361-
for (String name : lists) {
362-
name = name.replace("`", "");
363-
select.addOrderBy(name, type);
364-
}
365-
lists.clear();
366-
}
367352

368-
}
353+
addOrderByToSelect(select, items, null);
354+
355+
}
356+
357+
private void addOrderByToSelect(Select select, List<SQLSelectOrderByItem> items, String alias) throws SqlParseException {
358+
for (SQLSelectOrderByItem sqlSelectOrderByItem : items) {
359+
SQLExpr expr = sqlSelectOrderByItem.getExpr();
360+
String orderByName = FieldMaker.makeField(expr, null, null).toString();
369361

370-
private void findLimit(MySqlSelectQueryBlock.Limit limit, Select select) {
362+
if (sqlSelectOrderByItem.getType() == null) {
363+
sqlSelectOrderByItem.setType(SQLOrderingSpecification.ASC);
364+
}
365+
String type = sqlSelectOrderByItem.getType().toString();
366+
367+
orderByName = orderByName.replace("`", "");
368+
if(alias!=null) orderByName = orderByName.replaceFirst(alias+"\\.","");
369+
select.addOrderBy(orderByName, type);
370+
371+
}
372+
}
373+
374+
private void findLimit(MySqlSelectQueryBlock.Limit limit, Select select) {
371375

372376
if (limit == null) {
373377
return;
@@ -419,17 +423,38 @@ public JoinSelect parseJoinSelect(SQLQueryExpr sqlExpr) throws SqlParseException
419423
String firstTableAlias = joinedFrom.get(0).getAlias();
420424
String secondTableAlias = joinedFrom.get(1).getAlias();
421425
Map<String, Where> aliasToWhere = splitAndFindWhere(query.getWhere(), firstTableAlias, secondTableAlias);
426+
Map<String, List<SQLSelectOrderByItem>> aliasToOrderBy = splitAndFindOrder(query.getOrderBy(), firstTableAlias, secondTableAlias);
422427
List<Condition> connectedConditions = getConditionsFlatten(joinSelect.getConnectedWhere());
423428
joinSelect.setConnectedConditions(connectedConditions);
424-
fillTableSelectedJoin(joinSelect.getFirstTable(), query, joinedFrom.get(0), aliasToWhere.get(firstTableAlias), connectedConditions);
425-
fillTableSelectedJoin(joinSelect.getSecondTable(), query, joinedFrom.get(1), aliasToWhere.get(secondTableAlias), connectedConditions);
429+
fillTableSelectedJoin(joinSelect.getFirstTable(), query, joinedFrom.get(0), aliasToWhere.get(firstTableAlias),aliasToOrderBy.get(firstTableAlias), connectedConditions);
430+
fillTableSelectedJoin(joinSelect.getSecondTable(), query, joinedFrom.get(1), aliasToWhere.get(secondTableAlias), aliasToOrderBy.get(secondTableAlias),connectedConditions);
426431

427432
updateJoinLimit(query.getLimit(), joinSelect);
428433

429434
//todo: throw error feature not supported: no group bys on joins ?
430435
return joinSelect;
431436
}
432437

438+
private Map<String, List<SQLSelectOrderByItem>> splitAndFindOrder(SQLOrderBy orderBy, String firstTableAlias, String secondTableAlias) throws SqlParseException {
439+
Map<String,List<SQLSelectOrderByItem>> aliasToOrderBys = new HashMap<>();
440+
aliasToOrderBys.put(firstTableAlias,new ArrayList<SQLSelectOrderByItem>());
441+
aliasToOrderBys.put(secondTableAlias,new ArrayList<SQLSelectOrderByItem>());
442+
if(orderBy == null) return aliasToOrderBys;
443+
List<SQLSelectOrderByItem> orderByItems = orderBy.getItems();
444+
for(SQLSelectOrderByItem orderByItem : orderByItems){
445+
if(orderByItem.getExpr().toString().startsWith(firstTableAlias+".")){
446+
aliasToOrderBys.get(firstTableAlias).add(orderByItem);
447+
}
448+
else if(orderByItem.getExpr().toString().startsWith(secondTableAlias+".")){
449+
aliasToOrderBys.get(secondTableAlias).add(orderByItem);
450+
}
451+
else
452+
throw new SqlParseException("order by field on join request should have alias before, got " + orderByItem.getExpr().toString());
453+
454+
}
455+
return aliasToOrderBys;
456+
}
457+
433458
private void updateJoinLimit(MySqlSelectQueryBlock.Limit limit, JoinSelect joinSelect) {
434459
if(limit != null && limit.getRowCount()!= null) {
435460
int sizeLimit = Integer.parseInt(limit.getRowCount().toString());
@@ -463,9 +488,9 @@ private Map<String, Where> splitAndFindWhere(SQLExpr whereExpr, String firstTabl
463488
return splitWheres(where, firstTableAlias, secondTableAlias);
464489
}
465490

466-
private void fillTableSelectedJoin(TableOnJoinSelect tableOnJoin,MySqlSelectQueryBlock query, From tableFrom, Where where, List<Condition> conditions) throws SqlParseException {
491+
private void fillTableSelectedJoin(TableOnJoinSelect tableOnJoin, MySqlSelectQueryBlock query, From tableFrom, Where where, List<SQLSelectOrderByItem> orderBys, List<Condition> conditions) throws SqlParseException {
467492
String alias = tableFrom.getAlias();
468-
fillBasicTableSelectJoin(tableOnJoin, tableFrom, where, query);
493+
fillBasicTableSelectJoin(tableOnJoin, tableFrom, where,orderBys, query);
469494
tableOnJoin.setConnectedFields(getConnectedFields(conditions, alias));
470495
tableOnJoin.setSelectedFields(new ArrayList<Field>(tableOnJoin.getFields()));
471496
tableOnJoin.setAlias(alias);
@@ -493,10 +518,11 @@ private List<Field> getConnectedFields(List<Condition> conditions, String alias)
493518
return fields;
494519
}
495520

496-
private void fillBasicTableSelectJoin(TableOnJoinSelect select, From from, Where where, MySqlSelectQueryBlock query) throws SqlParseException {
521+
private void fillBasicTableSelectJoin(TableOnJoinSelect select, From from, Where where, List<SQLSelectOrderByItem> orderBys, MySqlSelectQueryBlock query) throws SqlParseException {
497522
select.getFrom().add(from);
498523
findSelect(query, select,from.getAlias());
499524
select.setWhere(where);
525+
addOrderByToSelect(select, orderBys,from.getAlias());
500526
}
501527

502528
private List<Condition> getJoinConditionsFlatten(SQLJoinTableSource from) throws SqlParseException {

src/test/java/org/nlpcn/es4sql/JoinTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,29 @@ public void joinWithOrWithTermsFilterOpt() throws SQLFeatureNotSupportedExceptio
445445
}
446446

447447

448+
@Test
449+
public void joinWithOrderbyFirstTableHASH() throws SQLFeatureNotSupportedException, IOException, SqlParseException {
450+
joinWithOrderFirstTable(false);
451+
}
452+
@Test
453+
public void joinWithOrderbyFirstTableNL() throws SQLFeatureNotSupportedException, IOException, SqlParseException {
454+
joinWithOrderFirstTable(true);
455+
}
456+
private void joinWithOrderFirstTable(boolean useNestedLoops) throws SQLFeatureNotSupportedException, IOException, SqlParseException {
457+
String query = String.format("select c.name.firstname , d.words from %s/gotCharacters c " +
458+
"JOIN %s/gotHouses d on d.name = c.house " +
459+
"order by c.name.firstname"
460+
, TEST_INDEX, TEST_INDEX);
461+
if(useNestedLoops) query = query.replace("select","select /*! USE_NL*/ ");
462+
SearchHit[] hits = joinAndGetHits(query);
463+
Assert.assertEquals(4, hits.length);
464+
Assert.assertEquals("Brandon",hits[0].sourceAsMap().get("c.name.firstname"));
465+
Assert.assertEquals("Daenerys",hits[1].sourceAsMap().get("c.name.firstname"));
466+
Assert.assertEquals("Eddard",hits[2].sourceAsMap().get("c.name.firstname"));
467+
Assert.assertEquals("Jaime",hits[3].sourceAsMap().get("c.name.firstname"));
468+
}
469+
470+
448471
private String hashJoinRunAndExplain(String query) throws IOException, SqlParseException, SQLFeatureNotSupportedException {
449472
SearchDao searchDao = MainTestSuite.getSearchDao();
450473
HashJoinElasticRequestBuilder explain = (HashJoinElasticRequestBuilder) searchDao.explain(query);

0 commit comments

Comments
 (0)