Skip to content

Commit

Permalink
chatbot-rag-app: recover from timeouts on first use of ELSER
Browse files Browse the repository at this point in the history
Through investigation, I found that timeout errors on first use were ultimately
caused by a buildup of ML jobs, which until complete prevent operation. This
change catches the first error doing bulk indexing which is about 10s later.
Then, the code watches for ML jobs to settle, which is a less frustrating
experience than users having to do so for periods of several minutes. I use
`warn` when this happens so that status is visible even when tools like
docker compose may hide or buffer stdout.

While I was here, I tuned the docker compose setup slightly. At first, I
thought there was a memory issue, but there isn't in fact 2GB is plenty and
causes less worry considering many other containers are running. I also
matched health check behavior with upstream work in kibana. Finally, I
updated dependencies just for hygiene.

Fixes #307

Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt committed Feb 19, 2025
1 parent 2868e7d commit dd4a381
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 93 deletions.
13 changes: 10 additions & 3 deletions docker/docker-compose-elastic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@ services:
- xpack.security.http.ssl.enabled=false
- xpack.security.transport.ssl.enabled=false
- xpack.license.self_generated.type=trial
- ES_JAVA_OPTS=-Xmx8g
# Use minimum heap required by ELSER
- ES_JAVA_OPTS=-Xms2g -Xmx2g
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test: ["CMD-SHELL", "curl -s http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=500ms"]
retries: 300
test: # readiness probe taken from kbn-health-gateway-server script
[
"CMD-SHELL",
"curl -s http://localhost:9200 | grep -q 'missing authentication credentials'",
]
start_period: 10s
interval: 1s
timeout: 10s
retries: 120

elasticsearch_settings:
depends_on:
Expand Down
116 changes: 89 additions & 27 deletions example-apps/chatbot-rag-app/data/index_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import json
import os
import time
from warnings import warn

from elasticsearch import (
ApiError,
Elasticsearch,
NotFoundError,
BadRequestError,
)
from elastic_transport._exceptions import ConnectionTimeout

from elasticsearch import Elasticsearch, NotFoundError
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_elasticsearch import ElasticsearchStore
Expand All @@ -18,42 +26,46 @@
ELSER_MODEL = os.getenv("ELSER_MODEL", ".elser_model_2")

if ELASTICSEARCH_USER:
elasticsearch_client = Elasticsearch(
es = Elasticsearch(
hosts=[ELASTICSEARCH_URL],
basic_auth=(ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD),
)
elif ELASTICSEARCH_API_KEY:
elasticsearch_client = Elasticsearch(
hosts=[ELASTICSEARCH_URL], api_key=ELASTICSEARCH_API_KEY
)
es = Elasticsearch(hosts=[ELASTICSEARCH_URL], api_key=ELASTICSEARCH_API_KEY)
else:
raise ValueError(
"Please provide either ELASTICSEARCH_USER or ELASTICSEARCH_API_KEY"
)


def install_elser():
# Step 1: Ensure ELSER_MODEL is defined
try:
elasticsearch_client.ml.get_trained_models(model_id=ELSER_MODEL)
print(f'"{ELSER_MODEL}" model is available')
es.ml.get_trained_models(model_id=ELSER_MODEL)
except NotFoundError:
print(f'"{ELSER_MODEL}" model not available, downloading it now')
elasticsearch_client.ml.put_trained_model(
es.ml.put_trained_model(
model_id=ELSER_MODEL, input={"field_names": ["text_field"]}
)
while True:
status = elasticsearch_client.ml.get_trained_models(
model_id=ELSER_MODEL, include="definition_status"
)
if status["trained_model_configs"][0]["fully_defined"]:
# model is ready
break
time.sleep(1)
while True:
status = es.ml.get_trained_models(
model_id=ELSER_MODEL, include="definition_status"
)
if status["trained_model_configs"][0]["fully_defined"]:
break
time.sleep(1)

print("Model downloaded, starting deployment")
elasticsearch_client.ml.start_trained_model_deployment(
# Step 1: Ensure ELSER_MODEL is deployed
try:
es.ml.start_trained_model_deployment(
model_id=ELSER_MODEL, wait_for="fully_allocated"
)
print(f'"{ELSER_MODEL}" model is deployed')
except BadRequestError:
# This error means it already exists
pass

print(f'"{ELSER_MODEL}" model is ready')


def main():
Expand Down Expand Up @@ -84,19 +96,69 @@ def main():

print(f"Creating Elasticsearch sparse vector store in {ELASTICSEARCH_URL}")

elasticsearch_client.indices.delete(index=INDEX, ignore_unavailable=True)

ElasticsearchStore.from_documents(
docs,
es_connection=elasticsearch_client,
store = ElasticsearchStore(
es_connection=es,
index_name=INDEX,
strategy=ElasticsearchStore.SparseVectorRetrievalStrategy(model_id=ELSER_MODEL),
bulk_kwargs={
"request_timeout": 60,
},
)

# The first call creates ML tasks to support the index, and typically fails
# with the default 10-second timeout, at least when Elasticsearch is a
# container running on Apple Silicon.
#
# Once elastic/elasticsearch#107077 is fixed, we can use bulk_kwargs to
# adjust the timeout.
try:
es.indices.delete(index=INDEX, ignore_unavailable=True)
store.add_documents(list(docs))
except BadRequestError:
# This error means the index already exists
pass
except (ConnectionTimeout, ApiError) as e:
if isinstance(e, ApiError) and e.status_code != 408:
raise
warn(f"Error occurred, will retry after ML jobs complete: {e}")
await_ml_tasks()
es.indices.delete(index=INDEX, ignore_unavailable=True)
store.add_documents(list(docs))


def await_ml_tasks(max_timeout=600, interval=5):
"""
Waits for all machine learning tasks to complete within a specified timeout period.
Parameters:
max_timeout (int): Maximum time to wait for tasks to complete, in seconds.
interval (int): Time to wait between status checks, in seconds.
Raises:
TimeoutError: If the timeout is reached and machine learning tasks are still running.
"""
start_time = time.time()

tasks = [] # Initialize tasks list
previous_task_count = 0 # Track the previous number of tasks
while time.time() - start_time < max_timeout:
tasks = []
resp = es.tasks.list(detailed=True, actions=["cluster:monitor/xpack/ml/*"])
for node_id, node_info in resp["nodes"].items():
node_tasks = node_info.get("tasks", {})
for task_id, task_info in node_tasks.items():
tasks.append(task_info["action"])
if not tasks:
break
current_task_count = len(tasks)
if current_task_count != previous_task_count:
warn(f"Awaiting {current_task_count} ML tasks")
previous_task_count = current_task_count
time.sleep(interval)

if tasks:
raise TimeoutError(
f"Timeout reached. ML tasks are still running: {', '.join(tasks)}"
)


# Unless we run through flask, we can miss critical settings or telemetry signals.
if __name__ == "__main__":
raise RuntimeError("Run via the parent directory: 'flask create-index'")
main()
Loading

0 comments on commit dd4a381

Please sign in to comment.