diff --git a/.claude/agents/integration-fix.md b/.claude/agents/integration-fix.md new file mode 100644 index 0000000000..b0c6e1e3aa --- /dev/null +++ b/.claude/agents/integration-fix.md @@ -0,0 +1,70 @@ +--- +name: integration-fix +description: Fix integration test failures in a Redpanda Connect worktree +model: opus +allowed-tools: + - Agent + - Bash(git:*) + - Bash(go:*) + - Bash(golangci-lint:*) + - Bash(task:*) + - Edit + - Glob + - Grep + - Read + - Search + - TaskCreate + - TaskList + - TaskUpdate + - Write + - mcp__jira__jira_read +--- + +# Fix Agent + +You are fixing integration test failures in a Redpanda Connect git worktree. You receive a list of classified issues and the full failure logs. + +You are running autonomously, when facing ambiguity or tradeoffs: +- Make a decision and proceed. +- Document your reasoning in the commit message body or as a comment in the code (only if non-obvious). +- If multiple valid approaches exist, prefer the safer, more conservative option. +- Never stop to ask. Either fix it or skip it with a written explanation. + +## Issue Resolution + +Start by creating a task list (TaskCreate) with one task per issue from the "Issues to Fix" list. Update task status as you progress through each step. This gives visibility into the progress and ensures nothing is missed. + +For each issue: + +Loop (max 3 iterations — if validation doesn't pass, loop back; after 3 failures skip the issue): + +1. **Learn.** Read the triage classification, failure logs, the failing test, and the code under test. +2. **Fix.** Fix the root cause of the failure. Do not modify files outside the failing package unless the fix genuinely requires it. The fix should be targeted at the root cause: + - `test_infra`: fix the test infrastructure (e.g. container setup, test helper code), avoid modifying the production code unless the test is incorrect or can be significantly simplified by a minor change. + - `code_bug`: fix the production code bug, avoid modifying the test unless the test is incorrect or can be significantly simplified by a minor change. +3. **Validate.** + - Run `golangci-lint run --new-from-rev=HEAD ` and fix any lint errors. + - Run `go test -v -count=1 -timeout 5m -run -tags integration ` to validate the fix. +4. **Simplify.** If the patch is bigger than 20 lines (`git diff --stat HEAD`), run the `simplify` skill. After simplification, repeat step 3 to validate that the simplified patch still fixes the issue and passes lint. + +Then: + +6. Commit with a message following the project commit policy: + ``` + : + + + + Fixes CON-XXX + ``` + - `` is the component area in lowercase (e.g., `kafka`, `aws`, `sql`). + - `` starts lowercase, uses imperative mood (e.g., "fix flaky consumer test", not "fixed" or "fixes"). + - `Fixes CON-XXX` uses the `jira_key` from the triage entry. Omit this line if no `jira_key` is present. + +7. Mark task completed, or note why it was skipped. Move to the next issue. + +## Rules + +- One commit per issue. Do not combine fixes across issues. +- Never push. Only commit locally. +- diff --git a/.claude/agents/integration-triage.md b/.claude/agents/integration-triage.md new file mode 100644 index 0000000000..12e77a2a05 --- /dev/null +++ b/.claude/agents/integration-triage.md @@ -0,0 +1,45 @@ +--- +name: integration-triage +description: Classify integration test failures and track them in Jira +model: sonnet +allowed-tools: + - Glob + - Grep + - Read + - Search + - mcp__jira__jira_read + - mcp__jira__jira_schema + - mcp__jira__jira_write +--- + +# Triage Agent + +You are a triage agent for Redpanda Connect integration test failures. Your job is to classify each failure and ensure it is tracked in Jira. + +## Tools + +### Jira MCP + +You have access to Jira MCP tools for querying and creating issues. Use them to check existing subtasks under CON-381 and to create or comment on issues. + +- Project key: CON +- Parent issue: CON-381 +- When creating issues, include: test name, package path, full failure output, and your classification reasoning. +- When searching for duplicates, match on test name and failure pattern, not exact log output. +- Issue summary format: `: ` + +## Classification + +You receive `go test` failure outputs. For each failure: + +1. Read the failure output carefully. +2. **Read the code.** Before classifying, read the failing test and the production code it exercises. Use the package path and test name from the logs to locate the relevant files. This is essential for accurate classification. +3. Classify the failure: + - `test_infra`: The test infrastructure is broken (container setup, port mapping, wait strategy, test helper code, flaky timing). The production code is not at fault. + - `code_bug`: The production code has a bug that causes the test to fail. The test itself is correct. +4. Write a `description` that explains what went wrong and why. When multiple failures share the same underlying cause (e.g., Docker daemon not running, shared container startup failure), use the same description text so they can be grouped. +5. For each classified failure, check Jira: + - Search subtasks of CON-381 for an existing issue matching this failure. + - If a matching issue exists: add a comment with the failure logs and timestamp. Set `jira_key` to the existing issue key and `is_new` to false. + - If no matching issue exists: create a new subtask under CON-381 with the full failure logs, test name, package, and a clear description. Set `jira_key` to the new issue key and `is_new` to true. + - For failures sharing a root cause, a single Jira issue may cover the group. Reference the same `jira_key` for all entries in the group. diff --git a/.gitignore b/.gitignore index 5e5434de75..4dae8e22d9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ release_notes.md .codemogger .idea .integration +.integration-worktree .task .vscode .op diff --git a/cmd/tools/integration/cache.go b/cmd/tools/integration/cache.go index c799a23d39..fba6f35c0b 100644 --- a/cmd/tools/integration/cache.go +++ b/cmd/tools/integration/cache.go @@ -134,6 +134,9 @@ func checkCache(outFile string) PackageCache { lastAction := parseEvents(f, 0, EventCallbacks{ OnEvent: func(pe ParsedEvent) { + if pc.Package == "" && pe.Package != "" { + pc.Package = pe.Package + } switch pe.Action { case ActionRun: hasRun = true diff --git a/cmd/tools/integration/fix.go b/cmd/tools/integration/fix.go new file mode 100644 index 0000000000..60ca7ef1ae --- /dev/null +++ b/cmd/tools/integration/fix.go @@ -0,0 +1,128 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "errors" + "flag" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/redpanda-data/connect/v4/cmd/tools/integration/llmfix" +) + +func cmdFix(args []string) error { + fset := flag.NewFlagSet("fix", flag.ExitOnError) + fixTimeout := fset.Duration("fix-timeout", 30*time.Minute, "timeout per fix agent run") + + flags, positional := splitFlagsAndArgs(fset, args) + if err := fset.Parse(flags); err != nil { + return err + } + positional = append(positional, fset.Args()...) + + if len(positional) != 1 { + return errors.New("usage: integration fix ") + } + + filePath, err := filepath.Abs(positional[0]) + if err != nil { + return fmt.Errorf("resolving path: %w", err) + } + + cached := checkCache(filePath) + if cached.Package == "" { + return fmt.Errorf("no package found in %s", filePath) + } + if cached.Overall() != ResultFail { + log.Printf("no failures in %s", filePath) + return nil + } + + baseSHA, err := resolveHEAD() + if err != nil { + return err + } + + outputDir := filepath.Dir(filePath) + slug := pkgSlug(cached.Package) + tag := llmfix.NewTag(slug) + + dir, err := llmfix.CreateWorktree(tag, baseSHA) + if err != nil { + return fmt.Errorf("creating worktree: %w", err) + } + defer llmfix.CleanupWorktree(dir, tag) + + logPath := filepath.Join(outputDir, tag+".log") + logFile, err := os.Create(logPath) + if err != nil { + return fmt.Errorf("creating log file: %w", err) + } + defer logFile.Close() + + logger := log.New(io.MultiWriter(logFile, os.Stdout), "", log.LstdFlags) + + op := llmfix.NewOperator(llmfix.FixRequest{ + Tag: tag, + PkgPath: cached.Package, + TestOutput: dumpTestOutput(filePath, cached.Tests), + OutputDir: outputDir, + WorktreeDir: dir, + Timeout: *fixTimeout, + }, logger) + + if err := op.Run(); err != nil { + return err + } + + commits, err := llmfix.CherryPickCommits(dir, baseSHA) + if err != nil { + return fmt.Errorf("cherry-pick: %w", err) + } + for _, c := range commits { + logger.Printf("cherry-picked: %s", c) + } + return nil +} + +func dumpTestOutput(outFile string, tests []CacheEntry) string { + var buf strings.Builder + for _, t := range tests { + if t.Result != ResultFail { + continue + } + buf.WriteString("---\n") + fmt.Fprintf(&buf, "Test: %s\n", t.TestName) + fmt.Fprintf(&buf, "Location: %s:%d\n\n", outFile, t.FailLine) + + if t.FailLine > 0 { + var output bytes.Buffer + if err := showTestOutput(&output, outFile, t.FailLine); err != nil { + fmt.Fprintf(&buf, "(failed to extract output: %v)\n", err) + } else { + buf.WriteString(output.String()) + } + } + buf.WriteString("\n") + } + return buf.String() +} diff --git a/cmd/tools/integration/flag.go b/cmd/tools/integration/flag.go new file mode 100644 index 0000000000..f1f6e7ef5b --- /dev/null +++ b/cmd/tools/integration/flag.go @@ -0,0 +1,56 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "strings" +) + +// splitFlagsAndArgs separates flag-like tokens from positional arguments so +// Go's flag package — which stops at the first non-flag token — can parse +// interspersed usage like: +// +// run --fix amqp1 --debug +// run --output-dir /tmp kafka +// +// It consults fset to tell bool flags (which never consume the next token) +// from value-taking flags (which do, unless already written as --flag=value). +func splitFlagsAndArgs(fset *flag.FlagSet, args []string) (flags, positional []string) { + for i := 0; i < len(args); i++ { + a := args[i] + if !strings.HasPrefix(a, "-") { + positional = append(positional, a) + continue + } + flags = append(flags, a) + if strings.Contains(a, "=") { + continue + } + name := strings.TrimLeft(a, "-") + f := fset.Lookup(name) + if f == nil { + continue + } + if bf, ok := f.Value.(interface{ IsBoolFlag() bool }); ok && bf.IsBoolFlag() { + continue + } + if i+1 < len(args) { + i++ + flags = append(flags, args[i]) + } + } + return flags, positional +} diff --git a/cmd/tools/integration/flag_test.go b/cmd/tools/integration/flag_test.go new file mode 100644 index 0000000000..42750341c7 --- /dev/null +++ b/cmd/tools/integration/flag_test.go @@ -0,0 +1,100 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "testing" + + "github.com/stretchr/testify/assert" +) + +func newTestFlagSet() *flag.FlagSet { + fset := flag.NewFlagSet("test", flag.ContinueOnError) + fset.Bool("fix", false, "") + fset.Bool("clean", false, "") + fset.Bool("debug", false, "") + fset.Bool("race", false, "") + fset.String("output-dir", "", "") + fset.Int("loop", 0, "") + return fset +} + +func TestSplitFlagsAndArgs(t *testing.T) { + tests := []struct { + name string + args []string + wantFlags []string + wantPosn []string + }{ + { + name: "flags before positional", + args: []string{"--fix", "--clean", "amqp1"}, + wantFlags: []string{"--fix", "--clean"}, + wantPosn: []string{"amqp1"}, + }, + { + name: "interspersed flags and positional", + args: []string{"--fix", "amqp1", "--debug", "--race"}, + wantFlags: []string{"--fix", "--debug", "--race"}, + wantPosn: []string{"amqp1"}, + }, + { + name: "value flag with equals", + args: []string{"--output-dir=/tmp/out", "kafka"}, + wantFlags: []string{"--output-dir=/tmp/out"}, + wantPosn: []string{"kafka"}, + }, + { + name: "value flag with space", + args: []string{"--output-dir", "/tmp/out", "kafka"}, + wantFlags: []string{"--output-dir", "/tmp/out"}, + wantPosn: []string{"kafka"}, + }, + { + name: "int value flag with space interspersed", + args: []string{"--fix", "kafka", "--loop", "3", "--debug"}, + wantFlags: []string{"--fix", "--loop", "3", "--debug"}, + wantPosn: []string{"kafka"}, + }, + { + name: "multiple positional args", + args: []string{"--fix", "kafka", "redis", "--debug"}, + wantFlags: []string{"--fix", "--debug"}, + wantPosn: []string{"kafka", "redis"}, + }, + { + name: "all positional", + args: []string{"kafka", "redis"}, + wantPosn: []string{"kafka", "redis"}, + }, + { + name: "all flags", + args: []string{"--fix", "--debug", "--race"}, + wantFlags: []string{"--fix", "--debug", "--race"}, + }, + { + name: "empty", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + flags, posn := splitFlagsAndArgs(newTestFlagSet(), tt.args) + assert.Equal(t, tt.wantFlags, flags) + assert.Equal(t, tt.wantPosn, posn) + }) + } +} diff --git a/cmd/tools/integration/llmfix/README.md b/cmd/tools/integration/llmfix/README.md new file mode 100644 index 0000000000..55b3caa994 --- /dev/null +++ b/cmd/tools/integration/llmfix/README.md @@ -0,0 +1,57 @@ +# llmfix + +Self-healing for integration test failures. Runs Claude agents in parallel against failed Go packages, commits fixes in isolated git worktrees, and cherry-picks them back onto the caller's branch. + +## Components + +| File | Purpose | +|---|---| +| `manager.go` | Dispatches and supervises concurrent fix pipelines. Serializes cherry-picks. Persists agent status to `agents-status.json`. Recovers leftover worktrees. | +| `operator.go` | Runs the triage → fix pipeline for one package. | +| `claude.go` | Embeds prompts (`triage.md`, `fix.md`) and types the `claude -p` JSON envelope. | +| `git.go` | Worktree creation/cleanup and cherry-pick helpers. | +| `triage.md` | Prompt for the triage agent (model: `sonnet`). Classifies failures and syncs Jira CON-381 subtasks. | +| `fix.md` | Prompt for the fix agent (model: `opus`). Edits code, validates with `go test` + `golangci-lint`, and commits one issue per commit. | + +## Flow + +```mermaid +sequenceDiagram + participant Caller + participant Mgr as Manager + participant Op as Operator + participant Claude + participant Jira + + Caller->>Mgr: Dispatch(failed package) + Mgr->>Op: run in worktree (pinned to baseSHA) + + Op->>Claude: triage (sonnet) + Claude->>Jira: search / create CON-381 subtasks + Jira-->>Claude: issue keys + Claude-->>Op: classified issues + + Op->>Claude: fix (opus, in worktree) + Claude-->>Op: one commit per issue + + Op-->>Mgr: done + Mgr->>Caller: cherry-pick commits onto HEAD
(mutex-serialized) +``` + +## Key invariants + +- **Worktrees are pinned to `baseSHA`** so concurrent cherry-picks advancing `HEAD` don't pollute new worktrees. +- **Cherry-picks are mutex-serialized** across all goroutines — only one touches the caller's `HEAD` at a time. +- **One commit per issue.** The fix agent is instructed never to combine fixes. +- **Status is persisted** to `agents-status.json` so a re-run can retry agents that didn't reach `completed`. +- **Worktrees live under `.integration/worktree/`** and are recovered on startup if left over from a crashed run. + +## Outputs + +Under `/fix/`: + +- `agents.log` — interleaved log of all agents, each line prefixed by slug. +- `agents-status.json` — persisted per-slug status (`dispatched` / `completed` / `failed`). +- `-triage.json` — triage agent's classified issues. +- `.jsonl` — fix agent's stream-json transcript. +- `.stderr` — fix agent's stderr. diff --git a/cmd/tools/integration/llmfix/claude.go b/cmd/tools/integration/llmfix/claude.go new file mode 100644 index 0000000000..f635ce4505 --- /dev/null +++ b/cmd/tools/integration/llmfix/claude.go @@ -0,0 +1,76 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package llmfix + +import ( + _ "embed" + "encoding/json" +) + +var ( + //go:embed triage.md + triagePrompt string + + //go:embed fix.md + fixPrompt string +) + +// claudeEnvelope is the JSON envelope returned by `claude -p --output-format json`. +type claudeEnvelope struct { + IsError bool `json:"is_error"` + DurationMS int `json:"duration_ms"` + NumTurns int `json:"num_turns"` + Result string `json:"result"` + StopReason string `json:"stop_reason"` + SessionID string `json:"session_id"` + TotalCostUSD float64 `json:"total_cost_usd"` + StructuredOutput json.RawMessage `json:"structured_output"` + TerminalReason string `json:"terminal_reason"` +} + +type triageResult struct { + Issues []issue `json:"issues"` +} + +type issue struct { + Test string `json:"test"` + Package string `json:"package"` + Type string `json:"type"` + Description string `json:"description"` + JiraKey string `json:"jira_key,omitempty"` + IsNew bool `json:"is_new"` +} + +const triageResultSchema = `{ + "type": "object", + "required": ["issues"], + "properties": { + "issues": { + "type": "array", + "items": { + "type": "object", + "required": ["test", "package", "type", "description"], + "properties": { + "test": {"type": "string"}, + "package": {"type": "string"}, + "type": {"enum": ["test_infra", "code_bug"]}, + "description": {"type": "string"}, + "jira_key": {"type": "string"}, + "is_new": {"type": "boolean"} + } + } + } + } +}` diff --git a/cmd/tools/integration/llmfix/fix.md b/cmd/tools/integration/llmfix/fix.md new file mode 100644 index 0000000000..6f3f5d8f61 --- /dev/null +++ b/cmd/tools/integration/llmfix/fix.md @@ -0,0 +1,47 @@ +# Fix Agent + +You are fixing integration test failures in a Redpanda Connect git worktree. You receive a list of classified issues and the full failure logs. + +You are running autonomously, when facing ambiguity or tradeoffs: +- Make a decision and proceed. +- Document your reasoning in the commit message body or as a comment in the code (only if non-obvious). +- If multiple valid approaches exist, prefer the safer, more conservative option. +- Never stop to ask. Either fix it or skip it with a written explanation. + +## Issue Resolution + +Start by creating a task list (TaskCreate) with one task per issue from the "Issues to Fix" list. Update task status as you progress through each step. This gives visibility into the progress and ensures nothing is missed. + +For each issue: + +Loop (max 3 iterations — if validation doesn't pass, loop back; after 3 failures skip the issue): + +1. **Learn.** Read the triage classification, failure logs, the failing test, and the code under test. +2. **Fix.** Fix the root cause of the failure. Do not modify files outside the failing package unless the fix genuinely requires it. The fix should be targeted at the root cause: + - `test_infra`: fix the test infrastructure (e.g. container setup, test helper code), avoid modifying the production code unless the test is incorrect or can be significantly simplified by a minor change. + - `code_bug`: fix the production code bug, avoid modifying the test unless the test is incorrect or can be significantly simplified by a minor change. +3. **Validate.** + - Run `golangci-lint run --new-from-rev=HEAD ` and fix any lint errors. + - Run `go test -v -count=1 -timeout 5m -run -tags integration ` to validate the fix. +4. **Simplify.** If the patch is bigger than 20 lines (`git diff --stat HEAD`), run the `simplify` skill. After simplification, repeat step 3 to validate that the simplified patch still fixes the issue and passes lint. + +Then: + +6. Commit with a message following the project commit policy: + ``` + : + + + + Fixes CON-XXX + ``` + - `` is the component area in lowercase (e.g., `kafka`, `aws`, `sql`). + - `` starts lowercase, uses imperative mood (e.g., "fix flaky consumer test", not "fixed" or "fixes"). + - `Fixes CON-XXX` uses the `jira_key` from the triage entry. Omit this line if no `jira_key` is present. + +7. Mark task completed, or note why it was skipped. Move to the next issue. + +## Rules + +- One commit per issue. Do not combine fixes across issues. +- Never push. Only commit locally. diff --git a/cmd/tools/integration/llmfix/git.go b/cmd/tools/integration/llmfix/git.go new file mode 100644 index 0000000000..1ca12fbe45 --- /dev/null +++ b/cmd/tools/integration/llmfix/git.go @@ -0,0 +1,76 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package llmfix + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" +) + +const worktreeBase = ".integration/worktree" + +// CreateWorktree creates a git worktree pinned to baseSHA so that concurrent +// cherry-picks advancing HEAD don't pollute new worktrees. +func CreateWorktree(tag, baseSHA string) (string, error) { + path := filepath.Join(worktreeBase, tag) + + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return "", fmt.Errorf("creating worktree parent dir: %w", err) + } + + out, err := exec.Command("git", "worktree", "add", "-b", tag, path, baseSHA).CombinedOutput() + if err != nil { + return "", fmt.Errorf("git worktree add: %w\n%s", err, out) + } + + return path, nil +} + +// CleanupWorktree removes the worktree directory and its branch. Best-effort. +func CleanupWorktree(dir, branch string) { + _ = exec.Command("git", "worktree", "remove", "--force", dir).Run() + _ = exec.Command("git", "branch", "-D", branch).Run() +} + +// CherryPickCommits cherry-picks all commits from dir that are ahead of baseSHA +// onto the current branch. +func CherryPickCommits(dir, baseSHA string) ([]string, error) { + out, err := exec.Command("git", "-C", dir, "log", "--format=%H %s", "--reverse", baseSHA+"..HEAD").Output() + if err != nil { + return nil, fmt.Errorf("listing worktree commits: %w", err) + } + + raw := strings.TrimSpace(string(out)) + if raw == "" { + return nil, nil + } + + lines := strings.Split(raw, "\n") + var picked []string + for _, line := range lines { + sha, _, _ := strings.Cut(line, " ") + cpOut, err := exec.Command("git", "cherry-pick", sha).CombinedOutput() + if err != nil { + _ = exec.Command("git", "cherry-pick", "--abort").Run() + return picked, fmt.Errorf("cherry-pick %s: %w\n%s", sha[:8], err, cpOut) + } + picked = append(picked, line) + } + + return picked, nil +} diff --git a/cmd/tools/integration/llmfix/manager.go b/cmd/tools/integration/llmfix/manager.go new file mode 100644 index 0000000000..8c4fbfe54b --- /dev/null +++ b/cmd/tools/integration/llmfix/manager.go @@ -0,0 +1,301 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package llmfix + +import ( + "encoding/json" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "golang.org/x/sync/errgroup" +) + +// agentStatus tracks the outcome of a fix agent dispatch. +type agentStatus struct { + PkgPath string `json:"pkg_path"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + Tag string `json:"tag"` + Updated time.Time `json:"updated"` +} + +const ( + statusDispatched = "dispatched" + statusCompleted = "completed" + statusFailed = "failed" +) + +// NewTag generates a unique tag for a fix attempt. The tag is used as the +// worktree directory name, git branch name, and log file prefix. +func NewTag(slug string) string { + return fmt.Sprintf("fix-%s-%s", slug, time.Now().Format("20060102150405")) +} + +// Manager manages concurrent fix agents and serializes cherry-picks. +// All agent progress is logged to a shared agents.log file with per-agent prefixes. +type Manager struct { + outputDir string + baseSHA string + logFile *os.File + log *log.Logger + + mu sync.Mutex + eg *errgroup.Group + statuses map[string]agentStatus +} + +// NewManager creates a Manager that dispatches fix agents into outputDir. +// baseSHA pins worktrees so concurrent cherry-picks don't pollute them. +func NewManager(outputDir, baseSHA string, maxParallel int) (*Manager, error) { + fixDir := filepath.Join(outputDir, "fix") + if err := os.MkdirAll(fixDir, 0o755); err != nil { + return nil, fmt.Errorf("creating fix dir: %w", err) + } + + logFile, err := os.OpenFile(filepath.Join(fixDir, "agents.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("creating agents log: %w", err) + } + + eg := new(errgroup.Group) + eg.SetLimit(maxParallel) + + l := log.New(logFile, "[manager] ", log.LstdFlags) + l.Printf("initialized: baseSHA=%s, maxParallel=%d, outputDir=%s", baseSHA[:8], maxParallel, outputDir) + + m := &Manager{ + outputDir: outputDir, + baseSHA: baseSHA, + logFile: logFile, + log: l, + eg: eg, + statuses: make(map[string]agentStatus), + } + m.loadStatuses() + + return m, nil +} + +// Close closes the shared agents log file. +func (m *Manager) Close() error { + return m.logFile.Close() +} + +func (m *Manager) statusPath() string { + return filepath.Join(m.outputDir, "fix", "agents-status.json") +} + +func (m *Manager) loadStatuses() { + data, err := os.ReadFile(m.statusPath()) + if err != nil { + return + } + if err := json.Unmarshal(data, &m.statuses); err != nil { + m.log.Printf("warning: ignoring corrupt status file: %v", err) + } +} + +func (m *Manager) flushStatuses() { + data, err := json.MarshalIndent(m.statuses, "", " ") + if err != nil { + m.log.Printf("warning: failed to marshal status: %v", err) + return + } + if err := os.WriteFile(m.statusPath(), data, 0o644); err != nil { + m.log.Printf("warning: failed to write status file: %v", err) + } +} + +func (m *Manager) setAgentStatus(slug string, s agentStatus) { + m.mu.Lock() + defer m.mu.Unlock() + s.Updated = time.Now() + m.statuses[slug] = s + m.flushStatuses() +} + +// PendingSlugs returns slug to pkgPath for agents that didn't complete successfully. +func (m *Manager) PendingSlugs() map[string]string { + m.mu.Lock() + defer m.mu.Unlock() + pending := make(map[string]string) + for slug, s := range m.statuses { + if s.Status != statusCompleted { + pending[slug] = s.PkgPath + } + } + return pending +} + +// Dispatch starts a fix pipeline for the given request in the background. +// The slug is used for tag/branch/file naming. +func (m *Manager) Dispatch(slug string, req FixRequest) { + tag := NewTag(slug) + + // log.Logger is goroutine-safe; each Write call is a single syscall. + l := log.New(m.logFile, "["+slug+"] ", log.LstdFlags) + + // Mark dispatched synchronously so IsFixing(slug) is true before Dispatch returns. + m.setAgentStatus(slug, agentStatus{ + PkgPath: req.PkgPath, + Status: statusDispatched, + Tag: tag, + }) + + m.eg.Go(func() error { + start := time.Now() + l.Printf("starting fix pipeline") + + err := m.runOperator(tag, req, l) + + status := statusCompleted + var errMsg string + if err != nil { + status = statusFailed + errMsg = err.Error() + l.Printf("pipeline failed after %s: %v", time.Since(start).Truncate(time.Second), err) + } else { + l.Printf("pipeline completed in %s", time.Since(start).Truncate(time.Second)) + } + + m.setAgentStatus(slug, agentStatus{ + PkgPath: req.PkgPath, + Status: status, + Error: errMsg, + Tag: tag, + }) + return nil + }) +} + +func (m *Manager) runOperator(tag string, req FixRequest, l *log.Logger) error { + dir, err := CreateWorktree(tag, m.baseSHA) + if err != nil { + return fmt.Errorf("creating worktree: %w", err) + } + l.Printf("worktree created: %s", dir) + defer CleanupWorktree(dir, tag) + + req.Tag = tag + req.OutputDir = m.outputDir + req.WorktreeDir = dir + + op := NewOperator(req, l) + if err := op.Run(); err != nil { + return err + } + + commits, err := m.cherryPickCommits(dir, m.baseSHA) + if err != nil { + return fmt.Errorf("cherry-pick: %w", err) + } + for _, c := range commits { + l.Printf("cherry-picked: %s", c) + } + l.Printf("%d commit(s) applied", len(commits)) + + return nil +} + +// Recovery holds the result of recovering a single worktree. +type Recovery struct { + Name string + Commits []string + Err error +} + +// RecoverWorktrees cherry-picks commits from leftover worktrees onto HEAD. +func (m *Manager) RecoverWorktrees() []Recovery { + entries, err := os.ReadDir(worktreeBase) + if err != nil { + return nil + } + + var dirs []os.DirEntry + for _, e := range entries { + if e.IsDir() { + dirs = append(dirs, e) + } + } + if len(dirs) == 0 { + return nil + } + + m.log.Printf("recovering %d leftover worktree(s)", len(dirs)) + var results []Recovery + for _, e := range dirs { + r := m.recoverWorktreeDir(e.Name()) + if r.Err != nil { + m.log.Printf("recover %s: %v", r.Name, r.Err) + } else if len(r.Commits) > 0 { + m.log.Printf("recover %s: %d commit(s) applied", r.Name, len(r.Commits)) + } else { + m.log.Printf("recover %s: no commits to apply", r.Name) + } + results = append(results, r) + } + return results +} + +// IsFixing reports whether a fix agent is currently running for the given slug. +func (m *Manager) IsFixing(slug string) bool { + m.mu.Lock() + s, ok := m.statuses[slug] + m.mu.Unlock() + return ok && s.Status == statusDispatched +} + +// Wait blocks until all dispatched fix agents complete. +func (m *Manager) Wait() { + m.log.Printf("waiting for fix agents to finish") + _ = m.eg.Wait() + m.log.Printf("all fix agents finished") +} + +func (m *Manager) recoverWorktreeDir(name string) Recovery { + r := Recovery{Name: name} + dir := filepath.Join(worktreeBase, name) + + branchOut, err := exec.Command("git", "-C", dir, "rev-parse", "--abbrev-ref", "HEAD").Output() + if err != nil { + _ = exec.Command("git", "worktree", "remove", "--force", dir).Run() + r.Err = fmt.Errorf("resolving branch: %w", err) + return r + } + branch := strings.TrimSpace(string(branchOut)) + defer CleanupWorktree(dir, branch) + + baseOut, err := exec.Command("git", "merge-base", "HEAD", branch).Output() + if err != nil { + r.Err = fmt.Errorf("finding merge base: %w", err) + return r + } + baseSHA := strings.TrimSpace(string(baseOut)) + + r.Commits, r.Err = m.cherryPickCommits(dir, baseSHA) + return r +} + +func (m *Manager) cherryPickCommits(dir, baseSHA string) ([]string, error) { + m.mu.Lock() + defer m.mu.Unlock() + return CherryPickCommits(dir, baseSHA) +} diff --git a/cmd/tools/integration/llmfix/operator.go b/cmd/tools/integration/llmfix/operator.go new file mode 100644 index 0000000000..142c9d6f06 --- /dev/null +++ b/cmd/tools/integration/llmfix/operator.go @@ -0,0 +1,219 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package llmfix + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +// FixRequest describes a single package fix job dispatched to a triage+fix agent pair. +type FixRequest struct { + // Tag is a unique identifier for this fix attempt, used as branch name and log file prefix. + Tag string + // PkgPath is the Go package path being fixed. + PkgPath string + // TestOutput is pre-rendered test failure output passed to the triage agent. + TestOutput string + // OutputDir is the directory for triage JSON and agent logs. + OutputDir string + // WorktreeDir is the git worktree where the fix agent runs. + WorktreeDir string + // Timeout is the maximum duration for the fix agent run. Zero means no timeout. + Timeout time.Duration +} + +// Operator runs the triage+fix pipeline for a single package. +// Worktree lifecycle and cherry-picking are the caller's responsibility. +type Operator struct { + req FixRequest + log *log.Logger +} + +// NewOperator creates an Operator for the given request. +func NewOperator(req FixRequest, l *log.Logger) *Operator { + return &Operator{ + req: req, + log: l, + } +} + +// Run executes the triage agent followed by the fix agent. +func (op *Operator) Run() error { + if op.req.TestOutput == "" { + return errors.New("no failure data") + } + + if err := os.MkdirAll(filepath.Join(op.req.OutputDir, "fix"), 0o755); err != nil { + return fmt.Errorf("creating fix dir: %w", err) + } + + ctx := context.Background() + if op.req.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, op.req.Timeout) + defer cancel() + } + + op.log.Printf("triaging %s", op.req.PkgPath) + + triageStart := time.Now() + triage, err := op.runTriageAgent(ctx, op.req.TestOutput) + if err != nil { + return fmt.Errorf("triage: %w", err) + } + op.log.Printf("triage completed in %s, found %d issue(s)", time.Since(triageStart).Truncate(time.Second), len(triage.Issues)) + + for _, failure := range triage.Issues { + label := failure.Type + if failure.JiraKey != "" { + label += " " + failure.JiraKey + } + op.log.Printf(" [%s] %s: %s", label, failure.Test, failure.Description) + } + + if len(triage.Issues) == 0 { + return nil + } + + op.log.Printf("running fix agent for %d issue(s) in %s", len(triage.Issues), op.req.WorktreeDir) + + fixStart := time.Now() + if err := op.runFixAgent(ctx, buildFixPrompt(op.req.PkgPath, triage.Issues, op.req.TestOutput)); err != nil { + return fmt.Errorf("fix: %w", err) + } + op.log.Printf("fix agent completed in %s", time.Since(fixStart).Truncate(time.Second)) + + return nil +} + +func (op *Operator) runTriageAgent(ctx context.Context, testOutputDump string) (triageResult, error) { + args := []string{ + "-p", + "--model", "sonnet", + "--output-format", "json", + "--json-schema", triageResultSchema, + } + cmd := exec.CommandContext(ctx, "claude", args...) + cmd.Stdin = strings.NewReader(triagePrompt + "\n\n" + testOutputDump) + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + var res triageResult + + if err := cmd.Run(); err != nil { + return res, fmt.Errorf("claude triage: %w\nstderr: %s", err, stderr.String()) + } + + var env claudeEnvelope + if err := json.Unmarshal(stdout.Bytes(), &env); err != nil { + return res, fmt.Errorf("parsing claude response envelope: %w\nraw: %s", err, stdout.String()) + } + if env.IsError { + return res, fmt.Errorf("claude returned error: %s", env.Result) + } + if len(env.StructuredOutput) == 0 || bytes.Equal(env.StructuredOutput, []byte("null")) { + return res, fmt.Errorf("claude returned no structured output (result: %q, stop_reason: %s, terminal_reason: %s)", + env.Result, env.StopReason, env.TerminalReason) + } + op.log.Printf("triage agent: cost=$%.4f, turns=%d, duration=%dms, session=%s", + env.TotalCostUSD, env.NumTurns, env.DurationMS, env.SessionID) + + if err := json.Unmarshal(env.StructuredOutput, &res); err != nil { + return res, fmt.Errorf("parsing triage output: %w\nraw: %s", err, string(env.StructuredOutput)) + } + + triageFile, err := os.Create(op.outPath("-triage.json")) + if err != nil { + return res, fmt.Errorf("creating triage output file: %w", err) + } + defer triageFile.Close() + enc := json.NewEncoder(triageFile) + enc.SetIndent("", " ") + if err := enc.Encode(res); err != nil { + return res, fmt.Errorf("writing triage output: %w", err) + } + op.log.Printf("triage: %s", op.outPath("-triage.json")) + + return res, nil +} + +func (op *Operator) runFixAgent(ctx context.Context, prompt string) error { + args := []string{ + "-p", + "--model", "opus", + "--output-format", "stream-json", + "--effort", "high", + "--verbose", + "--dangerously-skip-permissions", + } + cmd := exec.CommandContext(ctx, "claude", args...) + cmd.Stdin = strings.NewReader(fixPrompt + "\n\n" + prompt) + cmd.Dir = op.req.WorktreeDir + + stdoutFile, err := os.Create(op.outPath(".jsonl")) + if err != nil { + return fmt.Errorf("creating stdout log: %w", err) + } + defer stdoutFile.Close() + + stderrFile, err := os.Create(op.outPath(".stderr")) + if err != nil { + return fmt.Errorf("creating stderr log: %w", err) + } + defer stderrFile.Close() + + cmd.Stdout = stdoutFile + cmd.Stderr = stderrFile + + if err := cmd.Run(); err != nil { + stderrContent, _ := os.ReadFile(stderrFile.Name()) + return fmt.Errorf("claude fix: %w\nstderr: %s", err, string(stderrContent)) + } + + return nil +} + +func (op *Operator) outPath(suffix string) string { + p, _ := filepath.Abs(filepath.Join(op.req.OutputDir, "fix", op.req.Tag+suffix)) + return p +} + +func buildFixPrompt(pkgPath string, issues []issue, testOutputDump string) string { + var buf strings.Builder + buf.WriteString("# Package ") + buf.WriteString(pkgPath) + buf.WriteString("\n\n") + + enc := json.NewEncoder(&buf) + enc.SetIndent("", " ") + _ = enc.Encode(issues) + buf.WriteString("\n\n") + + buf.WriteString("## Test Failure Output\n\n") + buf.WriteString(testOutputDump) + return buf.String() +} diff --git a/cmd/tools/integration/llmfix/triage.md b/cmd/tools/integration/llmfix/triage.md new file mode 100644 index 0000000000..08ed270c77 --- /dev/null +++ b/cmd/tools/integration/llmfix/triage.md @@ -0,0 +1,31 @@ +# Triage Agent + +You are a triage agent for Redpanda Connect integration test failures. Your job is to classify each failure and ensure it is tracked in Jira. + +## Tools + +### Jira MCP + +You have access to Jira MCP tools for querying and creating issues. Use them to check existing subtasks under CON-381 and to create or comment on issues. + +- Project key: CON +- Parent issue: CON-381 +- When creating issues, include: test name, package path, full failure output, and your classification reasoning. +- When searching for duplicates, match on test name and failure pattern, not exact log output. +- Issue summary format: `: ` + +## Classification + +You receive `go test` failure outputs. For each failure: + +1. Read the failure output carefully. +2. **Read the code.** Before classifying, read the failing test and the production code it exercises. Use the package path and test name from the logs to locate the relevant files. This is essential for accurate classification. +3. Classify the failure: + - `test_infra`: The test infrastructure is broken (container setup, port mapping, wait strategy, test helper code, flaky timing). The production code is not at fault. + - `code_bug`: The production code has a bug that causes the test to fail. The test itself is correct. +4. Write a `description` that explains what went wrong and why. When multiple failures share the same underlying cause (e.g., Docker daemon not running, shared container startup failure), use the same description text so they can be grouped. +5. For each classified failure, check Jira: + - Search subtasks of CON-381 for an existing issue matching this failure. + - If a matching issue exists: add a comment with the failure logs and timestamp. Set `jira_key` to the existing issue key and `is_new` to false. + - If no matching issue exists: create a new subtask under CON-381 with the full failure logs, test name, package, and a clear description. Set `jira_key` to the new issue key and `is_new` to true. + - For failures sharing a root cause, a single Jira issue may cover the group. Reference the same `jira_key` for all entries in the group. diff --git a/cmd/tools/integration/main.go b/cmd/tools/integration/main.go index 189f549f82..2baf3f6819 100644 --- a/cmd/tools/integration/main.go +++ b/cmd/tools/integration/main.go @@ -16,7 +16,7 @@ // // Usage: // -// go run ./cmd/tools/integration run [--output-dir dir] [--clean] [--debug] [--race] [--unit] [--cover-profile] [filter...] +// go run ./cmd/tools/integration run [--output-dir dir] [--clean] [--debug] [--race] [--unit] [--cover-profile] [--fix] [--fix-max-parallel N] [filter...] package main import ( @@ -37,6 +37,10 @@ func main() { if err := cmdRun(os.Args[2:]); err != nil { log.Fatal(err) } + case "fix": + if err := cmdFix(os.Args[2:]); err != nil { + log.Fatal(err) + } case "show": if err := cmdShow(os.Args[2:]); err != nil { log.Fatal(err) @@ -51,16 +55,22 @@ func printUsage() { log.Println("") log.Println("Commands:") log.Println(" run Run integration tests package by package") + log.Println(" fix Triage and fix failures from a test output file") log.Println(" show Show test output for a failure reference (file:line)") log.Println("") log.Println("Flags for run:") - log.Println(" --output-dir Directory for test output (default: .integration/)") - log.Println(" --clean Ignore cache, start a fresh run") - log.Println(" --debug Enable debug logging to stderr") - log.Println(" --race Enable race detector (requires CGO_ENABLED=1)") - log.Println(" --unit Run all tests, not just integration tests") - log.Println(" --cover-profile Generate coverage profile per package (.cov files)") + log.Println(" --output-dir Directory for test output (default: .integration/)") + log.Println(" --clean Ignore cache, start a fresh run") + log.Println(" --debug Enable debug logging to stderr") + log.Println(" --race Enable race detector (requires CGO_ENABLED=1)") + log.Println(" --unit Run all tests, not just integration tests") + log.Println(" --cover-profile Generate coverage profile per package (.cov files)") + log.Println(" --fix Triage and fix failed packages using Claude agents") + log.Println(" --fix-max-parallel Max parallel Claude agents (default: 2, requires --fix)") log.Println("") log.Println("Positional arguments are package filters (substring match).") - log.Println("Example: integration run kafka redis") + log.Println("") + log.Println("Examples:") + log.Println(" integration run kafka redis") + log.Println(" integration fix .integration/20260414205744/amqp1.txt") } diff --git a/cmd/tools/integration/packages.go b/cmd/tools/integration/packages.go index 1ffaaa8ada..e34fe7b907 100644 --- a/cmd/tools/integration/packages.go +++ b/cmd/tools/integration/packages.go @@ -26,6 +26,7 @@ const defaultTimeout = "5m" type TestPackage struct { Path string `json:"path"` Timeout string `json:"timeout"` + Skip string `json:"skip"` } // TimeoutStr returns the timeout for go test -timeout, defaulting to 5m. @@ -40,10 +41,17 @@ func (tp TestPackage) TimeoutStr() string { var packagesJSON []byte // allPackages is the CI matrix package list, loaded from packages.json. +// Entries with a "skip" field are excluded. var allPackages = func() []TestPackage { - var pkgs []TestPackage - if err := json.Unmarshal(packagesJSON, &pkgs); err != nil { + var raw []TestPackage + if err := json.Unmarshal(packagesJSON, &raw); err != nil { log.Fatalf("failed to parse packages.json: %v", err) } + var pkgs []TestPackage + for _, p := range raw { + if p.Skip == "" { + pkgs = append(pkgs, p) + } + } return pkgs }() diff --git a/cmd/tools/integration/packages.json b/cmd/tools/integration/packages.json index ad3af530dc..6e579e20b1 100644 --- a/cmd/tools/integration/packages.json +++ b/cmd/tools/integration/packages.json @@ -10,7 +10,9 @@ {"path": "./internal/impl/beanstalkd"}, {"path": "./internal/impl/cassandra"}, {"path": "./internal/impl/cockroachdb"}, + {"path": "./internal/impl/confluent"}, {"path": "./internal/impl/couchbase"}, + {"path": "./internal/impl/cypher"}, {"path": "./internal/impl/elasticsearch/v8"}, {"path": "./internal/impl/elasticsearch/v9"}, {"path": "./internal/impl/gcp"}, @@ -18,29 +20,42 @@ {"path": "./internal/impl/gcp/enterprise/changestreams"}, {"path": "./internal/impl/gcp/enterprise/changestreams/metadata"}, {"path": "./internal/impl/hdfs"}, + {"path": "./internal/impl/iceberg/integration", "timeout": "20m"}, {"path": "./internal/impl/influxdb"}, {"path": "./internal/impl/kafka"}, {"path": "./internal/impl/kafka/enterprise"}, {"path": "./internal/impl/memcached"}, - {"path": "./internal/impl/mssqlserver", "timeout": "10m"}, {"path": "./internal/impl/mongodb"}, {"path": "./internal/impl/mongodb/cdc"}, {"path": "./internal/impl/mqtt"}, + {"path": "./internal/impl/mssqlserver", "timeout": "10m"}, + {"path": "./internal/impl/mssqlserver/replication", "timeout": "10m"}, {"path": "./internal/impl/mysql"}, {"path": "./internal/impl/nanomsg"}, {"path": "./internal/impl/nats"}, {"path": "./internal/impl/nsq"}, + {"path": "./internal/impl/ollama", "timeout": "15m"}, {"path": "./internal/impl/opensearch"}, {"path": "./internal/impl/oracledb", "timeout": "10m"}, + {"path": "./internal/impl/oracledb/replication", "timeout": "30m"}, + {"path": "./internal/impl/otlp", "timeout": "10m"}, {"path": "./internal/impl/postgresql"}, + {"path": "./internal/impl/postgresql/pglogicalstream"}, {"path": "./internal/impl/pulsar", "timeout": "10m"}, {"path": "./internal/impl/qdrant"}, {"path": "./internal/impl/questdb"}, {"path": "./internal/impl/redis"}, + {"path": "./internal/impl/redpanda", "timeout": "10m"}, {"path": "./internal/impl/redpanda/migrator"}, {"path": "./internal/impl/sftp"}, {"path": "./internal/impl/snowflake"}, {"path": "./internal/impl/snowflake/streaming", "timeout": "20m"}, + {"path": "./internal/impl/spicedb"}, {"path": "./internal/impl/splunk"}, - {"path": "./internal/impl/sql"} + {"path": "./internal/impl/sql"}, + + {"path": "./internal/impl/cohere", "skip": "requires COHERE_API_KEY"}, + {"path": "./internal/impl/cyborgdb", "skip": "requires CYBORGDB_API_KEY"}, + {"path": "./internal/impl/tigerbeetle", "skip": "requires cgo build tag"}, + {"path": "./internal/impl/zeromq", "skip": "requires x_benthos_extra build tag and libzmq"} ] diff --git a/cmd/tools/integration/run.go b/cmd/tools/integration/run.go index d6ce6fd332..4d7d59ff87 100644 --- a/cmd/tools/integration/run.go +++ b/cmd/tools/integration/run.go @@ -25,15 +25,23 @@ import ( "path/filepath" "strings" "time" + + "github.com/redpanda-data/connect/v4/cmd/tools/integration/llmfix" ) +const fixPollInterval = 15 * time.Second + var runArgs struct { - outputDir string - clean bool - debug bool - race bool - unit bool - coverProfile bool + outputDir string + clean bool + debug bool + race bool + unit bool + coverProfile bool + fix bool + fixMaxParallel int + fixTimeout time.Duration + loop int } func debugf(format string, args ...any) { @@ -50,10 +58,19 @@ func cmdRun(args []string) error { fset.BoolVar(&runArgs.race, "race", false, "enable race detector (sets CGO_ENABLED=1)") fset.BoolVar(&runArgs.unit, "unit", false, "run all tests, not just integration tests") fset.BoolVar(&runArgs.coverProfile, "cover-profile", false, "generate coverage profile per package") - - if err := fset.Parse(args); err != nil { + fset.BoolVar(&runArgs.fix, "fix", false, "triage and fix failed packages using Claude agents") + fset.IntVar(&runArgs.fixMaxParallel, "fix-max-parallel", 4, "max parallel fix agents (requires --fix)") + fset.DurationVar(&runArgs.fixTimeout, "fix-timeout", 30*time.Minute, "timeout per fix agent run (requires --fix)") + fset.IntVar(&runArgs.loop, "loop", 0, "number of successful clean iterations required (0 = single run)") + + // Go's flag package stops parsing at the first non-flag argument. + // Separate flags from positional filters so interspersed usage works: + // run --fix --race amqp1 --debug + flags, filters := splitFlagsAndArgs(fset, args) + if err := fset.Parse(flags); err != nil { return err } + filters = append(filters, fset.Args()...) if runArgs.outputDir != "" && runArgs.clean { return errors.New("--output-dir and --clean are mutually exclusive") @@ -63,35 +80,129 @@ func cmdRun(args []string) error { log.SetOutput(os.Stderr) } - filters := fset.Args() - packages := filterPackages(allPackages, filters) if len(packages) == 0 { return fmt.Errorf("no packages match filter: %v", filters) } - // Determine output directory. - outputDir := "" - if runArgs.outputDir != "" { - outputDir = runArgs.outputDir - debugf("using provided output dir: %s", outputDir) - } else { - if !runArgs.clean { - outputDir = findLatestRunDir(".integration") - debugf("resuming run dir: %s", outputDir) + loopCount := runArgs.loop + if loopCount <= 0 { + loopCount = 1 + } + + outputDir := resolveOutputDir(runArgs.outputDir, runArgs.clean) + clean := runArgs.clean + for success := 0; success < loopCount; { + if runArgs.loop > 0 { + log.Printf("loop: starting iteration (success %d/%d)", success, runArgs.loop) } - if outputDir == "" { - outputDir = resolveRunDir() - debugf("new run dir: %s", outputDir) + + failed, err := runIteration(packages, outputDir, clean) + if err != nil { + return err + } + + if failed > 0 { + if runArgs.loop <= 0 { + return errors.New("integration tests failed") + } + // Retry in same dir — agents may have applied fixes. + clean = false + continue + } + + success++ + if runArgs.loop > 0 { + log.Printf("loop: iteration succeeded (success %d/%d)", success, runArgs.loop) + } + + if runArgs.outputDir != "" { + // Fixed output dir — can't iterate further, cache would make it a no-op. + break + } + // Fresh dir for next iteration. + outputDir = resolveRunDir() + clean = false + } + return nil +} + +func resolveOutputDir(explicit string, clean bool) string { + if explicit != "" { + debugf("using provided output dir: %s", explicit) + return explicit + } + if !clean { + if dir := findLatestRunDir(".integration"); dir != "" { + debugf("resuming run dir: %s", dir) + return dir } } + dir := resolveRunDir() + debugf("new run dir: %s", dir) + return dir +} + +// runIteration executes a single test run in the given output directory. +// Returns the number of failed packages and any non-recoverable error. +// When --fix is enabled, it always waits for agents to finish before returning. +func runIteration(packages []TestPackage, outputDir string, clean bool) (int, error) { if err := os.MkdirAll(outputDir, 0o755); err != nil { - return fmt.Errorf("creating run directory: %w", err) + return 0, fmt.Errorf("creating run directory: %w", err) } debugf("resolved run dir: %s", outputDir) out := NewOutput(os.Stdout, filepath.Join(outputDir, "index.md")) + + var mgr *llmfix.Manager + if runArgs.fix { + baseSHA, err := resolveHEAD() + if err != nil { + return 0, fmt.Errorf("initializing fix manager: %w", err) + } + var mgrErr error + mgr, mgrErr = llmfix.NewManager(outputDir, baseSHA, runArgs.fixMaxParallel) + if mgrErr != nil { + return 0, fmt.Errorf("initializing fix manager: %w", mgrErr) + } + defer mgr.Close() + for _, r := range mgr.RecoverWorktrees() { + if r.Err != nil { + out.Error(fmt.Sprintf(" recover %s: %v", r.Name, r.Err)) + } + for _, c := range r.Commits { + out.Info(" recovered: " + c) + } + } + + // Retry fix agents that failed in the previous run. + if !clean { + pending := mgr.PendingSlugs() + if len(pending) > 0 { + out.Info(fmt.Sprintf("Retrying %d failed fix agent(s)...", len(pending))) + for slug, pkgPath := range pending { + outFile := filepath.Join(outputDir, slug+".txt") + cached := checkCache(outFile) + if cached.Overall() != ResultFail { + continue + } + testOutput := dumpTestOutput(outFile, cached.Tests) + if testOutput == "" { + continue + } + mgr.Dispatch(slug, llmfix.FixRequest{ + PkgPath: pkgPath, + TestOutput: testOutput, + Timeout: runArgs.fixTimeout, + }) + out.Info(" " + pkgShort(pkgPath)) + } + out.Blank() + } + } + } + total := len(packages) headerLabel := "Integration Tests" @@ -106,57 +217,80 @@ func cmdRun(args []string) error { passed int failed int skipped int + run int ) - for i, pkg := range packages { - short := pkgShort(pkg.Path) - fname := pkgFilename(pkg.Path) - pkgOutFile := filepath.Join(outputDir, fname) + for len(packages) > 0 { + var pending []TestPackage + for _, pkg := range packages { + slug := pkgSlug(pkg.Path) + short := pkgShort(pkg.Path) + fname := pkgFilename(pkg.Path) + pkgOutFile := filepath.Join(outputDir, fname) + tag := strings.TrimSuffix(fname, ".txt") + + if mgr != nil && mgr.IsFixing(slug) { + pending = append(pending, pkg) + continue + } - tag := strings.TrimSuffix(fname, ".txt") - out.PackageHeader(tag, i+1, total, short) + run++ + out.PackageHeader(tag, run, total, short) - // Check cache from current run directory. - var cached PackageCache - if !runArgs.clean { - debugf("checking cache for %s at %s", short, pkgOutFile) - cached = checkCache(pkgOutFile) - } + var cached PackageCache + if !clean { + debugf("checking cache for %s at %s", short, pkgOutFile) + cached = checkCache(pkgOutFile) + } + + for _, t := range compactCacheEntries(cached.Tests) { + switch t.Result { + case ResultPass: + out.TestPass(t.TestName, "", true) + case ResultSkip: + out.TestSkip(t.TestName, "", t.SkipReason, true) + case ResultFail: + out.TestFail(t.TestName, "", pkgOutFile, t.FailLine, true) + } + } - // Print cached items (subtests compacted into parent summaries). - for _, t := range compactCacheEntries(cached.Tests) { - switch t.Result { + if cached.Complete && cached.Overall() == ResultPass { + debugf("found cached result for %s: complete", short) + out.Info("● " + short + " (cached)") + out.PackageDone() + passed++ + continue + } + + res := runPackageTest(pkg, short, pkgOutFile, cached, out) + + switch res.status { case ResultPass: - out.TestPass(t.TestName, "", true) - case ResultSkip: - out.TestSkip(t.TestName, "", t.SkipReason, true) + passed++ case ResultFail: - out.TestFail(t.TestName, "", pkgOutFile, t.FailLine, true) + failed++ + case ResultSkip: + skipped++ } - } - - // If complete and all passed, close and move on. - if cached.Complete && cached.Overall() == ResultPass { - debugf("found cached result for %s: complete", short) - out.Info(fmt.Sprintf("● %s (cached)", short)) - out.PackageDone() - passed++ - continue - } + out.PackageResult(short, res.status, res.duration) - res := runPackageTest(pkg, short, pkgOutFile, cached, out) + if runArgs.fix && res.status == ResultFail { + mgr.Dispatch(slug, llmfix.FixRequest{ + PkgPath: pkg.Path, + TestOutput: dumpTestOutput(res.outFile, res.tests), + Timeout: runArgs.fixTimeout, + }) + out.Info(fmt.Sprintf(" fixing %s → %s", short, filepath.Join(outputDir, "fix", "agents.log"))) + } - switch res.status { - case ResultPass: - passed++ - case ResultFail: - failed++ - case ResultSkip: - skipped++ + dockerCleanup() } - out.PackageResult(short, res.status, res.duration) - - dockerCleanup() + if len(pending) == 0 { + break + } + out.Info(fmt.Sprintf("Waiting for %d package(s) being fixed...", len(pending))) + time.Sleep(fixPollInterval) + packages = pending } // Final report. @@ -165,10 +299,10 @@ func cmdRun(args []string) error { out.Summary(passed, failed, skipped) out.Info(sep) - if failed > 0 { - return errors.New("integration tests failed") + if mgr != nil { + mgr.Wait() } - return nil + return failed, nil } type packageResult struct { @@ -212,7 +346,7 @@ func runPackageTest(pkg TestPackage, short, outFile string, cached PackageCache, skipRegex := completedTestSkipRegex(cached.Tests) runRegex := "^Test.*Integration" if runArgs.unit { - runRegex = "" + runRegex = "^Test" } timeout := pkg.TimeoutStr() diff --git a/cmd/tools/integration/utils.go b/cmd/tools/integration/utils.go index 8bc052a539..d108c0eb6a 100644 --- a/cmd/tools/integration/utils.go +++ b/cmd/tools/integration/utils.go @@ -22,10 +22,20 @@ import ( "io" "io/fs" "os" + "os/exec" "strings" "time" ) +// resolveHEAD returns the SHA of the current HEAD commit. +func resolveHEAD() (string, error) { + out, err := exec.Command("git", "rev-parse", "HEAD").Output() + if err != nil { + return "", fmt.Errorf("resolving HEAD: %w", err) + } + return strings.TrimSpace(string(out)), nil +} + // Action represents a test event action from `go test -json` output. type Action string @@ -120,8 +130,12 @@ func pkgShort(pkg string) string { return strings.TrimPrefix(pkg, "./internal/impl/") } +func pkgSlug(pkg string) string { + return strings.ReplaceAll(pkgShort(pkg), "/", "-") +} + func pkgFilename(pkg string) string { - return strings.ReplaceAll(pkgShort(pkg), "/", "-") + ".txt" + return pkgSlug(pkg) + ".txt" } // countFileLines counts newlines in a file without loading it all into memory. diff --git a/taskfiles/test.yml b/taskfiles/test.yml index 3ff826023c..5df3933edb 100644 --- a/taskfiles/test.yml +++ b/taskfiles/test.yml @@ -24,6 +24,13 @@ tasks: cmds: - go run ./cmd/tools/integration run {{.CLI_ARGS}} + integration:fix-loop: + desc: "Self-healing full test run: --fix --unit --race --clean --loop=10" + aliases: + - swiss-cheese + cmds: + - go run ./cmd/tools/integration run --fix --unit --race --clean --loop=10 {{.CLI_ARGS}} + show: desc: "Show human-readable output for a test failure (e.g. task test:show -- .integration/20260331/mssql.txt:870)" cmds: