Skip to content

Commit f153fb0

Browse files
fix: propagate OpenTelemetry trace context to subagent calls (#704)
## Problem When using a subagent as a tool, the subagent's OpenTelemetry spans have a different traceId than the main agent's spans, breaking distributed trace continuity. ## Root Cause - `AgentBase.createEventStream()` synchronously calls `callSupplier.get()` outside of Reactor subscription context - `SubAgentTool` methods don't propagate Reactor Context containing trace information to downstream agent invocations ## Solution Use `Mono.defer()` and `.contextWrite()` to propagate Reactor Context containing trace information to agent calls. ## Changes - **AgentBase.createEventStream()**: Use `Mono.defer()` to delay execution until subscription when trace context is available - **SubAgentTool.executeWithStreaming()**: Add `.contextWrite(context -> context.putAll(ctxView))` to propagate trace context - **SubAgentTool.executeWithoutStreaming()**: Add `.contextWrite(context -> context.putAll(ctxView))` to propagate trace context ## Impact This ensures main agent and subagent share the same traceId in distributed tracing systems like Langfuse and Jaeger. ## Testing - Verified with LangfuseExample that main agent and subagent now share the same traceId - All spans show proper parent-child relationships in the trace visualization --- **Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>** --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 467df7b commit f153fb0

File tree

2 files changed

+77
-59
lines changed

2 files changed

+77
-59
lines changed

agentscope-core/src/main/java/io/agentscope/core/agent/AgentBase.java

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -664,40 +664,46 @@ public final Flux<Event> stream(
664664
* @return Flux of events emitted during execution
665665
*/
666666
private Flux<Event> createEventStream(StreamOptions options, Supplier<Mono<Msg>> callSupplier) {
667-
return Flux.<Event>create(
668-
sink -> {
669-
// Create streaming hook with options
670-
StreamingHook streamingHook = new StreamingHook(sink, options);
671-
672-
// Add temporary hook
673-
hooks.add(streamingHook);
674-
675-
// Execute call and manage hook lifecycle
676-
callSupplier
677-
.get()
678-
.doFinally(
679-
signalType -> {
680-
// Remove temporary hook
681-
hooks.remove(streamingHook);
682-
})
683-
.subscribe(
684-
finalMsg -> {
685-
if (options.shouldStream(EventType.AGENT_RESULT)) {
686-
Event finalEvent =
687-
new Event(
688-
EventType.AGENT_RESULT,
689-
finalMsg,
690-
true);
691-
sink.next(finalEvent);
692-
}
693-
694-
// Complete the stream
695-
sink.complete();
696-
},
697-
sink::error);
698-
},
699-
FluxSink.OverflowStrategy.BUFFER)
700-
.publishOn(Schedulers.boundedElastic());
667+
return Flux.deferContextual(
668+
ctxView ->
669+
Flux.<Event>create(
670+
sink -> {
671+
// Create streaming hook with options
672+
StreamingHook streamingHook =
673+
new StreamingHook(sink, options);
674+
675+
// Add temporary hook
676+
hooks.add(streamingHook);
677+
678+
// Use Mono.defer to ensure trace context propagation
679+
// while maintaining streaming hook functionality
680+
Mono.defer(() -> callSupplier.get())
681+
.contextWrite(
682+
context -> context.putAll(ctxView))
683+
.doFinally(
684+
signalType -> {
685+
// Remove temporary hook
686+
hooks.remove(streamingHook);
687+
})
688+
.subscribe(
689+
finalMsg -> {
690+
if (options.shouldStream(
691+
EventType.AGENT_RESULT)) {
692+
sink.next(
693+
new Event(
694+
EventType
695+
.AGENT_RESULT,
696+
finalMsg,
697+
true));
698+
}
699+
700+
// Complete the stream
701+
sink.complete();
702+
},
703+
sink::error);
704+
},
705+
FluxSink.OverflowStrategy.BUFFER)
706+
.publishOn(Schedulers.boundedElastic()));
701707
}
702708

703709
@Override

agentscope-core/src/main/java/io/agentscope/core/tool/subagent/SubAgentTool.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ public Mono<ToolResultBlock> callAsync(ToolCallParam param) {
122122
* @return A Mono emitting the tool result block
123123
*/
124124
private Mono<ToolResultBlock> executeConversation(ToolCallParam param) {
125-
return Mono.defer(
126-
() -> {
125+
return Mono.deferContextual(
126+
(ctxView) -> {
127127
try {
128128
Map<String, Object> input = param.getInput();
129129

@@ -246,21 +246,28 @@ private Mono<ToolResultBlock> executeWithStreaming(
246246
? config.getStreamOptions()
247247
: StreamOptions.defaults();
248248

249-
return agent.stream(List.of(userMsg), streamOptions)
250-
.doOnNext(event -> forwardEvent(event, emitter, agent, sessionId))
251-
.filter(Event::isLast)
252-
.last()
253-
.map(
254-
lastEvent -> {
255-
Msg response = lastEvent.getMessage();
256-
return buildResult(response, sessionId);
257-
})
258-
.onErrorResume(
259-
e -> {
260-
logger.error("Error in streaming execution: {}", e.getMessage(), e);
261-
return Mono.just(
262-
ToolResultBlock.error("Execution error: " + e.getMessage()));
263-
});
249+
return Mono.deferContextual(
250+
ctxView ->
251+
agent.stream(List.of(userMsg), streamOptions)
252+
.doOnNext(event -> forwardEvent(event, emitter, agent, sessionId))
253+
.filter(Event::isLast)
254+
.last()
255+
.map(
256+
lastEvent -> {
257+
Msg response = lastEvent.getMessage();
258+
return buildResult(response, sessionId);
259+
})
260+
.contextWrite(context -> context.putAll(ctxView))
261+
.onErrorResume(
262+
e -> {
263+
logger.error(
264+
"Error in streaming execution:" + " {}",
265+
e.getMessage(),
266+
e);
267+
return Mono.just(
268+
ToolResultBlock.error(
269+
"Execution error: " + e.getMessage()));
270+
}));
264271
}
265272

266273
/**
@@ -276,14 +283,19 @@ private Mono<ToolResultBlock> executeWithStreaming(
276283
private Mono<ToolResultBlock> executeWithoutStreaming(
277284
Agent agent, Msg userMsg, String sessionId) {
278285

279-
return agent.call(List.of(userMsg))
280-
.map(response -> buildResult(response, sessionId))
281-
.onErrorResume(
282-
e -> {
283-
logger.error("Error in execution: {}", e.getMessage(), e);
284-
return Mono.just(
285-
ToolResultBlock.error("Execution error: " + e.getMessage()));
286-
});
286+
return Mono.deferContextual(
287+
ctxView ->
288+
agent.call(List.of(userMsg))
289+
.map(response -> buildResult(response, sessionId))
290+
.onErrorResume(
291+
e -> {
292+
logger.error(
293+
"Error in execution: {}", e.getMessage(), e);
294+
return Mono.just(
295+
ToolResultBlock.error(
296+
"Execution error: " + e.getMessage()));
297+
})
298+
.contextWrite(context -> context.putAll(ctxView)));
287299
}
288300

289301
/**

0 commit comments

Comments
 (0)