Skip to content

Commit d48a097

Browse files
authored
Merge pull request #57 from atxp-dev/naveen/agent-push-notifications
feat(atxp): auto-discover channels and write HEARTBEAT.md for multi-c…
2 parents 3a39597 + 0e1120f commit d48a097

2 files changed

Lines changed: 171 additions & 58 deletions

File tree

packages/atxp/src/commands/notifications.ts

Lines changed: 170 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,76 +4,180 @@ import os from 'os';
44
import { execSync } from 'child_process';
55

66
const NOTIFICATIONS_BASE_URL = 'https://clowdbot-notifications.corp.circuitandchisel.com';
7+
const OPENCLAW_CONFIG_PATH = '/data/.openclaw/openclaw.json';
8+
const SESSIONS_PATH = '/data/.openclaw/agents/main/sessions/sessions.json';
9+
const WORKSPACE_DIR = '/data/.openclaw/workspace';
10+
const HEARTBEAT_SECTION_HEADER = '# ATXP Notification Relay';
11+
12+
// eslint-disable-next-line no-control-regex
13+
const sanitizeSessionValue = (s: string) => s.replace(/[\x00-\x1f"`[\]]/g, '');
714

815
interface EnableResponse {
916
instance?: { webhookUrl?: string; hooksToken?: string };
1017
webhook?: { id?: string; url?: string; eventTypes?: string[]; secret?: string; enabled?: boolean };
1118
error?: string;
1219
}
1320

21+
interface NotificationChannel {
22+
channel: string; // "telegram", "discord", "slack", etc.
23+
to: string; // peer ID (chat ID, channel ID, etc.)
24+
}
25+
26+
/**
27+
* Discover connected messaging channels by reading the local session store.
28+
* Parses session keys like "agent:main:telegram:direct:8204320066" to extract
29+
* the channel type and peer ID for each active DM session.
30+
*/
31+
async function discoverConnectedChannels(): Promise<NotificationChannel[]> {
32+
try {
33+
const raw = await fs.readFile(SESSIONS_PATH, 'utf-8');
34+
const sessions = JSON.parse(raw);
35+
const channels: NotificationChannel[] = [];
36+
const seen = new Set<string>();
37+
38+
for (const key of Object.keys(sessions)) {
39+
// Match DM session keys: agent:main:<channel>:direct:<peerId>
40+
const match = key.match(/^agent:main:([^:]+):direct:(.+)$/);
41+
if (!match) continue;
42+
const channel = sanitizeSessionValue(match[1]).slice(0, 64);
43+
const to = sanitizeSessionValue(match[2]).slice(0, 128);
44+
if (!channel || !to) continue;
45+
// Skip ephemeral channels (webchat has no persistent address)
46+
if (channel === 'webchat') continue;
47+
const dedupKey = `${channel}:${to}`;
48+
if (seen.has(dedupKey)) continue;
49+
seen.add(dedupKey);
50+
channels.push({ channel, to });
51+
}
52+
return channels;
53+
} catch {
54+
return []; // sessions file may not exist yet
55+
}
56+
}
57+
58+
/**
59+
* Build the HEARTBEAT.md content with notification relay instructions.
60+
* The default heartbeat prompt says "Read HEARTBEAT.md if it exists. Follow it strictly."
61+
* so writing these instructions directly to the file ensures the LLM relays
62+
* system events to all discovered channels.
63+
*/
64+
function buildHeartbeatMd(channels: NotificationChannel[]): string {
65+
const lines = [
66+
HEARTBEAT_SECTION_HEADER,
67+
'',
68+
'When you see System: event lines about new emails or SMS messages,',
69+
'you MUST immediately relay each one to the user.',
70+
'Do NOT reply HEARTBEAT_OK when System events are present.',
71+
'',
72+
];
73+
74+
if (channels.length > 0) {
75+
lines.push('Relay to ALL of these channels:');
76+
for (const c of channels) {
77+
lines.push(`- Use the message tool with channel=\`${c.channel}\` and target=\`${c.to}\``);
78+
}
79+
lines.push('');
80+
}
81+
82+
return lines.join('\n');
83+
}
84+
1485
/**
15-
* Configure hooks in openclaw.json on the running instance.
86+
* Configure hooks, heartbeat delivery target, and HEARTBEAT.md on the instance.
1687
* Only runs when inside a Fly instance (FLY_MACHINE_ID is set).
17-
* Updates openclaw.json with the hooks token and restarts the gateway.
88+
*
89+
* Discovers all connected messaging channels from the session store, writes
90+
* HEARTBEAT.md with relay instructions for each channel, and sets the primary
91+
* delivery target to the first discovered channel.
1892
*/
1993
async function configureHooksOnInstance(hooksToken: string): Promise<void> {
2094
if (!process.env.FLY_MACHINE_ID) return;
2195

22-
const configPath = '/data/.openclaw/openclaw.json';
96+
const configPath = OPENCLAW_CONFIG_PATH;
2397
try {
2498
const raw = await fs.readFile(configPath, 'utf-8');
2599
const config = JSON.parse(raw);
26100

101+
// Discover connected channels from session store
102+
const channels = await discoverConnectedChannels();
103+
104+
let changed = false;
105+
106+
// Configure hooks
27107
if (!config.hooks) config.hooks = {};
28-
// Already configured with this token — skip
29-
if (config.hooks.token === hooksToken && config.hooks.enabled === true) return;
108+
if (config.hooks.token !== hooksToken || config.hooks.enabled !== true) {
109+
config.hooks.enabled = true;
110+
config.hooks.token = hooksToken;
111+
changed = true;
112+
}
30113

31-
config.hooks.enabled = true;
32-
config.hooks.token = hooksToken;
33-
await fs.writeFile(configPath, JSON.stringify(config, null, 2));
34-
console.log(chalk.gray('Hooks configured in openclaw.json'));
114+
// Set primary delivery target to first discovered channel
115+
if (!config.agents) config.agents = {};
116+
if (!config.agents.defaults) config.agents.defaults = {};
117+
if (!config.agents.defaults.heartbeat) config.agents.defaults.heartbeat = {};
118+
const hb = config.agents.defaults.heartbeat;
35119

36-
// Restart gateway to pick up new config (watchdog auto-restarts it)
37-
try {
38-
execSync('pkill -f openclaw-gateway', { stdio: 'ignore' });
39-
console.log(chalk.gray('Gateway restarting to apply hooks config...'));
40-
} catch {
41-
// Gateway may not be running yet — config will be picked up on next start
120+
if (channels.length > 0) {
121+
const primary = channels[0];
122+
if (hb.target !== primary.channel || hb.to !== primary.to) {
123+
hb.target = primary.channel;
124+
hb.to = primary.to;
125+
changed = true;
126+
}
127+
} else if (hb.target !== 'last') {
128+
// No channels discovered — fall back to 'last' and clear stale target
129+
hb.target = 'last';
130+
delete hb.to;
131+
changed = true;
42132
}
43-
} catch {
44-
// Non-fatal — hooks will be configured on next reboot via entrypoint config sync
45-
}
46-
}
47133

48-
/**
49-
* Send a one-time instruction to the agent via /hooks/wake so it saves
50-
* notification handling instructions to its HEARTBEAT.md file.
51-
*
52-
* Trust boundary: This intentionally injects a system-level prompt into the
53-
* agent's memory. The webhook endpoint MUST validate the Authorization header
54-
* before acting on the message content. The hardcoded text is controlled by us
55-
* (not user input) and tells the agent how to broadcast notifications.
56-
*/
57-
async function sendHeartbeatInstruction(webhookUrl: string, hooksToken: string): Promise<void> {
58-
try {
59-
const res = await fetch(webhookUrl, {
60-
method: 'POST',
61-
headers: {
62-
'Content-Type': 'application/json',
63-
'Authorization': `Bearer ${hooksToken}`,
64-
},
65-
body: JSON.stringify({
66-
text: 'SYSTEM: Push notifications are now enabled. When you receive a system event about a new email, proactively use the message tool to broadcast the notification to every connected channel. Check your available tools, find all messaging channels, and send the notification to each one individually. The goal is that the owner sees the notification regardless of which channel they are watching.',
67-
mode: 'now',
68-
}),
69-
});
70-
if (!res.ok) {
71-
console.log(chalk.gray(`Note: Setup instruction returned HTTP ${res.status} — agent may not have received it.`));
134+
if (changed) {
135+
await fs.writeFile(configPath, JSON.stringify(config, null, 2));
136+
console.log(chalk.gray('Hooks and heartbeat configured in openclaw.json'));
137+
}
138+
139+
// Append notification relay instructions to HEARTBEAT.md.
140+
// The default heartbeat prompt reads this file and follows it strictly.
141+
await fs.mkdir(WORKSPACE_DIR, { recursive: true });
142+
const heartbeatPath = `${WORKSPACE_DIR}/HEARTBEAT.md`;
143+
const section = buildHeartbeatMd(channels);
144+
let existing = '';
145+
try { existing = await fs.readFile(heartbeatPath, 'utf-8'); } catch { /* file may not exist */ }
146+
// Replace existing notification section or append if not present.
147+
// Uses split-on-header to avoid regex edge cases with anchors/newlines.
148+
const idx = existing.indexOf(HEARTBEAT_SECTION_HEADER);
149+
if (idx !== -1) {
150+
let before = existing.slice(0, idx);
151+
// Ensure a newline separates preceding content from our section
152+
if (before.length > 0 && !before.endsWith('\n')) before += '\n';
153+
const afterHeader = existing.slice(idx + HEARTBEAT_SECTION_HEADER.length);
154+
// Find next top-level heading. Assumes a preceding newline (standard markdown).
155+
const nextHeading = afterHeader.search(/\n# /);
156+
const after = nextHeading !== -1 ? afterHeader.slice(nextHeading) : '';
157+
await fs.writeFile(heartbeatPath, before + section.trimEnd() + after);
72158
} else {
73-
console.log(chalk.gray('Notification instructions sent to your agent.'));
159+
const separator = existing.length > 0 && !existing.endsWith('\n') ? '\n\n' : existing.length > 0 ? '\n' : '';
160+
await fs.writeFile(heartbeatPath, existing + separator + section);
74161
}
75-
} catch {
76-
console.log(chalk.gray('Note: Could not send setup instruction to instance.'));
162+
console.log(chalk.gray('HEARTBEAT.md updated with notification relay instructions'));
163+
164+
if (channels.length > 0) {
165+
console.log(chalk.gray(`Notification channels: ${channels.map(c => `${c.channel}:${c.to}`).join(', ')}`));
166+
}
167+
168+
// Restart gateway to pick up new config (watchdog auto-restarts it)
169+
if (changed) {
170+
try {
171+
execSync('pkill -f openclaw-gateway', { stdio: 'ignore' });
172+
console.log(chalk.gray('Gateway restarting to apply config...'));
173+
} catch {
174+
// Gateway may not be running yet — config will be picked up on next start
175+
}
176+
}
177+
} catch (err) {
178+
console.log(chalk.yellow('Warning: Could not configure instance locally.'));
179+
console.log(chalk.gray(`${err instanceof Error ? err.message : err}`));
180+
console.log(chalk.gray('Hooks will be configured on next instance reboot.'));
77181
}
78182
}
79183

@@ -91,9 +195,13 @@ function getMachineId(): string | undefined {
91195
}
92196

93197
async function getAccountId(): Promise<string | undefined> {
94-
const { getAccountInfo } = await import('./whoami.js');
95-
const account = await getAccountInfo();
96-
return account?.accountId;
198+
try {
199+
const { getAccountInfo } = await import('./whoami.js');
200+
const account = await getAccountInfo();
201+
return account?.accountId;
202+
} catch {
203+
return undefined;
204+
}
97205
}
98206

99207
async function enableNotifications(): Promise<void> {
@@ -112,11 +220,18 @@ async function enableNotifications(): Promise<void> {
112220
const body: Record<string, string> = { machine_id: machineId };
113221
if (accountId) body.account_id = accountId;
114222

115-
const res = await fetch(`${NOTIFICATIONS_BASE_URL}/notifications/enable`, {
116-
method: 'POST',
117-
headers: { 'Content-Type': 'application/json' },
118-
body: JSON.stringify(body),
119-
});
223+
let res: Response;
224+
try {
225+
res = await fetch(`${NOTIFICATIONS_BASE_URL}/notifications/enable`, {
226+
method: 'POST',
227+
headers: { 'Content-Type': 'application/json' },
228+
body: JSON.stringify(body),
229+
});
230+
} catch (err) {
231+
console.error(chalk.red(`Error: Could not reach notifications service.`));
232+
console.error(chalk.gray(`${err instanceof Error ? err.message : err}`));
233+
process.exit(1);
234+
}
120235

121236
const data = await res.json().catch(() => ({})) as EnableResponse;
122237
if (!res.ok) {
@@ -144,9 +259,6 @@ async function enableNotifications(): Promise<void> {
144259
console.log(chalk.gray('Save the secret — it will not be shown again.'));
145260
console.log(chalk.gray('Use it to verify webhook signatures (HMAC-SHA256).'));
146261
}
147-
148-
// Send one-time HEARTBEAT.md instruction to the agent
149-
await sendHeartbeatInstruction(instance.webhookUrl, instance.hooksToken);
150262
}
151263

152264
function showNotificationsHelp(): void {
@@ -156,6 +268,7 @@ function showNotificationsHelp(): void {
156268
console.log();
157269
console.log(chalk.bold('Available Events:'));
158270
console.log(' ' + chalk.green('email.received') + ' ' + 'Triggered when an inbound email arrives');
271+
console.log(' ' + chalk.green('sms.received') + ' ' + 'Triggered when an inbound SMS arrives');
159272
console.log();
160273
console.log(chalk.bold('Examples:'));
161274
console.log(' npx atxp notifications enable');

skills/atxp/SKILL.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ Local contacts database for resolving names to phone numbers and emails. Stored
337337

338338
### Notifications
339339

340-
Enable push notifications so your agent receives a POST to its `/hooks/wake` endpoint when events happen (e.g., inbound email), instead of polling.
340+
Enable push notifications so your agent receives a POST to its `/hooks/wake` endpoint when events happen (e.g., inbound email or SMS), instead of polling.
341341

342342
| Command | Cost | Description |
343343
|---------|------|-------------|

0 commit comments

Comments
 (0)