Skip to content

Commit 2176f9d

Browse files
committed
Fix telegram workflow config handoff
1 parent 9b93fe9 commit 2176f9d

File tree

3 files changed

+220
-26
lines changed

3 files changed

+220
-26
lines changed

src/app/api/channels/telegram/webhook/route.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,15 @@ export async function POST(request: Request): Promise<Response> {
323323
bootMessageId,
324324
handoffDelayMs: Date.now() - receivedAtMs,
325325
}));
326-
await telegramWebhookWorkflowRuntime.start(drainChannelWorkflow, ["telegram", payload, origin, requestId ?? null, bootMessageId, receivedAtMs]);
326+
await telegramWebhookWorkflowRuntime.start(drainChannelWorkflow, [
327+
"telegram",
328+
payload,
329+
origin,
330+
requestId ?? null,
331+
bootMessageId,
332+
receivedAtMs,
333+
{ fallbackTelegramConfig: config },
334+
]);
327335
logInfo("channels.telegram_workflow_started", withOperationContext(op, {
328336
effectiveStatus: effectiveMeta.status,
329337
effectiveSandboxId: effectiveMeta.sandboxId,

src/server/workflows/channels/drain-channel-workflow.test.ts

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ import test from "node:test";
33

44
import type { SingleMeta, RestorePhaseMetrics } from "@/shared/types";
55
import { getServerLogs, _resetLogBuffer } from "@/server/log";
6+
import { _resetStoreForTesting, getInitializedMeta } from "@/server/store/store";
7+
import { setTelegramChannelConfig } from "@/server/channels/state";
68
import {
79
processChannelStep,
810
toWorkflowProcessingError,
911
type DrainChannelWorkflowDependencies,
12+
type ChannelWorkflowHandoff,
1013
type RetryingForwardResult,
1114
type TelegramProbeResult,
1215
} from "@/server/workflows/channels/drain-channel-workflow";
@@ -40,6 +43,16 @@ function asMeta(meta: Partial<SingleMeta>): SingleMeta {
4043
return meta as SingleMeta;
4144
}
4245

46+
function createFallbackTelegramConfig() {
47+
return {
48+
botToken: "123456:handoff-token",
49+
webhookSecret: "handoff-secret",
50+
webhookUrl: "https://example.test/api/channels/telegram/webhook",
51+
botUsername: "handoff_bot",
52+
configuredAt: 1_777_000_000_000,
53+
};
54+
}
55+
4356
function createWorkflowDependencies(
4457
overrides: Partial<DrainChannelWorkflowDependencies> = {},
4558
): DrainChannelWorkflowDependencies {
@@ -106,6 +119,10 @@ function createWorkflowDependencies(
106119
};
107120
}
108121

122+
test.beforeEach(async () => {
123+
_resetStoreForTesting();
124+
});
125+
109126
test("processChannelStep skips ensureSandboxReady when boot returns running", async () => {
110127
let ensureCalls = 0;
111128
let forwardedSandboxId: string | null = null;
@@ -166,6 +183,97 @@ test("processChannelStep falls back to ensureSandboxReady when boot returns non-
166183
assert.equal(forwardedSandboxId, "sbx-restored");
167184
});
168185

186+
test("processChannelStep restores Telegram config from workflow handoff when store is empty", async () => {
187+
const fallbackTelegramConfig = createFallbackTelegramConfig();
188+
let bootHandleSawConfig = false;
189+
let forwardedWebhookSecret: string | null = null;
190+
191+
const dependencies = createWorkflowDependencies({
192+
buildExistingBootHandle: async () => {
193+
const meta = await getInitializedMeta();
194+
bootHandleSawConfig = meta.channels.telegram?.webhookSecret === fallbackTelegramConfig.webhookSecret;
195+
return undefined;
196+
},
197+
runWithBootMessages: async () => ({
198+
meta: asMeta({ status: "running", sandboxId: "sbx-handoff" }),
199+
bootMessageSent: false,
200+
}),
201+
forwardToNativeHandlerWithRetry: async (_channel: unknown, _payload: unknown, meta: SingleMeta): Promise<RetryingForwardResult> => {
202+
forwardedWebhookSecret = meta.channels.telegram?.webhookSecret ?? null;
203+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
204+
},
205+
});
206+
207+
await processChannelStep(
208+
"telegram",
209+
{ update_id: 1, message: { chat: { id: 123 } } },
210+
"test",
211+
"req-handoff",
212+
null,
213+
{
214+
dependencies,
215+
workflowHandoff: {
216+
fallbackTelegramConfig,
217+
} satisfies ChannelWorkflowHandoff,
218+
},
219+
);
220+
221+
const meta = await getInitializedMeta();
222+
assert.ok(bootHandleSawConfig, "boot handle should see restored Telegram config");
223+
assert.equal(meta.channels.telegram?.webhookSecret, fallbackTelegramConfig.webhookSecret);
224+
assert.equal(forwardedWebhookSecret, fallbackTelegramConfig.webhookSecret);
225+
});
226+
227+
test("processChannelStep preserves existing Telegram config over workflow handoff fallback", async () => {
228+
const existingConfig = {
229+
...createFallbackTelegramConfig(),
230+
webhookSecret: "existing-secret",
231+
botUsername: "existing_bot",
232+
};
233+
const fallbackTelegramConfig = createFallbackTelegramConfig();
234+
let forwardedWebhookSecret: string | null = null;
235+
236+
await setTelegramChannelConfig(existingConfig);
237+
238+
const dependencies = createWorkflowDependencies({
239+
runWithBootMessages: async () => ({
240+
meta: asMeta({
241+
status: "running",
242+
sandboxId: "sbx-existing",
243+
channels: {
244+
telegram: existingConfig as never,
245+
slack: null,
246+
discord: null,
247+
whatsapp: null,
248+
},
249+
}),
250+
bootMessageSent: false,
251+
}),
252+
forwardToNativeHandlerWithRetry: async (_channel: unknown, _payload: unknown, meta: SingleMeta): Promise<RetryingForwardResult> => {
253+
forwardedWebhookSecret = meta.channels.telegram?.webhookSecret ?? null;
254+
return { ok: true, status: 200, attempts: 1, totalMs: 50, transport: "public", retries: [] };
255+
},
256+
});
257+
258+
await processChannelStep(
259+
"telegram",
260+
{ update_id: 2, message: { chat: { id: 456 } } },
261+
"test",
262+
"req-existing",
263+
null,
264+
{
265+
dependencies,
266+
workflowHandoff: {
267+
fallbackTelegramConfig,
268+
} satisfies ChannelWorkflowHandoff,
269+
},
270+
);
271+
272+
const meta = await getInitializedMeta();
273+
assert.equal(meta.channels.telegram?.webhookSecret, existingConfig.webhookSecret);
274+
assert.equal(forwardedWebhookSecret, existingConfig.webhookSecret);
275+
});
276+
169277
test("processChannelStep converts retrying forward fetch exception into RetryableError", async () => {
170278
const dependencies = createWorkflowDependencies({
171279
forwardToNativeHandlerWithRetry: async () => {
@@ -842,8 +950,11 @@ test("processChannelStep accepts local Telegram empty 200 without retrying", asy
842950
assert.ok(summary, "telegram wake summary should be emitted");
843951
assert.equal(summary.data?.retryingForwardAttempts, 1);
844952
assert.equal(summary.data?.retryingForwardTransport, "local");
953+
const attemptTimeline = summary.data?.retryingForwardAttemptTimeline as
954+
| Array<{ classification?: string }>
955+
| undefined;
845956
assert.equal(
846-
summary.data?.retryingForwardAttemptTimeline?.[0]?.classification,
957+
attemptTimeline?.[0]?.classification,
847958
"accepted",
848959
);
849960
});

0 commit comments

Comments
 (0)