-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecovery_orchestrator.go
More file actions
235 lines (195 loc) · 5.32 KB
/
recovery_orchestrator.go
File metadata and controls
235 lines (195 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package claudesdk
import (
"context"
"fmt"
"iter"
"time"
)
// RecoveryOrchestratorOptions configure host-side recovery planning.
type RecoveryOrchestratorOptions struct {
RetryPolicy RetryPolicy
MaxAttempts int
EnableMediaStrip bool
EnableFallbackSwitch bool
}
// RecoveryPlan summarizes the next recovery step after a failed result.
type RecoveryPlan struct {
Attempt int
MaxAttempts int
Classification ErrorClassification
Retry bool
Delay time.Duration
Action RecoveryAction
Reason string
Content UserMessageContent
ContentChanged bool
OverrideModel *string
}
// RecoveryOrchestrator plans host-side retries and safe content/model adjustments.
type RecoveryOrchestrator struct {
retryPolicy RetryPolicy
maxAttempts int
enableMediaStrip bool
enableFallbackSwitch bool
}
// NewRecoveryOrchestrator creates a host-side recovery planner.
func NewRecoveryOrchestrator(options RecoveryOrchestratorOptions) *RecoveryOrchestrator {
maxAttempts := options.MaxAttempts
if maxAttempts <= 0 {
maxAttempts = 3
}
return &RecoveryOrchestrator{
retryPolicy: normalizeRetryPolicy(options.RetryPolicy),
maxAttempts: maxAttempts,
enableMediaStrip: options.EnableMediaStrip,
enableFallbackSwitch: options.EnableFallbackSwitch,
}
}
// PlanResult returns the next recovery step for a terminal result.
func (o *RecoveryOrchestrator) PlanResult(
result *ResultMessage,
content UserMessageContent,
options *ClaudeAgentOptions,
attempt int,
) RecoveryPlan {
plan := RecoveryPlan{
Attempt: attempt,
MaxAttempts: 1,
Content: content,
}
if o == nil {
plan.MaxAttempts = 1
return plan
}
plan.MaxAttempts = o.maxAttempts
if result == nil || !result.IsError {
return plan
}
classification := ClassifyResultError(result)
plan.Classification = classification
plan.Action = classification.RecoveryAction
plan.Reason = string(classification.Class)
if attempt >= o.maxAttempts {
return plan
}
switch classification.RecoveryAction {
case RecoveryActionStripMedia:
if o.enableMediaStrip {
if stripped, changed := StripMediaContent(content); changed {
plan.Retry = true
plan.Content = stripped
plan.ContentChanged = true
plan.Reason = "strip_media"
return plan
}
}
case RecoveryActionSwitchModel:
if o.enableFallbackSwitch && options != nil && options.FallbackModel != "" && options.FallbackModel != options.Model {
model := options.FallbackModel
plan.Retry = true
plan.OverrideModel = &model
plan.Reason = "switch_model"
return plan
}
}
decision := EvaluateResultRetry(result, attempt, o.retryPolicy)
if decision.Action == RetryDecisionActionRetry && decision.Retryable {
plan.Retry = true
plan.Delay = decision.RecommendedDelay
plan.Reason = decision.Reason
}
return plan
}
// ResilientQuery reruns a one-shot query when the recovery orchestrator can
// safely retry the request.
func ResilientQuery(
ctx context.Context,
content UserMessageContent,
orchestrator *RecoveryOrchestrator,
opts ...Option,
) iter.Seq2[Message, error] {
return func(yield func(Message, error) bool) {
if orchestrator == nil {
for msg, err := range Query(ctx, content, opts...) {
if !yield(msg, err) {
return
}
}
return
}
currentContent := content
baseOptions := append([]Option(nil), opts...)
for attempt := 1; attempt <= orchestrator.maxAttempts; attempt++ {
var terminal *ResultMessage
for msg, err := range Query(ctx, currentContent, baseOptions...) {
if err != nil {
if !yield(nil, err) {
return
}
return
}
if result, ok := msg.(*ResultMessage); ok {
terminal = result
}
if !yield(msg, nil) {
return
}
}
plan := orchestrator.PlanResult(terminal, currentContent, applyAgentOptions(baseOptions), attempt)
if !plan.Retry {
return
}
currentContent = plan.Content
if plan.OverrideModel != nil {
baseOptions = append(baseOptions, WithModel(*plan.OverrideModel))
}
if plan.Delay > 0 {
timer := time.NewTimer(plan.Delay)
select {
case <-ctx.Done():
timer.Stop()
yield(nil, ctx.Err())
return
case <-timer.C:
}
}
}
}
}
// StripMediaContent removes inline image and document blocks from block-based
// user content so a retry can proceed without large attachments.
func StripMediaContent(content UserMessageContent) (UserMessageContent, bool) {
if content.IsString() {
return content, false
}
blocks := content.Blocks()
if len(blocks) == 0 {
return content, false
}
filtered := make([]ContentBlock, 0, len(blocks))
changed := false
for _, block := range blocks {
switch block.(type) {
case *InputImageBlock, *InputDocumentBlock:
changed = true
continue
default:
filtered = append(filtered, block)
}
}
if !changed {
return content, false
}
return Blocks(filtered...), true
}
// MustNewRecoveryOrchestrator is a convenience helper for examples/tests.
func MustNewRecoveryOrchestrator(options RecoveryOrchestratorOptions) *RecoveryOrchestrator {
return NewRecoveryOrchestrator(options)
}
// Validate ensures the recovery plan is internally consistent.
func (p RecoveryPlan) Validate() error {
if p.Retry && p.Attempt >= p.MaxAttempts {
return fmt.Errorf("retry plan exceeds max attempts")
}
return nil
}