-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathevents.go
More file actions
1008 lines (931 loc) · 40.1 KB
/
events.go
File metadata and controls
1008 lines (931 loc) · 40.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package hooks
import (
"context"
"errors"
"time"
"goa.design/goa-ai/runtime/agent"
"goa.design/goa-ai/runtime/agent/model"
"goa.design/goa-ai/runtime/agent/planner"
"goa.design/goa-ai/runtime/agent/policy"
"goa.design/goa-ai/runtime/agent/prompt"
"goa.design/goa-ai/runtime/agent/rawjson"
"goa.design/goa-ai/runtime/agent/run"
"goa.design/goa-ai/runtime/agent/telemetry"
"goa.design/goa-ai/runtime/agent/toolerrors"
"goa.design/goa-ai/runtime/agent/tools"
"go.temporal.io/sdk/temporal"
)
type (
// Event is the interface all hook events must implement. The runtime publishes
// events through the Bus, and subscribers receive them via HandleEvent.
// Concrete event types carry typed payloads for each lifecycle phase.
//
// Subscribers use type switches to access event-specific fields:
//
// func (s *MySubscriber) HandleEvent(ctx context.Context, evt Event) error {
// switch e := evt.(type) {
// case *WorkflowStartedEvent:
// log.Printf("Context: %+v", e.RunContext)
// case *ToolResultReceivedEvent:
// log.Printf("Tool %s took %v", e.ToolName, e.Duration)
// }
// return nil
// }
Event interface {
// Type returns the specific event type constant (e.g., RunStarted, ToolCallScheduled).
// Subscribers use this to filter events or route to specific handlers without
// type assertions.
Type() EventType
// RunID returns the unique identifier for the workflow run that produced this event.
// All events within a single run execution share the same run ID. This allows
// correlation across distributed systems and enables filtering events by run.
RunID() string
// SessionID returns the logical session identifier associated with the run.
// All events for a given run share the same session ID, providing a stable
// join key across processes and transports.
SessionID() string
// AgentID returns the agent identifier that triggered this event. Subscribers can
// use this to filter events by agent when multiple agents run in the same system.
AgentID() string
// Timestamp returns the Unix timestamp in milliseconds when the event occurred.
// Events are timestamped at creation, not at delivery, so subscribers can calculate
// durations and latencies between related events.
Timestamp() int64
// TurnID returns the conversational turn identifier if turn tracking is active,
// empty string otherwise. A turn groups events for a single user interaction cycle
// (e.g., from user message through final assistant response). UI systems use this
// to render threaded conversations.
TurnID() string
}
// RunStartedEvent fires when a run begins execution.
RunStartedEvent struct {
baseEvent
// RunContext carries the execution metadata (run ID, attempt, labels, caps)
// for this run invocation.
RunContext run.Context
// Input is the initial payload passed to the run, typically containing
// messages and caller-provided metadata.
Input any
}
// RunCompletedEvent fires after a run finishes, whether
// successfully or with a failure.
RunCompletedEvent struct {
baseEvent
// Status indicates the final outcome: "success", "failed", or "canceled".
Status string
// PublicError is a user-safe, deterministic summary of the terminal failure.
// It is empty on success and cancellations. On failures, it is populated
// and is intended to be rendered directly in UIs without additional parsing.
PublicError string
// Error contains any terminal error that halted the run. Nil on success.
Error error
// ErrorProvider identifies the model provider when the terminal error was
// caused by a provider failure (for example, "bedrock").
ErrorProvider string
// ErrorOperation identifies the provider operation when available.
ErrorOperation string
// ErrorKind classifies provider failures into a small set of stable categories
// suitable for retry and UX decisions (for example, "auth" or "invalid_request").
ErrorKind string
// ErrorCode is the provider-specific error code when available.
ErrorCode string
// HTTPStatus is the provider HTTP status code when available.
HTTPStatus int
// Retryable reports whether retrying may succeed without changing the request.
Retryable bool
// Phase captures the terminal phase for the run. For successful runs this
// is typically PhaseCompleted; failures map to PhaseFailed; cancellations
// map to PhaseCanceled.
Phase run.Phase
}
// RunPausedEvent fires when a run is intentionally paused.
RunPausedEvent struct {
baseEvent
// Reason provides a human-readable explanation for why the run was paused.
// Examples: "user_requested", "approval_required", "manual_review_needed".
// Subscribers can use this to categorize pause events and display appropriate
// messages to end users.
Reason string
// RequestedBy identifies the actor who initiated the pause (e.g., user ID,
// service name, or "policy_engine"). This enables audit logging and attribution
// for governance workflows.
RequestedBy string
// Labels carries optional key-value metadata for categorizing the pause event.
// These labels are propagated from the pause request and can be used for filtering,
// reporting, or triggering downstream workflows. Nil if no labels were provided.
Labels map[string]string
// Metadata holds arbitrary structured data attached to the pause request for audit
// trails or workflow-specific logic (e.g., approval ticket IDs, escalation reasons).
// The runtime persists this alongside the run status. Nil if no metadata was provided.
Metadata map[string]any
}
// RunResumedEvent fires when a paused run resumes.
RunResumedEvent struct {
baseEvent
// Notes carries optional human-readable context provided when resuming the run.
// This might include instructions for the planner ("focus on X"), approval
// summaries, or other guidance. Empty if no notes were provided with the resume request.
Notes string
// RequestedBy identifies the actor who initiated the resume (e.g., user ID,
// service name, or "approval_system"). This enables audit logging and attribution
// for governance workflows.
RequestedBy string
// Labels carries optional key-value metadata for categorizing the resume event.
// These labels are propagated from the resume request and can be used for filtering,
// reporting, or triggering downstream workflows. Nil if no labels were provided.
Labels map[string]string
// MessageCount indicates how many new conversational messages were injected when
// resuming the run. When greater than zero, these messages are appended to the
// planner's context before execution continues. Subscribers can use this to track
// human-in-the-loop interventions.
MessageCount int
}
// ChildRunLinkedEvent links a parent run/tool call to a spawned child agent run.
// It is emitted on the parent run and allows consumers to correlate child-run
// events without flattening them into the parent.
ChildRunLinkedEvent struct {
baseEvent
// ToolName is the canonical tool identifier for the parent tool.
ToolName tools.Ident
// ToolCallID is the parent tool call identifier.
ToolCallID string
// ChildRunID is the run identifier of the nested agent execution.
ChildRunID string
// ChildAgentID is the identifier of the nested agent.
ChildAgentID agent.Ident
}
// RunPhaseChangedEvent fires when a run transitions between lifecycle phases
// (prompted, planning, executing_tools, synthesizing, completed, failed,
// canceled). This is a higher-fidelity signal than Status and is primarily
// intended for streaming/UX consumers.
RunPhaseChangedEvent struct {
baseEvent
// Phase is the new lifecycle phase for the run.
Phase run.Phase
}
// PromptRenderedEvent fires when the runtime resolves and renders a prompt.
PromptRenderedEvent struct {
baseEvent
// PromptID identifies the rendered prompt specification.
PromptID prompt.Ident
// Version is the resolved prompt version used for rendering.
Version string
// Scope is the resolved override scope used during prompt resolution.
Scope prompt.Scope
}
// ToolCallScheduledEvent fires when the runtime schedules a tool activity
// for execution.
ToolCallScheduledEvent struct {
baseEvent
// ToolCallID uniquely identifies the scheduled tool invocation so progress
// updates can correlate with the original request.
ToolCallID string
// ToolName is the globally unique tool identifier (simple DSL name).
ToolName tools.Ident
// Payload contains the canonical JSON tool arguments for the scheduled tool.
// It is a json.RawMessage representing the tool payload object as seen by the
// runtime and codecs.
Payload rawjson.RawJSON
// Queue is the activity queue name where the tool execution is scheduled.
Queue string
// ParentToolCallID optionally identifies the tool call that requested this tool.
// Empty for top-level planner-requested tools. Used to track parent-child chains.
ParentToolCallID string
// ExpectedChildrenTotal indicates how many child tools are expected from this batch.
// A value of 0 means no children expected or count not tracked by the planner.
ExpectedChildrenTotal int
// DisplayHint is a human-facing summary of the in-flight tool work derived
// from the tool's call hint template. It is computed once by the runtime
// so downstream subscribers (streaming, session persistence, memory) can
// surface consistent labels without re-rendering templates.
DisplayHint string
}
// ToolResultReceivedEvent fires when a tool activity completes and returns
// a result or error.
ToolResultReceivedEvent struct {
baseEvent
// ToolCallID uniquely identifies the tool invocation that produced this result.
ToolCallID string
// ParentToolCallID identifies the parent tool call if this tool was invoked by another tool.
// Empty when the tool call was scheduled directly by the planner.
ParentToolCallID string
// ToolName is the globally unique tool identifier that was executed.
ToolName tools.Ident
// Result contains the tool's output payload. Nil if Error is set.
Result any
// ResultJSON contains the canonical JSON encoding of Result as produced
// by the tool's generated result codec.
//
// This is used by stream sinks and persistence layers that must serialize
// tool results without relying on `encoding/json` and Go field names.
ResultJSON rawjson.RawJSON
// ServerData carries server-only data emitted by tool providers. This payload
// must not be serialized into model provider requests and is treated as opaque
// JSON bytes by the runtime.
ServerData rawjson.RawJSON
// ResultPreview is a concise, user-facing summary of the tool result rendered
// from the registered ResultHintTemplate for this tool. It is computed by
// the runtime while the result is still strongly typed so downstream
// subscribers do not need to re-render templates from JSON-decoded maps.
ResultPreview string
// Bounds, when non-nil, describes how the tool result has been bounded
// relative to the full underlying data set. It is supplied by tool
// implementations and surfaced for observability; the runtime does not
// modify it.
Bounds *agent.Bounds
// Duration is the wall-clock execution time for the tool activity.
Duration time.Duration
// Telemetry holds structured observability metadata (tokens, model, retries).
// Nil if no telemetry was collected.
Telemetry *telemetry.ToolTelemetry
// RetryHint carries structured guidance for recovering from tool failures.
// It is typically populated for validation/repair flows (missing fields,
// invalid arguments) and surfaced to clients so they can prompt the user
// and retry deterministically.
RetryHint *planner.RetryHint
// Error contains any error returned by the tool execution. Nil on success.
Error *toolerrors.ToolError
}
// ToolCallUpdatedEvent fires when a tool call's metadata is updated after
// initial scheduling. This typically occurs when a parent tool (agent-as-tool)
// dynamically discovers additional child tools across multiple planning iterations.
// UIs use this to update progress displays ("3 of 5 children complete").
ToolCallUpdatedEvent struct {
baseEvent
// ToolCallID identifies the tool call being updated (usually a parent call).
ToolCallID string
// ExpectedChildrenTotal is the new count of expected child tools. This value
// grows as child tools are discovered dynamically during execution.
ExpectedChildrenTotal int
}
// ToolCallArgsDeltaEvent fires when a provider streams an incremental tool-call
// argument fragment while constructing the final tool input JSON.
//
// Contract:
// - This event is best-effort and may be ignored or dropped entirely.
// - Delta is not guaranteed to be valid JSON on its own.
// - The canonical tool payload is still emitted via ToolCallScheduledEvent
// and ToolResultReceivedEvent (and, at the model boundary, the finalized
// tool call chunk).
ToolCallArgsDeltaEvent struct {
baseEvent
// ToolCallID is the provider-issued identifier for the tool call.
ToolCallID string
// ToolName is the canonical tool identifier when known.
ToolName tools.Ident
// Delta is a raw JSON fragment emitted while streaming tool input JSON.
Delta string
}
// PlannerNoteEvent fires when the planner emits an annotation or
// intermediate thought during execution.
PlannerNoteEvent struct {
baseEvent
// Note is the text content of the planner's annotation.
Note string
// Labels provide optional categorization metadata (e.g., "type": "reasoning").
Labels map[string]string
}
// ThinkingBlockEvent fires when the planner emits a structured reasoning block
// (either signed plaintext or redacted bytes). This preserves provider-accurate
// thinking suitable for exact replay and auditing.
ThinkingBlockEvent struct {
baseEvent
// Text is the plaintext reasoning content when provided by the model.
Text string
// Signature is the provider signature for plaintext reasoning (when required).
Signature string
// Redacted contains provider-issued redacted reasoning bytes (mutually exclusive with Text).
Redacted []byte
// ContentIndex is the provider content block index.
ContentIndex int
// Final indicates that the reasoning block was finalized by the provider.
Final bool
}
// AssistantMessageEvent fires when a final assistant response is produced,
// indicating the workflow is completing with a user-facing message.
AssistantMessageEvent struct {
baseEvent
// Message is the textual content of the assistant's response.
Message string
// Structured contains optional typed output (e.g., Pydantic-style structured data).
// Nil if only a text message is provided.
Structured any
}
// RetryHintIssuedEvent fires when the planner or runtime suggests a retry
// policy change, such as disabling a failing tool or adjusting caps.
RetryHintIssuedEvent struct {
baseEvent
// Reason summarizes why the retry hint was issued (e.g., "invalid_arguments").
Reason string
// ToolName identifies the tool involved in the failure, if applicable.
ToolName tools.Ident
// Message provides human-readable guidance for the retry adjustment.
Message string
}
// MemoryAppendedEvent fires when new memory entries are successfully
// persisted to the memory store.
MemoryAppendedEvent struct {
baseEvent
// EventCount indicates how many memory events were written in this operation.
EventCount int
}
// PolicyDecisionEvent captures the outcome of a policy evaluation so downstream
// systems can audit allowlists, cap adjustments, and metadata applied for a turn.
PolicyDecisionEvent struct {
baseEvent
// AllowedTools lists the globally unique tool identifiers that the policy engine
// permitted for this turn. The runtime enforces this allowlist: planners can only
// invoke tools in this list. An empty slice means no tools are allowed for this turn,
// forcing the planner to produce a final response. Subscribers use this for security
// auditing and debugging tool restrictions.
AllowedTools []tools.Ident
// Caps reflects the updated execution budgets after policy evaluation for this turn.
// This includes remaining tool call limits, consecutive failure thresholds, and time
// budgets. Policies may adjust these dynamically based on observed behavior (e.g.,
// reducing limits after repeated failures). Subscribers can monitor cap consumption
// to predict run termination or trigger alerts.
Caps policy.CapsState
// Labels carries policy-applied metadata merged into the run context and propagated
// to subsequent turns. Examples: {"circuit_breaker": "active", "policy_version": "v2"}.
// These labels appear in downstream telemetry, memory records, and hooks. Subscribers
// can use them to correlate policy decisions with run outcomes. Nil if the policy did
// not add labels.
Labels map[string]string
// Metadata holds policy-specific structured data for audit trails and compliance
// reporting (e.g., approval IDs, justification codes, external system responses).
// The runtime persists this alongside run records. Subscribers can extract this for
// governance dashboards or regulatory logging. Nil if the policy did not provide metadata.
Metadata map[string]any
}
// baseEvent holds common fields shared by all event types. It is embedded
// anonymously in each concrete event struct, providing implementations of
// the RunID, AgentID, Timestamp, and TurnID methods.
baseEvent struct {
runID string
agentID agent.Ident
timestamp int64
// sessionID associates the event with the logical session that owns the
// run. All events emitted for a given run share the same session ID.
sessionID string
// turnID identifies the conversational turn this event belongs to (optional).
// When set, groups events for UI rendering and conversation tracking.
turnID string
}
// AwaitClarificationEvent indicates the planner requested a human-provided
// clarification before continuing execution.
AwaitClarificationEvent struct {
baseEvent
// ID correlates this await with a subsequent ProvideClarification.
ID string
// Question is the prompt to present to the user.
Question string
// MissingFields optionally lists fields needed to proceed.
MissingFields []string
// RestrictToTool optionally narrows the next turn to a specific tool.
RestrictToTool tools.Ident
// ExampleInput optionally provides a schema-compliant example.
ExampleInput map[string]any
}
// AwaitConfirmationEvent indicates the runtime requested an explicit operator
// confirmation before executing a sensitive tool call.
AwaitConfirmationEvent struct {
baseEvent
// ID correlates this await with a subsequent confirmation decision.
ID string
// Title is an optional display title for the confirmation UI.
Title string
// Prompt is the operator-facing confirmation prompt.
Prompt string
// ToolName identifies the tool that requires confirmation.
ToolName tools.Ident
// ToolCallID is the tool_call_id for the pending tool call.
ToolCallID string
// Payload is the canonical JSON arguments for the pending tool call.
Payload rawjson.RawJSON
}
// AwaitQuestionsEvent indicates the planner requested structured multiple-choice
// answers to be provided out-of-band (typically by a UI) before the run can resume.
AwaitQuestionsEvent struct {
baseEvent
// ID correlates this await with a subsequent ProvideToolResults.
ID string
// ToolName identifies the tool awaiting user answers.
ToolName tools.Ident
// ToolCallID correlates the provided result with this requested call.
ToolCallID string
// Payload is the canonical JSON arguments for the awaited tool call.
Payload rawjson.RawJSON
// Title is an optional display title for the questions UI.
Title *string
// Questions are the structured questions to present to the user.
Questions []AwaitQuestion
}
// AwaitQuestion describes a single multiple-choice question.
AwaitQuestion struct {
ID string
Prompt string
Options []AwaitQuestionOption
AllowMultiple bool
}
// AwaitQuestionOption describes a selectable answer option.
AwaitQuestionOption struct {
ID string
Label string
}
// ToolAuthorizationEvent indicates an operator provided an explicit approval
// or denial decision for a pending tool call. This is emitted immediately when
// the decision is received so subscribers can record a durable audit trail and
// UIs can render an approval record independent of tool execution.
ToolAuthorizationEvent struct {
baseEvent
// ToolName identifies the tool that was authorized.
ToolName tools.Ident
// ToolCallID is the tool_call_id for the pending tool call.
ToolCallID string
// Approved reports whether the operator approved execution.
Approved bool
// Summary is a deterministic, human-facing description of what was approved.
Summary string
// ApprovedBy identifies the actor that provided the decision, formatted as
// "<principal_type>:<principal_id>".
ApprovedBy string
}
// AwaitExternalToolsEvent indicates the planner requested external tool execution.
AwaitExternalToolsEvent struct {
baseEvent
// ID correlates this await with a subsequent ProvideToolResults.
ID string
// Items enumerate the external tool calls to be satisfied.
Items []AwaitToolItem
}
// AwaitToolItem describes a single external tool call to be executed out-of-band.
AwaitToolItem struct {
ToolName tools.Ident
ToolCallID string
Payload rawjson.RawJSON
}
// UsageEvent reports token usage for a model invocation within a run.
// Emitted when the model stream reports usage deltas or a final summary.
UsageEvent struct {
baseEvent
// TokenUsage contains the attributed token counts reported by the model
// adapter. Model and ModelClass identify the specific model that produced
// this delta.
model.TokenUsage
}
// HardProtectionEvent signals that the runtime applied a hard protection to
// avoid a pathological loop or expensive no-op behavior. For example, when
// an agent-as-tool produced zero child tool calls, the runtime finalizes
// instead of resuming.
HardProtectionEvent struct {
baseEvent
// Reason is a fixed string describing the protection that was applied.
// Example: "agent_tool_no_children".
Reason string
// ExecutedAgentTools is the number of agent-as-tool executions in the turn.
ExecutedAgentTools int
// ChildrenTotal is the total number of child tool calls produced by those
// agent tools (typically zero when this event fires).
ChildrenTotal int
// ToolNames lists the agent-tool identifiers executed in the turn.
ToolNames []tools.Ident
}
)
const (
// ErrorKindTimeout indicates the run failed because a required operation timed out.
ErrorKindTimeout = "timeout"
// ErrorKindInternal indicates the run failed for an unclassified reason.
ErrorKindInternal = "internal"
)
// NewRunStartedEvent constructs a RunStartedEvent with the current
// timestamp. RunContext and Input capture the initial run state.
func NewRunStartedEvent(runID string, agentID agent.Ident, runContext run.Context, input any) *RunStartedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = runContext.SessionID
return &RunStartedEvent{
baseEvent: be,
RunContext: runContext,
Input: input,
}
}
// NewRunCompletedEvent constructs a RunCompletedEvent. Status should
// be "success", "failed", or "canceled"; phase must be the terminal
// lifecycle phase for the run. err may be nil on success.
func NewRunCompletedEvent(runID string, agentID agent.Ident, sessionID, status string, phase run.Phase, err error) *RunCompletedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
out := &RunCompletedEvent{
baseEvent: be,
Status: status,
Phase: phase,
Error: err,
}
if err == nil {
return out
}
var pe *model.ProviderError
if errors.As(err, &pe) {
out.ErrorProvider = pe.Provider()
out.ErrorOperation = pe.Operation()
out.ErrorKind = string(pe.Kind())
out.ErrorCode = pe.Code()
out.HTTPStatus = pe.HTTPStatus()
out.Retryable = pe.Retryable()
if status == "failed" {
out.PublicError = providerPublicError(pe)
}
return out
}
// Cancellation is terminal but non-error for UX purposes.
if status != "failed" {
return out
}
out.ErrorKind, out.PublicError = classifyNonProviderFailure(err)
out.Retryable = true // Non-provider failures are always retryable.
return out
}
// NewChildRunLinkedEvent constructs a ChildRunLinkedEvent for the given parent
// run, tool call, and child run identifiers.
func NewChildRunLinkedEvent(runID string, agentID agent.Ident, sessionID string, toolName tools.Ident, toolCallID, childRunID string, childAgentID agent.Ident) *ChildRunLinkedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &ChildRunLinkedEvent{
baseEvent: be,
ToolName: toolName,
ToolCallID: toolCallID,
ChildRunID: childRunID,
ChildAgentID: childAgentID,
}
}
// NewRunPhaseChangedEvent constructs a RunPhaseChangedEvent for the given run
// and agent.
func NewRunPhaseChangedEvent(runID string, agentID agent.Ident, sessionID string, phase run.Phase) *RunPhaseChangedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &RunPhaseChangedEvent{
baseEvent: be,
Phase: phase,
}
}
// NewPromptRenderedEvent constructs a PromptRenderedEvent for one rendered prompt.
func NewPromptRenderedEvent(runID string, agentID agent.Ident, sessionID string, promptID prompt.Ident, version string, scope prompt.Scope) *PromptRenderedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &PromptRenderedEvent{
baseEvent: be,
PromptID: promptID,
Version: version,
Scope: scope,
}
}
// NewRunPausedEvent constructs a RunPausedEvent with provided metadata.
func NewRunPausedEvent(runID string, agentID agent.Ident, sessionID, reason, requestedBy string, labels map[string]string, metadata map[string]any) *RunPausedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &RunPausedEvent{
baseEvent: be,
Reason: reason,
RequestedBy: requestedBy,
Labels: labels,
Metadata: metadata,
}
}
// NewRunResumedEvent constructs a RunResumedEvent with provided metadata.
func NewRunResumedEvent(runID string, agentID agent.Ident, sessionID, notes, requestedBy string, labels map[string]string, messageCount int) *RunResumedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &RunResumedEvent{
baseEvent: be,
Notes: notes,
RequestedBy: requestedBy,
Labels: labels,
MessageCount: messageCount,
}
}
// NewAwaitClarificationEvent constructs an AwaitClarificationEvent with the provided details.
func NewAwaitClarificationEvent(runID string, agentID agent.Ident, sessionID, id, question string, missing []string, restrict tools.Ident, example map[string]any) *AwaitClarificationEvent {
var ex map[string]any
if len(example) > 0 {
ex = make(map[string]any, len(example))
for k, v := range example {
ex[k] = v
}
}
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &AwaitClarificationEvent{
baseEvent: be,
ID: id,
Question: question,
MissingFields: append([]string(nil), missing...),
RestrictToTool: restrict,
ExampleInput: ex,
}
}
// NewAwaitConfirmationEvent constructs an AwaitConfirmationEvent with the provided details.
func NewAwaitConfirmationEvent(runID string, agentID agent.Ident, sessionID, id, title, prompt string, toolName tools.Ident, toolCallID string, payload rawjson.RawJSON) *AwaitConfirmationEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &AwaitConfirmationEvent{
baseEvent: be,
ID: id,
Title: title,
Prompt: prompt,
ToolName: toolName,
ToolCallID: toolCallID,
Payload: payload,
}
}
// NewToolAuthorizationEvent constructs a ToolAuthorizationEvent for a pending tool call.
func NewToolAuthorizationEvent(runID string, agentID agent.Ident, sessionID string, toolName tools.Ident, toolCallID string, approved bool, summary, approvedBy string) *ToolAuthorizationEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &ToolAuthorizationEvent{
baseEvent: be,
ToolName: toolName,
ToolCallID: toolCallID,
Approved: approved,
Summary: summary,
ApprovedBy: approvedBy,
}
}
// NewAwaitExternalToolsEvent constructs an AwaitExternalToolsEvent.
func NewAwaitExternalToolsEvent(runID string, agentID agent.Ident, sessionID, id string, items []AwaitToolItem) *AwaitExternalToolsEvent {
// ensure copy
copied := make([]AwaitToolItem, len(items))
copy(copied, items)
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &AwaitExternalToolsEvent{
baseEvent: be,
ID: id,
Items: copied,
}
}
// NewAwaitQuestionsEvent constructs an AwaitQuestionsEvent for a structured questions prompt.
func NewAwaitQuestionsEvent(runID string, agentID agent.Ident, sessionID, id string, toolName tools.Ident, toolCallID string, payload rawjson.RawJSON, title *string, questions []AwaitQuestion) *AwaitQuestionsEvent {
qcopy := make([]AwaitQuestion, 0, len(questions))
for _, q := range questions {
opts := make([]AwaitQuestionOption, len(q.Options))
copy(opts, q.Options)
qcopy = append(qcopy, AwaitQuestion{
ID: q.ID,
Prompt: q.Prompt,
AllowMultiple: q.AllowMultiple,
Options: opts,
})
}
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &AwaitQuestionsEvent{
baseEvent: be,
ID: id,
ToolName: toolName,
ToolCallID: toolCallID,
Payload: payload,
Title: title,
Questions: qcopy,
}
}
// NewPolicyDecisionEvent constructs a PolicyDecisionEvent with the provided metadata.
func NewPolicyDecisionEvent(runID string, agentID agent.Ident, sessionID string, allowed []tools.Ident, caps policy.CapsState, labels map[string]string, metadata map[string]any) *PolicyDecisionEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &PolicyDecisionEvent{
baseEvent: be,
AllowedTools: allowed,
Caps: caps,
Labels: labels,
Metadata: metadata,
}
}
// Type implements Event for AwaitClarificationEvent.
func (e *AwaitClarificationEvent) Type() EventType { return AwaitClarification }
// Type implements Event for AwaitConfirmationEvent.
func (e *AwaitConfirmationEvent) Type() EventType { return AwaitConfirmation }
// Type implements Event for ToolAuthorizationEvent.
func (e *ToolAuthorizationEvent) Type() EventType { return ToolAuthorization }
// Type implements Event for AwaitQuestionsEvent.
func (e *AwaitQuestionsEvent) Type() EventType { return AwaitQuestions }
// Type implements Event for AwaitExternalToolsEvent.
func (e *AwaitExternalToolsEvent) Type() EventType { return AwaitExternalTools }
// NewToolCallScheduledEvent constructs a ToolCallScheduledEvent. Payload is the
// canonical JSON arguments for the scheduled tool; queue is the activity queue name.
// ParentToolCallID and expectedChildren are optional (empty/0 for top-level calls).
func NewToolCallScheduledEvent(runID string, agentID agent.Ident, sessionID string, toolName tools.Ident, toolCallID string, payload rawjson.RawJSON, queue string, parentToolCallID string, expectedChildren int) *ToolCallScheduledEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &ToolCallScheduledEvent{
baseEvent: be,
ToolCallID: toolCallID,
ToolName: toolName,
Payload: payload,
Queue: queue,
ParentToolCallID: parentToolCallID,
ExpectedChildrenTotal: expectedChildren,
// DisplayHint is computed by the runtime at publish time using typed payloads
// and registered templates. This keeps the contract strict: hints are never
// rendered against raw JSON bytes.
DisplayHint: "",
}
}
// NewToolResultReceivedEvent constructs a ToolResultReceivedEvent. Result and err
// capture the tool outcome; duration is the wall-clock execution time; telemetry
// carries structured observability metadata (nil if not collected).
func NewToolResultReceivedEvent(runID string, agentID agent.Ident, sessionID string, toolName tools.Ident, toolCallID, parentToolCallID string, result any, resultJSON, serverData rawjson.RawJSON, resultPreview string, bounds *agent.Bounds, duration time.Duration, telemetry *telemetry.ToolTelemetry, retryHint *planner.RetryHint, err *toolerrors.ToolError) *ToolResultReceivedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &ToolResultReceivedEvent{
baseEvent: be,
ToolCallID: toolCallID,
ParentToolCallID: parentToolCallID,
ToolName: toolName,
Result: result,
ResultJSON: resultJSON,
ServerData: serverData,
ResultPreview: resultPreview,
Bounds: bounds,
Duration: duration,
Telemetry: telemetry,
RetryHint: retryHint,
Error: err,
}
}
// NewToolCallUpdatedEvent constructs a ToolCallUpdatedEvent to signal that a
// parent tool's child count has increased due to dynamic discovery.
func NewToolCallUpdatedEvent(runID string, agentID agent.Ident, sessionID string, toolCallID string, expectedChildrenTotal int) *ToolCallUpdatedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &ToolCallUpdatedEvent{
baseEvent: be,
ToolCallID: toolCallID,
ExpectedChildrenTotal: expectedChildrenTotal,
}
}
// NewToolCallArgsDeltaEvent constructs a ToolCallArgsDeltaEvent.
func NewToolCallArgsDeltaEvent(runID string, agentID agent.Ident, sessionID string, toolCallID string, toolName tools.Ident, delta string) *ToolCallArgsDeltaEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &ToolCallArgsDeltaEvent{
baseEvent: be,
ToolCallID: toolCallID,
ToolName: toolName,
Delta: delta,
}
}
// NewUsageEvent constructs a UsageEvent from an attributed usage snapshot.
func NewUsageEvent(runID string, agentID agent.Ident, sessionID string, usage model.TokenUsage) *UsageEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &UsageEvent{
baseEvent: be,
TokenUsage: usage,
}
}
// NewHardProtectionEvent constructs a HardProtectionEvent.
func NewHardProtectionEvent(runID string, agentID agent.Ident, sessionID string, reason string, executedAgentTools, childrenTotal int, toolNames []tools.Ident) *HardProtectionEvent {
names := make([]tools.Ident, len(toolNames))
copy(names, toolNames)
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &HardProtectionEvent{
baseEvent: be,
Reason: reason,
ExecutedAgentTools: executedAgentTools,
ChildrenTotal: childrenTotal,
ToolNames: names,
}
}
// NewPlannerNoteEvent constructs a PlannerNoteEvent with the given note text
// and optional labels for categorization.
func NewPlannerNoteEvent(runID string, agentID agent.Ident, sessionID string, note string, labels map[string]string) *PlannerNoteEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &PlannerNoteEvent{
baseEvent: be,
Note: note,
Labels: labels,
}
}
// NewThinkingBlockEvent constructs a ThinkingBlockEvent with structured reasoning fields.
func NewThinkingBlockEvent(runID string, agentID agent.Ident, sessionID string, text, signature string, redacted []byte, contentIndex int, final bool) *ThinkingBlockEvent {
var rb []byte
if len(redacted) > 0 {
rb = append([]byte(nil), redacted...)
}
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &ThinkingBlockEvent{
baseEvent: be,
Text: text,
Signature: signature,
Redacted: rb,
ContentIndex: contentIndex,
Final: final,
}
}
// NewAssistantMessageEvent constructs an AssistantMessageEvent. Structured
// may be nil if only a text message is provided.
func NewAssistantMessageEvent(runID string, agentID agent.Ident, sessionID string, message string, structured any) *AssistantMessageEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &AssistantMessageEvent{
baseEvent: be,
Message: message,
Structured: structured,
}
}
// NewRetryHintIssuedEvent constructs a RetryHintIssuedEvent indicating a
// suggested retry policy adjustment.
func NewRetryHintIssuedEvent(runID string, agentID agent.Ident, sessionID string, reason string, toolName tools.Ident, message string) *RetryHintIssuedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &RetryHintIssuedEvent{
baseEvent: be,
Reason: reason,
ToolName: toolName,
Message: message,
}
}
// NewMemoryAppendedEvent constructs a MemoryAppendedEvent indicating successful
// persistence of memory entries.
func NewMemoryAppendedEvent(runID string, agentID agent.Ident, sessionID string, eventCount int) *MemoryAppendedEvent {
be := newBaseEvent(runID, agentID)
be.sessionID = sessionID
return &MemoryAppendedEvent{
baseEvent: be,
EventCount: eventCount,
}
}
func classifyNonProviderFailure(err error) (kind, publicError string) {
var te *temporal.TimeoutError
if errors.As(err, &te) || errors.Is(err, context.DeadlineExceeded) {
return ErrorKindTimeout, PublicErrorTimeout
}
return ErrorKindInternal, PublicErrorInternal
}
func providerPublicError(pe *model.ProviderError) string {
switch pe.Kind() {
case model.ProviderErrorKindRateLimited:
return PublicErrorProviderRateLimited
case model.ProviderErrorKindUnavailable:
return PublicErrorProviderUnavailable
case model.ProviderErrorKindInvalidRequest:
return PublicErrorProviderInvalidRequest
case model.ProviderErrorKindAuth:
return PublicErrorProviderAuth
case model.ProviderErrorKindUnknown:
return PublicErrorProviderUnknown
default:
return PublicErrorProviderDefault
}
}
// RunID returns the workflow run identifier.
func (e baseEvent) RunID() string { return e.runID }
// SessionID returns the logical session identifier associated with the run.
func (e baseEvent) SessionID() string { return e.sessionID }
// AgentID returns the agent identifier.
func (e baseEvent) AgentID() string { return string(e.agentID) }
// Timestamp returns the Unix timestamp in milliseconds when the event occurred.
func (e baseEvent) Timestamp() int64 { return e.timestamp }
// TurnID returns the conversational turn identifier (empty if not set).
func (e baseEvent) TurnID() string { return e.turnID }
// SetTurnID updates the turn identifier. This is called by the runtime to stamp
// events with turn information after construction.
func (e *baseEvent) SetTurnID(turnID string) {
e.turnID = turnID
}
// SetSessionID updates the session identifier associated with the event. This is
// called by the runtime when constructing events so downstream subscribers can
// rely on SessionID as a stable join key across processes.
func (e *baseEvent) SetSessionID(id string) {
e.sessionID = id
}
// newBaseEvent constructs a baseEvent with the current timestamp.
func newBaseEvent(runID string, agentID agent.Ident) baseEvent {
return baseEvent{
runID: runID,
agentID: agentID,
timestamp: time.Now().UnixMilli(),
}
}
// Type method implementations
func (e *RunStartedEvent) Type() EventType { return RunStarted }
func (e *RunCompletedEvent) Type() EventType { return RunCompleted }
func (e *RunPausedEvent) Type() EventType { return RunPaused }
func (e *RunResumedEvent) Type() EventType { return RunResumed }
func (e *ToolCallScheduledEvent) Type() EventType { return ToolCallScheduled }
func (e *ToolResultReceivedEvent) Type() EventType { return ToolResultReceived }
func (e *ToolCallUpdatedEvent) Type() EventType { return ToolCallUpdated }
func (e *ToolCallArgsDeltaEvent) Type() EventType { return ToolCallArgsDelta }
func (e *PlannerNoteEvent) Type() EventType { return PlannerNote }
func (e *AssistantMessageEvent) Type() EventType { return AssistantMessage }
func (e *ThinkingBlockEvent) Type() EventType { return ThinkingBlock }