Skip to content

Commit bf918cb

Browse files
committed
fix testcase
1 parent 9598381 commit bf918cb

File tree

14 files changed

+93
-46
lines changed

14 files changed

+93
-46
lines changed

src/main/java/org/elasticsearch/plugin/nlpcn/ComperableHitResult.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.base.Joiner;
44
import org.elasticsearch.search.SearchHit;
5+
import org.elasticsearch.search.lookup.Source;
56
import org.nlpcn.es4sql.Util;
67

78
import java.util.ArrayList;
@@ -19,7 +20,7 @@ public class ComperableHitResult {
1920
private Map<String,Object> flattenMap;
2021
public ComperableHitResult(SearchHit hit , String[] fieldsOrder ,String seperator) {
2122
this.hit = hit;
22-
Map<String, Object> hitAsMap = hit.getSourceAsMap();
23+
Map<String, Object> hitAsMap = Source.fromBytes(hit.getSourceRef()).source();
2324
this.flattenMap = new HashMap<>();
2425
List<String> results = new ArrayList<>();
2526
this.isAllNull = true;

src/main/java/org/elasticsearch/plugin/nlpcn/ElasticJoinExecutor.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.elasticsearch.action.search.SearchResponse;
66
import org.elasticsearch.client.internal.Client;
77
import org.elasticsearch.rest.RestResponse;
8+
import org.elasticsearch.search.lookup.Source;
89
import org.elasticsearch.xcontent.XContentBuilder;
910
import org.elasticsearch.core.TimeValue;
1011
import org.elasticsearch.rest.RestChannel;
@@ -88,11 +89,11 @@ else if (requestBuilder instanceof NestedLoopsElasticRequestBuilder){
8889
}
8990
}
9091

91-
protected void mergeSourceAndAddAliases(Map<String,Object> secondTableHitSource, SearchHit searchHit,String t1Alias,String t2Alias) {
92-
Map<String,Object> results = mapWithAliases(searchHit.getSourceAsMap(), t1Alias);
92+
protected void mergeSourceAndAddAliases(Map<String,Object> secondTableHitSource, Map<String, Object> hitSource, String t1Alias, String t2Alias) {
93+
Map<String,Object> results = mapWithAliases(hitSource, t1Alias);
9394
results.putAll(mapWithAliases(secondTableHitSource, t2Alias));
94-
searchHit.getSourceAsMap().clear();
95-
searchHit.getSourceAsMap().putAll(results);
95+
hitSource.clear();
96+
hitSource.putAll(results);
9697
}
9798

9899
protected Map<String,Object> mapWithAliases(Map<String, Object> source, String alias) {
@@ -169,11 +170,14 @@ protected SearchHit createUnmachedResult( List<Field> secondTableReturnedFields,
169170
SearchHit searchHit = SearchHit.unpooled(docId, unmatchedId);
170171
searchHit.addDocumentFields(hit.getDocumentFields(), Collections.emptyMap());
171172
searchHit.sourceRef(hit.getSourceRef());
172-
searchHit.getSourceAsMap().clear();
173-
searchHit.getSourceAsMap().putAll(hit.getSourceAsMap());
173+
Source source = Source.fromBytes(searchHit.getSourceRef());
174+
Map<String, Object> hitSource = source.source();
175+
hitSource.clear();
176+
hitSource.putAll(Source.fromBytes(hit.getSourceRef()).source());
174177
Map<String,Object> emptySecondTableHitSource = createNullsSource(secondTableReturnedFields);
175178

176-
mergeSourceAndAddAliases(emptySecondTableHitSource, searchHit,t1Alias,t2Alias);
179+
mergeSourceAndAddAliases(emptySecondTableHitSource, hitSource, t1Alias,t2Alias);
180+
searchHit.sourceRef(Source.fromMap(hitSource, source.sourceContentType()).internalSourceRef());
177181

178182
return searchHit;
179183
}

src/main/java/org/elasticsearch/plugin/nlpcn/ElasticResultHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package org.elasticsearch.plugin.nlpcn;
22

33
import org.elasticsearch.search.SearchHit;
4+
import org.elasticsearch.search.lookup.Source;
5+
46
import java.util.Map;
57

68
/**
79
* Created by Eliran on 3/10/2015.
810
*/
911
public class ElasticResultHandler {
1012
public static Object getFieldValue(SearchHit hit,String field){
11-
return deepSearchInMap(hit.getSourceAsMap(),field);
13+
return deepSearchInMap(Source.fromBytes(hit.getSourceRef()).source(),field);
1214
}
1315

1416
private static Object deepSearchInMap(Map<String, Object> fieldsMap, String name) {

src/main/java/org/elasticsearch/plugin/nlpcn/ElasticUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.elasticsearch.action.search.SearchRequestBuilder;
66
import org.elasticsearch.action.search.SearchResponse;
77
import org.elasticsearch.client.internal.Client;
8+
import org.elasticsearch.search.lookup.Source;
89
import org.elasticsearch.xcontent.XContentBuilder;
910
import org.elasticsearch.xcontent.XContentFactory;
1011
import org.elasticsearch.xcontent.XContentType;
@@ -49,7 +50,7 @@ public static XContentBuilder hitsAsXContentBuilder(SearchHits results, MetaSear
4950
HashMap<String,Object> value = new HashMap<>();
5051
value.put("_id",hit.getId());
5152
value.put("_score", hit.getScore());
52-
value.put("_source", hit.getSourceAsMap());
53+
value.put("_source", Source.fromBytes(hit.getSourceRef()).source());
5354
searchHits[i] = value;
5455
i++;
5556
}

src/main/java/org/elasticsearch/plugin/nlpcn/HashJoinElasticExecutor.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.elasticsearch.index.query.BoolQueryBuilder;
99
import org.elasticsearch.index.query.QueryBuilders;
1010
import org.elasticsearch.search.SearchHit;
11+
import org.elasticsearch.search.lookup.Source;
1112
import org.nlpcn.es4sql.domain.Field;
1213
import org.nlpcn.es4sql.domain.Select;
1314
import org.nlpcn.es4sql.domain.Where;
@@ -137,11 +138,11 @@ private List<SearchHit> createCombinedResults( TableInJoinRequestBuilder secondT
137138
if (limitReached) break;
138139
//todo: need to run on comparisons. for each comparison check if exists and add.
139140
HashMap<String, List<Map.Entry<Field, Field>>> comparisons = this.hashJoinComparisonStructure.getComparisons();
140-
141+
Map<String, Object> secondTableHitSource = Source.fromBytes(secondTableHit.getSourceRef()).source();
141142
for (Map.Entry<String, List<Map.Entry<Field, Field>>> comparison : comparisons.entrySet()) {
142143
String comparisonID = comparison.getKey();
143144
List<Map.Entry<Field, Field>> t1ToT2FieldsComparison = comparison.getValue();
144-
String key = getComparisonKey(t1ToT2FieldsComparison, secondTableHit, false, null);
145+
String key = getComparisonKey(t1ToT2FieldsComparison, secondTableHitSource, false, null);
145146

146147
SearchHitsResult searchHitsResult = this.hashJoinComparisonStructure.searchForMatchingSearchHits(comparisonID, key);
147148

@@ -159,19 +160,22 @@ private List<SearchHit> createCombinedResults( TableInJoinRequestBuilder secondT
159160
}
160161

161162
Map<String,Object> copiedSource = new HashMap<String,Object>();
162-
copyMaps(copiedSource,secondTableHit.getSourceAsMap());
163+
copyMaps(copiedSource, secondTableHitSource);
163164
onlyReturnedFields(copiedSource, secondTableRequest.getReturnedFields(),secondTableRequest.getOriginalSelect().isSelectAll());
164165

165166

166167

167168
SearchHit searchHit = SearchHit.unpooled(matchingHit.docId(), combinedId);
168169
searchHit.addDocumentFields(matchingHit.getDocumentFields(), Collections.emptyMap());
169170
searchHit.sourceRef(matchingHit.getSourceRef());
170-
searchHit.getSourceAsMap().clear();
171-
searchHit.getSourceAsMap().putAll(matchingHit.getSourceAsMap());
171+
Source source = Source.fromBytes(searchHit.getSourceRef());
172+
Map<String, Object> hitSource = source.source();
173+
hitSource.clear();
174+
hitSource.putAll(Source.fromBytes(matchingHit.getSourceRef()).source());
172175
String t1Alias = requestBuilder.getFirstTable().getAlias();
173176
String t2Alias = requestBuilder.getSecondTable().getAlias();
174-
mergeSourceAndAddAliases(copiedSource, searchHit, t1Alias, t2Alias);
177+
mergeSourceAndAddAliases(copiedSource, hitSource, t1Alias, t2Alias);
178+
searchHit.sourceRef(Source.fromMap(hitSource, source.sourceContentType()).internalSourceRef());
175179

176180
combinedResult.add(searchHit);
177181
resultIds++;
@@ -205,18 +209,22 @@ private void createKeyToResultsAndFillOptimizationStructure(Map<String,Map<Strin
205209
int resultIds = 1;
206210
for (SearchHit hit : firstTableHits) {
207211
HashMap<String, List<Map.Entry<Field, Field>>> comparisons = this.hashJoinComparisonStructure.getComparisons();
212+
Map<String, Object> hitSource = Source.fromBytes(hit.getSourceRef()).source();
208213
for (Map.Entry<String, List<Map.Entry<Field, Field>>> comparison : comparisons.entrySet()) {
209214
String comparisonID = comparison.getKey();
210215
List<Map.Entry<Field, Field>> t1ToT2FieldsComparison = comparison.getValue();
211216

212-
String key = getComparisonKey(t1ToT2FieldsComparison, hit, true, optimizationTermsFilterStructure.get(comparisonID));
217+
String key = getComparisonKey(t1ToT2FieldsComparison, hitSource, true, optimizationTermsFilterStructure.get(comparisonID));
213218

214219
//int docid , id
215220
SearchHit searchHit = SearchHit.unpooled(resultIds, hit.getId());
216221
searchHit.addDocumentFields(hit.getDocumentFields(), Collections.emptyMap());
217222
searchHit.sourceRef(hit.getSourceRef());
218223

219-
onlyReturnedFields(searchHit.getSourceAsMap(), firstTableRequest.getReturnedFields(),firstTableRequest.getOriginalSelect().isSelectAll());
224+
Source source = Source.fromBytes(searchHit.getSourceRef());
225+
Map<String, Object> searchHitSource = source.source();
226+
onlyReturnedFields(searchHitSource, firstTableRequest.getReturnedFields(),firstTableRequest.getOriginalSelect().isSelectAll());
227+
searchHit.sourceRef(Source.fromMap(searchHitSource, source.sourceContentType()).internalSourceRef());
220228
resultIds++;
221229
this.hashJoinComparisonStructure.insertIntoComparisonHash(comparisonID, key, searchHit);
222230
}
@@ -296,9 +304,8 @@ private void updateRequestWithTermsFilter(Map<String,Map<String, List<Object>>>
296304
secondTableRequest.getRequestBuilder().setQuery(boolQuery);
297305
}
298306

299-
private String getComparisonKey(List<Map.Entry<Field, Field>> t1ToT2FieldsComparison, SearchHit hit, boolean firstTable, Map<String, List<Object>> optimizationTermsFilterStructure) {
307+
private String getComparisonKey(List<Map.Entry<Field, Field>> t1ToT2FieldsComparison, Map<String, Object> sourceAsMap, boolean firstTable, Map<String, List<Object>> optimizationTermsFilterStructure) {
300308
String key = "";
301-
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
302309
for (Map.Entry<Field, Field> t1ToT2 : t1ToT2FieldsComparison) {
303310
//todo: change to our function find if key contains '.'
304311
String name;

src/main/java/org/elasticsearch/plugin/nlpcn/IntersectExecutor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.elasticsearch.action.search.SearchResponse;
77
import org.elasticsearch.search.SearchHit;
88
import org.elasticsearch.search.SearchHits;
9+
import org.elasticsearch.search.lookup.Source;
910
import org.nlpcn.es4sql.domain.Field;
1011
import org.nlpcn.es4sql.query.multi.MultiQueryRequestBuilder;
1112

@@ -71,7 +72,9 @@ private void fillIntersectHitsFromResults(Set<ComperableHitResult> comparableHit
7172
SearchHit searchHit = SearchHit.unpooled(currentId, originalHit.getId());
7273
searchHit.addDocumentFields(originalHit.getDocumentFields(), Collections.emptyMap());
7374
searchHit.sourceRef(originalHit.getSourceRef());
74-
searchHit.getSourceAsMap().clear();
75+
Source source = Source.fromBytes(searchHit.getSourceRef());
76+
Map<String, Object> hitSource = source.source();
77+
hitSource.clear();
7578
Map<String, Object> sourceAsMap = result.getFlattenMap();
7679
for (Map.Entry<String, String> entry : firstTableFieldToAlias) {
7780
if (sourceAsMap.containsKey(entry.getKey())) {
@@ -81,7 +84,8 @@ private void fillIntersectHitsFromResults(Set<ComperableHitResult> comparableHit
8184
}
8285
}
8386

84-
searchHit.getSourceAsMap().putAll(sourceAsMap);
87+
hitSource.putAll(sourceAsMap);
88+
searchHit.sourceRef(Source.fromMap(hitSource, source.sourceContentType()).internalSourceRef());
8589
currentId++;
8690
intersectHitsList.add(searchHit);
8791
}

src/main/java/org/elasticsearch/plugin/nlpcn/MinusExecutor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.elasticsearch.core.TimeValue;
88
import org.elasticsearch.search.SearchHit;
99
import org.elasticsearch.search.SearchHits;
10+
import org.elasticsearch.search.lookup.Source;
1011
import org.nlpcn.es4sql.Util;
1112
import org.nlpcn.es4sql.domain.Condition;
1213
import org.nlpcn.es4sql.domain.Field;
@@ -118,10 +119,13 @@ private void fillMinusHitsFromOneField(String fieldName, Set<Object> fieldValues
118119
SearchHit searchHit = SearchHit.unpooled(currentId, currentId + "");
119120
searchHit.addDocumentFields(fields, Collections.emptyMap());
120121
searchHit.sourceRef(someHit.getSourceRef());
121-
searchHit.getSourceAsMap().clear();
122+
Source source = Source.fromBytes(searchHit.getSourceRef());
123+
Map<String, Object> hitSource = source.source();
124+
hitSource.clear();
122125
Map<String, Object> sourceAsMap = new HashMap<>();
123126
sourceAsMap.put(fieldName,result);
124-
searchHit.getSourceAsMap().putAll(sourceAsMap);
127+
hitSource.putAll(sourceAsMap);
128+
searchHit.sourceRef(Source.fromMap(hitSource, source.sourceContentType()).internalSourceRef());
125129
currentId++;
126130
minusHitsList.add(searchHit);
127131
}
@@ -140,7 +144,9 @@ private void fillMinusHitsFromResults(Set<ComperableHitResult> comperableHitResu
140144
SearchHit searchHit = SearchHit.unpooled(currentId, originalHit.getId());
141145
searchHit.addDocumentFields(originalHit.getDocumentFields(), Collections.emptyMap());
142146
searchHit.sourceRef(originalHit.getSourceRef());
143-
searchHit.getSourceAsMap().clear();
147+
Source source = Source.fromBytes(searchHit.getSourceRef());
148+
Map<String, Object> hitSource = source.source();
149+
hitSource.clear();
144150
Map<String, Object> sourceAsMap = result.getFlattenMap();
145151
for(Map.Entry<String,String> entry : this.builder.getFirstTableFieldToAlias().entrySet()){
146152
if(sourceAsMap.containsKey(entry.getKey())){
@@ -150,7 +156,8 @@ private void fillMinusHitsFromResults(Set<ComperableHitResult> comperableHitResu
150156
}
151157
}
152158

153-
searchHit.getSourceAsMap().putAll(sourceAsMap);
159+
hitSource.putAll(sourceAsMap);
160+
searchHit.sourceRef(Source.fromMap(hitSource, source.sourceContentType()).internalSourceRef());
154161
currentId++;
155162
minusHitsList.add(searchHit);
156163
}
@@ -342,7 +349,7 @@ private Where buildTermsFilterFromResults(Set<Object> results,String fieldName)
342349
}
343350

344351
private Object getFieldValue(SearchHit hit, String fieldName) {
345-
Map<String,Object> sourceAsMap = hit.getSourceAsMap();
352+
Map<String,Object> sourceAsMap = Source.fromBytes(hit.getSourceRef()).source();
346353
if(fieldName.contains(".")){
347354
String[] split = fieldName.split("\\.");
348355
return Util.searchPathInMap(sourceAsMap, split);

src/main/java/org/elasticsearch/plugin/nlpcn/NestedLoopsElasticExecutor.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.core.TimeValue;
1010
import org.elasticsearch.search.SearchHit;
1111
import org.elasticsearch.search.SearchHits;
12+
import org.elasticsearch.search.lookup.Source;
1213
import org.nlpcn.es4sql.domain.Condition;
1314
import org.nlpcn.es4sql.domain.Select;
1415
import org.nlpcn.es4sql.domain.Where;
@@ -88,7 +89,10 @@ private int combineResultsFromMultiResponses(List<SearchHit> combinedResults, in
8889

8990
for(int j =0 ; j < responses.length && currentCombinedResults < totalLimit ; j++){
9091
SearchHit hitFromFirstTable = hits[currentIndex+j];
91-
onlyReturnedFields(hitFromFirstTable.getSourceAsMap(), nestedLoopsRequest.getFirstTable().getReturnedFields(),nestedLoopsRequest.getFirstTable().getOriginalSelect().isSelectAll());
92+
Source source = Source.fromBytes(hitFromFirstTable.getSourceRef());
93+
Map<String, Object> hitSource = source.source();
94+
onlyReturnedFields(hitSource, nestedLoopsRequest.getFirstTable().getReturnedFields(),nestedLoopsRequest.getFirstTable().getOriginalSelect().isSelectAll());
95+
hitFromFirstTable.sourceRef(Source.fromMap(hitSource, source.sourceContentType()).internalSourceRef());
9296

9397
SearchResponse multiItemResponse = responses[j].getResponse();
9498
updateMetaSearchResults(multiItemResponse);
@@ -116,21 +120,27 @@ private int combineResultsFromMultiResponses(List<SearchHit> combinedResults, in
116120
}
117121

118122
private SearchHit getMergedHit(int currentCombinedResults, String t1Alias, String t2Alias, SearchHit hitFromFirstTable, SearchHit matchedHit) {
119-
onlyReturnedFields(matchedHit.getSourceAsMap(), nestedLoopsRequest.getSecondTable().getReturnedFields(),nestedLoopsRequest.getSecondTable().getOriginalSelect().isSelectAll());
123+
Source source = Source.fromBytes(matchedHit.getSourceRef());
124+
Map<String, Object> matchedHitSource = source.source();
125+
onlyReturnedFields(matchedHitSource, nestedLoopsRequest.getSecondTable().getReturnedFields(),nestedLoopsRequest.getSecondTable().getOriginalSelect().isSelectAll());
126+
matchedHit.sourceRef(Source.fromMap(matchedHitSource, source.sourceContentType()).internalSourceRef());
120127
SearchHit searchHit = SearchHit.unpooled(currentCombinedResults, hitFromFirstTable.getId() + "|" + matchedHit.getId());
121128
searchHit.addDocumentFields(hitFromFirstTable.getDocumentFields(), Collections.emptyMap());
122129
searchHit.sourceRef(hitFromFirstTable.getSourceRef());
123-
searchHit.getSourceAsMap().clear();
124-
searchHit.getSourceAsMap().putAll(hitFromFirstTable.getSourceAsMap());
130+
source = Source.fromBytes(searchHit.getSourceRef());
131+
Map<String, Object> hitSource = source.source();
132+
hitSource.clear();
133+
hitSource.putAll(Source.fromBytes(hitFromFirstTable.getSourceRef()).source());
125134

126-
mergeSourceAndAddAliases(matchedHit.getSourceAsMap(), searchHit, t1Alias, t2Alias);
135+
mergeSourceAndAddAliases(Source.fromBytes(matchedHit.getSourceRef()).source(), hitSource, t1Alias, t2Alias);
136+
searchHit.sourceRef(Source.fromMap(hitSource, source.sourceContentType()).internalSourceRef());
127137
return searchHit;
128138
}
129139

130140
private MultiSearchRequest createMultiSearchRequest(int multiSearchMaxSize, Where connectedWhere, SearchHit[] hits, Select secondTableSelect, Where originalWhere, int currentIndex) throws SqlParseException {
131141
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
132142
for(int i = currentIndex ; i < currentIndex + multiSearchMaxSize && i< hits.length ; i++ ){
133-
Map<String, Object> hitFromFirstTableAsMap = hits[i].getSourceAsMap();
143+
Map<String, Object> hitFromFirstTableAsMap = Source.fromBytes(hits[i].getSourceRef()).source();
134144
Where newWhere = Where.newInstance();
135145
if(originalWhere!=null) newWhere.addWhere(originalWhere);
136146
if(connectedWhere!=null){

0 commit comments

Comments
 (0)