-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathexecutor.go
More file actions
1412 lines (1257 loc) · 53.7 KB
/
executor.go
File metadata and controls
1412 lines (1257 loc) · 53.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package executor
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/steveyegge/vc/internal/ai"
"github.com/steveyegge/vc/internal/config"
"github.com/steveyegge/vc/internal/control"
"github.com/steveyegge/vc/internal/cost"
"github.com/steveyegge/vc/internal/deduplication"
"github.com/steveyegge/vc/internal/events"
"github.com/steveyegge/vc/internal/gates"
"github.com/steveyegge/vc/internal/git"
"github.com/steveyegge/vc/internal/health"
"github.com/steveyegge/vc/internal/sandbox"
"github.com/steveyegge/vc/internal/storage"
"github.com/steveyegge/vc/internal/storage/beads"
"github.com/steveyegge/vc/internal/types"
"github.com/steveyegge/vc/internal/watchdog"
)
// SelfHealingMode represents the self-healing state machine state
type SelfHealingMode int
const (
// ModeHealthy indicates normal operation - baseline quality gates passing
ModeHealthy SelfHealingMode = iota
// ModeSelfHealing indicates baseline failed and executor is actively trying to fix it
ModeSelfHealing
// ModeEscalated indicates baseline is broken and needs human intervention
ModeEscalated
)
// String returns a human-readable string representation of the mode
func (m SelfHealingMode) String() string {
switch m {
case ModeHealthy:
return "HEALTHY"
case ModeSelfHealing:
return "SELF_HEALING"
case ModeEscalated:
return "ESCALATED"
default:
return fmt.Sprintf("UNKNOWN(%d)", m)
}
}
// Executor manages the issue processing event loop
type Executor struct {
store storage.Storage
supervisor *ai.Supervisor
watchdog *watchdog.Watchdog // Unified watchdog instance (vc-mq3c)
monitor *watchdog.Monitor // Standalone monitor when watchdog disabled (vc-mq3c)
watchdogConfig *watchdog.WatchdogConfig // Watchdog config (needed by result processor)
sandboxMgr sandbox.Manager
healthRegistry *health.MonitorRegistry
preFlightChecker *PreFlightChecker // Preflight quality gates checker (vc-196)
deduplicator deduplication.Deduplicator // Shared deduplicator for sandbox manager and results processor (vc-137)
gitOps git.GitOperations // Git operations for auto-commit (vc-136)
messageGen *git.MessageGenerator // Commit message generator (vc-136)
qaWorker *QualityGateWorker // QA worker for quality gate execution (vc-254)
costTracker *cost.Tracker // Cost budget tracker (vc-e3s7)
loopDetector *LoopDetector // Loop detector for unproductive patterns (vc-0vfg)
controlServer *control.Server // Control server for pause/resume commands (vc-00cu)
interruptMgr *InterruptManager // Interrupt manager for task pause/resume (vc-00cu)
config *Config
instanceID string
hostname string
pid int
version string
// Control channels
stopCh chan struct{}
doneCh chan struct{}
heartbeatStopCh chan struct{} // Separate channel for heartbeat shutdown (vc-m4od)
heartbeatDoneCh chan struct{} // Signals when heartbeat goroutine finished (vc-m4od)
cleanupStopCh chan struct{} // Separate channel for cleanup goroutine shutdown
cleanupDoneCh chan struct{} // Signals when cleanup goroutine finished
eventCleanupStopCh chan struct{} // Separate channel for event cleanup shutdown
eventCleanupDoneCh chan struct{} // Signals when event cleanup goroutine finished
// Configuration
pollInterval time.Duration
heartbeatPeriod time.Duration // vc-m4od: Period for heartbeat updates
cleanupInterval time.Duration
staleThreshold time.Duration
instanceCleanupAge time.Duration
instanceCleanupKeep int
enableAISupervision bool
enableQualityGates bool
enableSandboxes bool
enableHealthMonitoring bool
enableIterativeRefinement bool // Enable iterative refinement for assessment and analysis (vc-43kd, vc-t9ls)
enableQualityGateWorker bool
workingDir string
// State
mu sync.RWMutex
running bool
selfHealingMsgLast time.Time // Last time we printed the self-healing mode message (for throttling)
qaWorkersWg sync.WaitGroup // Tracks active QA worker goroutines for graceful shutdown (vc-0d58)
// Self-healing state machine (vc-23t0)
selfHealingMode SelfHealingMode // Current state in the self-healing state machine
modeMutex sync.RWMutex // Protects selfHealingMode and modeChangedAt
modeChangedAt time.Time // When the mode last changed (for escalation thresholds)
// Escalation tracking (vc-h8b8)
escalationTrackers map[string]*escalationTracker // Maps baseline issue ID to escalation state
escalationMutex sync.RWMutex // Protects escalationTrackers map
// Self-healing progress tracking (vc-ipoj)
selfHealingLastProgress time.Time // Last time we made progress (claimed or completed baseline work)
selfHealingProgressMutex sync.RWMutex
selfHealingNoWorkCount int // Consecutive iterations with no work found
selfHealingDeadlockIssue string // ID of escalation issue created for deadlock (empty if none)
// Steady state polling (vc-onch)
basePollInterval time.Duration // Original poll interval (5s)
currentPollInterval time.Duration // Dynamic poll interval (increases in steady state)
steadyStateCount int // Consecutive polls in steady state
lastGitCommit string // Last seen git commit hash
steadyStateMutex sync.RWMutex // Protects steady state fields
}
// getSelfHealingMode returns the current self-healing mode state (thread-safe)
func (e *Executor) getSelfHealingMode() SelfHealingMode {
e.modeMutex.RLock()
defer e.modeMutex.RUnlock()
return e.selfHealingMode
}
// restoreSelfHealingMode checks if there are open baseline-failure issues and restores mode (vc-556f)
// This ensures executor continues in self-healing mode after restarts when baseline is still broken
func (e *Executor) restoreSelfHealingMode(ctx context.Context) error {
// Query for open baseline-failure issues
status := types.StatusOpen
filter := types.IssueFilter{
Labels: []string{"baseline-failure"},
Status: &status,
}
baselineIssues, err := e.store.SearchIssues(ctx, "", filter)
if err != nil {
return fmt.Errorf("failed to query baseline issues: %w", err)
}
// If there are open baseline-failure issues, enter self-healing mode
if len(baselineIssues) > 0 {
e.modeMutex.Lock()
oldMode := e.selfHealingMode
e.selfHealingMode = ModeSelfHealing
e.modeChangedAt = time.Now()
e.modeMutex.Unlock()
// Persist to database
if err := e.store.UpdateSelfHealingMode(ctx, e.instanceID, "SELF_HEALING"); err != nil {
return fmt.Errorf("failed to persist restored mode: %w", err)
}
fmt.Printf("ℹ️ Restored self-healing mode from previous state (%d open baseline issue(s))\n", len(baselineIssues))
// Emit event for tracking
e.logEvent(ctx, events.EventTypeExecutorSelfHealingMode, events.SeverityInfo, "SYSTEM",
fmt.Sprintf("Restored SELF_HEALING mode on startup (%d baseline issues)", len(baselineIssues)),
map[string]interface{}{
"from_mode": oldMode.String(),
"restored_mode": "SELF_HEALING",
"baseline_issues": len(baselineIssues),
"timestamp": time.Now().Format(time.RFC3339),
})
}
return nil
}
// getModeChangedAt returns when the mode last changed (thread-safe)
func (e *Executor) getModeChangedAt() time.Time {
e.modeMutex.RLock()
defer e.modeMutex.RUnlock()
return e.modeChangedAt
}
// transitionToHealthy transitions to HEALTHY state (baseline passing)
func (e *Executor) transitionToHealthy(ctx context.Context) {
e.modeMutex.Lock()
oldMode := e.selfHealingMode
if oldMode == ModeHealthy {
e.modeMutex.Unlock()
return // Already healthy, no transition needed
}
e.selfHealingMode = ModeHealthy
e.modeChangedAt = time.Now()
e.modeMutex.Unlock()
// Clear all escalation trackers since baseline is now healthy (vc-h8b8)
e.clearAllTrackers()
// Persist mode change to database (vc-556f)
if err := e.store.UpdateSelfHealingMode(ctx, e.instanceID, "HEALTHY"); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to persist self-healing mode transition: %v\n", err)
}
// Log transition
fmt.Printf("✓ State transition: %s → HEALTHY (baseline quality gates passing)\n", oldMode)
// Emit activity feed event
e.logEvent(ctx, events.EventTypeExecutorSelfHealingMode, events.SeverityInfo, "SYSTEM",
fmt.Sprintf("Executor transitioned from %s to HEALTHY", oldMode),
map[string]interface{}{
"from_mode": oldMode.String(),
"to_mode": "HEALTHY",
"timestamp": time.Now().Format(time.RFC3339),
})
}
// transitionToSelfHealing transitions to SELF_HEALING state (baseline failed)
func (e *Executor) transitionToSelfHealing(ctx context.Context) {
e.modeMutex.Lock()
oldMode := e.selfHealingMode
if oldMode == ModeSelfHealing {
e.modeMutex.Unlock()
return // Already in self-healing, no transition needed
}
e.selfHealingMode = ModeSelfHealing
e.modeChangedAt = time.Now()
e.modeMutex.Unlock()
// Initialize self-healing progress tracking (vc-ipoj)
e.selfHealingProgressMutex.Lock()
e.selfHealingLastProgress = time.Now()
e.selfHealingNoWorkCount = 0
e.selfHealingDeadlockIssue = ""
e.selfHealingProgressMutex.Unlock()
// Persist mode change to database (vc-556f)
if err := e.store.UpdateSelfHealingMode(ctx, e.instanceID, "SELF_HEALING"); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to persist self-healing mode transition: %v\n", err)
}
// Log transition
fmt.Printf("⚠️ State transition: %s → SELF_HEALING (baseline failed, attempting fix)\n", oldMode)
// Emit activity feed event
e.logEvent(ctx, events.EventTypeExecutorSelfHealingMode, events.SeverityWarning, "SYSTEM",
fmt.Sprintf("Executor transitioned from %s to SELF_HEALING", oldMode),
map[string]interface{}{
"from_mode": oldMode.String(),
"to_mode": "SELF_HEALING",
"timestamp": time.Now().Format(time.RFC3339),
})
}
// transitionToEscalated transitions to ESCALATED state (needs human intervention)
func (e *Executor) transitionToEscalated(ctx context.Context, reason string) {
e.modeMutex.Lock()
oldMode := e.selfHealingMode
if oldMode == ModeEscalated {
e.modeMutex.Unlock()
return // Already escalated, no transition needed
}
e.selfHealingMode = ModeEscalated
e.modeChangedAt = time.Now()
e.modeMutex.Unlock()
// Persist mode change to database (vc-556f)
if err := e.store.UpdateSelfHealingMode(ctx, e.instanceID, "ESCALATED"); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to persist self-healing mode transition: %v\n", err)
}
// Log transition with reason
fmt.Printf("🚨 State transition: %s → ESCALATED (reason: %s)\n", oldMode, reason)
// Emit activity feed event
e.logEvent(ctx, events.EventTypeExecutorSelfHealingMode, events.SeverityCritical, "SYSTEM",
fmt.Sprintf("Executor transitioned from %s to ESCALATED: %s", oldMode, reason),
map[string]interface{}{
"from_mode": oldMode.String(),
"to_mode": "ESCALATED",
"reason": reason,
"timestamp": time.Now().Format(time.RFC3339),
})
}
// Config holds executor configuration
//
// Execution Modes (vc-m5qr):
// The executor supports two execution modes:
//
// 1. Executor Mode (default): Full autonomous loop
// - Polls beads for ready issues
// - Creates sandbox worktrees for isolation
// - Tracks executor instances
// - Continuous operation until stopped
//
// 2. Polecat Mode: Single-task execution for Gastown integration
// - Accepts task from CLI args or stdin
// - Uses polecat's existing clone/branch (no sandboxes)
// - Single execution, then exits
// - JSON output to stdout
//
// Supported Degraded Modes (vc-q5ve):
// The executor supports several degraded operating modes when optional components fail to initialize:
//
// 1. No AI Supervision (EnableAISupervision=false or init failure):
// - Issues are claimed and executed without assessment/analysis
// - No loop detection
// - No health monitoring
// - No auto-commit message generation
// - No deduplication
//
// 2. No Quality Gates (EnableQualityGates=false):
// - No preflight checks
// - No quality gate enforcement
// - No QA worker
//
// 3. No Sandboxes (EnableSandboxes=false or init failure):
// - Work executes directly in parent repo (less isolation)
// - No sandbox cleanup
// - Higher risk of repo contamination
//
// 4. No Git Operations (git init failure):
// - No auto-commit
// - No auto-PR
// - Test coverage analysis disabled
// - Code quality analysis disabled
//
// 5. No Cost Tracking (cost config disabled or init failure):
// - Budget enforcement disabled
// - AI calls proceed without cost checks
//
// Minimum Viable Configuration:
// - Store must be non-nil
// - All timing values (PollInterval, HeartbeatPeriod, etc.) must be non-negative
// - Dependent features must have their requirements enabled (see Validate())
//
// The executor will log warnings when optional components fail but will continue
// with reduced functionality. Use Validate() to check for configuration errors
// before calling New().
type Config struct {
// Execution mode (vc-m5qr: Gastown integration)
Mode types.ExecutionMode
Store storage.Storage
Version string
PollInterval time.Duration
HeartbeatPeriod time.Duration
CleanupInterval time.Duration // How often to check for stale instances (default: 5 minutes)
StaleThreshold time.Duration // How long before an instance is considered stale (default: 5 minutes)
EnableAISupervision bool // Enable AI assessment and analysis (default: true)
EnableQualityGates bool // Enable quality gates enforcement (default: true)
GatesTimeout time.Duration // Quality gates timeout (default: 5 minutes, env: VC_QUALITY_GATES_TIMEOUT, vc-xcfw)
EnableAutoCommit bool // Enable automatic git commits after successful execution (default: false, vc-142)
EnableAutoPR bool // Enable automatic PR creation after successful commit (default: false, requires EnableAutoCommit, vc-389e)
EnableSandboxes bool // Enable sandbox isolation (default: true, vc-144)
KeepSandboxOnFailure bool // Keep failed sandboxes for debugging (default: false)
KeepBranches bool // Keep mission branches after cleanup (default: false)
SandboxRetentionCount int // Number of failed sandboxes to keep (default: 3, 0 = keep all)
EnableBlockerPriority bool // Enable blocker-first prioritization (default: true, vc-161)
EnableHealthMonitoring bool // Enable health monitoring (default: false, opt-in)
EnableQualityGateWorker bool // Enable QA worker for quality gate execution (default: true, vc-254)
HealthConfigPath string // Path to health_monitors.yaml (default: ".beads/health_monitors.yaml")
HealthStatePath string // Path to health_state.json (default: ".beads/health_state.json")
WorkingDir string // Working directory for quality gates (default: ".")
SandboxRoot string // Root directory for sandboxes (default: ".sandboxes")
ParentRepo string // Parent repository path (default: ".")
DefaultBranch string // Default git branch for sandboxes (default: "main")
WatchdogConfig *watchdog.WatchdogConfig // Watchdog configuration (default: conservative defaults)
DeduplicationConfig *deduplication.Config // Deduplication configuration (default: sensible defaults, nil = use defaults)
EventRetentionConfig *config.EventRetentionConfig // Event retention and cleanup configuration (default: sensible defaults, nil = use defaults)
InstanceCleanupAge time.Duration // How old stopped instances must be before deletion (default: 24h)
InstanceCleanupKeep int // Minimum number of stopped instances to keep (default: 10, 0 = keep none)
MaxEscalationAttempts int // Maximum attempts before escalating baseline issues (default: 5, vc-h8b8)
MaxEscalationDuration time.Duration // Maximum duration in self-healing mode before escalating (default: 24h, vc-h8b8)
MaxIncompleteRetries int // Maximum retries for incomplete work before escalation (default: 1, vc-hsfz)
// Self-healing configuration (vc-tn9c)
SelfHealingMaxAttempts int // Maximum attempts before escalating (same as MaxEscalationAttempts, default: 5)
SelfHealingMaxDuration time.Duration // Maximum duration before escalating (same as MaxEscalationDuration, default: 24h)
SelfHealingRecheckInterval time.Duration // How often to recheck in self-healing mode (default: 5m)
SelfHealingVerboseLogging bool // Enable verbose logging for self-healing decisions (default: true)
SelfHealingDeadlockTimeout time.Duration // Timeout for detecting deadlocked baselines (default: 30m, vc-ipoj)
// Loop detector configuration (vc-0vfg)
LoopDetectorConfig *LoopDetectorConfig // Loop detector configuration (default: sensible defaults, nil = use defaults)
// Bootstrap mode configuration (vc-b027)
EnableBootstrapMode bool // Enable bootstrap mode during quota crisis (default: false, opt-in)
BootstrapModeLabels []string // Labels that trigger bootstrap mode (default: ["quota-crisis"])
BootstrapModeTitleKeywords []string // Title keywords that trigger bootstrap mode (default: ["quota", "budget", "cost", "API limit"])
// Iterative refinement configuration (vc-43kd, vc-t9ls)
EnableIterativeRefinement bool // Enable iterative refinement for assessment and analysis phases (default: true)
// Control server configuration (vc-00cu)
EnableControlServer bool // Enable control server for pause/resume commands (default: true)
ControlSocketPath string // Path to control socket (default: ".vc/executor.sock")
}
// Validate checks the configuration for invalid combinations (vc-q5ve)
// Returns an error if the configuration is invalid or unsupported
func (c *Config) Validate() error {
// Minimum required configuration: Store must be present
if c.Store == nil {
return fmt.Errorf("storage is required")
}
// Auto-PR requires auto-commit (vc-389e)
if c.EnableAutoPR && !c.EnableAutoCommit {
return fmt.Errorf("EnableAutoPR requires EnableAutoCommit to be enabled")
}
// Quality gate worker requires quality gates
if c.EnableQualityGateWorker && !c.EnableQualityGates {
return fmt.Errorf("EnableQualityGateWorker requires EnableQualityGates to be enabled")
}
// Health monitoring requires AI supervision (monitors use AI for analysis)
if c.EnableHealthMonitoring && !c.EnableAISupervision {
return fmt.Errorf("EnableHealthMonitoring requires EnableAISupervision to be enabled")
}
// Auto-commit requires git operations (implicit, will fail during init, but we can validate)
// This is a soft requirement - we'll just log a warning during initialization
// Validate timing configurations
if c.PollInterval < 0 {
return fmt.Errorf("PollInterval must be non-negative, got %v", c.PollInterval)
}
if c.HeartbeatPeriod < 0 {
return fmt.Errorf("HeartbeatPeriod must be non-negative, got %v", c.HeartbeatPeriod)
}
if c.CleanupInterval < 0 {
return fmt.Errorf("CleanupInterval must be non-negative, got %v", c.CleanupInterval)
}
if c.StaleThreshold < 0 {
return fmt.Errorf("StaleThreshold must be non-negative, got %v", c.StaleThreshold)
}
// Validate self-healing configuration
if c.SelfHealingMaxAttempts < 0 {
return fmt.Errorf("SelfHealingMaxAttempts must be non-negative, got %d", c.SelfHealingMaxAttempts)
}
if c.SelfHealingMaxDuration < 0 {
return fmt.Errorf("SelfHealingMaxDuration must be non-negative, got %v", c.SelfHealingMaxDuration)
}
if c.SelfHealingRecheckInterval < 0 {
return fmt.Errorf("SelfHealingRecheckInterval must be non-negative, got %v", c.SelfHealingRecheckInterval)
}
// Sandbox retention count must be non-negative
if c.SandboxRetentionCount < 0 {
return fmt.Errorf("SandboxRetentionCount must be non-negative, got %d", c.SandboxRetentionCount)
}
return nil
}
// DefaultConfig returns default executor configuration
func DefaultConfig() *Config {
return &Config{
Version: "0.1.0",
PollInterval: 5 * time.Second,
HeartbeatPeriod: 30 * time.Second,
CleanupInterval: 5 * time.Minute,
StaleThreshold: 5 * time.Minute,
InstanceCleanupAge: 24 * time.Hour,
InstanceCleanupKeep: 10,
EnableAISupervision: true,
EnableQualityGates: true,
GatesTimeout: getEnvDuration("VC_QUALITY_GATES_TIMEOUT", 5*time.Minute), // Configurable timeout (vc-xcfw)
EnableSandboxes: true, // Changed to true for safety (vc-144)
KeepSandboxOnFailure: false,
KeepBranches: false,
SandboxRetentionCount: 3,
EnableBlockerPriority: true, // Enable blocker-first prioritization by default (vc-161)
EnableHealthMonitoring: false, // Opt-in for now
EnableQualityGateWorker: true, // Enable QA worker by default (vc-254)
HealthConfigPath: ".beads/health_monitors.yaml",
HealthStatePath: ".beads/health_state.json",
WorkingDir: ".",
SandboxRoot: ".sandboxes",
ParentRepo: ".",
DefaultBranch: "main",
// Self-healing / Escalation configuration (vc-h8b8, vc-tn9c)
// MaxEscalation* fields are legacy, use SelfHealing* fields for consistency
MaxEscalationAttempts: getEnvInt("VC_SELF_HEALING_MAX_ATTEMPTS", 5),
MaxEscalationDuration: getEnvDuration("VC_SELF_HEALING_MAX_DURATION", 24*time.Hour),
MaxIncompleteRetries: getEnvInt("VC_MAX_INCOMPLETE_RETRIES", 1), // vc-hsfz
SelfHealingMaxAttempts: getEnvInt("VC_SELF_HEALING_MAX_ATTEMPTS", 5),
SelfHealingMaxDuration: getEnvDuration("VC_SELF_HEALING_MAX_DURATION", 24*time.Hour),
SelfHealingRecheckInterval: getEnvDuration("VC_SELF_HEALING_RECHECK_INTERVAL", 5*time.Minute),
SelfHealingVerboseLogging: getEnvBool("VC_SELF_HEALING_VERBOSE_LOGGING", true),
SelfHealingDeadlockTimeout: getEnvDuration("VC_SELF_HEALING_DEADLOCK_TIMEOUT", 30*time.Minute),
// Bootstrap mode (vc-b027) - disabled by default (opt-in)
EnableBootstrapMode: getEnvBool("VC_ENABLE_BOOTSTRAP_MODE", false),
BootstrapModeLabels: getEnvStringSlice("VC_BOOTSTRAP_MODE_LABELS", []string{"quota-crisis"}),
BootstrapModeTitleKeywords: getEnvStringSlice("VC_BOOTSTRAP_MODE_TITLE_KEYWORDS", []string{"quota", "budget", "cost", "API limit"}),
// Iterative refinement (vc-43kd, vc-t9ls) - enabled by default
EnableIterativeRefinement: getEnvBool("VC_ENABLE_ITERATIVE_REFINEMENT", true),
}
}
// New creates a new executor instance
func New(cfg *Config) (*Executor, error) {
// Validate configuration before initialization (vc-q5ve)
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid configuration: %w", err)
}
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("failed to get hostname: %w", err)
}
// Set default working directory if not specified
workingDir := cfg.WorkingDir
if workingDir == "" {
workingDir = "."
}
// Set default sandbox root if not specified
sandboxRoot := cfg.SandboxRoot
if sandboxRoot == "" {
sandboxRoot = ".sandboxes"
}
// Set default parent repo if not specified
parentRepo := cfg.ParentRepo
if parentRepo == "" {
parentRepo = "."
}
// Set default heartbeat period if not specified (vc-m4od)
heartbeatPeriod := cfg.HeartbeatPeriod
if heartbeatPeriod == 0 {
heartbeatPeriod = 30 * time.Second
}
// Set default cleanup interval if not specified
cleanupInterval := cfg.CleanupInterval
if cleanupInterval == 0 {
cleanupInterval = 5 * time.Minute
}
// Set default stale threshold if not specified
staleThreshold := cfg.StaleThreshold
if staleThreshold == 0 {
staleThreshold = 5 * time.Minute
}
// Set default instance cleanup age if not specified
instanceCleanupAge := cfg.InstanceCleanupAge
if instanceCleanupAge == 0 {
instanceCleanupAge = 24 * time.Hour
}
// Set default instance cleanup keep count if not specified
instanceCleanupKeep := cfg.InstanceCleanupKeep
if instanceCleanupKeep == 0 {
instanceCleanupKeep = 10
}
e := &Executor{
store: cfg.Store,
config: cfg,
instanceID: uuid.New().String(),
hostname: hostname,
pid: os.Getpid(),
version: cfg.Version,
pollInterval: cfg.PollInterval,
heartbeatPeriod: heartbeatPeriod,
cleanupInterval: cleanupInterval,
staleThreshold: staleThreshold,
instanceCleanupAge: instanceCleanupAge,
instanceCleanupKeep: instanceCleanupKeep,
enableAISupervision: cfg.EnableAISupervision,
enableQualityGates: cfg.EnableQualityGates,
enableSandboxes: cfg.EnableSandboxes,
enableQualityGateWorker: cfg.EnableQualityGateWorker,
enableIterativeRefinement: cfg.EnableIterativeRefinement,
workingDir: workingDir,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
heartbeatStopCh: make(chan struct{}),
heartbeatDoneCh: make(chan struct{}),
cleanupStopCh: make(chan struct{}),
cleanupDoneCh: make(chan struct{}),
eventCleanupStopCh: make(chan struct{}),
eventCleanupDoneCh: make(chan struct{}),
// Initialize self-healing state machine (vc-23t0)
selfHealingMode: ModeHealthy,
modeChangedAt: time.Now(),
// Initialize escalation tracking (vc-h8b8)
escalationTrackers: make(map[string]*escalationTracker),
// Initialize steady state polling (vc-onch)
basePollInterval: cfg.PollInterval,
currentPollInterval: cfg.PollInterval,
steadyStateCount: 0,
}
// Initialize cost tracker first (vc-e3s7)
// This is initialized even if AI supervision is disabled, for budget monitoring
var costTracker *cost.Tracker
costConfig := cost.LoadFromEnv()
if costConfig.Enabled {
tracker, err := cost.NewTracker(costConfig, cfg.Store)
if err != nil {
// Log warning but continue without cost tracking
fmt.Fprintf(os.Stderr, "Warning: failed to initialize cost tracker: %v (continuing without cost budgeting)\n", err)
} else {
costTracker = tracker
fmt.Printf("✓ Cost budget tracking enabled (limit: %d tokens/hour, $%.2f/hour)\n",
costConfig.MaxTokensPerHour, costConfig.MaxCostPerHour)
}
}
e.costTracker = costTracker
// Initialize AI supervisor if enabled (do this after cost tracker)
if cfg.EnableAISupervision {
supervisor, err := ai.NewSupervisor(&ai.Config{
Store: cfg.Store,
CostTracker: costTracker, // Pass cost tracker to supervisor (vc-e3s7)
})
if err != nil {
// Don't fail - just disable AI supervision
fmt.Fprintf(os.Stderr, "Warning: failed to initialize AI supervisor: %v (continuing without AI supervision)\n", err)
e.enableAISupervision = false
} else {
e.supervisor = supervisor
}
}
// Initialize git operations for auto-commit (vc-136)
// This is required for auto-commit, test coverage analysis, and code quality analysis
gitOps, err := git.NewGit(context.Background())
if err != nil {
// Don't fail - just log warning and continue without git operations
fmt.Fprintf(os.Stderr, "Warning: failed to initialize git operations: %v (auto-commit disabled)\n", err)
} else {
e.gitOps = gitOps
}
// Initialize message generator for auto-commit (vc-136)
// Only if we have AI supervisor (need API client)
if e.supervisor != nil {
// Get Anthropic API key
apiKey := os.Getenv("ANTHROPIC_API_KEY")
if apiKey != "" {
// Create Anthropic client for message generation (vc-35: using Haiku for cost efficiency)
client := ai.NewAnthropicClient(apiKey)
e.messageGen = git.NewMessageGenerator(&client, ai.GetSimpleTaskModel())
} else {
fmt.Fprintf(os.Stderr, "Warning: ANTHROPIC_API_KEY not set (auto-commit message generation disabled)\n")
}
}
// Create deduplicator if we have a supervisor (vc-137, vc-148)
// Shared by both sandbox manager and results processor
if e.supervisor != nil {
// Get deduplication config from executor config or use defaults
dedupConfig := deduplication.DefaultConfig()
if cfg.DeduplicationConfig != nil {
dedupConfig = *cfg.DeduplicationConfig
}
var err error
e.deduplicator, err = deduplication.NewAIDeduplicator(e.supervisor, cfg.Store, dedupConfig)
if err != nil {
// Don't fail - just continue without deduplication
fmt.Fprintf(os.Stderr, "Warning: failed to create deduplicator: %v (continuing without deduplication)\n", err)
e.deduplicator = nil
}
}
// Initialize sandbox manager if enabled
if cfg.EnableSandboxes {
sandboxMgr, err := sandbox.NewManager(sandbox.Config{
SandboxRoot: sandboxRoot,
ParentRepo: parentRepo,
MainDB: cfg.Store,
Deduplicator: e.deduplicator, // Use shared deduplicator (vc-137)
DeduplicationConfig: cfg.DeduplicationConfig,
PreserveOnFailure: cfg.KeepSandboxOnFailure, // Preserve failed sandboxes for debugging (vc-134)
KeepBranches: cfg.KeepBranches, // Keep mission branches after cleanup (vc-134)
})
if err != nil {
// Don't fail - just disable sandboxes
fmt.Fprintf(os.Stderr, "Warning: failed to initialize sandbox manager: %v (continuing without sandboxes)\n", err)
e.enableSandboxes = false
} else {
e.sandboxMgr = sandboxMgr
// Prune orphaned worktrees on startup (vc-194)
// This cleans up worktrees left behind by previous crashes
ctx := context.Background()
if err := sandbox.PruneWorktrees(ctx, parentRepo); err != nil {
// Log warning but don't fail - prune is best-effort
fmt.Fprintf(os.Stderr, "Warning: failed to prune worktrees on startup: %v\n", err)
}
}
}
// Initialize unified watchdog system (vc-mq3c)
// Set up watchdog config first
e.watchdogConfig = cfg.WatchdogConfig
if e.watchdogConfig == nil {
e.watchdogConfig = watchdog.DefaultWatchdogConfig()
}
// Only create full watchdog if AI supervision is enabled (since it requires supervisor)
if e.enableAISupervision && e.supervisor != nil {
wd, err := watchdog.NewWatchdog(&watchdog.WatchdogDeps{
Store: cfg.Store,
Supervisor: e.supervisor,
ExecutorInstanceID: e.instanceID,
Config: e.watchdogConfig,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to initialize watchdog: %v (watchdog disabled)\n", err)
// Fall back to standalone monitor
e.monitor = watchdog.NewMonitor(watchdog.DefaultConfig())
} else {
e.watchdog = wd
}
} else {
// AI supervision disabled - create standalone monitor for telemetry
e.monitor = watchdog.NewMonitor(watchdog.DefaultConfig())
}
// Initialize health monitoring if enabled
if cfg.EnableHealthMonitoring {
// Set default paths if not specified
healthStatePath := cfg.HealthStatePath
if healthStatePath == "" {
healthStatePath = ".beads/health_state.json"
}
// Resolve relative to workingDir
if !filepath.IsAbs(healthStatePath) {
healthStatePath = filepath.Join(workingDir, healthStatePath)
}
// Create health registry
registry, err := health.NewMonitorRegistry(healthStatePath)
if err != nil {
// Don't fail - just disable health monitoring
fmt.Fprintf(os.Stderr, "Warning: failed to initialize health registry: %v (health monitoring disabled)\n", err)
e.enableHealthMonitoring = false
} else {
e.healthRegistry = registry
// Register monitors (requires supervisor for AI calls)
if e.supervisor != nil {
// Get project root
projectRoot, err := getProjectRootFromStore(cfg.Store)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to get project root: %v (health monitoring disabled)\n", err)
e.enableHealthMonitoring = false
} else {
// Register file size monitor
fileSizeMonitor, err := health.NewFileSizeMonitor(projectRoot, e.supervisor)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create file size monitor: %v\n", err)
} else {
if err := registry.Register(fileSizeMonitor); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to register file size monitor: %v\n", err)
}
}
// Register cruft detector
cruftDetector, err := health.NewCruftDetector(projectRoot, e.supervisor)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create cruft detector: %v\n", err)
} else {
if err := registry.Register(cruftDetector); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to register cruft detector: %v\n", err)
}
}
// Register build modernizer
buildModernizer, err := health.NewBuildModernizer(projectRoot, e.supervisor)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create build modernizer: %v\n", err)
} else {
if err := registry.Register(buildModernizer); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to register build modernizer: %v\n", err)
}
}
// Register CI/CD reviewer
cicdReviewer, err := health.NewCICDReviewer(projectRoot, e.supervisor)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create CI/CD reviewer: %v\n", err)
} else {
if err := registry.Register(cicdReviewer); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to register CI/CD reviewer: %v\n", err)
}
}
// Register dependency auditor
dependencyAuditor, err := health.NewDependencyAuditor(projectRoot, e.supervisor)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create dependency auditor: %v\n", err)
} else {
if err := registry.Register(dependencyAuditor); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to register dependency auditor: %v\n", err)
}
}
}
} else {
fmt.Fprintf(os.Stderr, "Warning: health monitoring requires AI supervision (health monitoring disabled)\n")
e.enableHealthMonitoring = false
}
}
}
// Initialize preflight quality gates checker (vc-196)
if cfg.EnableQualityGates {
// Load preflight configuration from environment
preFlightConfig, err := PreFlightConfigFromEnv()
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: invalid preflight configuration: %v (using defaults)\n", err)
preFlightConfig = DefaultPreFlightConfig()
}
preFlightConfig.WorkingDir = workingDir
// Get VCStorage from storage interface
vcStorage, ok := cfg.Store.(*beads.VCStorage)
if !ok {
fmt.Fprintf(os.Stderr, "Warning: storage is not VCStorage (preflight disabled)\n")
} else {
// Create gates runner for preflight checker
gatesRunner, err := gates.NewRunner(&gates.Config{
Store: cfg.Store,
Supervisor: e.supervisor, // Optional: for AI-driven recovery
WorkingDir: workingDir,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create gates runner: %v (preflight disabled)\n", err)
} else {
// Create preflight checker
preFlightChecker, err := NewPreFlightChecker(vcStorage, gatesRunner, preFlightConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create preflight checker: %v (preflight disabled)\n", err)
} else {
e.preFlightChecker = preFlightChecker
if preFlightConfig.Enabled {
fmt.Printf("✓ Preflight quality gates enabled (TTL: %v, mode: %s)\n",
preFlightConfig.CacheTTL, preFlightConfig.FailureMode)
}
}
}
}
}
// Initialize QA worker if enabled (vc-254)
if cfg.EnableQualityGateWorker && cfg.EnableQualityGates {
// Create gates runner for QA worker (separate from preflight runner)
gatesRunner, err := gates.NewRunner(&gates.Config{
Store: cfg.Store,
Supervisor: e.supervisor, // Optional: for AI-driven recovery
WorkingDir: workingDir, // Default working dir (will be overridden per-mission)
})
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create gates runner for QA worker: %v (QA worker disabled)\n", err)
e.enableQualityGateWorker = false
} else {
qaWorker, err := NewQualityGateWorker(&QualityGateWorkerConfig{
Store: cfg.Store,
Supervisor: e.supervisor,
WorkingDir: workingDir,
InstanceID: e.instanceID,
GatesRunner: gatesRunner,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create QA worker: %v (QA worker disabled)\n", err)
e.enableQualityGateWorker = false
} else {
e.qaWorker = qaWorker
fmt.Printf("✓ Quality gate worker enabled (parallel execution)\n")
}
}
}
// Initialize loop detector if AI supervision is enabled (vc-0vfg)
// Loop detector requires AI supervisor to analyze activity patterns
if e.enableAISupervision && e.supervisor != nil {
loopDetectorConfig := cfg.LoopDetectorConfig
if loopDetectorConfig == nil {
loopDetectorConfig = DefaultLoopDetectorConfig()
}
loopDetector := NewLoopDetector(loopDetectorConfig, cfg.Store, e.supervisor, e.instanceID)
e.loopDetector = loopDetector
if loopDetectorConfig.Enabled {
fmt.Printf("✓ Loop detector enabled (check_interval=%v, lookback=%v, min_confidence=%.2f)\n",
loopDetectorConfig.CheckInterval, loopDetectorConfig.LookbackWindow, loopDetectorConfig.MinConfidenceThreshold)
}
}
// Initialize interrupt manager (vc-00cu)
// Always enabled - manages task pause/resume
e.interruptMgr = NewInterruptManager(e)
// Initialize control server for pause/resume commands (vc-00cu)
// Only create server if enabled in config (defaults to enabled)
enableControlServer := cfg.EnableControlServer
if enableControlServer {
// Determine socket path (.vc/executor.sock by default)
socketPath := cfg.ControlSocketPath
if socketPath == "" {
// Default to .vc/executor.sock in working directory
socketPath = filepath.Join(workingDir, ".vc", "executor.sock")
}
// Create control server with command handler
controlServer, err := control.NewServer(socketPath, func(cmd control.Command) (map[string]interface{}, error) {
// Dispatch commands to appropriate handlers
ctx := context.Background()
switch cmd.Type {
case "pause":
return e.interruptMgr.HandlePauseCommand(ctx, cmd.IssueID, cmd.Reason)
case "status":
return e.getExecutorStatus(), nil
default:
return nil, fmt.Errorf("unknown command type: %s", cmd.Type)
}
})
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create control server: %v (pause/resume disabled)\n", err)
} else {
e.controlServer = controlServer
fmt.Printf("✓ Control server initialized (socket: %s)\n", socketPath)
}
}
return e, nil
}
// Start begins the executor event loop
func (e *Executor) Start(ctx context.Context) error {
e.mu.Lock()
if e.running {
e.mu.Unlock()
return fmt.Errorf("executor is already running")
}
e.running = true
e.mu.Unlock()
// Register this executor instance (vc-556f: includes self-healing mode)
instance := &types.ExecutorInstance{
InstanceID: e.instanceID,
Hostname: e.hostname,
PID: e.pid,
Status: types.ExecutorStatusRunning,
StartedAt: time.Now(),
LastHeartbeat: time.Now(),
Version: e.version,
Metadata: "{}",
SelfHealingMode: e.getSelfHealingMode().String(), // Persist initial mode (usually HEALTHY)
}
if err := e.store.RegisterInstance(ctx, instance); err != nil {
e.mu.Lock()
e.running = false
e.mu.Unlock()
return fmt.Errorf("failed to register executor instance: %w", err)
}
// Clean up orphaned claims and stale instances on startup (vc-109)
// This runs synchronously before event loop starts to prevent claiming already-claimed issues
staleThresholdSecs := int(e.staleThreshold.Seconds())
cleaned, err := e.store.CleanupStaleInstances(ctx, staleThresholdSecs)
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to cleanup stale instances on startup: %v\n", err)
// Don't fail startup - log warning and continue
} else if cleaned > 0 {
fmt.Printf("Cleanup: Cleaned up %d stale/orphaned instance(s) on startup\n", cleaned)
}
// Clean up orphaned mission branches on startup (vc-135)
// This runs synchronously to ensure branches are cleaned before claiming work
if e.enableSandboxes && !e.config.KeepBranches {
if err := e.cleanupOrphanedBranches(ctx); err != nil {
// Log warning but don't fail startup
fmt.Fprintf(os.Stderr, "Warning: failed to cleanup orphaned branches: %v\n", err)
}
}
// Restore self-healing mode from previous instances if baseline is still broken (vc-556f)