Skip to content

Commit 1310d9a

Browse files
tanviet12claude
andcommitted
fix: AI API resilience — retry, timeout, stuck job recovery (#7)
- Add HTTP timeout (2min) to Claude and Gemini SDK clients - Add retry with exponential backoff (5s/15s/45s) for rate limit and network errors - Check DB update errors in analyzer with logging - Retry final status update 3 times to prevent stuck "running" state - Safety net: mark all "running" jobs as failed on app startup - Adaptive batch sleep: 2s normal, 10s after error, 30s after 3 consecutive errors Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0cd0a90 commit 1310d9a

5 files changed

Lines changed: 212 additions & 70 deletions

File tree

backend/ai/claude.go

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"github.com/anthropics/anthropic-sdk-go/option"
99
)
1010

11+
var claudeHTTPClient = NewHTTPClientWithTimeout()
12+
1113
type ClaudeProvider struct {
1214
apiKey string
1315
model string
@@ -29,41 +31,46 @@ func NewClaudeProvider(apiKey, model string, maxTokens int) *ClaudeProvider {
2931
}
3032

3133
func (c *ClaudeProvider) AnalyzeChat(ctx context.Context, systemPrompt string, chatTranscript string) (AIResponse, error) {
32-
client := anthropic.NewClient(option.WithAPIKey(c.apiKey))
34+
return withRetry(ctx, "claude", func() (AIResponse, error) {
35+
client := anthropic.NewClient(
36+
option.WithAPIKey(c.apiKey),
37+
option.WithHTTPClient(claudeHTTPClient),
38+
)
3339

34-
message, err := client.Messages.New(ctx, anthropic.MessageNewParams{
35-
Model: anthropic.Model(c.model),
36-
MaxTokens: int64(c.maxTokens),
37-
System: []anthropic.TextBlockParam{
38-
{Text: systemPrompt},
39-
},
40-
Messages: []anthropic.MessageParam{
41-
anthropic.NewUserMessage(anthropic.NewTextBlock(chatTranscript)),
42-
},
43-
})
44-
if err != nil {
45-
return AIResponse{}, fmt.Errorf("claude api error: %w", err)
46-
}
40+
message, err := client.Messages.New(ctx, anthropic.MessageNewParams{
41+
Model: anthropic.Model(c.model),
42+
MaxTokens: int64(c.maxTokens),
43+
System: []anthropic.TextBlockParam{
44+
{Text: systemPrompt},
45+
},
46+
Messages: []anthropic.MessageParam{
47+
anthropic.NewUserMessage(anthropic.NewTextBlock(chatTranscript)),
48+
},
49+
})
50+
if err != nil {
51+
return AIResponse{}, fmt.Errorf("claude api error: %w", err)
52+
}
4753

48-
// Extract text from response content blocks
49-
var text string
50-
for _, block := range message.Content {
51-
if block.Type == "text" {
52-
text = block.Text
53-
break
54+
// Extract text from response content blocks
55+
var text string
56+
for _, block := range message.Content {
57+
if block.Type == "text" {
58+
text = block.Text
59+
break
60+
}
61+
}
62+
if text == "" {
63+
return AIResponse{}, fmt.Errorf("claude api returned empty content")
5464
}
55-
}
56-
if text == "" {
57-
return AIResponse{}, fmt.Errorf("claude api returned empty content")
58-
}
5965

60-
return AIResponse{
61-
Content: text,
62-
InputTokens: int(message.Usage.InputTokens),
63-
OutputTokens: int(message.Usage.OutputTokens),
64-
Model: string(message.Model),
65-
Provider: "claude",
66-
}, nil
66+
return AIResponse{
67+
Content: text,
68+
InputTokens: int(message.Usage.InputTokens),
69+
OutputTokens: int(message.Usage.OutputTokens),
70+
Model: string(message.Model),
71+
Provider: "claude",
72+
}, nil
73+
})
6774
}
6875

6976
func (c *ClaudeProvider) AnalyzeChatBatch(ctx context.Context, systemPrompt string, items []BatchItem) (AIResponse, error) {

backend/ai/gemini.go

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,37 +23,40 @@ func NewGeminiProvider(apiKey, model string) *GeminiProvider {
2323
}
2424

2525
func (g *GeminiProvider) AnalyzeChat(ctx context.Context, systemPrompt string, chatTranscript string) (AIResponse, error) {
26-
client, err := genai.NewClient(ctx, &genai.ClientConfig{
27-
APIKey: g.apiKey,
28-
Backend: genai.BackendGeminiAPI,
29-
})
30-
if err != nil {
31-
return AIResponse{}, fmt.Errorf("gemini client error: %w", err)
32-
}
26+
return withRetry(ctx, "gemini", func() (AIResponse, error) {
27+
client, err := genai.NewClient(ctx, &genai.ClientConfig{
28+
APIKey: g.apiKey,
29+
Backend: genai.BackendGeminiAPI,
30+
HTTPClient: NewHTTPClientWithTimeout(),
31+
})
32+
if err != nil {
33+
return AIResponse{}, fmt.Errorf("gemini client error: %w", err)
34+
}
3335

34-
result, err := client.Models.GenerateContent(ctx, g.model, genai.Text(chatTranscript), &genai.GenerateContentConfig{
35-
SystemInstruction: genai.NewContentFromText(systemPrompt, "user"),
36-
})
37-
if err != nil {
38-
return AIResponse{}, fmt.Errorf("gemini api error: %w", err)
39-
}
36+
result, err := client.Models.GenerateContent(ctx, g.model, genai.Text(chatTranscript), &genai.GenerateContentConfig{
37+
SystemInstruction: genai.NewContentFromText(systemPrompt, "user"),
38+
})
39+
if err != nil {
40+
return AIResponse{}, fmt.Errorf("gemini api error: %w", err)
41+
}
4042

41-
text := result.Text()
42-
if text == "" {
43-
return AIResponse{}, fmt.Errorf("gemini api returned empty content")
44-
}
43+
text := result.Text()
44+
if text == "" {
45+
return AIResponse{}, fmt.Errorf("gemini api returned empty content")
46+
}
4547

46-
aiResp := AIResponse{
47-
Content: text,
48-
Model: g.model,
49-
Provider: "gemini",
50-
}
51-
if result.UsageMetadata != nil {
52-
aiResp.InputTokens = int(result.UsageMetadata.PromptTokenCount)
53-
aiResp.OutputTokens = int(result.UsageMetadata.CandidatesTokenCount)
54-
}
48+
aiResp := AIResponse{
49+
Content: text,
50+
Model: g.model,
51+
Provider: "gemini",
52+
}
53+
if result.UsageMetadata != nil {
54+
aiResp.InputTokens = int(result.UsageMetadata.PromptTokenCount)
55+
aiResp.OutputTokens = int(result.UsageMetadata.CandidatesTokenCount)
56+
}
5557

56-
return aiResp, nil
58+
return aiResp, nil
59+
})
5760
}
5861

5962
func (g *GeminiProvider) AnalyzeChatBatch(ctx context.Context, systemPrompt string, items []BatchItem) (AIResponse, error) {

backend/ai/retry.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package ai
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"net/http"
8+
"strings"
9+
"time"
10+
)
11+
12+
const (
13+
maxRetries = 3
14+
initialBackoff = 5 * time.Second
15+
)
16+
17+
// retryableError checks if an error should be retried (rate limit, server error, network).
18+
func retryableError(err error) bool {
19+
if err == nil {
20+
return false
21+
}
22+
msg := err.Error()
23+
// Rate limit
24+
if strings.Contains(msg, "429") || strings.Contains(msg, "rate") || strings.Contains(msg, "Rate") ||
25+
strings.Contains(msg, "RESOURCE_EXHAUSTED") || strings.Contains(msg, "quota") {
26+
return true
27+
}
28+
// Server errors
29+
if strings.Contains(msg, "500") || strings.Contains(msg, "502") ||
30+
strings.Contains(msg, "503") || strings.Contains(msg, "529") {
31+
return true
32+
}
33+
// Network errors
34+
if strings.Contains(msg, "timeout") || strings.Contains(msg, "connection") ||
35+
strings.Contains(msg, "EOF") || strings.Contains(msg, "reset") {
36+
return true
37+
}
38+
return false
39+
}
40+
41+
// withRetry wraps an AI call with exponential backoff retry for transient errors.
42+
func withRetry(ctx context.Context, provider string, fn func() (AIResponse, error)) (AIResponse, error) {
43+
var lastErr error
44+
backoff := initialBackoff
45+
46+
for attempt := 0; attempt <= maxRetries; attempt++ {
47+
if attempt > 0 {
48+
log.Printf("[%s] retry attempt %d/%d after error: %v (backoff: %v)", provider, attempt, maxRetries, lastErr, backoff)
49+
select {
50+
case <-ctx.Done():
51+
return AIResponse{}, fmt.Errorf("%s retry cancelled: %w", provider, ctx.Err())
52+
case <-time.After(backoff):
53+
}
54+
backoff *= 3 // exponential: 5s → 15s → 45s
55+
}
56+
57+
resp, err := fn()
58+
if err == nil {
59+
return resp, nil
60+
}
61+
lastErr = err
62+
63+
if !retryableError(err) {
64+
return AIResponse{}, err // non-retryable, fail immediately
65+
}
66+
}
67+
68+
return AIResponse{}, fmt.Errorf("%s failed after %d retries: %w", provider, maxRetries, lastErr)
69+
}
70+
71+
// NewHTTPClientWithTimeout creates an HTTP client with explicit timeout per go-safety rules.
72+
func NewHTTPClientWithTimeout() *http.Client {
73+
return &http.Client{Timeout: 2 * time.Minute}
74+
}

backend/engine/analyzer.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ func (a *Analyzer) runJobInternalExt(ctx context.Context, job models.Job, maxCon
180180
initialSummary, _ := json.Marshal(map[string]interface{}{
181181
"conversations_found": len(conversations),
182182
})
183-
db.DB.Model(&run).Update("summary", string(initialSummary))
183+
if err := db.DB.Model(&run).Update("summary", string(initialSummary)).Error; err != nil {
184+
log.Printf("[analyzer] DB update error (initial summary): %v", err)
185+
}
184186

185187
// Check batch mode setting (default: enabled with batch size 5)
186188
batchMode := true
@@ -270,7 +272,9 @@ func (a *Analyzer) runJobInternalExt(ctx context.Context, job models.Job, maxCon
270272
"conversations_errors": errorCount,
271273
"issues_found": issuesFound,
272274
})
273-
db.DB.Model(&run).Update("summary", string(errProgressJSON))
275+
if err := db.DB.Model(&run).Update("summary", string(errProgressJSON)).Error; err != nil {
276+
log.Printf("[analyzer] DB update error (error progress): %v", err)
277+
}
274278
continue
275279
}
276280
analyzedCount++
@@ -309,7 +313,9 @@ func (a *Analyzer) runJobInternalExt(ctx context.Context, job models.Job, maxCon
309313
"conversations_errors": errorCount,
310314
"issues_found": issuesFound,
311315
})
312-
db.DB.Model(&run).Update("summary", string(progressJSON))
316+
if err := db.DB.Model(&run).Update("summary", string(progressJSON)).Error; err != nil {
317+
log.Printf("[analyzer] DB update error (progress): %v", err)
318+
}
313319
}
314320

