Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 229 additions & 17 deletions oxygent/databases/db_es/local_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import asyncio
import json
from datetime import datetime
import locale
import logging
import os
Expand Down Expand Up @@ -103,9 +104,30 @@ async def _read_json_safe(self, path: str) -> Optional[Dict[str, Any]]:
# Public ES‑like API
# ------------------------------------------------------------------

async def index_exists(self, index_name: str) -> bool:
"""Check if an index exists by verifying its data file.

Args:
index_name: Name of the index to check

Returns:
True if the index exists, False otherwise
"""
index_path = self._index_path(index_name)
return await aiofiles.os.path.exists(index_path)

async def create_index(
self, index_name: str, body: dict[str, Any]
) -> dict[str, bool]:
"""Create a new index with the given name and mapping body.

Args:
index_name: Name of the index to create
body: Index configuration including mappings and settings

Returns:
Dictionary with acknowledged flag
"""
if not index_name or not body:
raise ValueError("index_name and body must not be empty")

Expand Down Expand Up @@ -175,16 +197,67 @@ async def exists(self, index_name: str, doc_id: str) -> bool:

async def search(self, index_name: str, body: dict[str, Any]):
data = await self._read_json_safe(self._index_path(index_name)) or {}
query = body.get("query", {})
docs = self._build_docs(data)
docs = self._filter_docs(docs, body.get("query", {}))
docs = self._sort_docs(docs, body.get("sort", []))
filtered_docs = self._filter_docs(docs, query)
docs = self._sort_docs(filtered_docs, body.get("sort", []))
result_size = body.get("size", 10)
result_docs = docs[:result_size]

# Apply _source filtering if specified
source_fields = body.get("_source")
if source_fields and isinstance(source_fields, list):
docs = self._apply_source_filtering(docs, source_fields)

return {"hits": {"hits": docs[: body.get("size", 10)]}}
result_docs = self._apply_source_filtering(result_docs, source_fields)

# handle aggregations(if needed)
aggs = body.get("aggs", {})
result = {"hits": {"hits": result_docs, "total": {"value": len(filtered_docs)}}}

if aggs:
# support aggregations statistics
agg_results = {}
for agg_name, agg_config in aggs.items():
if "terms" in agg_config:
# terms aggregation
field = agg_config["terms"]["field"]
buckets = {}
for doc in filtered_docs:
source = doc.get("_source", {})
# extract nested field values
value = source
for part in field.split("."):
if isinstance(value, dict):
value = value.get(part, "")
else:
value = ""
break
if value is None:
value = ""
value_str = str(value)
buckets[value_str] = buckets.get(value_str, 0) + 1

agg_results[agg_name] = {
"buckets": [
{"key": k, "doc_count": v}
for k, v in buckets.items()
]
}
elif "filter" in agg_config:
# filter aggregation
filter_query = agg_config["filter"]
filtered_count = len(self._filter_docs(filtered_docs, filter_query))
agg_results[agg_name] = {"doc_count": filtered_count}
elif "top_hits" in agg_config:
# top_hits aggregation (simplified version)
top_docs = docs[:agg_config["top_hits"].get("size", 1)]
agg_results[agg_name] = {
"hits": {"hits": top_docs}
}

if agg_results:
result["aggregations"] = agg_results

return result

