Skip to content

Commit 814d344

Browse files
author
Onur Solmaz
committed
refactor(slack-gateway): recover stale runtime sessions
1 parent 188fedf commit 814d344

9 files changed

Lines changed: 537 additions & 48 deletions

File tree

helm/spritz/templates/operator-deployment.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@ spec:
5353
- name: SPRITZ_HOME_SIZE_LIMIT
5454
value: {{ .Values.operator.homeSizeLimit | quote }}
5555
{{- end }}
56+
{{- if .Values.operator.lifecycleNotifications.url }}
57+
- name: SPRITZ_LIFECYCLE_NOTIFY_URL
58+
value: {{ .Values.operator.lifecycleNotifications.url | quote }}
59+
{{- end }}
60+
{{- if .Values.operator.lifecycleNotifications.authToken }}
61+
- name: SPRITZ_LIFECYCLE_NOTIFY_AUTH_TOKEN
62+
value: {{ .Values.operator.lifecycleNotifications.authToken | quote }}
63+
{{- end }}
64+
{{- if .Values.operator.lifecycleNotifications.timeout }}
65+
- name: SPRITZ_LIFECYCLE_NOTIFY_TIMEOUT
66+
value: {{ .Values.operator.lifecycleNotifications.timeout | quote }}
67+
{{- end }}
5668
- name: SPRITZ_ROUTE_MODEL_TYPE
5769
value: {{ include "spritz.routeModel.type" . | quote }}
5870
- name: SPRITZ_ROUTE_HOST

helm/spritz/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ operator:
105105
workspaceSizeLimit: 10Gi
106106
homeSizeLimit: 5Gi
107107
podNodeSelector: ""
108+
lifecycleNotifications:
109+
url: ""
110+
authToken: ""
111+
timeout: 3s
108112
sharedMounts:
109113
enabled: false
110114
mounts: []

integrations/slack-gateway/backend_client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,17 @@ func channelSessionUnavailableProviderAuth(err error) (slackInstallation, bool)
126126
return unavailableErr.providerAuth, true
127127
}
128128

