Skip to content

Commit 9190cf0

Browse files
Refactor agent runtime into reconciler-based architecture
Extract task processing logic from runtime into dedicated TaskReconciler component following controller pattern. This separation improves code organization and enables better task lifecycle management. Changes: Architecture: • Extract TaskReconciler from Runtime to handle all task processing • Move task state machine logic into dedicated component • Introduce TaskPhase enum (AwaitInput, InvokeModel, ExecuteTools, Suspended) • Create ModelProviderFactory for centralizing provider client creation • Extract SyncMap into separate file for reusability Task Reconciliation: • Implement computeStatus to analyze message history and determine next action • Split task processing into phase-specific reconcilers • Handle model invocation cancellation gracefully with proper state updates • Mark assistant messages as processed immediately when no tool calls present • Add structured error handling for each reconciliation phase Runtime Simplification: • Remove 700+ lines of task processing logic from Runtime • Eliminate CancelTask method in favor of reconciler-managed cancellation • Update AgentRuntime interface to remove CancelTask dependency • Replace direct queue management with TaskReconciler delegation • Increase default concurrency from 5 to 50 workers API Updates: • Implement SuspendTask by updating task phase in database • Remove runtime cancellation calls in favor of database-driven suspension • Publish task events after phase transitions This refactoring improves separation of concerns, makes task processing more testable, and provides clearer boundaries between runtime management and task execution logic. Co-authored-by: construct-agent <noreply@construct.sh>
1 parent 29f2b94 commit 9190cf0

8 files changed

Lines changed: 1113 additions & 731 deletions

File tree

backend/agent/client.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"github.com/furisto/construct/backend/memory"
9+
"github.com/furisto/construct/backend/memory/schema/types"
10+
"github.com/furisto/construct/backend/model"
11+
"github.com/furisto/construct/backend/secret"
12+
"github.com/google/uuid"
13+
)
14+
15+
type ModelProviderFactory struct {
16+
encryption *secret.Client
17+
memory *memory.Client
18+
}
19+
20+
func NewModelProviderFactory(encryption *secret.Client, memory *memory.Client) *ModelProviderFactory {
21+
return &ModelProviderFactory{
22+
encryption: encryption,
23+
memory: memory,
24+
}
25+
}
26+
27+
func (f *ModelProviderFactory) CreateClient(
28+
ctx context.Context,
29+
modelProviderID uuid.UUID,
30+
) (model.ModelProvider, error) {
31+
provider, err := f.memory.ModelProvider.Get(ctx, modelProviderID)
32+
if err != nil {
33+
return nil, fmt.Errorf("failed to fetch model provider: %w", err)
34+
}
35+
36+
providerAuth, err := f.encryption.Decrypt(provider.Secret, []byte(secret.ModelProviderSecret(provider.ID)))
37+
if err != nil {
38+
return nil, fmt.Errorf("failed to decrypt model provider secret: %w", err)
39+
}
40+
41+
var auth struct {
42+
APIKey string `json:"apiKey"`
43+
}
44+
err = json.Unmarshal(providerAuth, &auth)
45+
if err != nil {
46+
return nil, fmt.Errorf("failed to unmarshal model provider auth: %w", err)
47+
}
48+
49+
switch provider.ProviderType {
50+
case types.ModelProviderTypeAnthropic:
51+
apiProvider, err := model.NewAnthropicProvider(auth.APIKey)
52+
if err != nil {
53+
return nil, fmt.Errorf("failed to create Anthropic provider: %w", err)
54+
}
55+
return apiProvider, nil
56+
57+
case types.ModelProviderTypeOpenAI:
58+
apiProvider, err := model.NewOpenAICompletionProvider(auth.APIKey)
59+
if err != nil {
60+
return nil, fmt.Errorf("failed to create OpenAI provider: %w", err)
61+
}
62+
return apiProvider, nil
63+
64+
case types.ModelProviderTypeGemini:
65+
apiProvider, err := model.NewGeminiProvider(auth.APIKey)
66+
if err != nil {
67+
return nil, fmt.Errorf("failed to create Gemini provider: %w", err)
68+
}
69+
return apiProvider, nil
70+
71+
case types.ModelProviderTypeXAI:
72+
apiProvider, err := model.NewOpenAICompletionProvider(auth.APIKey, model.WithURL("https://api.xai.com/v1"))
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to create XAI provider: %w", err)
75+
}
76+
return apiProvider, nil
77+
78+
default:
79+
return nil, fmt.Errorf("unknown model provider type: %s", provider.ProviderType)
80+
}
81+
}

backend/agent/conv.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"encoding/json"
55
"fmt"
66
"log/slog"
7-
"strings"
87

98
v1 "github.com/furisto/construct/api/go/v1"
109
"github.com/furisto/construct/backend/memory"
@@ -84,11 +83,8 @@ func ConvertMemoryMessageBlocksToModel(blocks []types.MessageBlock) ([]model.Con
8483
}
8584

8685
result := interpreterResult.Output
87-
if interpreterResult.Error != ""{
86+
if interpreterResult.Error != "" {
8887
result = interpreterResult.Output + "\n\n" + interpreterResult.Error
89-
if strings.Contains(interpreterResult.Error, "ReferenceError") {
90-
result = result + "\n" + "You likely tried to reference a variable from a previous code interpreter call. All code interpreter calls are isolated and share no state with each other."
91-
}
9288
}
9389
contentBlocks = append(contentBlocks, &model.ToolResultBlock{
9490
ID: interpreterResult.ID,
@@ -136,7 +132,7 @@ func ConvertMemoryMessageToProto(m *memory.Message) (*v1.Message, error) {
136132
return nil, fmt.Errorf("failed to unmarshal code interpreter call block: %w", err)
137133
}
138134

139-
var interpreterArgs codeact.InterpreterArgs
135+
var interpreterArgs codeact.InterpreterInput
140136
err = json.Unmarshal(toolCall.Args, &interpreterArgs)
141137
if err != nil {
142138
return nil, fmt.Errorf("failed to unmarshal code interpreter args: %w", err)
@@ -696,3 +692,16 @@ func ConvertModelUsageToMemory(usage *model.Usage) *types.MessageUsage {
696692
CacheWriteTokens: usage.CacheWriteTokens,
697693
}
698694
}
695+
696+
func convertTaskPhaseToMemory(phase TaskPhase) types.TaskPhase {
697+
switch phase {
698+
case TaskPhaseAwaitInput:
699+
return types.TaskPhaseAwaiting
700+
case TaskPhaseExecuteTools, TaskPhaseInvokeModel:
701+
return types.TaskPhaseRunning
702+
case TaskPhaseSuspended:
703+
return types.TaskPhaseSuspended
704+
}
705+
706+
return types.TaskPhaseUnspecified
707+
}

backend/agent/queue_metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type workqueueMetricsProvider struct {
1616
retries *prometheus.CounterVec
1717
}
1818

19-
func newWorkqueueMetricsProvider(registry *prometheus.Registry) *workqueueMetricsProvider {
19+
func newWorkqueueMetricsProvider(registry prometheus.Registerer) *workqueueMetricsProvider {
2020
p := &workqueueMetricsProvider{
2121
depth: prometheus.NewGaugeVec(prometheus.GaugeOpts{
2222
Subsystem: "workqueue",

0 commit comments

Comments
 (0)