Skip to content

Commit a0fbbdd

Browse files
committed
Prefer local telegram forward after restore
1 parent bb0db57 commit a0fbbdd

File tree

2 files changed

+57
-15
lines changed

2 files changed

+57
-15
lines changed

src/server/workflows/channels/drain-channel-workflow.test.ts

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,29 @@ function createWorkflowDependencies(
6161
channels: { telegram: null, slack: null, discord: null, whatsapp: null },
6262
}),
6363
getSandboxDomain: async () => "https://sandbox.example.test",
64-
forwardToNativeHandler: async () => ({ ok: true, status: 200 }),
64+
forwardToNativeHandler: async () => ({
65+
ok: true,
66+
status: 200,
67+
durationMs: 0,
68+
bodyLength: 0,
69+
bodyHead: "",
70+
headers: null,
71+
}),
72+
forwardTelegramToNativeHandlerLocally: async () => ({
73+
ok: true,
74+
status: 200,
75+
durationMs: 0,
76+
bodyLength: 0,
77+
bodyHead: "",
78+
headers: null,
79+
error: null,
80+
}),
6581
forwardToNativeHandlerWithRetry: async (): Promise<RetryingForwardResult> => ({
6682
ok: true,
6783
status: 200,
6884
attempts: 1,
6985
totalMs: 50,
86+
transport: "public",
7087
retries: [],
7188
}),
7289
waitForTelegramNativeHandler: async (): Promise<TelegramProbeResult> => ({
@@ -108,7 +125,7 @@ test("processChannelStep skips ensureSandboxReady when boot returns running", as
108125
},
109126
forwardToNativeHandlerWithRetry: async (_channel: unknown, _payload: unknown, meta: SingleMeta): Promise<RetryingForwardResult> => {
110127
forwardedSandboxId = meta.sandboxId ?? null;
111-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
128+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
112129
},
113130
});
114131

@@ -139,7 +156,7 @@ test("processChannelStep falls back to ensureSandboxReady when boot returns non-
139156
},
140157
forwardToNativeHandlerWithRetry: async (_channel: unknown, _payload: unknown, meta: SingleMeta): Promise<RetryingForwardResult> => {
141158
forwardedSandboxId = meta.sandboxId ?? null;
142-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
159+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
143160
},
144161
});
145162

@@ -173,6 +190,7 @@ test("processChannelStep converts native forward 502 into RetryableError (Telegr
173190
status: 502,
174191
attempts: 6,
175192
totalMs: 6000,
193+
transport: "public",
176194
retries: [{ attempt: 1, reason: "proxy-error", status: 502 }],
177195
}),
178196
});
@@ -194,6 +212,7 @@ test("processChannelStep keeps native forward 404 fatal (Telegram retrying path)
194212
status: 404,
195213
attempts: 1,
196214
totalMs: 50,
215+
transport: "public",
197216
retries: [],
198217
}),
199218
});
@@ -214,11 +233,11 @@ test("processChannelStep uses retrying forward for Telegram, direct forward for
214233
const telegramDeps = createWorkflowDependencies({
215234
forwardToNativeHandlerWithRetry: async (): Promise<RetryingForwardResult> => {
216235
retryingCalled = true;
217-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
236+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
218237
},
219238
forwardToNativeHandler: async () => {
220239
directCalled = true;
221-
return { ok: true, status: 200 };
240+
return { ok: true, status: 200, durationMs: 0, bodyLength: 0, bodyHead: "", headers: null };
222241
},
223242
});
224243

@@ -232,11 +251,11 @@ test("processChannelStep uses retrying forward for Telegram, direct forward for
232251
const slackDeps = createWorkflowDependencies({
233252
forwardToNativeHandlerWithRetry: async (): Promise<RetryingForwardResult> => {
234253
retryingCalled = true;
235-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
254+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
236255
},
237256
forwardToNativeHandler: async () => {
238257
directCalled = true;
239-
return { ok: true, status: 200 };
258+
return { ok: true, status: 200, durationMs: 0, bodyLength: 0, bodyHead: "", headers: null };
240259
},
241260
});
242261

@@ -252,6 +271,7 @@ test("processChannelStep converts retrying forward 504 (exhausted) into Retryabl
252271
status: 504,
253272
attempts: 6,
254273
totalMs: 30000,
274+
transport: null,
255275
retries: [
256276
{ attempt: 1, reason: "proxy-error", status: 503 },
257277
{ attempt: 2, reason: "fetch-exception", error: "connect ECONNREFUSED" },
@@ -279,6 +299,7 @@ test("processChannelStep treats retrying forward 500 as retryable at workflow le
279299
status: 500,
280300
attempts: 1,
281301
totalMs: 100,
302+
transport: "public",
282303
retries: [],
283304
}),
284305
});
@@ -515,7 +536,7 @@ test("processChannelStep forward captures Telegram webhook secret and correct po
515536
capturedPayload = payload;
516537
capturedMeta = meta;
517538
capturedGetSandboxDomain = getSandboxDomain;
518-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
539+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
519540
},
520541
});
521542

