Skip to content

Commit 9c13d07

Browse files
committed
2 parents 4b86715 + 2d9e21c commit 9c13d07

File tree

3 files changed

+29
-21
lines changed

3 files changed

+29
-21
lines changed

src/workflows/recipe/__init__.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import functools
44
import logging
55
from collections.abc import Callable
6-
from opentelemetry import trace
76
from typing import Any
87

8+
from opentelemetry import trace
9+
910
from workflows.recipe.recipe import Recipe
1011
from workflows.recipe.validate import validate_recipe
1112
from workflows.recipe.wrapper import RecipeWrapper
@@ -80,16 +81,18 @@ def unwrap_recipe(header, message):
8081
environment = message.get("environment", {})
8182
if isinstance(environment, dict):
8283
recipe_id = environment.get("ID")
83-
84+
8485
if recipe_id:
8586
span.set_attribute("recipe_id", recipe_id)
86-
span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id})
87+
span.add_event(
88+
"recipe.id_extracted", attributes={"recipe_id": recipe_id}
89+
)
8790

8891
# Extract span_id and trace_id for logging
8992
span_context = span.get_span_context()
9093
if span_context and span_context.is_valid:
91-
span_id = format(span_context.span_id, '016x')
92-
trace_id = format(span_context.trace_id, '032x')
94+
span_id = format(span_context.span_id, "016x")
95+
trace_id = format(span_context.trace_id, "032x")
9396

9497
log_extra = {
9598
"span_id": span_id,

src/workflows/services/common_service.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@
99
import time
1010
from typing import Any
1111

12-
import workflows
13-
import workflows.logging
14-
1512
from opentelemetry import trace
13+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
14+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
1615
from opentelemetry.sdk.trace import TracerProvider
1716
from opentelemetry.sdk.trace.export import BatchSpanProcessor
18-
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
17+
18+
import workflows
19+
import workflows.logging
1920
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
20-
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
2121

2222

2323
class Status(enum.Enum):
@@ -216,7 +216,9 @@ def start_transport(self):
216216

217217
# Add OTELTracingMiddleware to the transport layer
218218
tracer = trace.get_tracer(__name__)
219-
otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name)
219+
otel_middleware = OTELTracingMiddleware(
220+
tracer, service_name=self._service_name
221+
)
220222
self._transport.add_middleware(otel_middleware)
221223

222224

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
from __future__ import annotations
2+
3+
import functools
4+
from collections.abc import Callable
5+
16
from opentelemetry import trace
7+
from opentelemetry.propagate import extract
8+
29
from workflows.transport.middleware import BaseTransportMiddleware
3-
from collections.abc import Callable
4-
import functools
5-
from opentelemetry.propagate import inject, extract
10+
611

712
class OTELTracingMiddleware(BaseTransportMiddleware):
813
def __init__(self, tracer: trace.Tracer, service_name: str):
@@ -14,18 +19,16 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
1419
def wrapped_callback(header, message):
1520
# Extract trace context from message headers
1621
ctx = extract(header) if header else None
17-
22+
1823
# Start a new span with the extracted context
1924
with self.tracer.start_as_current_span(
20-
"transport.subscribe",
21-
context=ctx
25+
"transport.subscribe", context=ctx
2226
) as span:
2327
span.set_attribute("service_name", self.service_name)
2428
span.set_attribute("channel", channel)
25-
26-
29+
2730
# Call the original callback
2831
return callback(header, message)
29-
32+
3033
# Call the next middleware with the wrapped callback
31-
return call_next(channel, wrapped_callback, **kwargs)
34+
return call_next(channel, wrapped_callback, **kwargs)

0 commit comments

Comments
 (0)