Skip to content

Commit e3d654a

Browse files
author
kushidhar-in
committed
Add Langfuse tracing across RAG ingestion pipeline with datapack-scoped project resolution
Instrument the RAG agent pipeline end-to-end with Langfuse spans/observations so ingestion, chunking, embedding generation, vector writes, and pipeline state updates are traceable in a single observability flow. This adds method-level @observe coverage to core pipeline layers (, , , , , , , and active pipeline tracking DB methods), and introduces a wrapped SQL executor in to capture query metadata and row counts for database operations. In , add datapack-aware Langfuse key resolution from document URI and , then bind the active public key context so nested observations are routed to the correct Langfuse project. Also wrap top-level task processing in an observation span and ensure structured success/error output is emitted to tracing. Improve failure handling by updating document status to FAILED only when is available. Update dependencies by adding to support the new tracing instrumentation.
1 parent 13d2386 commit e3d654a

10 files changed

Lines changed: 290 additions & 142 deletions

File tree

python/ai/rag_agent/db/active_pipeline_tracking.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from enum import Enum
88
from db.connection_pool import ConnectionPool
99

10-
10+
from langfuse import observe
1111
class PipelineStatus(Enum):
1212
"""Enum for pipeline status values"""
1313
PROCESSING = 'PROCESSING'
@@ -38,6 +38,7 @@ def _return_connection(self, conn):
3838
"""Return a connection to the pool."""
3939
self.pool.return_connection(conn)
4040