@@ -554,7 +575,7 @@ test("processChannelStep forward passes meta with portUrls from boot result", as
554575
meta: SingleMeta,
555576
): Promise<RetryingForwardResult> => {
556577
forwardedPortUrls = (meta.portUrls as Record<string, string>) ?? null;
557-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
578+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
558579
},
559580
});
560581

@@ -610,7 +631,7 @@ test("processChannelStep uses local Telegram native handler readiness before for
610631
},
611632
forwardToNativeHandlerWithRetry: async (): Promise<RetryingForwardResult> => {
612633
forwardCalledAfterLocalProbe = localProbeCallCount > 0;
613-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
634+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "local", retries: [] };
614635
},
615636
});
616637

@@ -680,7 +701,7 @@ test("processChannelStep falls back to public Telegram probe when local handler
680701
},
681702
forwardToNativeHandlerWithRetry: async (): Promise<RetryingForwardResult> => {
682703
forwardCalled = true;
683-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
704+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
684705
},
685706
});
686707

@@ -726,7 +747,7 @@ test("processChannelStep still forwards when both Telegram probes time out", asy
726747
},
727748
forwardToNativeHandlerWithRetry: async (): Promise<RetryingForwardResult> => {
728749
forwardCalled = true;
729-
return { ok: true, status: 200, attempts: 1, totalMs: 50, retries: [] };
750+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
730751
},
731752
});
732753

