Skip to content

Commit 03657c6

Browse files
Merge pull request #410 from gastownhall/fix/session-submit-intents-complete
feat: add semantic session submit intents
2 parents e3ce669 + c3d5329 commit 03657c6

28 files changed

Lines changed: 1700 additions & 176 deletions

cmd/gc/cmd_citystatus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func doCityStatus(
213213

214214
// Chat sessions count (best-effort — skip if store unavailable).
215215
if store, err := openCityStoreAt(cityPath); err == nil {
216-
mgr := newSessionManagerWithConfig(store, sp, cfg)
216+
mgr := newSessionManagerWithConfig(cityPath, store, sp, cfg)
217217
if sessions, err := mgr.List("", ""); err == nil && len(sessions) > 0 {
218218
var active, suspended int
219219
for _, s := range sessions {
@@ -321,7 +321,7 @@ func doCityStatusJSON(
321321

322322
// Chat sessions count (best-effort).
323323
if store, err := openCityStoreAt(cityPath); err == nil {
324-
mgr := newSessionManagerWithConfig(store, sp, cfg)
324+
mgr := newSessionManagerWithConfig(cityPath, store, sp, cfg)
325325
if sessions, err := mgr.List("", ""); err == nil {
326326
for _, s := range sessions {
327327
switch s.State {

cmd/gc/cmd_nudge.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
defaultQueuedNudgeMaxAttempts = 5
3434
defaultNudgePollInterval = 2 * time.Second
3535
defaultNudgePollQuiescence = 3 * time.Second
36+
defaultNudgePollStartGrace = 15 * time.Second
3637
defaultNudgeWaitIdleTimeout = 30 * time.Second
3738
)
3839

@@ -402,10 +403,20 @@ func cmdNudgePoll(args []string, sessionName string, interval, quiescence time.D
402403
defer release()
403404

404405
sp := newSessionProvider()
406+
var missingSince time.Time
405407
for {
406408
if !sp.IsRunning(target.sessionName) {
409+
now := time.Now()
410+
if shouldKeepNudgePollerAlive(target, missingSince, now) {
411+
if missingSince.IsZero() {
412+
missingSince = now
413+
}
414+
time.Sleep(interval)
415+
continue
416+
}
407417
return 0
408418
}
419+
missingSince = time.Time{}
409420
delivered, pollErr := tryDeliverQueuedNudgesByPoller(target, sp, quiescence)
410421
if pollErr != nil {
411422
fmt.Fprintf(stderr, "gc nudge poll: %v\n", pollErr) //nolint:errcheck
@@ -417,6 +428,17 @@ func cmdNudgePoll(args []string, sessionName string, interval, quiescence time.D
417428
}
418429
}
419430

431+
func shouldKeepNudgePollerAlive(target nudgeTarget, missingSince, now time.Time) bool {
432+
pending, inFlight, _, err := listQueuedNudgesForTarget(target.cityPath, target, now)
433+
if err != nil || (len(pending) == 0 && len(inFlight) == 0) {
434+
return false
435+
}
436+
if missingSince.IsZero() {
437+
return true
438+
}
439+
return now.Sub(missingSince) < defaultNudgePollStartGrace
440+
}
441+
420442
func deliverSessionNudge(target nudgeTarget, message string, mode nudgeDeliveryMode, stdout, stderr io.Writer) int {
421443
return deliverSessionNudgeWithProvider(target, newSessionProvider(), message, mode, stdout, stderr)
422444
}
@@ -689,9 +711,6 @@ func tryDeliverQueuedNudgesByPoller(target nudgeTarget, sp runtime.Provider, qui
689711
}
690712

691713
func pollerSessionIdleEnough(sp runtime.Provider, sessionName string, quiescence time.Duration) bool {
692-
if !sp.Capabilities().CanReportActivity {
693-
return false
694-
}
695714
last, err := sp.GetLastActivity(sessionName)
696715
if err != nil || last.IsZero() {
697716
return false

cmd/gc/cmd_nudge_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ import (
1717
"github.com/gastownhall/gascity/internal/session"
1818
)
1919

20+
type noActivityCapabilityProvider struct {
21+
*runtime.Fake
22+
}
23+
24+
func (p *noActivityCapabilityProvider) Capabilities() runtime.ProviderCapabilities {
25+
return runtime.ProviderCapabilities{}
26+
}
27+
2028
func TestDeliverSessionNudgeWithProviderWaitIdleQueuesForCodex(t *testing.T) {
2129
t.Setenv("GC_BEADS", "file")
2230
dir := t.TempDir()
@@ -100,6 +108,47 @@ func TestDeliverSessionNudgeWithProviderWaitIdleStartsCodexPollerWhenQueued(t *t
100108
}
101109
}
102110

111+
func TestPollerSessionIdleEnoughUsesLastActivityWithoutCapabilityFlag(t *testing.T) {
112+
fake := runtime.NewFake()
113+
if err := fake.Start(context.Background(), "sess-worker", runtime.Config{}); err != nil {
114+
t.Fatalf("Start: %v", err)
115+
}
116+
fake.SetActivity("sess-worker", time.Now().Add(-5*time.Second))
117+
118+
if !pollerSessionIdleEnough(&noActivityCapabilityProvider{Fake: fake}, "sess-worker", 3*time.Second) {
119+
t.Fatal("pollerSessionIdleEnough = false, want true when last activity is old enough")
120+
}
121+
}
122+
123+
func TestShouldKeepNudgePollerAliveDuringStartupGrace(t *testing.T) {
124+
t.Setenv("GC_BEADS", "file")
125+
dir := t.TempDir()
126+
now := time.Now()
127+
item := newQueuedNudgeWithOptions("worker", "queued follow-up", "session", now.Add(-time.Minute), queuedNudgeOptions{
128+
ID: "n-grace",
129+
SessionID: "gc-1",
130+
})
131+
if err := enqueueQueuedNudge(dir, item); err != nil {
132+
t.Fatalf("enqueueQueuedNudge: %v", err)
133+
}
134+
135+
target := nudgeTarget{
136+
cityPath: dir,
137+
agent: config.Agent{Name: "worker"},
138+
sessionID: "gc-1",
139+
}
140+
141+
if !shouldKeepNudgePollerAlive(target, time.Time{}, now) {
142+
t.Fatal("shouldKeepNudgePollerAlive = false, want true on first missing-session check with queued items")
143+
}
144+
if !shouldKeepNudgePollerAlive(target, now.Add(-defaultNudgePollStartGrace/2), now) {
145+
t.Fatal("shouldKeepNudgePollerAlive = false, want true within startup grace")
146+
}
147+
if shouldKeepNudgePollerAlive(target, now.Add(-defaultNudgePollStartGrace-time.Second), now) {
148+
t.Fatal("shouldKeepNudgePollerAlive = true, want false after startup grace expires")
149+
}
150+
}
151+
103152
func TestDeliverSessionNudgeWithProviderImmediateUsesImmediateNudge(t *testing.T) {
104153
t.Setenv("GC_BEADS", "file")
105154
dir := t.TempDir()

cmd/gc/cmd_session.go

Lines changed: 159 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"text/tabwriter"
1313
"time"
1414

15+
"github.com/gastownhall/gascity/internal/api"
1516
"github.com/gastownhall/gascity/internal/beads"
1617
"github.com/gastownhall/gascity/internal/clock"
1718
"github.com/gastownhall/gascity/internal/config"
@@ -39,7 +40,7 @@ continuity.`,
3940
Args: cobra.ArbitraryArgs,
4041
RunE: func(_ *cobra.Command, args []string) error {
4142
if len(args) == 0 {
42-
fmt.Fprintln(stderr, "gc session: missing subcommand (new, list, attach, suspend, close, rename, prune, peek, kill, nudge, logs, wake, wait)") //nolint:errcheck // best-effort stderr
43+
fmt.Fprintln(stderr, "gc session: missing subcommand (new, list, attach, submit, suspend, close, rename, prune, peek, kill, nudge, logs, wake, wait)") //nolint:errcheck // best-effort stderr
4344
} else {
4445
fmt.Fprintf(stderr, "gc session: unknown subcommand %q\n", args[0]) //nolint:errcheck // best-effort stderr
4546
}
@@ -50,6 +51,7 @@ continuity.`,
5051
newSessionNewCmd(stdout, stderr),
5152
newSessionListCmd(stdout, stderr),
5253
newSessionAttachCmd(stdout, stderr),
54+
newSessionSubmitCmd(stdout, stderr),
5355
newSessionSuspendCmd(stdout, stderr),
5456
newSessionCloseCmd(stdout, stderr),
5557
newSessionRenameCmd(stdout, stderr),
@@ -64,6 +66,35 @@ continuity.`,
6466
return cmd
6567
}
6668

69+
func newSessionSubmitCmd(stdout, stderr io.Writer) *cobra.Command {
70+
var intent string
71+
cmd := &cobra.Command{
72+
Use: "submit <id-or-alias> <message...>",
73+
Short: "Submit a message with semantic delivery intent",
74+
Long: `Submit a user message to a session without choosing provider transport details.
75+
76+
The runtime decides whether to wake, inject immediately, or queue the message
77+
according to the selected semantic intent.`,
78+
Example: ` gc session submit mayor "status update"
79+
gc session submit mayor "after this run, handle docs" --intent follow_up
80+
gc session submit mayor "stop and do this instead" --intent interrupt_now`,
81+
Args: cobra.MinimumNArgs(2),
82+
RunE: func(_ *cobra.Command, args []string) error {
83+
parsedIntent, err := parseSessionSubmitIntent(intent)
84+
if err != nil {
85+
fmt.Fprintf(stderr, "gc session submit: %v\n", err) //nolint:errcheck // best-effort stderr
86+
return errExit
87+
}
88+
if cmdSessionSubmit(args, parsedIntent, stdout, stderr) != 0 {
89+
return errExit
90+
}
91+
return nil
92+
},
93+
}
94+
cmd.Flags().StringVar(&intent, "intent", string(session.SubmitIntentDefault), "submit intent: default, follow_up, or interrupt_now")
95+
return cmd
96+
}
97+
6798
// newSessionNewCmd creates the "gc session new <template>" command.
6899
func newSessionNewCmd(stdout, stderr io.Writer) *cobra.Command {
69100
var title string
@@ -152,52 +183,56 @@ func cmdSessionNew(args []string, alias, title string, noAttach bool, stdout, st
152183
canonicalTemplate := found.QualifiedName()
153184
singletonOwner := sessionNewAliasOwner(cfg, &found)
154185

155-
// Try reconciler-first path: create bead, poke controller.
156-
if pokeErr := pokeController(cityPath); pokeErr == nil {
157-
// Controller is running — create bead only, let reconciler start it.
158-
var info session.Info
159-
err := session.WithCitySessionAliasLock(cityPath, alias, func() error {
160-
if err := session.EnsureAliasAvailableWithConfigForOwner(store, cfg, alias, "", singletonOwner); err != nil {
161-
return err
162-
}
163-
var createErr error
164-
info, createErr = mgr.CreateAliasedBeadOnlyNamed(alias, "", canonicalTemplate, title, resolved.CommandString(), workDir, resolved.Name, found.Session, resolved.Env, session.ProviderResume{
165-
ResumeFlag: resolved.ResumeFlag,
166-
ResumeStyle: resolved.ResumeStyle,
167-
ResumeCommand: resolved.ResumeCommand,
168-
SessionIDFlag: resolved.SessionIDFlag,
186+
// Try reconciler-first path only when this specific city is managed by a
187+
// standalone controller or the machine-wide supervisor. A reachable
188+
// supervisor socket alone is not enough for unmanaged ad-hoc cities.
189+
if cityUsesManagedReconciler(cityPath) {
190+
if pokeErr := pokeController(cityPath); pokeErr == nil {
191+
// Controller is running — create bead only, let reconciler start it.
192+
var info session.Info
193+
err := session.WithCitySessionAliasLock(cityPath, alias, func() error {
194+
if err := session.EnsureAliasAvailableWithConfigForOwner(store, cfg, alias, "", singletonOwner); err != nil {
195+
return err
196+
}
197+
var createErr error
198+
info, createErr = mgr.CreateAliasedBeadOnlyNamed(alias, "", canonicalTemplate, title, resolved.CommandString(), workDir, resolved.Name, found.Session, resolved.Env, session.ProviderResume{
199+
ResumeFlag: resolved.ResumeFlag,
200+
ResumeStyle: resolved.ResumeStyle,
201+
ResumeCommand: resolved.ResumeCommand,
202+
SessionIDFlag: resolved.SessionIDFlag,
203+
})
204+
return createErr
169205
})
170-
return createErr
171-
})
172-
if err != nil {
173-
fmt.Fprintf(stderr, "gc session new: %v\n", err) //nolint:errcheck // best-effort stderr
174-
return 1
175-
}
206+
if err != nil {
207+
fmt.Fprintf(stderr, "gc session new: %v\n", err) //nolint:errcheck // best-effort stderr
208+
return 1
209+
}
176210

177-
// Poke again after bead creation to trigger immediate reconciler tick.
178-
_ = pokeController(cityPath)
211+
// Poke again after bead creation to trigger immediate reconciler tick.
212+
_ = pokeController(cityPath)
179213

180-
fmt.Fprintf(stdout, "Session %s created from template %q (reconciler will start it).\n", info.ID, canonicalTemplate) //nolint:errcheck // best-effort stdout
214+
fmt.Fprintf(stdout, "Session %s created from template %q (reconciler will start it).\n", info.ID, canonicalTemplate) //nolint:errcheck // best-effort stdout
181215

182-
if !shouldAttachNewSession(noAttach, found.Session) {
183-
if found.Session == "acp" && !noAttach {
184-
fmt.Fprintln(stdout, "Session uses ACP transport; not attaching.") //nolint:errcheck // best-effort stdout
216+
if !shouldAttachNewSession(noAttach, found.Session) {
217+
if found.Session == "acp" && !noAttach {
218+
fmt.Fprintln(stdout, "Session uses ACP transport; not attaching.") //nolint:errcheck // best-effort stdout
219+
}
220+
return 0
185221
}
186-
return 0
187-
}
188222

189-
// Wait for the reconciler to start the session before attaching.
190-
fmt.Fprintln(stdout, "Waiting for session to start...") //nolint:errcheck // best-effort stdout
191-
if waitErr := waitForSession(sp, info.SessionName, 30*time.Second, store, info.ID, stderr); waitErr != nil {
192-
fmt.Fprintf(stderr, "gc session new: %v\n", waitErr) //nolint:errcheck // best-effort stderr
193-
return 1
194-
}
195-
fmt.Fprintln(stdout, "Attaching...") //nolint:errcheck // best-effort stdout
196-
if err := sp.Attach(info.SessionName); err != nil {
197-
fmt.Fprintf(stderr, "gc session new: attaching: %v\n", err) //nolint:errcheck // best-effort stderr
198-
return 1
223+
// Wait for the reconciler to start the session before attaching.
224+
fmt.Fprintln(stdout, "Waiting for session to start...") //nolint:errcheck // best-effort stdout
225+
if waitErr := waitForSession(sp, info.SessionName, 30*time.Second, store, info.ID, stderr); waitErr != nil {
226+
fmt.Fprintf(stderr, "gc session new: %v\n", waitErr) //nolint:errcheck // best-effort stderr
227+
return 1
228+
}
229+
fmt.Fprintln(stdout, "Attaching...") //nolint:errcheck // best-effort stdout
230+
if err := sp.Attach(info.SessionName); err != nil {
231+
fmt.Fprintf(stderr, "gc session new: attaching: %v\n", err) //nolint:errcheck // best-effort stderr
232+
return 1
233+
}
234+
return 0
199235
}
200-
return 0
201236
}
202237

203238
// Fallback: controller not running — direct start via session manager.
@@ -695,7 +730,9 @@ func cmdSessionSuspend(args []string, stdout, stderr io.Writer) int {
695730
}
696731

697732
// Try reconciler-first path: set held_until metadata, poke controller.
698-
if cityErr == nil {
733+
// Only use this path when the city is managed by a standalone controller
734+
// or the machine-wide supervisor — not for unmanaged ad-hoc cities.
735+
if cityErr == nil && cityUsesManagedReconciler(cityPath) {
699736
if pokeErr := pokeController(cityPath); pokeErr == nil {
700737
// Controller is running — metadata-only suspend.
701738
// Set held_until far in the future so the reconciler drains/stops the session.
@@ -1068,6 +1105,87 @@ joined automatically.`,
10681105
return cmd
10691106
}
10701107

1108+
func parseSessionSubmitIntent(raw string) (session.SubmitIntent, error) {
1109+
switch strings.TrimSpace(raw) {
1110+
case "", string(session.SubmitIntentDefault):
1111+
return session.SubmitIntentDefault, nil
1112+
case "follow-up", string(session.SubmitIntentFollowUp):
1113+
return session.SubmitIntentFollowUp, nil
1114+
case "interrupt-now", string(session.SubmitIntentInterruptNow):
1115+
return session.SubmitIntentInterruptNow, nil
1116+
default:
1117+
return "", fmt.Errorf("unknown submit intent %q (want default, follow_up, or interrupt_now)", raw)
1118+
}
1119+
}
1120+
1121+
func cmdSessionSubmit(args []string, intent session.SubmitIntent, stdout, stderr io.Writer) int {
1122+
target := args[0]
1123+
message := strings.Join(args[1:], " ")
1124+
1125+
cityPath, err := resolveCity()
1126+
if err != nil {
1127+
fmt.Fprintf(stderr, "gc session submit: %v\n", err) //nolint:errcheck // best-effort stderr
1128+
return 1
1129+
}
1130+
1131+
if c := apiClient(cityPath); c != nil {
1132+
resp, err := c.SubmitSession(target, message, intent)
1133+
if err == nil {
1134+
emitSessionSubmitResult(stdout, target, intent, resp.Queued)
1135+
return 0
1136+
}
1137+
if !api.ShouldFallback(err) {
1138+
fmt.Fprintf(stderr, "gc session submit: %v\n", err) //nolint:errcheck // best-effort stderr
1139+
return 1
1140+
}
1141+
}
1142+
1143+
cfg, err := loadCityConfig(cityPath)
1144+
if err != nil {
1145+
fmt.Fprintf(stderr, "gc session submit: %v\n", err) //nolint:errcheck // best-effort stderr
1146+
return 1
1147+
}
1148+
store, code := openCityStore(stderr, "gc session submit")
1149+
if store == nil {
1150+
return code
1151+
}
1152+
1153+
sessionID, err := resolveSessionIDMaterializingNamed(cityPath, cfg, store, target)
1154+
if err != nil {
1155+
fmt.Fprintf(stderr, "gc session submit: %v\n", err) //nolint:errcheck // best-effort stderr
1156+
return 1
1157+
}
1158+
1159+
sp := newSessionProvider()
1160+
mgr := newSessionManagerWithConfig(cityPath, store, sp, cfg)
1161+
info, err := mgr.Get(sessionID)
1162+
if err != nil {
1163+
fmt.Fprintf(stderr, "gc session submit: %v\n", err) //nolint:errcheck // best-effort stderr
1164+
return 1
1165+
}
1166+
resumeCmd, hints := buildResumeCommand(cfg, info)
1167+
outcome, err := mgr.Submit(context.Background(), sessionID, message, resumeCmd, hints, intent)
1168+
if err != nil {
1169+
fmt.Fprintf(stderr, "gc session submit: %v\n", err) //nolint:errcheck // best-effort stderr
1170+
return 1
1171+
}
1172+
emitSessionSubmitResult(stdout, target, intent, outcome.Queued)
1173+
return 0
1174+
}
1175+
1176+
func emitSessionSubmitResult(stdout io.Writer, target string, intent session.SubmitIntent, queued bool) {
1177+
switch {
1178+
case queued:
1179+
fmt.Fprintf(stdout, "Queued follow-up for %s\n", target) //nolint:errcheck // best-effort stdout
1180+
case intent == session.SubmitIntentFollowUp:
1181+
fmt.Fprintf(stdout, "Submitted follow-up to %s\n", target) //nolint:errcheck // best-effort stdout
1182+
case intent == session.SubmitIntentInterruptNow:
1183+
fmt.Fprintf(stdout, "Interrupted and submitted to %s\n", target) //nolint:errcheck // best-effort stdout
1184+
default:
1185+
fmt.Fprintf(stdout, "Submitted to %s\n", target) //nolint:errcheck // best-effort stdout
1186+
}
1187+
}
1188+
10711189
// cmdSessionNudge is the CLI entry point for "gc session nudge".
10721190
func cmdSessionNudge(args []string, delivery nudgeDeliveryMode, stdout, stderr io.Writer) int {
10731191
target := args[0]

0 commit comments

Comments
 (0)