|
3 | 3 | import co.elastic.clients.elasticsearch.ElasticsearchClient; |
4 | 4 | import co.elastic.clients.elasticsearch._types.FieldValue; |
5 | 5 | import co.elastic.clients.elasticsearch._types.Refresh; |
6 | | -import co.elastic.clients.elasticsearch._types.SortOptions; |
7 | 6 | import co.elastic.clients.elasticsearch._types.SortOrder; |
8 | 7 | import co.elastic.clients.elasticsearch._types.aggregations.Aggregate; |
9 | 8 | import co.elastic.clients.elasticsearch._types.aggregations.Aggregation; |
|
39 | 38 |
|
40 | 39 | @Service |
41 | 40 | @Slf4j |
| 41 | +@SuppressWarnings({"unchecked","deprecation"}) // deprecation: legacy factory usage for ES6 compatibility; unchecked: dynamic query map casting |
42 | 42 | public class EsUtilServiceImpl implements EsUtilService{ |
43 | 43 |
|
44 | | - private final EsConfig esConfig; |
45 | 44 | private final ElasticsearchClient elasticsearchClient; |
46 | 45 | private final Logger logger = LogManager.getLogger(getClass()); |
47 | 46 |
|
48 | 47 | private static final Map<String, Map<String, Object>> schemaCache = new ConcurrentHashMap<>(); |
49 | 48 |
|
50 | 49 | public EsUtilServiceImpl(EsConfig esConfig, ElasticsearchClient elasticsearchClient) { |
51 | | - this.esConfig = esConfig; |
| 50 | + // esConfig retained in ctor signature for backward compatibility / bean wiring even if not directly used now |
52 | 51 | this.elasticsearchClient = elasticsearchClient; |
53 | 52 | } |
54 | 53 |
|
@@ -90,29 +89,56 @@ public String addDocument( |
90 | 89 | @Override |
91 | 90 | public String updateDocument( |
92 | 91 | String index, String indexType, String entityId, Map<String, Object> updatedDocument, String JsonFilePath) { |
| 92 | + /* |
| 93 | + * NOTE (ES 6.8 compatibility): |
| 94 | + * The official Java client 8.x uses typeless endpoints ( /{index}/_update/{id} ). |
| 95 | + * When talking to an ES 6.8 cluster this path is interpreted as {index}/{type}/{id} |
| 96 | + * and the segment "_update" is considered a type, which is invalid (starts with '_'). |
| 97 | + * This causes: invalid_type_name_exception Document mapping type name can't start with '_', found: [_update] |
| 98 | + * |
| 99 | + * To remain backward compatible without downgrading the whole client right now, we emulate a partial update: |
| 100 | + * 1. Fetch existing document (if any) |
| 101 | + * 2. Merge provided fields (overwrite only keys present in updatedDocument) |
| 102 | + * 3. Re-index the merged document using the regular index API (which still works with ES 6.8) |
| 103 | + * This gives near-semantic parity with a partial update (no deletion of unspecified fields) while avoiding the _update endpoint. |
| 104 | + */ |
93 | 105 | try { |
| 106 | + // 1. Filter incoming map using schema (same logic as addDocument) |
94 | 107 | JsonSchemaFactory schemaFactory = JsonSchemaFactory.getInstance(); |
95 | | - InputStream schemaStream = schemaFactory.getClass().getResourceAsStream(JsonFilePath); |
96 | | - Map<String, Object> map = objectMapper.readValue(schemaStream, |
97 | | - new TypeReference<Map<String, Object>>() { |
98 | | - }); |
99 | | - Iterator<Map.Entry<String, Object>> iterator = updatedDocument.entrySet().iterator(); |
100 | | - while (iterator.hasNext()) { |
101 | | - Map.Entry<String, Object> entry = iterator.next(); |
102 | | - String key = entry.getKey(); |
103 | | - if (!map.containsKey(key)) { |
104 | | - iterator.remove(); |
| 108 | + try (InputStream schemaStream = schemaFactory.getClass().getResourceAsStream(JsonFilePath)) { |
| 109 | + Map<String, Object> schemaMap = objectMapper.readValue(schemaStream, new TypeReference<Map<String, Object>>() {}); |
| 110 | + updatedDocument.entrySet().removeIf(e -> !schemaMap.containsKey(e.getKey())); |
| 111 | + } |
| 112 | + |
| 113 | + Map<String, Object> merged = new HashMap<>(); |
| 114 | + boolean existingFound = false; |
| 115 | + try { |
| 116 | + GetResponse<Object> existing = elasticsearchClient.get(builder -> builder.index(index).id(entityId), Object.class); |
| 117 | + if (existing.found()) { |
| 118 | + Object src = existing.source(); |
| 119 | + if (src instanceof Map) { |
| 120 | + merged.putAll((Map<String,Object>) src); |
| 121 | + existingFound = true; |
| 122 | + } |
105 | 123 | } |
| 124 | + } catch (Exception getEx) { |
| 125 | + log.debug("ES get (for merge) failed for index={}, id={}, treating as upsert. Cause: {}", index, entityId, getEx.getMessage()); |
106 | 126 | } |
107 | | - IndexRequest<Map<String, Object>> indexRequest = new IndexRequest.Builder<Map<String, Object>>() |
| 127 | + |
| 128 | + // 2. Merge (overwrite / add updated fields only) |
| 129 | + merged.putAll(updatedDocument); |
| 130 | + |
| 131 | + // 3. Index (acts as create or replace). We want refresh so subsequent reads see changes. |
| 132 | + IndexRequest<Map<String,Object>> indexRequest = new IndexRequest.Builder<Map<String, Object>>() |
108 | 133 | .index(index) |
109 | 134 | .id(entityId) |
110 | | - .document(updatedDocument) |
| 135 | + .document(merged) |
111 | 136 | .refresh(Refresh.True) |
112 | 137 | .build(); |
113 | 138 | IndexResponse response = elasticsearchClient.index(indexRequest); |
114 | | - return response.result().jsonValue(); |
115 | | - } catch (IOException e) { |
| 139 | + return (existingFound ? "updated" : "created") + ":" + response.result().jsonValue(); |
| 140 | + } catch (Exception e) { |
| 141 | + log.error("Error performing merge+index update for index={}, id={}: {}", index, entityId, e.getMessage(), e); |
116 | 142 | return null; |
117 | 143 | } |
118 | 144 | } |
@@ -142,7 +168,15 @@ public SearchResult searchDocuments(String esIndexName, SearchCriteria searchCri |
142 | 168 | SearchResult searchResult = new SearchResult(); |
143 | 169 | searchResult.setData(paginatedResult); |
144 | 170 | searchResult.setFacets(fieldAggregations); |
145 | | - searchResult.setTotalCount(paginatedSearchResponse.hits().total().value()); |
| 171 | + long totalHits = 0L; |
| 172 | + var hitsMeta = paginatedSearchResponse.hits(); |
| 173 | + if (hitsMeta != null) { |
| 174 | + var totalObj = hitsMeta.total(); |
| 175 | + if (totalObj != null) { |
| 176 | + totalHits = totalObj.value(); |
| 177 | + } |
| 178 | + } |
| 179 | + searchResult.setTotalCount(totalHits); |
146 | 180 | return searchResult; |
147 | 181 | } catch (IOException e) { |
148 | 182 | log.error("Error while fetching details from elastic search"); |
|
0 commit comments