129+
func isSpritzRuntimeMissingError(err error) bool {
130+
var statusErr *httpStatusError
131+
if !errors.As(err, &statusErr) {
132+
return false
133+
}
134+
if statusErr.statusCode != http.StatusNotFound {
135+
return false
136+
}
137+
return strings.Contains(strings.ToLower(statusErr.body), "spritz not found")
138+
}
139+
129140
func (g *slackGateway) exchangeChannelSession(ctx context.Context, teamID string) (channelSession, error) {
130141
body := map[string]any{
131142
"principalId": g.cfg.PrincipalID,

integrations/slack-gateway/gateway_test.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2176,6 +2176,219 @@ func TestProcessMessageEventPostsStatusMessageWhileSessionRecoveryIsInFlight(t *
21762176
}
21772177
}
21782178

2179+
func TestProcessMessageEventRecoversAfterRuntimeDisappearsMidFlight(t *testing.T) {
2180+
var slackPayloads struct {
2181+
sync.Mutex
2182+
items []map[string]any
2183+
}
2184+
slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2185+
if r.URL.Path != "/chat.postMessage" {
2186+
t.Fatalf("unexpected slack path %s", r.URL.Path)
2187+
}
2188+
var payload map[string]any
2189+
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
2190+
t.Fatalf("decode slack post body: %v", err)
2191+
}
2192+
slackPayloads.Lock()
2193+
slackPayloads.items = append(slackPayloads.items, payload)
2194+
slackPayloads.Unlock()
2195+
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))})
2196+
}))
2197+
defer slackAPI.Close()
2198+
2199+
var sessionExchangeCalls atomic.Int32
2200+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2201+
if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" {
2202+
t.Fatalf("unexpected backend path %s", r.URL.Path)
2203+
}
2204+
call := sessionExchangeCalls.Add(1)
2205+
switch call {
2206+
case 1, 2:
2207+
writeJSON(w, http.StatusOK, map[string]any{
2208+
"status": "resolved",
2209+
"session": map[string]any{
2210+
"accessToken": "owner-token-old",
2211+
"ownerAuthId": "owner-123",
2212+
"namespace": "spritz-staging",
2213+
"instanceId": "zeno-old",
2214+
"providerAuth": map[string]any{
2215+
"providerInstallRef": "cred_slack_workspace_1",
2216+
"apiAppId": "A_app_1",
2217+
"teamId": "T_workspace_1",
2218+
"botUserId": "U_bot",
2219+
"botAccessToken": "xoxb-installed",
2220+
},
2221+
},
2222+
})
2223+
case 3:
2224+
writeJSON(w, http.StatusServiceUnavailable, map[string]any{
2225+
"status": "unavailable",
2226+
"providerAuth": map[string]any{
2227+
"providerInstallRef": "cred_slack_workspace_1",
2228+
"apiAppId": "A_app_1",
2229+
"teamId": "T_workspace_1",
2230+
"botUserId": "U_bot",
2231+
"botAccessToken": "xoxb-installed",
2232+
},
2233+
})
2234+
default:
2235+
writeJSON(w, http.StatusOK, map[string]any{
2236+
"status": "resolved",
2237+
"session": map[string]any{
2238+
"accessToken": "owner-token-new",
2239+
"ownerAuthId": "owner-123",
2240+
"namespace": "spritz-staging",
2241+
"instanceId": "zeno-new",
2242+
"providerAuth": map[string]any{
2243+
"providerInstallRef": "cred_slack_workspace_1",
2244+
"apiAppId": "A_app_1",
2245+
"teamId": "T_workspace_1",
2246+
"botUserId": "U_bot",
2247+
"botAccessToken": "xoxb-installed",
2248+
},
2249+
},
2250+
})
2251+
}
2252+
}))
2253+
defer backend.Close()
2254+
2255+
var upsertCalls atomic.Int32
2256+
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
2257+
spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2258+
switch r.URL.Path {
2259+
case "/api/channel-conversations/upsert":
2260+
call := upsertCalls.Add(1)
2261+
if call == 1 {
2262+
http.Error(w, `{"status":"error","message":"spritz not found"}`, http.StatusNotFound)
2263+
return
2264+
}
2265+
writeJSON(w, http.StatusCreated, map[string]any{
2266+
"status": "success",
2267+
"data": map[string]any{
2268+
"created": true,
2269+
"conversation": map[string]any{
2270+
"metadata": map[string]any{"name": "conv-1"},
2271+
"spec": map[string]any{"cwd": "/home/dev"},
2272+
},
2273+
},
2274+
})
2275+
case "/api/acp/conversations/conv-1/bootstrap":
2276+
writeJSON(w, http.StatusOK, map[string]any{
2277+
"status": "success",
2278+
"data": map[string]any{
2279+
"effectiveSessionId": "session-1",
2280+
"conversation": map[string]any{
2281+
"metadata": map[string]any{"name": "conv-1"},
2282+
"spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"},
2283+
},
2284+
},
2285+
})
2286+
case "/api/acp/conversations/conv-1/connect":
2287+
conn, err := upgrader.Upgrade(w, r, nil)
2288+
if err != nil {
2289+
t.Fatalf("upgrade failed: %v", err)
2290+
}
2291+
defer conn.Close()
2292+
for {
2293+
_, payload, err := conn.ReadMessage()
2294+
if err != nil {
2295+
return
2296+
}
2297+
var message map[string]any
2298+
if err := json.Unmarshal(payload, &message); err != nil {
2299+
t.Fatalf("decode ws payload: %v", err)
2300+
}
2301+
switch message["method"] {
2302+
case "initialize":
2303+
_ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}})
2304+
case "session/load":
2305+
_ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}})
2306+
case "session/prompt":
2307+
_ = conn.WriteJSON(map[string]any{
2308+
"jsonrpc": "2.0",
2309+
"method": "session/update",
2310+
"params": map[string]any{
2311+
"update": map[string]any{
2312+
"sessionUpdate": "agent_message_chunk",
2313+
"content": []map[string]any{{
2314+
"type": "text",
2315+
"text": "Hello from recovered concierge",
2316+
}},
2317+
},
2318+
},
2319+
})
2320+
_ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}})
2321+
return
2322+
default:
2323+
t.Fatalf("unexpected ACP method %#v", message["method"])
2324+
}
2325+
}
2326+
default:
2327+
t.Fatalf("unexpected spritz path %s", r.URL.Path)
2328+
}
2329+
}))
2330+
defer spritz.Close()
2331+
2332+
cfg := config{
2333+
SlackAPIBaseURL: slackAPI.URL,
2334+
BackendBaseURL: backend.URL,
2335+
BackendInternalToken: "backend-internal-token",
2336+
SpritzBaseURL: spritz.URL,
2337+
SpritzServiceToken: "spritz-service-token",
2338+
PrincipalID: "shared-slack-gateway",
2339+
HTTPTimeout: 200 * time.Millisecond,
2340+
DedupeTTL: time.Minute,
2341+
StatusMessageDelay: 5 * time.Millisecond,
2342+
SessionRetryInterval: 10 * time.Millisecond,
2343+
ProcessingTimeout: 200 * time.Millisecond,
2344+
}
2345+
gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)))
2346+
2347+
envelope := slackEnvelope{
2348+
APIAppID: "A_app_1",
2349+
TeamID: "T_workspace_1",
2350+
Event: slackEventInner{
2351+
Type: "app_mention",
2352+
User: "U_user",
2353+
Text: "<@U_bot> hello",
2354+
Channel: "C_channel_1",
2355+
ChannelType: "channel",
2356+
TS: "1711387375.000100",
2357+
},
2358+
}
2359+
delivery, process, err := gateway.beginMessageEventDelivery(envelope)
2360+
if err != nil {
2361+
t.Fatalf("beginMessageEventDelivery returned error: %v", err)
2362+
}
2363+
if !process {
2364+
t.Fatal("expected app mention to be processed")
2365+
}
2366+
2367+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
2368+
defer cancel()
2369+
if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil {
2370+
t.Fatalf("expected missing-runtime recovery flow to succeed, got %v", err)
2371+
}
2372+
2373+
slackPayloads.Lock()
2374+
defer slackPayloads.Unlock()
2375+
if len(slackPayloads.items) != 2 {
2376+
t.Fatalf("expected wake-up message and final reply, got %#v", slackPayloads.items)
2377+
}
2378+
if got := slackPayloads.items[0]["text"]; got != "Still waking up. I will continue here shortly." {
2379+
t.Fatalf("expected wake-up status text, got %#v", got)
2380+
}
2381+
if got := slackPayloads.items[1]["text"]; got != "Hello from recovered concierge" {
2382+
t.Fatalf("expected final reply text, got %#v", got)
2383+
}
2384+
if sessionExchangeCalls.Load() != 4 {
2385+
t.Fatalf("expected 4 session exchange attempts, got %d", sessionExchangeCalls.Load())
2386+
}
2387+
if upsertCalls.Load() != 3 {
2388+
t.Fatalf("expected recovery retry plus alias persistence, got %d", upsertCalls.Load())
2389+
}
2390+
}
2391+
21792392
func TestProcessMessageEventPostsTerminalErrorAfterRecoveryTimeout(t *testing.T) {
21802393
var slackPayloads struct {
21812394
sync.Mutex

0 commit comments

Comments
 (0)