src/server/workflows/channels/drain-channel-workflow.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export type RetryingForwardResult = {
1717
status: number;
1818
attempts: number;
1919
totalMs: number;
20+
transport?: "public" | "local" | null;
2021
retries: Array<{ attempt: number; reason: string; status?: number; error?: string }>;
2122
attemptsDetail?: ForwardAttemptDetail[];
2223
};
@@ -287,7 +288,8 @@ export async function processChannelStep(
287288
payload,
288289
readyMeta,
289290
getSandboxDomain,
290-
localProbeResult?.ready === true ? null : forwardTelegramToNativeHandlerLocally,
291+
forwardTelegramToNativeHandlerLocally,
292+
localProbeResult?.ready === true,
291293
);
292294
forwardResult = { ok: retryingResult.ok, status: retryingResult.status };
293295
} else {
@@ -306,6 +308,8 @@ export async function processChannelStep(
306308
diag.forwardAttempts = retryingResult?.attempts ?? null;
307309
diag.forwardRetries = retryingResult?.retries ?? null;
308310
diag.forwardTotalMs = retryingResult?.totalMs ?? null;
311+
diag.forwardTransport =
312+
retryingResult?.transport ?? (channel === "telegram" ? "public" : null);
309313
diag.forwardAttemptTimeline = retryingResult?.attemptsDetail ?? null;
310314
console.log(`[DIAG] Phase 2 DONE: ok=${forwardResult.ok} status=${forwardResult.status} attempts=${retryingResult?.attempts ?? 1} retries=${JSON.stringify(retryingResult?.retries ?? [])} durationMs=${diag.forwardDurationMs}`);
311315
await persistDiagSnapshot("native-forward-complete", {
@@ -315,6 +319,7 @@ export async function processChannelStep(
315319
forwardAttempts: diag.forwardAttempts,
316320
forwardRetries: diag.forwardRetries,
317321
forwardTotalMs: diag.forwardTotalMs,
322+
forwardTransport: diag.forwardTransport,
318323
forwardAttemptTimeline: diag.forwardAttemptTimeline,
319324
});
320325

@@ -324,6 +329,7 @@ export async function processChannelStep(
324329
sandboxId: readyMeta.sandboxId,
325330
ok: forwardResult.ok,
326331
status: forwardResult.status,
332+
transport: retryingResult?.transport ?? (channel === "telegram" ? "public" : null),
327333
retryingForwardAttempts: retryingResult?.attempts ?? null,
328334
retryingForwardTotalMs: retryingResult?.totalMs ?? null,
329335
retryingForwardRetries: retryingResult?.retries?.length ?? null,
@@ -368,6 +374,7 @@ export async function processChannelStep(
368374
telegramLocalProbeHeaders: diag.telegramLocalProbeHeaders ?? null,
369375
retryingForwardAttempts: retryingResult?.attempts ?? null,
370376
retryingForwardTotalMs: retryingResult?.totalMs ?? null,
377+
retryingForwardTransport: retryingResult?.transport ?? null,
371378
retryingForwardAttemptTimeline: retryingResult?.attemptsDetail ?? null,
372379
telegramReconcileBlocking: restore?.telegramReconcileBlocking ?? null,
373380
telegramReconcileMs: restore?.telegramReconcileMs ?? null,
@@ -497,6 +504,7 @@ export type ForwardAttemptDetail = {
497504
bodyLength: number | null;
498505
bodyHead: string | null;
499506
headers: DiagnosticHeaders | null;
507+
transport: "public" | "local";
500508
classification: string;
501509
error?: string | null;
502510
};
@@ -928,6 +936,7 @@ async function forwardToNativeHandlerWithRetry(
928936
headers: DiagnosticHeaders | null;
929937
error?: string | null;
930938
}>) | null,
939+
preferLocalTelegramForward = false,
931940
): Promise<RetryingForwardResult> {
932941
const startedAt = Date.now();
933942
const deadline = startedAt + RETRYING_FORWARD_TIMEOUT_MS;
@@ -937,9 +946,15 @@ async function forwardToNativeHandlerWithRetry(
937946
for (let attempt = 1; attempt <= RETRYING_FORWARD_MAX_ATTEMPTS && Date.now() < deadline; attempt++) {
938947
const attemptStartedAt = Date.now();
939948
try {
940-
const result = channel === "telegram" && meta.sandboxId && forwardTelegramToNativeHandlerLocally
949+
const useLocalForward =
950+
channel === "telegram"
951+
&& meta.sandboxId != null
952+
&& forwardTelegramToNativeHandlerLocally != null
953+
&& preferLocalTelegramForward;
954+
const transport: "public" | "local" = useLocalForward ? "local" : "public";
955+
const result = useLocalForward
941956
? await forwardTelegramToNativeHandlerLocally(
942-
meta.sandboxId,
957+
meta.sandboxId as string,
943958
payload,
944959
meta.channels.telegram?.webhookSecret ?? null,
945960
)
@@ -982,6 +997,7 @@ async function forwardToNativeHandlerWithRetry(
982997
bodyLength: result.bodyLength,
983998
bodyHead: result.bodyHead,
984999
headers: result.headers,
1000+
transport,
9851001
classification,
9861002
});
9871003
if (result.status >= 502 || result.status === 401 || result.status === 404 || swallowed) {
@@ -996,6 +1012,7 @@ async function forwardToNativeHandlerWithRetry(
9961012
durationMs: result.durationMs,
9971013
bodyLength: result.bodyLength,
9981014
bodyHead: result.bodyHead,
1015+
transport,
9991016
responseHeaders: result.headers,
10001017
retryElapsedMs: Date.now() - startedAt,
10011018
});
@@ -1014,6 +1031,7 @@ async function forwardToNativeHandlerWithRetry(
10141031
status: result.status,
10151032
attempts: attempt,
10161033
totalMs,
1034+
transport,
10171035
retryCount: retries.length,
10181036
attemptsDetail,
10191037
});
@@ -1022,6 +1040,7 @@ async function forwardToNativeHandlerWithRetry(
10221040
status: result.status,
10231041
attempts: attempt,
10241042
totalMs,
1043+
transport,
10251044
retries,
10261045
attemptsDetail,
10271046
};
@@ -1040,6 +1059,7 @@ async function forwardToNativeHandlerWithRetry(
10401059
bodyLength: null,
10411060
bodyHead: null,
10421061
headers: null,
1062+
transport: "public",
10431063
classification: "fetch-exception",
10441064
error: errorMsg,
10451065
});
@@ -1070,6 +1090,7 @@ async function forwardToNativeHandlerWithRetry(
10701090
status: 504,
10711091
attempts: RETRYING_FORWARD_MAX_ATTEMPTS,
10721092
totalMs,
1093+
transport: null,
10731094
retries,
10741095
attemptsDetail,
10751096
};

0 commit comments

Comments
 (0)