diff --git a/python/logging-samples/distributed-tracing-otel-python-java/.env.example b/python/logging-samples/distributed-tracing-otel-python-java/.env.example new file mode 100644 index 00000000..9f58ca22 --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/.env.example @@ -0,0 +1,17 @@ +# --- Required --- +OPENAI_API_KEY=sk-... +GALILEO_API_KEY=... +GALILEO_CONSOLE_URL=https://app.galileo.ai + +# --- Galileo project routing (used by the collector) --- +GALILEO_PROJECT=distributed-tracing-demo +GALILEO_LOG_STREAM=main + +# --- Galileo API URL (derived from console URL; adjust if different) --- +GALILEO_API_URL=https://api.galileo.ai + +# --- OTel Collector endpoint --- +# docker-compose overrides this to http://otel-collector:4318 (the bundled +# demo collector). For your real deployment, set it to the OTLP/HTTP endpoint +# of the collector you already run, e.g. https://otel-collector.internal:4318. +OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 diff --git a/python/logging-samples/distributed-tracing-otel-python-java/.gitignore b/python/logging-samples/distributed-tracing-otel-python-java/.gitignore new file mode 100644 index 00000000..0a7f0d3d --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/.gitignore @@ -0,0 +1,17 @@ +# Secrets +.env + +# Python +python-service/__pycache__/ +python-service/*.py[cod] +python-service/.venv/ + +# Java / Maven +java-service/target/ +java-service/*.class + +# ChromaDB local data +chroma-data/ + +# OS +.DS_Store diff --git a/python/logging-samples/distributed-tracing-otel-python-java/README.md b/python/logging-samples/distributed-tracing-otel-python-java/README.md new file mode 100644 index 00000000..16e1fde3 --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/README.md @@ -0,0 +1,164 @@ +# Distributed Tracing: Java + Python with Galileo via OTEL + +A runnable example showing how to trace a request across a **Java Spring Boot gateway** and a **Python FastAPI + LangGraph RAG service** — with a single trace ID stitched together end-to-end in [Galileo](https://galileo.ai). + +``` +User ──POST /ask──▶ Java Gateway ──POST /process──▶ Python RAG Service + │ Spring AI (classify) │ LangGraph (route → retrieve → generate) + │ OTel Java Agent │ LangChainInstrumentor + RetrieverSpan + └──────────────────┬─────────────┘ + │ OTLP HTTP + ▼ + OpenTelemetry Collector + │ OTLP HTTP (with Galileo auth) + ▼ + Galileo +``` + +## What this demonstrates + +- **Cross-language trace propagation** — the OTel Java agent injects a W3C `traceparent` header on every outbound HTTP call; the Python service continues the same trace automatically +- **Auto-instrumentation + targeted manual spans** — `LangChainInstrumentor` handles LangGraph nodes automatically; a Galileo `RetrieverSpan` surfaces retrieved source documents in the trace UI; the Java service adds `gen_ai.*` attributes for proper agent hierarchy rendering +- **Collector-agnostic** — both services export plain OTLP/HTTP to `OTEL_EXPORTER_OTLP_ENDPOINT`. Point them at the bundled demo collector, or at whatever OTel Collector you already run. +- **Financial services domain** — ChromaDB is pre-seeded with realistic financial policy documents (wire transfers, KYC/AML, mortgage guidelines, etc.) + +## Expected trace in Galileo + +``` +invoke_agent financial-assistant ← Java: outer agent span (manual) + chat classify_question ← Java: LLM classification (manual span) + POST /process ← Java agent: auto-instrumented HTTP call + rag_pipeline ← Python: WorkflowSpan (Galileo) + LangGraph.invoke ← Python: auto-instrumented by LangChainInstrumentor + route ← Python: LangGraph node (auto) + retrieve ← Python: LangGraph node (auto) + chromadb_search ← Python: RetrieverSpan with source docs (Galileo) + generate ← Python: LangGraph node (auto) + openai.chat ← Python: LLM span (auto, OpenAIInstrumentor) +``` + +## Prerequisites + +| Requirement | Version | +|---|---| +| Docker + Docker Compose | 24+ | +| OpenAI API key | — | +| Galileo account + API key | [console.galileo.ai](https://console.galileo.ai) | + +## Quick start + +**1. Clone and configure** + +```bash +git clone https://github.com/rungalileo/sdk-examples.git +cd sdk-examples/python/logging-samples/distributed-tracing-otel-python-java +cp .env.example .env +``` + +Edit `.env` and fill in your keys: + +```ini +OPENAI_API_KEY=sk-... +GALILEO_API_KEY=... +GALILEO_PROJECT=distributed-tracing-demo +GALILEO_LOG_STREAM=main +GALILEO_API_URL=https://api.galileo.ai +``` + +**2. Start all services** + +```bash +docker compose up --build +``` + +On first run this builds the Java and Python images (~2–3 min) and seeds ChromaDB. Once you see `Started GatewayApplication`, everything is ready. + +**3. Send a question** + +```bash +curl -s -X POST http://localhost:8089/ask \ + -H "Content-Type: application/json" \ + -d '{"question": "What are the wire transfer limits?"}' | jq +``` + +Expected response: + +```json +{ + "answer": "Domestic wire transfers are limited to $50,000 per business day...", + "category": "FINANCIAL", + "sources": ["wire-transfer-policy"] +} +``` + +**4. View traces in Galileo** + +Open your Galileo project and navigate to the log stream you configured in `.env`. You should see a trace with the full span hierarchy shown above. + +## Service ports + +| Service | Host port | Purpose | +|---|---|---| +| Java gateway | 8089 | `POST /ask` — entry point | +| Python RAG service | 8099 | `POST /process` — internal | +| ChromaDB | 8299 | Vector store | + +## Architecture + +### Java gateway (`java-service/`) + +- **Spring Boot 3.4.1 + Spring AI 1.0.0** — handles routing and LLM classification +- **OTel Java agent** (bundled in Docker image) — auto-instruments all HTTP calls and injects `traceparent` headers +- **Manual spans** — two spans with `gen_ai.*` attributes tell Galileo to render this as an agent with an LLM sub-call; Spring AI doesn't yet emit these automatically + +### Python RAG service (`python-service/`) + +- **FastAPI + LangGraph** — three-node graph: `route → retrieve → generate` +- **`LangChainInstrumentor` + `OpenAIInstrumentor`** — auto-create spans for each LangGraph node and every OpenAI call +- **`RetrieverSpan`** (Galileo) — wraps the ChromaDB query so retrieved documents appear as source context in Galileo +- **`WorkflowSpan`** (Galileo) — top-level span for the `/process` endpoint + +### How spans reach Galileo + +Both services export OTLP/HTTP to whatever endpoint `OTEL_EXPORTER_OTLP_ENDPOINT` points at. That endpoint is always an **OpenTelemetry Collector** — the collector batches, retries on failure, and forwards to Galileo with the right auth headers. Galileo credentials live in the collector config, not in the application code. + +#### Using the bundled demo collector + +For running this repo end-to-end on a laptop, `docker compose up` starts a minimal collector alongside the two services. Its config is in [`otel-collector/config.yaml`](./otel-collector/config.yaml). Services automatically point at it: + +``` +OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 +``` + +This is a demo convenience — not something you deploy. + +#### Using your own existing collector + +In any real environment you almost certainly already run an OTel Collector. To point the services at it instead: + +1. Add a Galileo exporter to your existing collector config (see [`otel-collector/config.yaml`](./otel-collector/config.yaml) for the `otlphttp/galileo` block to copy). +2. Set `OTEL_EXPORTER_OTLP_ENDPOINT` in `.env` (or your deployment config) to your collector's OTLP/HTTP endpoint, e.g. `https://otel-collector.internal.example.com:4318`. +3. Remove the `otel-collector` service from `docker-compose.yml` (or just stop pointing at it). + +No application code changes. The instrumentation stays identical. + +## Troubleshooting + +**Java service fails to start** +- Check that `OPENAI_API_KEY` is set in `.env` +- Maven downloads dependencies on first build; allow 2–3 min + +**No traces in Galileo** +- Verify `GALILEO_API_KEY` and `GALILEO_API_URL` in `.env` +- Check `GALILEO_PROJECT` and `GALILEO_LOG_STREAM` match what you're looking at in the UI + +**ChromaDB connection refused** +- Python service waits up to 60 s for ChromaDB; check `docker compose logs python-service` + +**Empty answers** +- Confirm `OPENAI_API_KEY` is valid and has quota +- Try a question in the FINANCIAL category (e.g., "What is the minimum credit score for a mortgage?") + +## Further reading + +[Distributed Tracing with OpenTelemetry — Galileo Docs](https://docs.galileo.ai/sdk-api/logging/distributed-tracing-otel) diff --git a/python/logging-samples/distributed-tracing-otel-python-java/docker-compose.yml b/python/logging-samples/distributed-tracing-otel-python-java/docker-compose.yml new file mode 100644 index 00000000..98011a4c --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/docker-compose.yml @@ -0,0 +1,60 @@ +services: + + chromadb: + image: chromadb/chroma:0.6.3 + ports: + - "8299:8000" + volumes: + - chroma-data:/chroma/chroma + environment: + - ANONYMIZED_TELEMETRY=false + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.120.0 + command: ["--config", "/etc/otel-collector/config.yaml"] + volumes: + - ./otel-collector/config.yaml:/etc/otel-collector/config.yaml:ro + environment: + - GALILEO_API_URL=${GALILEO_API_URL} + - GALILEO_API_KEY=${GALILEO_API_KEY} + - GALILEO_PROJECT=${GALILEO_PROJECT} + - GALILEO_LOG_STREAM=${GALILEO_LOG_STREAM} + ports: + - "4317:4317" # OTLP gRPC + - "4318:4318" # OTLP HTTP + + python-service: + build: ./python-service + ports: + - "8099:8099" + environment: + - OPENAI_API_KEY=${OPENAI_API_KEY} + - OTEL_SERVICE_NAME=python-rag-service + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 + - CHROMADB_HOST=chromadb + - CHROMADB_PORT=8000 + # Silences a harmless posthog telemetry error in chromadb-client. + - ANONYMIZED_TELEMETRY=False + depends_on: + - chromadb + - otel-collector + + java-service: + build: ./java-service + ports: + - "8089:8089" + environment: + - OPENAI_API_KEY=${OPENAI_API_KEY} + - OTEL_SERVICE_NAME=java-gateway + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 + - OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + - OTEL_TRACES_EXPORTER=otlp + - OTEL_METRICS_EXPORTER=none + - OTEL_LOGS_EXPORTER=none + - PYTHON_SERVICE_URL=http://python-service:8099 + depends_on: + - python-service + - otel-collector + +volumes: + chroma-data: diff --git a/python/logging-samples/distributed-tracing-otel-python-java/java-service/Dockerfile b/python/logging-samples/distributed-tracing-otel-python-java/java-service/Dockerfile new file mode 100644 index 00000000..479bd6ab --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/java-service/Dockerfile @@ -0,0 +1,20 @@ +FROM eclipse-temurin:21-jdk AS build +WORKDIR /app + +RUN apt-get update && apt-get install -y curl && \ + curl -fsSL https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.9/apache-maven-3.9.9-bin.tar.gz \ + | tar xz -C /opt && \ + ln -s /opt/apache-maven-3.9.9/bin/mvn /usr/local/bin/mvn + +COPY pom.xml . +RUN mvn dependency:resolve -q || true +COPY src ./src +RUN mvn clean package -DskipTests -q + +FROM eclipse-temurin:21-jre +WORKDIR /app +COPY --from=build /app/target/*.jar app.jar +ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v2.12.0/opentelemetry-javaagent.jar /app/opentelemetry-javaagent.jar + +EXPOSE 8089 +ENTRYPOINT ["java", "-javaagent:/app/opentelemetry-javaagent.jar", "-jar", "app.jar"] diff --git a/python/logging-samples/distributed-tracing-otel-python-java/java-service/pom.xml b/python/logging-samples/distributed-tracing-otel-python-java/java-service/pom.xml new file mode 100644 index 00000000..0f0ae2bc --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/java-service/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.4.1 + + + com.example + gateway-service + 0.0.1-SNAPSHOT + Gateway Service + + + 21 + 1.0.0 + + + + + + org.springframework.ai + spring-ai-bom + ${spring-ai.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.ai + spring-ai-starter-model-openai + ${spring-ai.version} + + + io.opentelemetry + opentelemetry-api + 1.46.0 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/java/com/example/gateway/AskController.java b/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/java/com/example/gateway/AskController.java new file mode 100644 index 00000000..a5a30e4d --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/java/com/example/gateway/AskController.java @@ -0,0 +1,123 @@ +package com.example.gateway; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.ai.chat.client.ChatClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.client.RestTemplate; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; + +@RestController +public class AskController { + + private static final Logger log = LoggerFactory.getLogger(AskController.class); + + private final ChatClient chatClient; + private final RestTemplate restTemplate; + private final String pythonServiceUrl; + + public AskController( + ChatClient.Builder chatClientBuilder, + RestTemplate restTemplate, + @Value("${python-service.url}") String pythonServiceUrl) { + this.chatClient = chatClientBuilder + .defaultSystem("You are a routing classifier for a financial services AI system. " + + "Classify questions into exactly one category: FINANCIAL, TECHNICAL, or GENERAL. " + + "Reply with ONLY the category name, nothing else.") + .build(); + this.restTemplate = restTemplate; + this.pythonServiceUrl = pythonServiceUrl; + } + + @PostMapping("/ask") + @SuppressWarnings("unchecked") + public ResponseEntity> ask(@RequestBody Map request) { + String question = request.get("question"); + log.info("Received question: {}", question); + + Tracer tracer = GlobalOpenTelemetry.getTracer("java-gateway"); + + // Wrap the full agent flow in a span with Galileo-recognized gen_ai attributes. + // The OTel Java agent auto-instruments Spring Boot/HTTP, but these attributes + // are needed for Galileo to render this as an "agent" span in the trace UI. + Span agentSpan = tracer.spanBuilder("invoke_agent financial-assistant") + .setAttribute("gen_ai.system", "spring-ai") + .setAttribute("gen_ai.operation.name", "invoke_agent") + .setAttribute("gen_ai.agent.name", "financial-assistant") + .setAttribute(AttributeKey.stringKey("gen_ai.input.messages"), + "[{\"role\": \"user\", \"content\": " + jsonEscape(question) + "}]") + .startSpan(); + + try (Scope ignored = agentSpan.makeCurrent()) { + // Step 1: Classify the question using Spring AI + OpenAI + String category = classifyQuestion(tracer, question); + log.info("Classified as: {}", category); + + // Step 2: Call Python RAG service — the OTel Java agent automatically + // injects the W3C traceparent header, propagating the trace context. + Map pythonRequest = new HashMap<>(); + pythonRequest.put("question", question); + pythonRequest.put("category", category); + + Map ragResponse = restTemplate.postForObject( + pythonServiceUrl + "/process", + pythonRequest, + Map.class); + + Map response = new HashMap<>(); + response.put("answer", ragResponse.get("answer")); + response.put("category", category); + response.put("sources", ragResponse.get("sources")); + + agentSpan.setAttribute(AttributeKey.stringKey("gen_ai.output.messages"), + "[{\"role\": \"assistant\", \"content\": " + jsonEscape((String) ragResponse.get("answer")) + "}]"); + + return ResponseEntity.ok(response); + } finally { + agentSpan.end(); + } + } + + private String classifyQuestion(Tracer tracer, String question) { + // Manual LLM span — Spring AI doesn't yet auto-emit gen_ai.operation.name=chat, + // so we create it ourselves for proper Galileo trace rendering. + Span llmSpan = tracer.spanBuilder("chat classify_question") + .setAttribute("gen_ai.system", "spring-ai") + .setAttribute("gen_ai.operation.name", "chat") + .setAttribute("gen_ai.request.model", "gpt-4o-mini") + .setAttribute(AttributeKey.stringKey("gen_ai.input.messages"), + "[{\"role\": \"user\", \"content\": " + jsonEscape(question) + "}]") + .startSpan(); + + try (Scope ignored = llmSpan.makeCurrent()) { + String category = chatClient.prompt() + .user(question) + .call() + .content() + .trim(); + + llmSpan.setAttribute(AttributeKey.stringKey("gen_ai.output.messages"), + "[{\"role\": \"assistant\", \"content\": " + jsonEscape(category) + "}]"); + return category; + } finally { + llmSpan.end(); + } + } + + private static String jsonEscape(String value) { + if (value == null) return "\"\""; + return "\"" + value.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n") + "\""; + } +} diff --git a/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/java/com/example/gateway/GatewayApplication.java b/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/java/com/example/gateway/GatewayApplication.java new file mode 100644 index 00000000..eb33dccd --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/java/com/example/gateway/GatewayApplication.java @@ -0,0 +1,19 @@ +package com.example.gateway; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.web.client.RestTemplate; + +@SpringBootApplication +public class GatewayApplication { + + public static void main(String[] args) { + SpringApplication.run(GatewayApplication.class, args); + } + + @Bean + public RestTemplate restTemplate() { + return new RestTemplate(); + } +} diff --git a/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/resources/application.yml b/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/resources/application.yml new file mode 100644 index 00000000..c7642402 --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/java-service/src/main/resources/application.yml @@ -0,0 +1,16 @@ +server: + port: 8089 + +spring: + application: + name: java-gateway + ai: + openai: + api-key: ${OPENAI_API_KEY} + chat: + options: + model: gpt-4o-mini + temperature: 0.0 + +python-service: + url: ${PYTHON_SERVICE_URL:http://localhost:8099} diff --git a/python/logging-samples/distributed-tracing-otel-python-java/otel-collector/config.yaml b/python/logging-samples/distributed-tracing-otel-python-java/otel-collector/config.yaml new file mode 100644 index 00000000..fb965107 --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/otel-collector/config.yaml @@ -0,0 +1,57 @@ +# OpenTelemetry Collector config used by the docker-compose demo. +# +# In production, point OTEL_EXPORTER_OTLP_ENDPOINT at your own collector +# instead — copy the `otlphttp/galileo` exporter block below into its config +# and add it to your traces pipeline. +# +# Pipeline: receive OTLP from both services, batch, forward to Galileo. +# Galileo credentials live here, not in the application services. + +receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + grpc: + endpoint: 0.0.0.0:4317 + +processors: + # Keep batches small and frequent so the Galileo UI fills in quickly during + # demos. Tune up for real workloads. + batch: + timeout: 1s + send_batch_size: 512 + +exporters: + otlphttp/galileo: + endpoint: ${env:GALILEO_API_URL}/otel + headers: + Galileo-API-Key: ${env:GALILEO_API_KEY} + project: ${env:GALILEO_PROJECT} + logstream: ${env:GALILEO_LOG_STREAM} + # Retry on transient failures so a blip to Galileo doesn't drop half a trace. + retry_on_failure: + enabled: true + initial_interval: 1s + max_interval: 10s + max_elapsed_time: 60s + sending_queue: + enabled: true + num_consumers: 4 + queue_size: 1000 + + # Prints every received span to the collector's stdout. Useful for demos and + # debugging — remove this exporter (and its reference in the pipeline below) + # for production. + debug: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlphttp/galileo, debug] + telemetry: + logs: + level: info diff --git a/python/logging-samples/distributed-tracing-otel-python-java/python-service/Dockerfile b/python/logging-samples/distributed-tracing-otel-python-java/python-service/Dockerfile new file mode 100644 index 00000000..bb37a51b --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/python-service/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . +RUN chmod +x entrypoint.sh + +EXPOSE 8099 +ENTRYPOINT ["./entrypoint.sh"] diff --git a/python/logging-samples/distributed-tracing-otel-python-java/python-service/app.py b/python/logging-samples/distributed-tracing-otel-python-java/python-service/app.py new file mode 100644 index 00000000..29ebd316 --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/python-service/app.py @@ -0,0 +1,223 @@ +import logging +import os +from typing import TypedDict + +import chromadb +import chromadb.utils.embedding_functions as ef +import openai +from fastapi import FastAPI +from galileo.otel import start_galileo_span +from galileo_core.schemas.logging.span import RetrieverSpan +from galileo_core.schemas.shared.document import Document +from langgraph.graph import END, START, StateGraph +from openinference.instrumentation.langchain import LangChainInstrumentor +from openinference.instrumentation.openai import OpenAIInstrumentor +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from pydantic import BaseModel + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# OTel setup — export to the local OpenTelemetry Collector, which forwards +# to Galileo. The collector owns Galileo credentials; this service does not. +# +# Defaults to http://localhost:4318 for running outside Docker. In +# docker-compose, OTEL_EXPORTER_OTLP_ENDPOINT is overridden to the collector +# service address. +# --------------------------------------------------------------------------- +_otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") +_otlp_exporter = OTLPSpanExporter(endpoint=f"{_otlp_endpoint.rstrip('/')}/v1/traces") + +provider = TracerProvider(resource=Resource.create()) +# BatchSpanProcessor batches spans and flushes them on a repeating timer. +# The default interval is 5000ms, which means Python spans can appear in +# Galileo up to 5 seconds after Java spans for the same request. Reducing +# this to 1000ms keeps the trace UI consistent without adding per-request latency. +provider.add_span_processor(BatchSpanProcessor(_otlp_exporter, schedule_delay_millis=1000)) +trace.set_tracer_provider(provider) + +# --------------------------------------------------------------------------- +# Instrumentation strategy +# --------------------------------------------------------------------------- +# We rely on auto-instrumentation wherever possible and only add manual +# Galileo spans when auto-instrumentation can't express what we need: +# +# Auto-instrumented (no code changes required): +# - FastAPI server spans -> opentelemetry-instrumentation-fastapi +# - LangGraph workflow + node spans -> openinference-instrumentation-langchain +# (LangChain instrumentor covers LangGraph; there is no separate +# LangGraph instrumentor package on PyPI.) +# - OpenAI chat / embeddings spans -> openinference-instrumentation-openai +# +# Manual Galileo spans (see retrieve_documents below): +# - RetrieverSpan around ChromaDB queries — needed for two reasons: +# 1. ChromaDB has no OTel/OpenInference instrumentation package, so +# without a manual span the retrieval step would be invisible. +# 2. RetrieverSpan is a Galileo-specific span type that renders +# retrieved documents as source context in the trace UI. A +# generic OTel span cannot produce this UI treatment. +# --------------------------------------------------------------------------- +LangChainInstrumentor().instrument(tracer_provider=provider) +OpenAIInstrumentor().instrument(tracer_provider=provider) + +# --------------------------------------------------------------------------- +# Clients +# --------------------------------------------------------------------------- +oai = openai.OpenAI() +chroma = chromadb.HttpClient( + host=os.getenv("CHROMADB_HOST", "localhost"), + port=int(os.getenv("CHROMADB_PORT", "8000")), +) +embedding_fn = ef.OpenAIEmbeddingFunction( + api_key=os.getenv("OPENAI_API_KEY"), + model_name="text-embedding-3-small", +) + +CHROMA_COLLECTION = "financial_docs" + + +# --------------------------------------------------------------------------- +# LangGraph state & nodes +# --------------------------------------------------------------------------- +class GraphState(TypedDict): + question: str + category: str + documents: list[str] + sources: list[str] + answer: str + + +def route_question(state: GraphState) -> GraphState: + """Entry node — logs the category and passes state through.""" + logger.info("Routing: category=%s", state["category"]) + return {**state, "documents": [], "sources": []} + + +def retrieve_documents(state: GraphState) -> GraphState: + """Query ChromaDB for relevant documents. + + This is the one place we create a manual Galileo span. See the + "Instrumentation strategy" comment above for the rationale. The + RetrieverSpan's `output` is populated with a list of Documents so the + Galileo UI can render each retrieved chunk as inline source context. + """ + query = state["question"] + retriever_span = RetrieverSpan(name="chromadb_search", input=query) + + with start_galileo_span(retriever_span) as span: + try: + collection = chroma.get_collection(CHROMA_COLLECTION, embedding_function=embedding_fn) + results = collection.query(query_texts=[query], n_results=3) + docs = results["documents"][0] if results["documents"] else [] + ids = results["ids"][0] if results["ids"] else [] + except Exception: + logger.exception("ChromaDB query failed, continuing with empty context") + docs, ids = [], [] + + retriever_span.output = [Document(content=d, metadata={"id": i}) for d, i in zip(docs, ids)] + span.set_attribute("retriever.result_count", len(docs)) + logger.info("Retrieved %d documents", len(docs)) + + return {**state, "documents": docs, "sources": ids} + + +def generate_answer(state: GraphState) -> GraphState: + """Call OpenAI with retrieved context to produce the final answer. + + OpenAIInstrumentor automatically creates an LLM span for this call, + capturing model, token usage, and input/output content. + """ + question = state["question"] + docs = state.get("documents", []) + + context_block = "\n\n---\n\n".join(docs) if docs else "No additional context available." + messages = [ + { + "role": "system", + "content": ( + "You are a knowledgeable financial services assistant. " + "Answer the user's question using ONLY the provided context. " + "If the context doesn't contain enough information, say so." + ), + }, + { + "role": "user", + "content": f"Context:\n{context_block}\n\nQuestion: {question}", + }, + ] + + response = oai.chat.completions.create( + model="gpt-4o-mini", + messages=messages, + temperature=0.2, + ) + # message.content is Optional[str] — guard against content filters or + # tool-call responses where the model returns no text. + answer = response.choices[0].message.content or "" + logger.info("Generated answer (%d chars)", len(answer)) + return {**state, "answer": answer} + + +# --------------------------------------------------------------------------- +# Build the LangGraph workflow +# --------------------------------------------------------------------------- +graph_builder = StateGraph(GraphState) +graph_builder.add_node("route", route_question) +graph_builder.add_node("retrieve", retrieve_documents) +graph_builder.add_node("generate", generate_answer) + +graph_builder.add_edge(START, "route") +graph_builder.add_edge("route", "retrieve") +graph_builder.add_edge("retrieve", "generate") +graph_builder.add_edge("generate", END) + +workflow = graph_builder.compile() + +# --------------------------------------------------------------------------- +# FastAPI app +# --------------------------------------------------------------------------- +app = FastAPI(title="Financial RAG Service") + +# FastAPIInstrumentor extracts the W3C traceparent from incoming HTTP requests +# and creates a server span that becomes the parent for all spans created +# during the request — this is what stitches Python's trace onto Java's. +# Call WITHOUT the tracer_provider kwarg so it uses the global provider set +# above; passing tracer_provider explicitly can silently skip middleware +# registration in recent opentelemetry-instrumentation-fastapi versions. +FastAPIInstrumentor.instrument_app(app) + + +class ProcessRequest(BaseModel): + question: str + category: str = "GENERAL" + + +@app.post("/process") +def process(req: ProcessRequest): + """Main RAG endpoint — called by the Java gateway. + + No manual workflow span is needed here: LangChainInstrumentor emits a + `LangGraph` span covering the full `workflow.invoke(...)` call, which + serves as the workflow container in the Galileo trace UI. + """ + result = workflow.invoke( + { + "question": req.question, + "category": req.category, + "documents": [], + "sources": [], + "answer": "", + } + ) + return {"answer": result["answer"], "sources": result.get("sources", [])} + + +@app.get("/health") +def health(): + return {"status": "ok"} diff --git a/python/logging-samples/distributed-tracing-otel-python-java/python-service/entrypoint.sh b/python/logging-samples/distributed-tracing-otel-python-java/python-service/entrypoint.sh new file mode 100644 index 00000000..cdf38c21 --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/python-service/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/sh +set -e + +echo "Seeding ChromaDB..." +python seed_chroma.py + +echo "Starting RAG service..." +exec uvicorn app:app --host 0.0.0.0 --port 8099 diff --git a/python/logging-samples/distributed-tracing-otel-python-java/python-service/requirements.txt b/python/logging-samples/distributed-tracing-otel-python-java/python-service/requirements.txt new file mode 100644 index 00000000..5101d7bd --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/python-service/requirements.txt @@ -0,0 +1,12 @@ +fastapi==0.115.6 +uvicorn==0.34.0 +openai==1.82.0 +langgraph==0.4.1 +chromadb-client==0.6.3 +galileo[otel] +opentelemetry-api +opentelemetry-sdk +opentelemetry-exporter-otlp-proto-http +opentelemetry-instrumentation-fastapi +openinference-instrumentation-langchain +openinference-instrumentation-openai diff --git a/python/logging-samples/distributed-tracing-otel-python-java/python-service/seed_chroma.py b/python/logging-samples/distributed-tracing-otel-python-java/python-service/seed_chroma.py new file mode 100644 index 00000000..c5813290 --- /dev/null +++ b/python/logging-samples/distributed-tracing-otel-python-java/python-service/seed_chroma.py @@ -0,0 +1,116 @@ +"""Seed ChromaDB with sample financial services documents.""" + +import os +import sys +import time + +import chromadb +import chromadb.utils.embedding_functions as ef + +COLLECTION = "financial_docs" + +DOCUMENTS = [ + { + "id": "wire-transfer-policy", + "text": ( + "Wire Transfer Policy: Account holders may initiate domestic wire transfers " + "up to $50,000 per business day through online banking or by visiting a branch. " + "International wires require additional verification and are limited to $25,000 " + "per transaction. All wire transfers are subject to fraud screening and may be " + "held for up to 24 hours for compliance review. Fees: $25 domestic, $45 international." + ), + }, + { + "id": "fraud-detection-alerts", + "text": ( + "Fraud Detection & Alerts: Our real-time fraud monitoring system analyzes " + "transactions using machine learning models that evaluate spending patterns, " + "geographic location, merchant category, and transaction velocity. Alerts are " + "triggered when anomaly scores exceed the configured threshold. Customers receive " + "SMS and push notifications within 30 seconds of a flagged transaction. False " + "positive rates are maintained below 2% through continuous model retraining." + ), + }, + { + "id": "account-types-overview", + "text": ( + "Account Types Overview: We offer Checking, Savings, Money Market, and Certificate " + "of Deposit (CD) accounts. Checking accounts include Standard (no minimum balance, " + "$12/month fee) and Preferred ($5,000 minimum, no fee, 0.01% APY). Savings accounts " + "earn 0.05% APY with a $300 minimum. Money Market accounts require $10,000 minimum " + "and earn tiered rates from 0.10% to 0.25% APY. CDs range from 3-month to 5-year terms." + ), + }, + { + "id": "mortgage-lending-guidelines", + "text": ( + "Mortgage Lending Guidelines: Conventional mortgage applications require a minimum " + "credit score of 620, debt-to-income ratio not exceeding 43%, and a down payment of " + "at least 3% for first-time buyers or 5% for subsequent purchases. Jumbo loans " + "(above $726,200) require 10% down and a 700+ credit score. Rate locks are available " + "for 30, 45, or 60 days. Pre-approval letters are valid for 90 days from issuance." + ), + }, + { + "id": "api-integration-guide", + "text": ( + "API Integration Guide: Our Open Banking API follows the FDX 6.0 standard and provides " + "RESTful endpoints for account information, transaction history, and payment initiation. " + "Authentication uses OAuth 2.0 with PKCE flow. Rate limits: 100 requests/minute for " + "read operations, 20 requests/minute for write operations. Webhooks are available for " + "real-time transaction notifications. SDKs are provided for Java, Python, and Node.js." + ), + }, + { + "id": "compliance-kyc-aml", + "text": ( + "KYC/AML Compliance: All new accounts require identity verification through our " + "tiered KYC process. Tier 1 (basic) accepts government-issued photo ID and SSN " + "verification. Tier 2 (enhanced) adds proof of address and source of funds " + "documentation for accounts exceeding $250,000. Suspicious Activity Reports (SARs) " + "are filed for transactions over $10,000 in cash or patterns suggesting structuring. " + "Customer risk scores are reassessed quarterly." + ), + }, +] + + +def seed(): + host = os.getenv("CHROMADB_HOST", "localhost") + port = int(os.getenv("CHROMADB_PORT", "8000")) + + # Wait for ChromaDB to be ready + client = None + for attempt in range(30): + try: + client = chromadb.HttpClient(host=host, port=port) + client.heartbeat() + break + except Exception: + print(f"Waiting for ChromaDB at {host}:{port}... (attempt {attempt + 1})") + time.sleep(2) + + if client is None: + print("ERROR: Could not connect to ChromaDB", file=sys.stderr) + sys.exit(1) + + # Delete existing collection if present, then create fresh + try: + client.delete_collection(COLLECTION) + except Exception: + pass + + embedding_fn = ef.OpenAIEmbeddingFunction( + api_key=os.getenv("OPENAI_API_KEY"), + model_name="text-embedding-3-small", + ) + collection = client.create_collection(name=COLLECTION, embedding_function=embedding_fn) + collection.add( + ids=[d["id"] for d in DOCUMENTS], + documents=[d["text"] for d in DOCUMENTS], + ) + print(f"Seeded {len(DOCUMENTS)} documents into '{COLLECTION}' collection.") + + +if __name__ == "__main__": + seed()