Skip to content

Commit 758a8f8

Browse files
authored
support reasoning agentid by accountId or session for cron (#847)
1 parent 54ec784 commit 758a8f8

File tree

9 files changed

+276
-389
lines changed

9 files changed

+276
-389
lines changed

electron/api/routes/channels.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { readFile, readdir } from 'node:fs/promises';
2+
import { extractSessionRecords } from '../../utils/session-util';
23
import type { IncomingMessage, ServerResponse } from 'http';
34
import { join } from 'node:path';
45
import {
@@ -679,16 +680,6 @@ function inferTargetKindFromValue(
679680
return 'user';
680681
}
681682

682-
function extractSessionRecords(store: JsonRecord): JsonRecord[] {
683-
const directEntries = Object.entries(store)
684-
.filter(([key, value]) => key !== 'sessions' && value && typeof value === 'object')
685-
.map(([, value]) => value as JsonRecord);
686-
const arrayEntries = Array.isArray(store.sessions)
687-
? store.sessions.filter((entry): entry is JsonRecord => Boolean(entry && typeof entry === 'object'))
688-
: [];
689-
return [...directEntries, ...arrayEntries];
690-
}
691-
692683
function buildChannelTargetCacheKey(params: {
693684
channelType: string;
694685
accountId?: string;

electron/api/routes/cron.ts

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,14 @@ import { join } from 'node:path';
44
import type { HostApiContext } from '../context';
55
import { parseJsonBody, sendJson } from '../route-utils';
66
import { getOpenClawConfigDir } from '../../utils/paths';
7+
import { resolveAccountIdFromSessionHistory } from '../../utils/session-util';
78
import { toOpenClawChannelType, toUiChannelType } from '../../utils/channel-alias';
9+
import { resolveAgentIdFromChannel } from '../../utils/agent-config';
810

11+
/**
12+
* Find agentId from session history by delivery "to" address.
13+
* Efficiently searches only agent session directories for matching deliveryContext.to.
14+
*/
915
interface GatewayCronJob {
1016
id: string;
1117
name: string;
@@ -461,6 +467,14 @@ export async function handleCronRoutes(
461467
const result = await ctx.gatewayManager.rpc('cron.list', { includeDisabled: true }, 8000);
462468
const data = result as { jobs?: GatewayCronJob[] };
463469
jobs = data?.jobs ?? (Array.isArray(result) ? result as GatewayCronJob[] : []);
470+
471+
// DEBUG: log name and agentId for each job
472+
console.debug('Fetched cron jobs from Gateway:');
473+
for (const job of jobs) {
474+
const jobAgentId = (job as unknown as { agentId?: string }).agentId;
475+
const deliveryInfo = job.delivery ? `delivery={mode:${job.delivery.mode}, channel:${job.delivery.channel || '(none)'}, accountId:${job.delivery.accountId || '(none)'}, to:${job.delivery.to || '(none)'}}` : 'delivery=(none)';
476+
console.debug(` - name: "${job.name}", agentId: "${jobAgentId || '(undefined)'}", ${deliveryInfo}, sessionTarget: "${job.sessionTarget || '(none)'}", payload.kind: "${job.payload?.kind || '(none)'}"`);
477+
}
464478
} catch {
465479
// Fallback: read cron.json directly when Gateway RPC fails/times out.
466480
try {
@@ -477,7 +491,8 @@ export async function handleCronRoutes(
477491

478492
// Run repair in background — don't block the response.
479493
if (!usedFallback && jobs.length > 0) {
480-
const jobsToRepair = jobs.filter((job) => {
494+
// Repair 1: delivery channel missing
495+
const jobsToRepairDelivery = jobs.filter((job) => {
481496
const isIsolatedAgent =
482497
(job.sessionTarget === 'isolated' || !job.sessionTarget) &&
483498
job.payload?.kind === 'agentTurn';
@@ -487,10 +502,10 @@ export async function handleCronRoutes(
487502
!job.delivery?.channel
488503
);
489504
});
490-
if (jobsToRepair.length > 0) {
505+
if (jobsToRepairDelivery.length > 0) {
491506
// Fire-and-forget: repair in background
492507
void (async () => {
493-
for (const job of jobsToRepair) {
508+
for (const job of jobsToRepairDelivery) {
494509
try {
495510
await ctx.gatewayManager.rpc('cron.update', {
496511
id: job.id,
@@ -502,14 +517,76 @@ export async function handleCronRoutes(
502517
}
503518
})();
504519
// Optimistically fix the response data
505-
for (const job of jobsToRepair) {
520+
for (const job of jobsToRepairDelivery) {
506521
job.delivery = { mode: 'none' };
507522
if (job.state?.lastError?.includes('Channel is required')) {
508523
job.state.lastError = undefined;
509524
job.state.lastStatus = 'ok';
510525
}
511526
}
512527
}
528+
529+
// Repair 2: agentId is undefined for jobs with announce delivery
530+
// Only repair undefined -> inferred agent, NOT main -> inferred agent
531+
const jobsToRepairAgent = jobs.filter((job) => {
532+
const jobAgentId = (job as unknown as { agentId?: string }).agentId;
533+
return (
534+
(job.sessionTarget === 'isolated' || !job.sessionTarget) &&
535+
job.payload?.kind === 'agentTurn' &&
536+
job.delivery?.mode === 'announce' &&
537+
job.delivery?.channel &&
538+
jobAgentId === undefined // Only repair when agentId is completely undefined
539+
);
540+
});
541+
if (jobsToRepairAgent.length > 0) {
542+
console.debug(`Found ${jobsToRepairAgent.length} jobs needing agent repair:`);
543+
for (const job of jobsToRepairAgent) {
544+
console.debug(` - Job "${job.name}" (id: ${job.id}): current agentId="${(job as unknown as { agentId?: string }).agentId || '(undefined)'}", channel="${job.delivery?.channel}", accountId="${job.delivery?.accountId || '(none)'}"`);
545+
}
546+
// Fire-and-forget: repair in background
547+
void (async () => {
548+
for (const job of jobsToRepairAgent) {
549+
try {
550+
const channel = toOpenClawChannelType(job.delivery!.channel!);
551+
const accountId = job.delivery!.accountId;
552+
const toAddress = job.delivery!.to;
553+
554+
// Try 1: resolve from channel + accountId binding
555+
let correctAgentId = await resolveAgentIdFromChannel(channel, accountId);
556+
557+
// If no accountId, try to resolve it from session history using "to" address, then get agentId
558+
let resolvedAccountId: string | null = null;
559+
if (!correctAgentId && !accountId && toAddress) {
560+
console.debug(`No binding found for channel="${channel}", accountId="${accountId || '(none)'}", trying session history for to="${toAddress}"`);
561+
resolvedAccountId = await resolveAccountIdFromSessionHistory(toAddress, channel);
562+
if (resolvedAccountId) {
563+
console.debug(`Resolved accountId="${resolvedAccountId}" from session history, now resolving agentId`);
564+
correctAgentId = await resolveAgentIdFromChannel(channel, resolvedAccountId);
565+
}
566+
}
567+
568+
if (correctAgentId) {
569+
console.debug(`Repairing job "${job.name}": agentId "${(job as unknown as { agentId?: string }).agentId || '(undefined)'}" -> "${correctAgentId}"`);
570+
// When accountId was resolved via to address, include it in the patch
571+
const patch: Record<string, unknown> = { agentId: correctAgentId };
572+
if (resolvedAccountId && !accountId) {
573+
patch.delivery = { accountId: resolvedAccountId };
574+
}
575+
await ctx.gatewayManager.rpc('cron.update', { id: job.id, patch });
576+
// Update the local job object so response reflects correct agentId
577+
(job as unknown as { agentId: string }).agentId = correctAgentId;
578+
if (resolvedAccountId && !accountId && job.delivery) {
579+
job.delivery.accountId = resolvedAccountId;
580+
}
581+
} else {
582+
console.warn(`Could not resolve agent for job "${job.name}": channel="${channel}", accountId="${accountId || '(none)'}", to="${toAddress || '(none)'}"`);
583+
}
584+
} catch (error) {
585+
console.error(`Failed to repair agent for job "${job.name}":`, error);
586+
}
587+
}
588+
})();
589+
}
513590
}
514591

515592
sendJson(res, 200, jobs.map((job) => ({ ...transformCronJob(job), ...(usedFallback ? { _fromFallback: true } : {}) })));
@@ -532,6 +609,8 @@ export async function handleCronRoutes(
532609
const agentId = typeof input.agentId === 'string' && input.agentId.trim()
533610
? input.agentId.trim()
534611
: 'main';
612+
// DEBUG: log the input and resolved agentId
613+
console.debug(`Creating cron job: name="${input.name}", input.agentId="${input.agentId || '(not provided)'}", resolved agentId="${agentId}"`);
535614
const delivery = normalizeCronDelivery(input.delivery);
536615
const unsupportedDeliveryError = getUnsupportedCronDeliveryError(delivery.channel);
537616
if (delivery.mode === 'announce' && unsupportedDeliveryError) {

electron/main/ipc-handlers.ts

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import {
2222
import { syncProxyConfigToOpenClaw } from '../utils/openclaw-proxy';
2323
import { buildOpenClawControlUiUrl } from '../utils/openclaw-control-ui';
2424
import { logger } from '../utils/logger';
25+
import { resolveAgentIdFromChannel } from '../utils/agent-config';
26+
import { resolveAccountIdFromSessionHistory } from '../utils/session-util';
2527
import {
2628
saveChannelConfig,
2729
getChannelConfig,
@@ -891,8 +893,7 @@ function registerCronHandlers(gatewayManager: GatewayManager): void {
891893
ipcMain.handle('cron:list', async () => {
892894
try {
893895
const result = await gatewayManager.rpc('cron.list', { includeDisabled: true });
894-
const data = result as { jobs?: GatewayCronJob[] };
895-
const jobs = data?.jobs ?? [];
896+
const jobs = Array.isArray(result) ? result : (result as { jobs?: GatewayCronJob[] })?.jobs ?? [];
896897

897898
// Auto-repair legacy UI-created jobs that were saved without
898899
// delivery: { mode: 'none' }. The Gateway auto-normalizes them
@@ -1031,6 +1032,65 @@ function registerCronHandlers(gatewayManager: GatewayManager): void {
10311032
throw error;
10321033
}
10331034
});
1035+
1036+
// Periodic cron job repair: checks for jobs with undefined agentId and repairs them
1037+
// This handles cases where cron jobs were created via openclaw CLI without specifying agent
1038+
const CRON_AGENT_REPAIR_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
1039+
let _lastRepairErrorLogAt = 0;
1040+
const REPAIR_ERROR_LOG_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
1041+
setInterval(async () => {
1042+
try {
1043+
const status = gatewayManager.getStatus();
1044+
if (status.state !== 'running') return;
1045+
1046+
const result = await gatewayManager.rpc('cron.list', { includeDisabled: true });
1047+
const jobs = Array.isArray(result)
1048+
? result
1049+
: (result as { jobs?: Array<{ id: string; name: string; sessionTarget?: string; payload?: { kind: string }; delivery?: { mode: string; channel?: string; to?: string; accountId?: string }; state?: Record<string, unknown> }> })?.jobs ?? [];
1050+
1051+
for (const job of jobs) {
1052+
const jobAgentId = (job as unknown as { agentId?: string }).agentId;
1053+
if (
1054+
(job.sessionTarget === 'isolated' || !job.sessionTarget) &&
1055+
job.payload?.kind === 'agentTurn' &&
1056+
job.delivery?.mode === 'announce' &&
1057+
job.delivery?.channel &&
1058+
jobAgentId === undefined
1059+
) {
1060+
const channel = job.delivery.channel;
1061+
const accountId = job.delivery.accountId;
1062+
const toAddress = job.delivery.to;
1063+
1064+
let correctAgentId = await resolveAgentIdFromChannel(channel, accountId);
1065+
1066+
// If no accountId, try to resolve it from session history
1067+
let resolvedAccountId: string | null = null;
1068+
if (!correctAgentId && !accountId && toAddress) {
1069+
resolvedAccountId = await resolveAccountIdFromSessionHistory(toAddress, channel);
1070+
if (resolvedAccountId) {
1071+
correctAgentId = await resolveAgentIdFromChannel(channel, resolvedAccountId);
1072+
}
1073+
}
1074+
1075+
if (correctAgentId) {
1076+
console.debug(`Periodic repair: job "${job.name}" agentId undefined -> "${correctAgentId}"`);
1077+
// When accountId was resolved via to address, include it in the patch
1078+
const patch: Record<string, unknown> = { agentId: correctAgentId };
1079+
if (resolvedAccountId && !accountId) {
1080+
patch.delivery = { accountId: resolvedAccountId };
1081+
}
1082+
await gatewayManager.rpc('cron.update', { id: job.id, patch });
1083+
}
1084+
}
1085+
}
1086+
} catch (error) {
1087+
const now = Date.now();
1088+
if (now - _lastRepairErrorLogAt >= REPAIR_ERROR_LOG_INTERVAL_MS) {
1089+
_lastRepairErrorLogAt = now;
1090+
console.debug('Periodic cron repair error:', error);
1091+
}
1092+
}
1093+
}, CRON_AGENT_REPAIR_INTERVAL_MS);
10341094
}
10351095

10361096
/**

electron/utils/agent-config.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,25 @@ export async function listConfiguredAgentIds(): Promise<string[]> {
555555
return ids.length > 0 ? ids : [MAIN_AGENT_ID];
556556
}
557557

558+
/**
559+
* Resolve agentId from channel and accountId using bindings.
560+
* Returns the agentId if found, or null if no binding exists.
561+
*/
562+
export async function resolveAgentIdFromChannel(channel: string, accountId?: string): Promise<string | null> {
563+
const config = await readOpenClawConfig() as AgentConfigDocument;
564+
const { channelToAgent, accountToAgent } = getChannelBindingMap(config.bindings);
565+
566+
// First try account-specific binding
567+
if (accountId) {
568+
const agentId = accountToAgent.get(`${channel}:${accountId}`);
569+
if (agentId) return agentId;
570+
}
571+
572+
// Fallback to channel-only binding
573+
const agentId = channelToAgent.get(channel);
574+
return agentId ?? null;
575+
}
576+
558577
export async function createAgent(
559578
name: string,
560579
options?: { inheritWorkspace?: boolean },

electron/utils/session-util.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Shared session utilities
3+
*/
4+
import { readFile, readdir } from 'node:fs/promises';
5+
import { join } from 'node:path';
6+
import { getOpenClawConfigDir } from './paths';
7+
8+
type JsonRecord = Record<string, unknown>;
9+
10+
/**
11+
* Parse sessions.json supporting both formats:
12+
* - Object-keyed: { "agent:xxx:yyy": { deliveryContext: {...} } }
13+
* - Array format: { sessions: [...] }
14+
*/
15+
export function extractSessionRecords(store: JsonRecord): JsonRecord[] {
16+
const directEntries = Object.entries(store)
17+
.filter(([key, value]) => key !== 'sessions' && value && typeof value === 'object')
18+
.map(([, value]) => value as JsonRecord);
19+
const arrayEntries = Array.isArray(store.sessions)
20+
? store.sessions.filter((entry): entry is JsonRecord => Boolean(entry && typeof entry === 'object'))
21+
: [];
22+
return [...directEntries, ...arrayEntries];
23+
}
24+
25+
/**
26+
* Find accountId from session history by "to" address and channel type.
27+
* Searches all agent session directories for a matching deliveryContext.
28+
*/
29+
export async function resolveAccountIdFromSessionHistory(
30+
toAddress: string,
31+
channelType: string,
32+
): Promise<string | null> {
33+
const agentsDir = join(getOpenClawConfigDir(), 'agents');
34+
35+
let agentDirs: Array<{ name: string; isDirectory: () => boolean }>;
36+
try {
37+
agentDirs = await readdir(agentsDir, { withFileTypes: true });
38+
} catch {
39+
return null;
40+
}
41+
42+
for (const entry of agentDirs) {
43+
if (!entry.isDirectory()) continue;
44+
45+
const sessionsPath = join(agentsDir, entry.name, 'sessions', 'sessions.json');
46+
let raw: string;
47+
try {
48+
raw = await readFile(sessionsPath, 'utf8');
49+
} catch {
50+
continue;
51+
}
52+
53+
if (!raw.trim()) continue;
54+
55+
let parsed: Record<string, unknown>;
56+
try {
57+
parsed = JSON.parse(raw);
58+
} catch {
59+
continue;
60+
}
61+
62+
for (const session of extractSessionRecords(parsed as JsonRecord)) {
63+
const deliveryContext = session.deliveryContext as Record<string, unknown> | undefined;
64+
if (
65+
deliveryContext &&
66+
typeof deliveryContext.to === 'string' &&
67+
deliveryContext.to === toAddress &&
68+
typeof deliveryContext.channel === 'string' &&
69+
deliveryContext.channel === channelType
70+
) {
71+
if (typeof deliveryContext.accountId === 'string') {
72+
return deliveryContext.accountId;
73+
}
74+
}
75+
}
76+
}
77+
78+
return null;
79+
}

src/pages/Cron/index.tsx

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ function TaskDialog({ job, configuredChannels, onClose, onSave }: TaskDialogProp
251251
const [name, setName] = useState(job?.name || '');
252252
const [message, setMessage] = useState(job?.message || '');
253253
const [selectedAgentId, setSelectedAgentId] = useState(job?.agentId || useChatStore.getState().currentAgentId);
254-
const [agentIdChanged, setAgentIdChanged] = useState(false);
255254
// Extract cron expression string from CronSchedule object or use as-is if string
256255
const initialSchedule = (() => {
257256
const s = job?.schedule;
@@ -411,7 +410,7 @@ function TaskDialog({ job, configuredChannels, onClose, onSave }: TaskDialogProp
411410
schedule: finalSchedule,
412411
delivery: finalDelivery,
413412
enabled,
414-
...(agentIdChanged ? { agentId: selectedAgentId } : {}),
413+
agentId: selectedAgentId,
415414
});
416415
onClose();
417416
toast.success(job ? t('toast.updated') : t('toast.created'));
@@ -468,7 +467,6 @@ function TaskDialog({ job, configuredChannels, onClose, onSave }: TaskDialogProp
468467
value={selectedAgentId}
469468
onChange={(e) => {
470469
setSelectedAgentId(e.target.value);
471-
setAgentIdChanged(true);
472470
}}
473471
className="h-[44px] rounded-xl border-black/10 dark:border-white/10 bg-[#eeece3] dark:bg-muted text-[13px]"
474472
>

0 commit comments

Comments
 (0)