Skip to content

Commit 4b52cfa

Browse files
authored
fix: enable Elasticsearch full text search support (#2947)
1 parent bfa1b11 commit 4b52cfa

File tree

3 files changed

+148
-13
lines changed

3 files changed

+148
-13
lines changed

packages/dbgpt-core/src/dbgpt/storage/full_text/base.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,37 @@ def __init__(self, executor: Optional[Executor] = None):
2020
"""Initialize vector store."""
2121
super().__init__(executor)
2222

23+
def is_support_full_text_search(self) -> bool:
24+
# 重写,新增抽象类
25+
"""Support full text search.
26+
27+
Full text store should support full text search by default.
28+
29+
Return:
30+
bool: True, full text stores always support full text search.
31+
"""
32+
return True # 全文检索存储类应该始终支持全文检索
33+
34+
def full_text_search(
35+
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
36+
) -> List[Chunk]:
37+
# 重写,新增抽象类
38+
"""Full text search.
39+
40+
Args:
41+
text (str): The query text.
42+
topk (int): Number of results to return. Default is 10.
43+
44+
Returns:
45+
List[Chunk]: Search results as chunks.
46+
"""
47+
# 调用抽象方法 similar_search_with_scores,但可以忽略分数阈值
48+
# 或者子类需要实现具体的全文检索逻辑
49+
50+
return self.similar_search_with_scores(
51+
text, topk, score_threshold=0.0, filters=filters
52+
)
53+
2354
@abstractmethod
2455
def load_document(self, chunks: List[Chunk]) -> List[str]:
2556
"""Load document in index database.
@@ -30,7 +61,9 @@ def load_document(self, chunks: List[Chunk]) -> List[str]:
3061
List[str]: chunk ids.
3162
"""
3263

33-
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
64+
async def aload_document(
65+
self, chunks: List[Chunk], file_id: Optional[str] = None
66+
) -> List[str]:
3467
"""Async load document in index database.
3568
3669
Args:

packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py

Lines changed: 111 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88
from dbgpt.core import Chunk
99
from dbgpt.storage.base import IndexStoreConfig, logger
1010
from dbgpt.storage.full_text.base import FullTextStoreBase
11-
from dbgpt.storage.vector_store.filters import MetadataFilters
11+
from dbgpt.storage.vector_store.filters import (
12+
FilterCondition,
13+
FilterOperator,
14+
MetadataFilters,
15+
)
1216
from dbgpt.util import string_utils
1317
from dbgpt.util.executor_utils import blocking_func_to_async
1418
from dbgpt_ext.storage.vector_store.elastic_store import ElasticsearchStoreConfig
@@ -81,7 +85,9 @@ def __init__(
8185
"similarity": "custom_bm25",
8286
},
8387
"metadata": {
84-
"type": "keyword",
88+
# Use object so metadata fields stay queryable for filters
89+
"type": "object",
90+
"dynamic": True,
8591
},
8692
}
8793
}
@@ -94,6 +100,35 @@ def __init__(
94100
)
95101
self._executor = executor or ThreadPoolExecutor()
96102

103+
def is_support_full_text_search(self) -> bool:
104+
# 重写,避免继承父类的默认实现
105+
"""Support full text search.
106+
107+
Elasticsearch supports full text search.
108+
109+
Return:
110+
bool: True if full text search is supported.
111+
"""
112+
return True # Elasticsearch 支持全文检索
113+
114+
def full_text_search(
115+
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
116+
) -> List[Chunk]:
117+
# 重写,使用现有的 similar_search_with_scores 方法实现全文检索
118+
"""Full text search in Elasticsearch.
119+
120+
Args:
121+
text (str): The query text.
122+
topk (int): Number of results to return. Default is 10.
123+
124+
Returns:
125+
List[Chunk]: Search results as chunks.
126+
"""
127+
score_threshold = 0.0
128+
return self.similar_search_with_scores(
129+
text=text, top_k=topk, score_threshold=score_threshold, filters=filters
130+
)
131+
97132
def get_config(self) -> IndexStoreConfig:
98133
"""Get the es store config."""
99134
return self._es_config
@@ -114,7 +149,7 @@ def load_document(self, chunks: List[Chunk]) -> List[str]:
114149
es_requests = []
115150
ids = []
116151
contents = [chunk.content for chunk in chunks]
117-
metadatas = [json.dumps(chunk.metadata) for chunk in chunks]
152+
metadatas = [self._normalize_metadata(chunk.metadata) for chunk in chunks]
118153
chunk_ids = [chunk.chunk_id for chunk in chunks]
119154
for i, content in enumerate(contents):
120155
es_request = {
@@ -143,19 +178,22 @@ def similar_search(
143178
Return:
144179
List[Chunk]: similar text.
145180
"""
146-
es_query = {"query": {"match": {"content": text}}}
147-
res = self._es_client.search(index=self._index_name, body=es_query)
181+
es_query = self._build_query(text, filters)
182+
res = self._es_client.search(
183+
index=self._index_name, body=es_query, size=topk, track_total_hits=False
184+
)
148185

149186
chunks = []
150187
for r in res["hits"]["hits"]:
188+
metadata = self._normalize_metadata(r["_source"].get("metadata"))
151189
chunks.append(
152190
Chunk(
153191
chunk_id=r["_id"],
154192
content=r["_source"]["content"],
155-
metadata=json.loads(r["_source"]["metadata"]),
193+
metadata=metadata,
156194
)
157195
)
158-
return chunks[:topk]
196+
return chunks
159197

160198
def similar_search_with_scores(
161199
self,
@@ -175,17 +213,20 @@ def similar_search_with_scores(
175213
Return:
176214
List[Tuple[str, float]]: similar text with scores.
177215
"""
178-
es_query = {"query": {"match": {"content": text}}}
179-
res = self._es_client.search(index=self._index_name, body=es_query)
216+
es_query = self._build_query(text, filters)
217+
res = self._es_client.search(
218+
index=self._index_name, body=es_query, size=top_k, track_total_hits=False
219+
)
180220

181221
chunks_with_scores = []
182222
for r in res["hits"]["hits"]:
183223
if r["_score"] >= score_threshold:
224+
metadata = self._normalize_metadata(r["_source"].get("metadata"))
184225
chunks_with_scores.append(
185226
Chunk(
186227
chunk_id=r["_id"],
187228
content=r["_source"]["content"],
188-
metadata=json.loads(r["_source"]["metadata"]),
229+
metadata=metadata,
189230
score=r["_score"],
190231
)
191232
)
@@ -196,14 +237,23 @@ def similar_search_with_scores(
196237
)
197238
return chunks_with_scores[:top_k]
198239

199-
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
240+
async def aload_document(
241+
self, chunks: List[Chunk], file_id: Optional[str] = None
242+
) -> List[str]:
200243
"""Async load document in elasticsearch.
201244
202245
Args:
203246
chunks(List[Chunk]): document chunks.
204247
Return:
205248
List[str]: chunk ids.
206249
"""
250+
# 新增修改:将 file_id 注入到每个 chunk 的元数据中
251+
if file_id:
252+
# 确保 metadata 字段存在,然后添加或更新 file_id
253+
for chunk in chunks:
254+
if not hasattr(chunk, "metadata"):
255+
chunk.metadata = {}
256+
chunk.metadata["file_id"] = file_id
207257
return await blocking_func_to_async(self._executor, self.load_document, chunks)
208258

209259
def delete_by_ids(self, ids: str) -> List[str]:
@@ -229,3 +279,53 @@ def delete_vector_name(self, index_name: str):
229279
index_name(str): The name of index to delete.
230280
"""
231281
self._es_client.indices.delete(index=self._index_name)
282+
283+
def _build_query(self, text: str, filters: Optional[MetadataFilters]):
284+
must_clauses = [{"match": {"content": text}}]
285+
filter_clause = self._build_metadata_filter(filters)
286+
if filter_clause:
287+
must_clauses.append(filter_clause)
288+
return {"query": {"bool": {"must": must_clauses}}}
289+
290+
def _build_metadata_filter(self, filters: Optional[MetadataFilters]):
291+
"""Translate MetadataFilters to elasticsearch bool clause."""
292+
if not filters or not filters.filters:
293+
return None
294+
295+
clauses = []
296+
for f in filters.filters:
297+
field_name = f"metadata.{f.key}"
298+
if f.operator == FilterOperator.EQ:
299+
clauses.append({"term": {field_name: f.value}})
300+
elif f.operator == FilterOperator.IN:
301+
values = f.value if isinstance(f.value, list) else [f.value]
302+
clauses.append({"terms": {field_name: values}})
303+
elif f.operator == FilterOperator.NE:
304+
clauses.append({"bool": {"must_not": {"term": {field_name: f.value}}}})
305+
else:
306+
logger.warning(
307+
"Unsupported filter operator %s for elastic full text search",
308+
f.operator,
309+
)
310+
if not clauses:
311+
return None
312+
if filters.condition == FilterCondition.OR:
313+
return {"bool": {"should": clauses, "minimum_should_match": 1}}
314+
return {"bool": {"must": clauses}}
315+
316+
def _normalize_metadata(self, metadata):
317+
"""Ensure metadata is stored as a dict for downstream consumers."""
318+
if metadata is None:
319+
return {}
320+
if isinstance(metadata, dict):
321+
return metadata
322+
if isinstance(metadata, str):
323+
try:
324+
return json.loads(metadata)
325+
except Exception:
326+
# Fallback to wrapping the raw string to avoid breaking callers
327+
return {"value": metadata}
328+
try:
329+
return dict(metadata)
330+
except Exception:
331+
return {"value": metadata}

packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/opensearch.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ def load_document(self, chunks: List[Chunk]) -> List[str]:
2020
"""
2121
pass
2222

23-
def aload_document(self, chunks: List[Chunk]) -> List[str]:
23+
def aload_document(
24+
self, chunks: List[Chunk], file_id: Optional[str] = None
25+
) -> List[str]:
2426
"""Async load document in index database.
2527
2628
Args:

0 commit comments

Comments
 (0)