Skip to content

Commit 2ea5cce

Browse files
committed
Add cross-host recovery and origin routing
1 parent 3736c3c commit 2ea5cce

24 files changed

Lines changed: 1775 additions & 137 deletions

AGENTS.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,16 @@ bin/shuttle dispatch <fiber-id> # one-shot dispatch
5656
shuttle-ctl status # all fibers with shuttle: blocks
5757
shuttle-ctl status --all # local + every configured remote
5858
shuttle-ctl status --remote <name> # single remote
59+
shuttle-ctl status --origin candide # direct origin spelling for remote daemon routing
5960
shuttle-ctl ps # live tmux workers only
60-
shuttle-ctl install <fiber> [-m <agent-id>] [--disabled]
61+
shuttle-ctl ps --origin candide # live workers on a selected daemon
62+
shuttle-ctl install <fiber> [-m <agent-id>] [--disabled] [--origin candide]
6163
shuttle-ctl repeat <fiber> --schedule "0 9 * * 1-5" --tz Europe/Paris
6264
shuttle-ctl pause <fiber> # disable + kill live worker; --no-kill preserves it
6365
shuttle-ctl resume / accept <fiber>
6466
shuttle-ctl set-model <fiber> <agent-id>
67+
shuttle-ctl dispatch <fiber> --origin candide
68+
shuttle-ctl snapshot --origin candide
6569
shuttle-ctl abort / attach <fiber>
6670
shuttle-ctl migrate --dry-run # preview eligibility migration
6771
```
@@ -81,8 +85,10 @@ shuttle-ctl migrate --dry-run # preview eligibility migration
8185
- **`shuttle.agent` field drives agent selection.** The `shuttle:` block's
8286
`agent:` field resolves against the registry. Default agent is
8387
`claude-sonnet`.
84-
- **shuttle-ctl is the agent-facing CLI.** Write verbs validate before
85-
write; works offline. `bin/shuttle` handles daemon lifecycle and dispatch.
88+
- **shuttle-ctl is the agent-facing CLI.** Local write verbs validate before
89+
write and work offline. With `--origin <name>`, lifecycle verbs are sent to
90+
that daemon over the configured tunnel so the selected host edits its own
91+
local fiber store. `bin/shuttle` handles daemon lifecycle and dispatch.
8692
- **No tag predicate for dispatch.** The `shuttle:` block's `enabled: true`
8793
field is the dispatch signal. Tags are free-form qualitative noticings;
8894
only `idea` is load-bearing for Portolan's kanban column placement.

cmd/shuttle/daemon.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"strings"
10+
"time"
11+
12+
"github.com/spf13/cobra"
13+
)
14+
15+
var snapshotCmd = &cobra.Command{
16+
Use: "snapshot",
17+
Short: "Print the selected daemon's state snapshot",
18+
Args: cobra.NoArgs,
19+
RunE: func(cmd *cobra.Command, args []string) error {
20+
baseURL, err := resolveOriginURL(normalizedOrigin())
21+
if err != nil {
22+
return err
23+
}
24+
body, err := getDaemon(baseURL + "/api/v1/state")
25+
if err != nil {
26+
return err
27+
}
28+
fmt.Print(string(body))
29+
if !bytes.HasSuffix(body, []byte("\n")) {
30+
fmt.Println()
31+
}
32+
return nil
33+
},
34+
}
35+
36+
var dispatchCmd = &cobra.Command{
37+
Use: "dispatch <fiber>",
38+
Short: "Ask the selected daemon to dispatch a fiber now",
39+
Args: cobra.ExactArgs(1),
40+
RunE: func(cmd *cobra.Command, args []string) error {
41+
baseURL, err := resolveOriginURL(normalizedOrigin())
42+
if err != nil {
43+
return err
44+
}
45+
payload, _ := json.Marshal(map[string]string{"fiber_id": args[0]})
46+
body, err := postDaemon(baseURL+"/api/v1/dispatch", payload)
47+
if err != nil {
48+
return err
49+
}
50+
fmt.Print(string(body))
51+
if !bytes.HasSuffix(body, []byte("\n")) {
52+
fmt.Println()
53+
}
54+
return nil
55+
},
56+
}
57+
58+
func getDaemon(url string) ([]byte, error) {
59+
client := &http.Client{Timeout: 15 * time.Second}
60+
resp, err := client.Get(url)
61+
if err != nil {
62+
return nil, fmt.Errorf("reaching daemon at %s: %w", url, err)
63+
}
64+
defer resp.Body.Close()
65+
return readDaemonResponse(url, resp)
66+
}
67+
68+
func postDaemon(url string, payload []byte) ([]byte, error) {
69+
client := &http.Client{Timeout: 10 * time.Second}
70+
resp, err := client.Post(url, "application/json", bytes.NewReader(payload))
71+
if err != nil {
72+
return nil, fmt.Errorf("reaching daemon at %s: %w", url, err)
73+
}
74+
defer resp.Body.Close()
75+
return readDaemonResponse(url, resp)
76+
}
77+
78+
func readDaemonResponse(url string, resp *http.Response) ([]byte, error) {
79+
body, err := io.ReadAll(resp.Body)
80+
if err != nil {
81+
return nil, fmt.Errorf("reading daemon response from %s: %w", url, err)
82+
}
83+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
84+
return nil, fmt.Errorf("daemon at %s returned %d: %s", url, resp.StatusCode, strings.TrimSpace(string(body)))
85+
}
86+
return body, nil
87+
}
88+
89+
func init() {
90+
rootCmd.AddCommand(snapshotCmd)
91+
rootCmd.AddCommand(dispatchCmd)
92+
}

cmd/shuttle/install.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ in the markdown before installing.
3333
Use 'shuttle repeat' for standing (recurring) roles.`,
3434
Args: cobra.ExactArgs(1),
3535
RunE: func(cmd *cobra.Command, args []string) error {
36+
if !usingLocalOrigin() {
37+
return postRemoteLifecycle("install", map[string]any{
38+
"fiber": args[0],
39+
"model": installModel,
40+
"disabled": installDisabled,
41+
})
42+
}
43+
3644
agents := loadAgents()
3745
path, _, _ := resolveFiber(args[0])
3846
f := readFiber(path)

cmd/shuttle/lifecycle.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ This makes pause the single-writer transition for the Kanban's Drafts target.`,
3131
if err != nil {
3232
return err
3333
}
34+
if !usingLocalOrigin() {
35+
return postRemoteLifecycle("pause", map[string]any{
36+
"fiber": args[0],
37+
"no_kill": noKill,
38+
})
39+
}
3440
path, fiberID, _ := resolveFiber(args[0])
3541
f := readFiber(path)
3642
if f.Block == nil {
@@ -96,6 +102,9 @@ Refuses if status is currently "closed" — use 'shuttle reopen' to requeue a
96102
closed fiber back into active work.`,
97103
Args: cobra.ExactArgs(1),
98104
RunE: func(cmd *cobra.Command, args []string) error {
105+
if !usingLocalOrigin() {
106+
return postRemoteLifecycle("resume", map[string]any{"fiber": args[0]})
107+
}
99108
path, fiberID, host := resolveFiber(args[0])
100109
f := readFiber(path)
101110
if f.Block == nil {
@@ -334,6 +343,12 @@ a run whose digest the next worker should still see).
334343
Appends a felt history event recording the acceptance.`,
335344
Args: cobra.ExactArgs(1),
336345
RunE: func(cmd *cobra.Command, args []string) error {
346+
if !usingLocalOrigin() {
347+
return postRemoteLifecycle("accept", map[string]any{
348+
"fiber": args[0],
349+
"keep_outcome": keepOutcome,
350+
})
351+
}
337352
path, fiberID, host := resolveFiber(args[0])
338353
f := readFiber(path)
339354
if f.Block == nil {
@@ -410,6 +425,12 @@ agent registry before writing. Removes any existing agent:* felt tag
410425
(the shuttle: block is now the authoritative source).`,
411426
Args: cobra.ExactArgs(2),
412427
RunE: func(cmd *cobra.Command, args []string) error {
428+
if !usingLocalOrigin() {
429+
return postRemoteLifecycle("set-model", map[string]any{
430+
"fiber": args[0],
431+
"agent": args[1],
432+
})
433+
}
413434
agents := loadAgents()
414435
path, _, _ := resolveFiber(args[0])
415436
f := readFiber(path)
@@ -439,6 +460,9 @@ var uninstallCmd = &cobra.Command{
439460
daemon will no longer dispatch it. The felt tags and status are not changed.`,
440461
Args: cobra.ExactArgs(1),
441462
RunE: func(cmd *cobra.Command, args []string) error {
463+
if !usingLocalOrigin() {
464+
return postRemoteLifecycle("uninstall", map[string]any{"fiber": args[0]})
465+
}
442466
path, _, _ := resolveFiber(args[0])
443467
f := readFiber(path)
444468
if f.Block == nil {

cmd/shuttle/origin.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"sort"
10+
"strings"
11+
"time"
12+
)
13+
14+
type Origin struct {
15+
Name string `json:"name"`
16+
URL string `json:"url"`
17+
}
18+
19+
type originList struct {
20+
Origins []Origin `json:"origins"`
21+
}
22+
23+
func normalizedOrigin() string {
24+
if originFlag == "" {
25+
return "local"
26+
}
27+
return originFlag
28+
}
29+
30+
func usingLocalOrigin() bool {
31+
return normalizedOrigin() == "local"
32+
}
33+
34+
func resolveOriginURL(name string) (string, error) {
35+
if name == "" || name == "local" {
36+
return daemonURL(), nil
37+
}
38+
39+
origins, err := fetchOrigins()
40+
if err != nil {
41+
return "", err
42+
}
43+
for _, origin := range origins {
44+
if origin.Name == name {
45+
return strings.TrimRight(origin.URL, "/"), nil
46+
}
47+
}
48+
return "", fmt.Errorf("unknown origin %q; configured: %s", name, joinOriginNames(origins))
49+
}
50+
51+
func fetchOrigins() ([]Origin, error) {
52+
url := daemonURL() + "/api/v1/origins"
53+
client := &http.Client{Timeout: 5 * time.Second}
54+
resp, err := client.Get(url)
55+
if err != nil {
56+
return nil, fmt.Errorf("reaching local daemon for origins at %s: %w", url, err)
57+
}
58+
defer resp.Body.Close()
59+
60+
body, err := io.ReadAll(resp.Body)
61+
if err != nil {
62+
return nil, fmt.Errorf("reading origins response: %w", err)
63+
}
64+
if resp.StatusCode != http.StatusOK {
65+
return nil, fmt.Errorf("origins endpoint returned %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
66+
}
67+
68+
var out originList
69+
if err := json.Unmarshal(body, &out); err != nil {
70+
return nil, fmt.Errorf("parsing origins response: %w", err)
71+
}
72+
return out.Origins, nil
73+
}
74+
75+
func postRemoteLifecycle(action string, payload map[string]any) error {
76+
origin := normalizedOrigin()
77+
baseURL, err := resolveOriginURL(origin)
78+
if err != nil {
79+
return err
80+
}
81+
82+
payload["action"] = action
83+
body, err := json.Marshal(payload)
84+
if err != nil {
85+
return fmt.Errorf("encoding lifecycle request: %w", err)
86+
}
87+
88+
url := baseURL + "/api/v1/lifecycle"
89+
client := &http.Client{Timeout: 15 * time.Second}
90+
resp, err := client.Post(url, "application/json", bytes.NewReader(body))
91+
if err != nil {
92+
return fmt.Errorf("origin %q unreachable at %s: %w", origin, url, err)
93+
}
94+
defer resp.Body.Close()
95+
96+
respBody, err := io.ReadAll(resp.Body)
97+
if err != nil {
98+
return fmt.Errorf("reading lifecycle response from origin %q: %w", origin, err)
99+
}
100+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
101+
return fmt.Errorf("origin %q returned %d: %s", origin, resp.StatusCode, strings.TrimSpace(string(respBody)))
102+
}
103+
fmt.Print(string(respBody))
104+
return nil
105+
}
106+
107+
func joinOriginNames(origins []Origin) string {
108+
names := make([]string, 0, len(origins))
109+
for _, origin := range origins {
110+
names = append(names, origin.Name)
111+
}
112+
sort.Strings(names)
113+
if len(names) == 0 {
114+
return "<none>"
115+
}
116+
return strings.Join(names, ", ")
117+
}

cmd/shuttle/origin_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package main
2+
3+
import (
4+
"net/http"
5+
"net/http/httptest"
6+
"os"
7+
"strings"
8+
"testing"
9+
)
10+
11+
func TestResolveOriginURLFromDaemonOrigins(t *testing.T) {
12+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
13+
if r.URL.Path != "/api/v1/origins" {
14+
t.Fatalf("unexpected path %s", r.URL.Path)
15+
}
16+
w.Write([]byte(`{"origins":[{"name":"local","url":"http://127.0.0.1:4000"},{"name":"candide","url":"http://127.0.0.1:4001"}]}`))
17+
}))
18+
defer srv.Close()
19+
20+
prev := os.Getenv("SHUTTLE_DAEMON_URL")
21+
os.Setenv("SHUTTLE_DAEMON_URL", srv.URL)
22+
defer os.Setenv("SHUTTLE_DAEMON_URL", prev)
23+
24+
got, err := resolveOriginURL("candide")
25+
if err != nil {
26+
t.Fatalf("resolveOriginURL: %v", err)
27+
}
28+
if got != "http://127.0.0.1:4001" {
29+
t.Fatalf("got %q", got)
30+
}
31+
}
32+
33+
func TestResolveOriginURLUnknownListsConfiguredOrigins(t *testing.T) {
34+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
35+
w.Write([]byte(`{"origins":[{"name":"local","url":"http://127.0.0.1:4000"},{"name":"candide","url":"http://127.0.0.1:4001"}]}`))
36+
}))
37+
defer srv.Close()
38+
39+
prev := os.Getenv("SHUTTLE_DAEMON_URL")
40+
os.Setenv("SHUTTLE_DAEMON_URL", srv.URL)
41+
defer os.Setenv("SHUTTLE_DAEMON_URL", prev)
42+
43+
_, err := resolveOriginURL("nope")
44+
if err == nil {
45+
t.Fatal("expected error")
46+
}
47+
if !strings.Contains(err.Error(), `unknown origin "nope"; configured: candide, local`) {
48+
t.Fatalf("unexpected error: %v", err)
49+
}
50+
}

cmd/shuttle/repeat.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ The shuttle: block is validated before any file is touched.
3030
The running daemon picks it up on its next poll.`,
3131
Args: cobra.ExactArgs(1),
3232
RunE: func(cmd *cobra.Command, args []string) error {
33+
if !usingLocalOrigin() {
34+
return postRemoteLifecycle("repeat", map[string]any{
35+
"fiber": args[0],
36+
"schedule": repeatSchedule,
37+
"tz": repeatTZ,
38+
"model": repeatModel,
39+
})
40+
}
41+
3342
agents := loadAgents()
3443
path, _, _ := resolveFiber(args[0])
3544
f := readFiber(path)

0 commit comments

Comments
 (0)