41+
@observe(name="Insert Pipeline Details / Active Pipeline Tracking", as_type="span")
4142
def insert_pipeline_details(
4243
self,
4344
document_id: str,
@@ -112,6 +113,7 @@ def insert_pipeline_details(
112113
if conn:
113114
self._return_connection(conn)
114115

116+
@observe(name="Update Pipeline Status / Active Pipeline Tracking", as_type="span")
115117
def update_pipeline_status(
116118
self,
117119
pipeline_id: str,
@@ -164,6 +166,7 @@ def update_pipeline_status(
164166
if conn:
165167
self._return_connection(conn)
166168

169+
@observe(name="Mark Pipeline Completed / Active Pipeline Tracking", as_type="span")
167170
def mark_pipeline_completed(self, pipeline_id: str) -> bool:
168171
"""
169172
Mark a pipeline as completed.
@@ -206,6 +209,7 @@ def mark_pipeline_completed(self, pipeline_id: str) -> bool:
206209
if conn:
207210
self._return_connection(conn)
208211

212+
@observe(name="Record Pipeline Error / Active Pipeline Tracking", as_type="span")
209213
def record_pipeline_error(
210214
self,
211215
pipeline_id: str,
@@ -256,6 +260,7 @@ def record_pipeline_error(
256260
if conn:
257261
self._return_connection(conn)
258262

263+
@observe(name="Update Chunks Processed / Active Pipeline Tracking", as_type="span")
259264
def update_chunks_processed(self, pipeline_id: str, chunks_count: int) -> bool:
260265
"""
261266
Update the number of chunks processed for a pipeline.
@@ -301,6 +306,7 @@ def update_chunks_processed(self, pipeline_id: str, chunks_count: int) -> bool:
301306
if conn:
302307
self._return_connection(conn)
303308

309+
@observe(name="Update Embeddings Persisted / Active Pipeline Tracking", as_type="span")
304310
def update_embeddings_persisted(self, pipeline_id: str, embeddings_count: int) -> bool:
305311
"""
306312
Update the number of embeddings persisted for a pipeline.

python/ai/rag_agent/db/yugabytedb_vector_store.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,31 @@
66
from psycopg import sql
77
from db.connection_pool import ConnectionPool
88
from db.active_pipeline_tracking import PipelineTracking
9+
from langfuse import observe, get_client
10+
11+
12+
@observe(name="Execute SQL / YugabyteDB Vector Store", as_type="span", capture_input=False, capture_output=False)
13+
def execute_sql(cur, query: str, params=None, many: bool = False) -> int:
14+
if many:
15+
cur.executemany(query, params)
16+
elif params is not None:
17+
cur.execute(query, params)
18+
else:
19+
cur.execute(query)
20+
21+
rowcount = cur.rowcount
22+
23+
get_client().update_current_span(
24+
input={
25+
"query": query.strip(),
26+
"mode": "executemany" if many else "execute",
27+
**({"batch_size": len(params)} if many and params is not None else {}),
28+
**({"param_count": len(params)} if not many and params is not None else {}),
29+
},
30+
output={"rowcount": rowcount}
31+
)
32+
33+
return rowcount
934

1035

1136
class YugabyteDBVectorStore:
@@ -40,7 +65,8 @@ def _table_exists(self, conn, table_name, table_schema):
4065
cur = conn.cursor()
4166
try:
4267
# Query information_schema to check if table exists
43-
cur.execute(
68+
execute_sql(
69+
cur,
4470
"""
4571
SELECT EXISTS (
4672
SELECT 1 FROM information_schema.tables
@@ -82,6 +108,7 @@ def _ensure_table_exists(self, table_name, schema):
82108
if conn:
83109
self._return_connection(conn)
84110

111+
@observe(name="Insert Embeddings / YugabyteDB Vector Store", as_type="span")
85112
def insert_embeddings(
86113
self,
87114
document_id,
@@ -142,8 +169,12 @@ def insert_embeddings(
142169
tenant_id, metadata_json)
143170
)
144171
if len(batch) >= batch_size:
145-
cur.executemany(insert_stmt, batch)
146-
rows_inserted = cur.rowcount
172+
rows_inserted = execute_sql(
173+
cur,
174+
insert_stmt,
175+
batch,
176+
many=True
177+
)
147178
total_inserted += rows_inserted
148179
self.pipeline_tracking.update_embeddings_persisted(
149180
pipeline_id=pipeline_id,
@@ -159,8 +190,12 @@ def insert_embeddings(
159190

160191
# Insert any remaining items in the batch
161192
if batch:
162-
cur.executemany(insert_stmt, batch)
163-
rows_inserted = cur.rowcount
193+
rows_inserted = execute_sql(
194+
cur,
195+
insert_stmt,
196+
batch,
197+
many=True
198+
)
164199
total_inserted += rows_inserted
165200
self.pipeline_tracking.update_embeddings_persisted(
166201
pipeline_id=pipeline_id,
@@ -189,6 +224,7 @@ def insert_embeddings(
189224
if conn:
190225
self._return_connection(conn)
191226

227+
@observe(name="Create Index / YugabyteDB Vector Store", as_type="span")
192228
def create_index(
193229
self,
194230
table_name,
@@ -223,7 +259,7 @@ def create_index(
223259
table=sql.Identifier(table_name),
224260
ops_class=sql.SQL(distance_metric),
225261
)
226-
cur.execute(sql_create_index)
262+
execute_sql(cur, sql_create_index)
227263

228264
conn.commit()
229265
cur.close()

python/ai/rag_agent/embeddings/embed.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from langchain_openai import OpenAIEmbeddings
33
from pdf_processing import PDFProcessor
44
from html_processing import HTMLProcessor
5+
from langfuse import observe
56
import logging
67
import psycopg
78
import os
@@ -118,6 +119,7 @@ def _generate_embeddings_for_text_files(
118119
f"empty/whitespace chunks, {yielded_count} embeddings yielded"
119120
)
120121

122+
@observe(name="Generate Embeddings for PDF Files / EmbeddingsGenerator", as_type="embedding")
121123
def _generate_embeddings_for_pdf_files(
122124
self,
123125
pipeline_id: int,
@@ -167,6 +169,7 @@ def _generate_embeddings_for_pdf_files(
167169
f"{chunk_count} total chunks, {yielded_count} embeddings yielded"
168170
)
169171

172+
@observe(name="Generate Embeddings for HTML Files / EmbeddingsGenerator", as_type="embedding")
170173
def _generate_embeddings_for_html_file(
171174
self,
172175
pipeline_id: int,
@@ -218,10 +221,12 @@ def _generate_embeddings_for_html_file(
218221
f"{chunk_count} total chunks, {yielded_count} embeddings yielded"
219222
)
220223

224+
@observe(name="Generate Embeddings for Video Files / EmbeddingsGenerator", as_type="embedding")
221225
def _generate_embeddings_for_video_files(self, file_location: str, chunk_args=None):
222226
"""Generate embeddings for video files."""
223227
pass
224228

229+
@observe(name="Generate Embeddings / EmbeddingsGenerator", as_type="chain")
225230
def generate_embeddings(self, pipeline_id: int, file_location: str, chunk_args=None):
226231
"""
227232
Generator that yields (chunk_text, embedding_vector) tuples.
@@ -275,6 +280,7 @@ def generate_embeddings(self, pipeline_id: int, file_location: str, chunk_args=N
275280
else:
276281
raise ValueError(f"Unsupported file type: {file_type}")
277282

283+
@observe(name="Generate User Prompt Embeddings / EmbeddingsGenerator", as_type="embedding")
278284
def generate_user_prompt_embeddings(self, user_prompt: str) -> list[float]:
279285
"""Generate embeddings for user prompt."""
280286

python/ai/rag_agent/embeddings/embedding_user_promt.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import psycopg
44
from langchain_openai import OpenAIEmbeddings
5+
from langfuse import observe
56

67

78
class UserPromptEmbedder:
@@ -36,6 +37,7 @@ def __init__(self,
3637
)
3738
self.logger = logging.getLogger(__name__)
3839

40+
@observe(name="Embed Prompt / UserPromptEmbedder", as_type="embedding")
3941
def embed_prompt(self, prompt: str):
4042
"""Get OpenAI embedding for a single prompt."""
4143
try:
@@ -45,6 +47,7 @@ def embed_prompt(self, prompt: str):
4547
self.logger.error(f"Failed to generate embedding for prompt: {e}")
4648
raise
4749

50+
@observe(name="Similarity Search / UserPromptEmbedder", as_type="retriever")
4851
def similarity_search(self, prompt: str):
4952
"""
5053
Embed the prompt and perform a vector similarity search using HNSW index in the PG table.

python/ai/rag_agent/pdf_processing/process_pdf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from langchain_community.document_loaders import UnstructuredPDFLoader
1010
from langchain_core.documents import Document
1111
from rag_pipeline.chunk import chunk_langchain_docs, DEFAULT_SPLITTER, DEFAULT_ARGS
12+
from langfuse import observe
1213

1314

1415
class PDFProcessor:
@@ -17,6 +18,7 @@ def __init__(self):
1718
# self.model = model
1819
pass
1920

21+
@observe(name="Load PDF from Local / PDFProcessor", as_type="span")
2022
def _load_pdf_from_local(self, file_path):
2123
logging.info(f"Loading PDF data from {file_path}")
2224
try:
@@ -118,6 +120,7 @@ def _load_pdf_from_local(self, file_path):
118120
# logging.error(f"Failed to read file from S3: {e}")
119121
# raise RuntimeError(f"Failed to read file from S3: {e}")
120122

123+
@observe(name="Load PDF from S3 / PDFProcessor", as_type="retriever")
121124
def _load_pdf_from_s3(self, file_path: str):
122125

123126
logging.debug(f"Reading file from S3: {file_path}")
@@ -179,6 +182,7 @@ def _load_pdf_from_s3(self, file_path: str):
179182
logging.error(f"Failed to read file from S3: {e}")
180183
raise RuntimeError(f"Failed to read file from S3: {e}")
181184

185+
@observe(name="Process PDF Data / PDFProcessor", as_type="chain")
182186
def process_pdf_data(
183187
self, file_path: str, chunk_args: Dict[str, Any] = {}
184188
) -> Generator[str, None, None]:

python/ai/rag_agent/rag_pipeline/chunk.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
RecursiveCharacterTextSplitter,
88
SpacyTextSplitter
99
)
10+
from langfuse import observe
1011
import json
1112
import mimetypes
1213

@@ -54,6 +55,7 @@ def get_splitter_for_filetype(file_location: str) -> tuple[str, str]:
5455
return splitter, args
5556

5657

58+
@observe(name="Chunk Text / chunk", as_type="span")
5759
def chunk(splitter, text, args):
5860
kwargs = json.loads(args)
5961

@@ -63,6 +65,7 @@ def chunk(splitter, text, args):
6365
raise ValueError("Unknown splitter: {}".format(splitter))
6466

6567

68+
@observe(name="Chunk LangChain Docs / chunk", as_type="span")
6669
def chunk_langchain_docs(splitter, docs, args):
6770
kwargs = json.loads(args)
6871

0 commit comments

Comments
 (0)