Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/db/services/dialog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ def decorate_answer(answer):
return {"answer": think + answer, "reference": refs, "prompt": re.sub(r"\n", " \n", prompt), "created_at": time.time()}

if langfuse_tracer:
langfuse_generation = langfuse_tracer.start_generation(
langfuse_generation = langfuse_tracer.start_observation(as_type="generation",
trace_context=trace_context, name="chat", model=llm_model_config["llm_name"],
input={"prompt": prompt, "prompt4citation": prompt4citation, "messages": msg}
)
Comment on lines 784 to 788
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether `decorate_answer` directly closes over `langfuse_generation`.
python - <<'PY'
import ast
from pathlib import Path

path = Path("api/db/services/dialog_service.py")
tree = ast.parse(path.read_text())

async_chat = next(
    node for node in tree.body
    if isinstance(node, ast.AsyncFunctionDef) and node.name == "async_chat"
)
decorate_answer = next(
    node for node in async_chat.body
    if isinstance(node, ast.FunctionDef) and node.name == "decorate_answer"
)

names = {n.id for n in ast.walk(decorate_answer) if isinstance(n, ast.Name)}
string_check = any(
    isinstance(n, ast.Constant) and n.value == "langfuse_generation"
    for n in ast.walk(decorate_answer)
)

print("direct_name_reference =", "langfuse_generation" in names)
print("string_check_present =", string_check)
PY

Repository: infiniflow/ragflow

Length of output: 118


🏁 Script executed:

#!/bin/bash
# First, verify the file exists and check the relevant lines
head -n 800 api/db/services/dialog_service.py | tail -n 50

Repository: infiniflow/ragflow

Length of output: 2836


🏁 Script executed:

#!/bin/bash
# Search for the cleanup path that checks langfuse_generation
rg -n "langfuse_generation" api/db/services/dialog_service.py -A 2 -B 2

Repository: infiniflow/ragflow

Length of output: 1006


Fix the langfuse observation finalization guard condition.

The cleanup path at line 771 uses "langfuse_generation" in locals() which may fail even though the variable is directly referenced on lines 774–775. In a nested function, variables from the parent scope don't reliably appear in locals() before they're first assigned locally. Change the guard to directly test the variable instead:

Suggested fix
     langfuse_tracer = None
+    langfuse_generation = None
     trace_context = {}
@@
-        if langfuse_tracer and "langfuse_generation" in locals():
+        if langfuse_generation is not None:
             langfuse_output = "\n" + re.sub(r"^.*?(### Query:.*)", r"\1", prompt, flags=re.DOTALL)
             langfuse_output = {"time_elapsed:": re.sub(r"\n", "  \n", langfuse_output), "created_at": time.time()}
             langfuse_generation.update(output=langfuse_output)
             langfuse_generation.end()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/db/services/dialog_service.py` around lines 779 - 783, The cleanup guard
should not rely on "langfuse_generation" appearing in locals(); initialize
langfuse_generation = None before the if langfuse_tracer block (where you call
langfuse_tracer.start_observation) and then in the finalization/cleanup use a
direct test like "if langfuse_generation is not None:" (or truthiness) to decide
whether to call finish/close on langfuse_generation; this ensures references to
langfuse_generation (created by start_observation) are safe in the nested
cleanup code.

Expand Down
24 changes: 12 additions & 12 deletions api/db/services/llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def bind_tools(self, toolcall_session, tools):

def encode(self, texts: list):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode", model=self.model_config["llm_name"], input={"texts": texts})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="encode", model=self.model_config["llm_name"], input={"texts": texts})

safe_texts = []
for text in texts:
Expand All @@ -119,7 +119,7 @@ def encode(self, texts: list):

def encode_queries(self, query: str):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="encode_queries", model=self.model_config["llm_name"], input={"query": query})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="encode_queries", model=self.model_config["llm_name"], input={"query": query})

emd, used_tokens = self.mdl.encode_queries(query)
if self.model_config["llm_factory"] == "Builtin":
Expand All @@ -135,7 +135,7 @@ def encode_queries(self, query: str):

def similarity(self, query: str, texts: list):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="similarity", model=self.model_config["llm_name"], input={"query": query, "texts": texts})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="similarity", model=self.model_config["llm_name"], input={"query": query, "texts": texts})

sim, used_tokens = self.mdl.similarity(query, texts)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -149,7 +149,7 @@ def similarity(self, query: str, texts: list):

def describe(self, image, max_tokens=300):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="describe", metadata={"model": self.model_config["llm_name"]})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="describe", metadata={"model": self.model_config["llm_name"]})

txt, used_tokens = self.mdl.describe(image)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -163,7 +163,7 @@ def describe(self, image, max_tokens=300):

def describe_with_prompt(self, image, prompt):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="describe_with_prompt", metadata={"model": self.model_config["llm_name"], "prompt": prompt})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="describe_with_prompt", metadata={"model": self.model_config["llm_name"], "prompt": prompt})

txt, used_tokens = self.mdl.describe_with_prompt(image, prompt)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -177,7 +177,7 @@ def describe_with_prompt(self, image, prompt):

def transcription(self, audio):
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="transcription", metadata={"model": self.model_config["llm_name"]})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="transcription", metadata={"model": self.model_config["llm_name"]})

txt, used_tokens = self.mdl.transcription(audio)
if not TenantLLMService.increase_usage_by_id(self.model_config["id"], used_tokens):
Expand All @@ -194,7 +194,7 @@ def stream_transcription(self, audio):
supports_stream = hasattr(mdl, "stream_transcription") and callable(getattr(mdl, "stream_transcription"))
if supports_stream:
if self.langfuse:
generation = self.langfuse.start_generation(
generation = self.langfuse.start_observation(as_type="generation",
trace_context=self.trace_context,
name="stream_transcription",
metadata={"model": self.model_config["llm_name"]},
Expand Down Expand Up @@ -228,7 +228,7 @@ def stream_transcription(self, audio):
return

if self.langfuse:
generation = self.langfuse.start_generation(
generation = self.langfuse.start_observation(as_type="generation",
trace_context=self.trace_context,
name="stream_transcription",
metadata={"model": self.model_config["llm_name"]},
Expand All @@ -253,7 +253,7 @@ def stream_transcription(self, audio):

def tts(self, text: str) -> Generator[bytes, None, None]:
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="tts", input={"text": text})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="tts", input={"text": text})

for chunk in self.mdl.tts(text):
if isinstance(chunk, int):
Expand Down Expand Up @@ -376,7 +376,7 @@ async def async_chat(self, system: str, history: list, gen_conf: dict = {}, **kw

generation = None
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat", model=self.model_config["llm_name"], input={"system": system, "history": history})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="chat", model=self.model_config["llm_name"], input={"system": system, "history": history})

chat_partial = partial(base_fn, system, history, gen_conf)
use_kwargs = self._clean_param(chat_partial, **kwargs)
Expand Down Expand Up @@ -417,7 +417,7 @@ async def async_chat_streamly(self, system: str, history: list, gen_conf: dict =

generation = None
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})

if stream_fn:
chat_partial = partial(stream_fn, system, history, gen_conf)
Expand Down Expand Up @@ -460,7 +460,7 @@ async def async_chat_streamly_delta(self, system: str, history: list, gen_conf:

generation = None
if self.langfuse:
generation = self.langfuse.start_generation(trace_context=self.trace_context, name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})
generation = self.langfuse.start_observation(trace_context=self.trace_context, as_type="generation", name="chat_streamly", model=self.model_config["llm_name"], input={"system": system, "history": history})

if stream_fn:
chat_partial = partial(stream_fn, system, history, gen_conf)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies = [
"infinity-emb>=0.0.66,<0.0.67",
"jira==3.10.5",
"json-repair==0.35.0",
"langfuse>=2.60.0",
"langfuse>=4.0.1",
"mammoth>=1.11.0",
"markdown==3.6",
"markdown-to-json==2.1.1",
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.