315321
} // end else (non-batch mode)
@@ -329,12 +335,20 @@ complete:
329335
runStatus = "error"
330336
run.ErrorMessage = fmt.Sprintf("AI errors: %d/%d conversations failed", errorCount, len(conversations))
331337
}
332-
db.DB.Model(&run).Updates(map[string]interface{}{
333-
"status": runStatus,
334-
"finished_at": &finishedAt,
335-
"summary": string(summaryJSON),
336-
"error_message": run.ErrorMessage,
337-
})
338+
// Critical: final status update — retry on failure to prevent stuck "running" state
339+
for retry := 0; retry < 3; retry++ {
340+
if err := db.DB.Model(&run).Updates(map[string]interface{}{
341+
"status": runStatus,
342+
"finished_at": &finishedAt,
343+
"summary": string(summaryJSON),
344+
"error_message": run.ErrorMessage,
345+
}).Error; err != nil {
346+
log.Printf("[analyzer] DB update error (final status, attempt %d): %v", retry+1, err)
347+
time.Sleep(2 * time.Second)
348+
continue
349+
}
350+
break
351+
}
338352

339353
// Update job last_run (skip for test runs to avoid affecting future normal runs)
340354
if !isTestRun {
@@ -627,7 +641,9 @@ func (a *Analyzer) runBatchMode(ctx context.Context, provider ai.AIProvider, job
627641
}
628642

629643
// Process in batches
644+
consecutiveErrors := 0
630645
for i := 0; i < len(prepared); i += batchSize {
646+
batchHadError := false
631647
// Check if context cancelled
632648
select {
633649
case <-ctx.Done():
@@ -656,6 +672,7 @@ func (a *Analyzer) runBatchMode(ctx context.Context, provider ai.AIProvider, job
656672
if err != nil {
657673
log.Printf("[analyzer-batch] AI error for batch starting at %d: %v", i, err)
658674
errorCount += len(batch)
675+
batchHadError = true
659676
continue
660677
}
661678

@@ -741,11 +758,23 @@ func (a *Analyzer) runBatchMode(ctx context.Context, provider ai.AIProvider, job
741758
"conversations_errors": errorCount,
742759
"issues_found": issuesFound,
743760
})
744-
db.DB.Model(&run).Update("summary", string(progressJSON))
761+
if err := db.DB.Model(&run).Update("summary", string(progressJSON)).Error; err != nil {
762+
log.Printf("[analyzer-batch] DB update error (progress): %v", err)
763+
}
745764

746-
// Rate limit between batches
765+
// Adaptive rate limit between batches
747766
if end < len(prepared) {
748-
time.Sleep(1 * time.Second)
767+
if batchHadError {
768+
consecutiveErrors++
769+
if consecutiveErrors >= 3 {
770+
time.Sleep(30 * time.Second)
771+
} else {
772+
time.Sleep(10 * time.Second)
773+
}
774+
} else {
775+
consecutiveErrors = 0
776+
time.Sleep(2 * time.Second)
777+
}
749778
}
750779
}
751780

backend/engine/scheduler.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,39 @@ func (s *Scheduler) Start() {
4747
// Load and schedule cron-based analysis jobs
4848
s.loadCronJobs()
4949

50+
// Safety net: mark any stuck "running" jobs as failed on startup
51+
cleanupStuckRuns()
52+
5053
s.scheduler.Start()
5154
log.Println("[scheduler] started")
5255
}
5356

57+
// cleanupStuckRuns marks any job_runs stuck in "running" status as failed.
58+
// This happens when the app crashes or restarts while a job is processing.
59+
func cleanupStuckRuns() {
60+
// On startup, any "running" job is stuck because the goroutine died with the previous process
61+
var stuckRuns []models.JobRun
62+
if err := db.DB.Where("status = ?", "running").Find(&stuckRuns).Error; err != nil {
63+
log.Printf("[scheduler] error querying stuck runs: %v", err)
64+
return
65+
}
66+
for _, run := range stuckRuns {
67+
now := time.Now()
68+
if err := db.DB.Model(&run).Updates(map[string]interface{}{
69+
"status": "failed",
70+
"finished_at": &now,
71+
"error_message": "Job bị gián đoạn do hệ thống khởi động lại. Vui lòng chạy lại.",
72+
}).Error; err != nil {
73+
log.Printf("[scheduler] error marking stuck run %s as failed: %v", run.ID, err)
74+
} else {
75+
log.Printf("[scheduler] marked stuck run %s as failed (started: %v)", run.ID, run.StartedAt)
76+
}
77+
}
78+
if len(stuckRuns) > 0 {
79+
log.Printf("[scheduler] cleaned up %d stuck job runs", len(stuckRuns))
80+
}
81+
}
82+
5483
// Stop gracefully shuts down the scheduler.
5584
func (s *Scheduler) Stop() {
5685
if err := s.scheduler.Shutdown(); err != nil {

0 commit comments

Comments
 (0)