# ------------------------------------------------------------------
# Helpers for naive query execution
Expand All @@ -208,10 +281,44 @@ def _apply_source_filtering(docs: list[dict[str, Any]], source_fields: list[str]
filtered_docs.append(filtered_doc)
return filtered_docs

def _filter_docs(self, docs: list[dict[str, Any]], query: dict[str, Any]):
def _filter_docs(self, docs: list[dict[str, Any]], query: dict[str, Any]) -> list[dict[str, Any]]:
if not query:
return docs

# Support match query for full-text search (case-insensitive substring match)
if "match" in query:
match_query = query["match"]
# Handle both simple match: {"field": "value"} and complex match: {"field": {"query": "value", "operator": "and"}}
if isinstance(match_query, dict):
field_name = None
search_value = None
operator = "or" # default operator

for k, v in match_query.items():
if isinstance(v, dict):
field_name = k
search_value = v.get("query", "")
operator = v.get("operator", "or")
else:
field_name = k
search_value = v

if field_name and search_value:
search_terms = str(search_value).lower().split()
filtered_docs = []
for doc in docs:
field_value = str(doc["_source"].get(field_name, "")).lower()
if operator == "and":
# All terms must be present
if all(term in field_value for term in search_terms):
filtered_docs.append(doc)
else:
# At least one term must be present (or)
if any(term in field_value for term in search_terms):
filtered_docs.append(doc)
return filtered_docs
return docs

if "term" in query:
k, v = next(iter(query["term"].items()))
if k == "_id":
Expand All @@ -224,15 +331,21 @@ def _filter_docs(self, docs: list[dict[str, Any]], query: dict[str, Any]):

if "bool" in query:
bool_query = query["bool"]
filtered_docs = docs.copy()

# Process "must" conditions (must match - affects scoring)
if "must" in bool_query:
must_conditions = bool_query["must"]
filtered_docs = docs.copy()

for condition in must_conditions:
filtered_docs = self._filter_docs(filtered_docs, condition)
return filtered_docs

# Process "filter" conditions (filter context - no scoring, but must match)
if "filter" in bool_query:
filter_conditions = bool_query["filter"]
for condition in filter_conditions:
filtered_docs = self._filter_docs(filtered_docs, condition)

# Process "should" conditions (should match - OR logic)
if "should" in bool_query:
should_conditions = bool_query["should"]
filtered_docs = []
Expand All @@ -243,18 +356,100 @@ def _filter_docs(self, docs: list[dict[str, Any]], query: dict[str, Any]):
break
return filtered_docs

# Process "must_not" conditions (must not match - exclude)
if "must_not" in bool_query:
must_not_conditions = bool_query["must_not"]
filtered_docs = []
for doc in docs:
for doc in filtered_docs:
exclude = False
for cond in must_not_conditions:
if self._match_single_condition(doc, cond):
exclude = True
break
if not exclude:
filtered_docs.append(doc)
return filtered_docs

return filtered_docs

if "range" in query:
range_conditions = query["range"]
filtered_docs = []
for doc in docs:
match = True
for field, range_params in range_conditions.items():
value = doc["_source"].get(field, "")
value_str = str(value)

# Parse document timestamp
parsed_dt = None
try:
# Try full format with microseconds (26 chars like "2025-12-30 18:19:38.050895")
# Try without microseconds (19 chars like "2025-12-30 18:19:38")
time_formats = [
"%Y-%m-%d %H:%M:%S.%f", # Full format with 6 microseconds
"%Y-%m-%d %H:%M:%S", # Without microseconds
]
for fmt in time_formats:
try:
parsed_dt = datetime.strptime(value_str, fmt)
break
except Exception:
continue
except Exception:
pass

# Actual range filtering logic
try:
if parsed_dt:
for op, threshold in range_params.items():
threshold_str = str(threshold)

# Parse threshold timestamp - try multiple formats
threshold_dt = None
for fmt in time_formats:
try:
threshold_dt = datetime.strptime(threshold_str, fmt)
break
except Exception:
continue

if threshold_dt is None:
continue

if op == "gte" and parsed_dt < threshold_dt:
match = False
break
elif op == "lte" and parsed_dt > threshold_dt:
match = False
break
elif op == "gt" and parsed_dt <= threshold_dt:
match = False
break
elif op == "lt" and parsed_dt >= threshold_dt:
match = False
break

if not match:
break
except Exception:
# Fallback to string comparison
for op, threshold in range_params.items():
if op == "gte" and str(value) < str(threshold):
match = False
break
if op == "lte" and str(value) > str(threshold):
match = False
break
if op == "gt" and str(value) <= str(threshold):
match = False
break
if op == "lt" and str(value) >= str(threshold):
match = False
break
if match:
filtered_docs.append(doc)

return filtered_docs

return docs

Expand Down Expand Up @@ -297,13 +492,30 @@ def _match_single_condition(
k, vlist = next(iter(condition["terms"].items()))
return doc["_source"].get(k) in vlist

# Support match query in single condition context
if "match" in condition:
k, v = next(iter(condition["match"].items()))
if k == "_id":
field_value = str(doc["_id"])
else:
field_value = str(doc["_source"].get(k, ""))
return v.lower() in field_value.lower()
match_query = condition["match"]
if isinstance(match_query, dict):
field_name = None
search_value = None
operator = "or"

for k, v in match_query.items():
if isinstance(v, dict):
field_name = k
search_value = v.get("query", "")
operator = v.get("operator", "or")
else:
field_name = k
search_value = v

if field_name and search_value:
field_value = str(doc["_source"].get(field_name, "")).lower()
search_terms = str(search_value).lower().split()
if operator == "and":
return all(term in field_value for term in search_terms)
else:
return any(term in field_value for term in search_terms)

return False

Expand Down