Skip to content

Commit ed112fb

Browse files
committed
KG-178. Update Open Telemetry feature
1 parent 7f5f406 commit ed112fb

38 files changed

+2443
-1602
lines changed

agents/agents-core/src/commonMain/kotlin/ai/koog/agents/core/agent/StatefulSingleUseAIAgent.kt

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,30 +85,32 @@ public abstract class StatefulSingleUseAIAgent<Input, Output, TContext : AIAgent
8585
}
8686

8787
val runId = Uuid.random().toString()
88-
val eventId = Uuid.random().toString()
89-
val context = prepareContext(agentInput, runId, eventId)
9088

91-
return context.withPreparedPipeline {
89+
// Unique identifier for a group of agent-run events
90+
val agentRunEventId = Uuid.random().toString()
91+
val context = prepareContext(agentInput, runId, agentRunEventId)
92+
93+
return withPreparedPipeline(context, agentRunEventId) {
9294
agentStateMutex.withLock {
9395
@OptIn(InternalAgentsApi::class)
9496
state = State.Running(context.parentContext ?: context)
9597
}
9698

9799
logger.debug { formatLog(id, runId, "Starting agent execution") }
98-
pipeline.onAgentStarting<Input, Output>(eventId, context.executionInfo, runId, this@StatefulSingleUseAIAgent, context)
100+
pipeline.onAgentStarting<Input, Output>(agentRunEventId, context.executionInfo, runId, this@StatefulSingleUseAIAgent, context)
99101

100102
val result = try {
101103
@Suppress("UNCHECKED_CAST")
102104
strategy.execute(context = context, input = agentInput)
103105
} catch (e: Throwable) {
104106
logger.error(e) { "Execution exception reported by server!" }
105-
pipeline.onAgentExecutionFailed(eventId, context.executionInfo, id, runId, e)
107+
pipeline.onAgentExecutionFailed(agentRunEventId, context.executionInfo, id, runId, e)
106108
agentStateMutex.withLock { state = State.Failed(e) }
107109
throw e
108110
}
109111

110112
logger.debug { formatLog(id, runId, "Finished agent execution") }
111-
pipeline.onAgentCompleted(eventId, context.executionInfo, id, runId, result)
113+
pipeline.onAgentCompleted(agentRunEventId, context.executionInfo, id, runId, result)
112114

113115
agentStateMutex.withLock {
114116
state = if (result != null) {
@@ -170,33 +172,34 @@ public abstract class StatefulSingleUseAIAgent<Input, Output, TContext : AIAgent
170172
protected fun formatLog(agentId: String, runId: String, message: String): String =
171173
"[agent id: $agentId, run id: $runId] $message"
172174

175+
//region Private Methods
176+
173177
/**
174178
* Executes the provided block within a prepared pipeline context.
175179
* Ensures that the necessary feature resources are initialized before the block is invoked
176180
* and properly cleaned up after finishing the block.
177181
*
178182
* @return The result of the block's execution.
179183
*/
180-
private suspend fun <T> AIAgentContext.withPreparedPipeline(block: suspend (eventId: String) -> T): T {
181-
require(executionInfo.parent == null) {
184+
private suspend fun <T> withPreparedPipeline(context: AIAgentContext, eventId: String, block: suspend () -> T): T {
185+
require(context.executionInfo.parent == null) {
182186
"withPreparedPipeline() should be called from a top level agent context."
183187
}
184188

185-
// Unique id for a group of agent-related events
186-
val eventId = Uuid.random().toString()
187-
188189
return try {
189190
pipeline.prepareFeatures()
190-
block.invoke(eventId)
191+
block.invoke()
191192
} finally {
192193
pipeline.onAgentClosing(
193194
eventId = eventId,
194-
executionInfo = executionInfo.parent ?: executionInfo,
195+
executionInfo = context.executionInfo.parent ?: context.executionInfo,
195196
agentId = id
196197
)
197198
pipeline.closeAllFeaturesMessageProcessors()
198199
}
199200
}
201+
202+
//endregion Private Methods
200203
}
201204

202205
/**

agents/agents-core/src/commonTest/kotlin/ai/koog/agents/core/feature/AIAgentPipelineTest.kt

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,75 @@ class AIAgentPipelineTest {
777777
assertContentEquals(expectedEvents, actualEvents)
778778
}
779779

780+
@Test
781+
fun `test AgentExecutionInfo path with loop nodes invocation`() = runTest {
782+
val interceptedEvents = mutableListOf<String>()
783+
val interceptedRunIds = mutableListOf<String>()
784+
785+
val agentId = "test-agent-id"
786+
val agentInput = "test input"
787+
788+
val strategyName = "test-strategy"
789+
val nodeRootName = "node-root"
790+
val nodeRootOutput = "Root node output"
791+
val nodeExecuteName = "node-execute"
792+
val nodeExecuteOutput = "Execute output"
793+
794+
var isExecuted = false
795+
796+
val strategy = strategy(strategyName) {
797+
val nodeRoot by node<String, String>(nodeRootName) { it }
798+
val nodeExecute by node<String, String>(nodeExecuteName) {
799+
isExecuted = true
800+
it
801+
}
802+
803+
edge(nodeStart forwardTo nodeRoot)
804+
edge(nodeRoot forwardTo nodeExecute onCondition { !isExecuted })
805+
edge(nodeRoot forwardTo nodeFinish onCondition { isExecuted } transformed { nodeRootOutput })
806+
edge(nodeExecute forwardTo nodeRoot transformed { nodeExecuteOutput })
807+
}
808+
809+
createAgent(
810+
strategy = strategy,
811+
promptExecutor = getMockExecutor { }
812+
) {
813+
install(TestFeature) {
814+
events = interceptedEvents
815+
runIds = interceptedRunIds
816+
}
817+
}.use { agent ->
818+
agent.run(agentInput)
819+
}
820+
821+
val actualEvents = interceptedEvents.filter { collectedEvent ->
822+
collectedEvent.startsWith(NodeExecutionStarting::class.simpleName.toString()) ||
823+
collectedEvent.startsWith(NodeExecutionCompleted::class.simpleName.toString()) ||
824+
collectedEvent.startsWith(NodeExecutionFailed::class.simpleName.toString())
825+
}
826+
827+
val expectedEvents = listOf(
828+
"${NodeExecutionStarting::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, START_NODE_PREFIX)}, name: $START_NODE_PREFIX, input: $agentInput)",
829+
"${NodeExecutionCompleted::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, START_NODE_PREFIX)}, name: $START_NODE_PREFIX, input: $agentInput, output: $agentInput)",
830+
"${NodeExecutionStarting::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, nodeRootName)}, name: $nodeRootName, input: $agentInput)",
831+
"${NodeExecutionCompleted::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, nodeRootName)}, name: $nodeRootName, input: $agentInput, output: $agentInput)",
832+
"${NodeExecutionStarting::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, nodeExecuteName)}, name: $nodeExecuteName, input: $agentInput)",
833+
"${NodeExecutionCompleted::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, nodeExecuteName)}, name: $nodeExecuteName, input: $agentInput, output: $agentInput)",
834+
"${NodeExecutionStarting::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, nodeRootName)}, name: $nodeRootName, input: $nodeExecuteOutput)",
835+
"${NodeExecutionCompleted::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, nodeRootName)}, name: $nodeRootName, input: $nodeExecuteOutput, output: $nodeExecuteOutput)",
836+
"${NodeExecutionStarting::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, FINISH_NODE_PREFIX)}, name: $FINISH_NODE_PREFIX, input: $nodeRootOutput)",
837+
"${NodeExecutionCompleted::class.simpleName} (path: ${agentExecutionPath(agentId, strategyName, FINISH_NODE_PREFIX)}, name: $FINISH_NODE_PREFIX, input: $nodeRootOutput, output: $nodeRootOutput)",
838+
)
839+
840+
assertEquals(
841+
expectedEvents.size,
842+
actualEvents.size,
843+
"Miss intercepted node events. Expected ${expectedEvents.size}, but received: ${actualEvents.size}"
844+
)
845+
846+
assertContentEquals(expectedEvents, actualEvents)
847+
}
848+
780849
//endregion Execution Info
781850

782851
//region Private Methods

agents/agents-features/agents-features-opentelemetry/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ The OpenTelemetry feature automatically adds various attributes to spans followi
158158
Some custom attributes specific to Koog are also added:
159159

160160
- `koog.agent.strategy.name`: the name of the agent strategy
161-
- `koog.node.name`: the name of the node being executed
161+
- `koog.node.id`: the name of the node being executed
162162

163163
### Span Types
164164

@@ -193,7 +193,7 @@ The OpenTelemetry feature creates different types of spans for various operation
193193
- Useful for: Understanding strategy flow and attributing LLM or tool work to a specific node.
194194
- Key attributes:
195195
- `gen_ai.conversation.id`
196-
- `koog.node.name`
196+
- `koog.node.id`
197197

198198
4. **Inference Span**
199199
- Purpose: A single LLM call (prompt execution).

0 commit comments

Comments
 (0)