Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions crates/calciforge-openclaw-channel-plugin/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/**
* Calciforge OpenClaw channel plugin.
*
* This registers POST /calciforge/inbound as a native OpenClaw channel route.
* Calciforge sends inbound chat messages here, OpenClaw runs the selected
* agent lane, and this plugin posts the assistant reply back to Calciforge's
* /hooks/reply endpoint.
*/

async function getLegacyRegisterPluginHttpRoute() {
const mod = await import("/usr/lib/node_modules/openclaw/dist/plugin-sdk/plugin-runtime.js");
return mod.registerPluginHttpRoute;
}

async function registerHttpRoute(api, route, log) {
const registerLegacyRoute = await getLegacyRegisterPluginHttpRoute();
const unregister = registerLegacyRoute({
...route,
auth: "none",
pluginId: "calciforge-channel",
source: "calciforge-channel-plugin",
replaceExisting: true,
log: (msg) => log?.warn?.(msg),
});
return { unregister, source: "legacy route registry" };
}

export default function register(api) {
const pluginConfig = api.pluginConfig ?? {};
Comment thread
bglusman marked this conversation as resolved.
const { authToken, replyWebhook, replyAuthToken } = pluginConfig;

if (authToken && replyWebhook && replyAuthToken) {
api.logger.info(
`[calciforge-channel] plugin loaded - replyWebhook=${replyWebhook}`,
);

registerHttpRoute(api, {
path: "/calciforge/inbound",
match: "exact",
handler: async (req, res) =>
handleInboundRequest({
api,
req,
res,
authToken,
replyWebhook,
replyAuthToken,
log: api.logger,
}),
}, api.logger)
.then(({ source }) => {
api.logger.info(
`[calciforge-channel] registered POST /calciforge/inbound via ${source}`,
);
})
.catch((err) => {
api.logger.error(
`[calciforge-channel] failed to register HTTP route: ${err.message}`,
);
});
}

api.registerChannel({
plugin: {
id: "calciforge-channel",
name: "Calciforge",
description: "Calciforge inbound channel",
configSchema: { type: "object", properties: {}, additionalProperties: true },

listAccounts: async () => [{ accountId: "default", config: {} }],

resolveAccountSnapshot: ({ account }) => ({
accountId: account.accountId,
config: account.config,
status: { kind: "connected", label: "Calciforge channel active" },
}),

send: null,

gateway: {
startAccount: async (ctx) => {
const { abortSignal, log } = ctx;
log?.info?.("[calciforge-channel] channel account active");

await new Promise((resolve) => {
abortSignal?.addEventListener("abort", () => {
log?.info?.("[calciforge-channel] channel stopped");
resolve();
});
});
},
},
},
});
}

async function handleInboundRequest({
api,
req,
res,
authToken,
replyWebhook,
replyAuthToken,
log,
}) {
if (req.method !== "POST") {
json(res, 405, { error: "Method not allowed" });
return true;
}

if (!isAuthorized(req, authToken)) {
json(res, 401, { error: "Unauthorized" });
return true;
}

let body;
try {
body = await readJsonBody(req);
} catch {
json(res, 400, { error: "Invalid JSON body" });
return true;
}

const { message, sessionKey, channel, replyTo, agentId } = body;
if (!message || !sessionKey) {
json(res, 400, { error: "message and sessionKey are required" });
return true;
}

json(res, 200, { ok: true });

try {
const { runId } = await api.runtime.subagent.run({
sessionKey,
message,
idempotencyKey: `calciforge:${Date.now()}:${Math.random().toString(36).slice(2, 8)}`,
...(agentId ? { lane: agentId } : {}),
deliver: false,
});

const result = await api.runtime.subagent.waitForRun({
runId,
timeoutMs: 300000,
});

if (result.status !== "ok") {
log?.warn?.(
`[calciforge-channel] agent run ${result.status} - runId=${runId}`,
);
await deliverReply({
replyWebhook,
replyAuthToken,
sessionKey,
message: `OpenClaw run ${result.status}`,
channel,
replyTo,
log,
});
return true;
}

const replyText = await readLatestAssistantText(
api,
sessionKey,
);
if (isSilentReply(replyText)) {
log?.info?.("[calciforge-channel] silent reply - not forwarding");
return true;
}

await deliverReply({
replyWebhook,
replyAuthToken,
sessionKey,
message: replyText,
channel,
replyTo,
log,
});
} catch (err) {
log?.error?.(`[calciforge-channel] dispatch error - ${err.message}`);
await deliverReply({
replyWebhook,
replyAuthToken,
sessionKey,
message: `OpenClaw dispatch failed: ${err.message}`,
channel,
replyTo,
log,
});
}

return true;
}

function isAuthorized(req, expectedToken) {
if (!expectedToken) return false;
const authHeader = req.headers["authorization"] ?? "";
const token = authHeader.startsWith("Bearer ")
? authHeader.slice("Bearer ".length)
: authHeader;
return token === expectedToken;
}

async function readJsonBody(req) {
const chunks = [];
await new Promise((resolve, reject) => {
req.on("data", (chunk) => chunks.push(chunk));
req.on("end", resolve);
req.on("error", reject);
});
return JSON.parse(Buffer.concat(chunks).toString("utf8"));
Comment on lines +205 to +212
}

async function readLatestAssistantText(api, sessionKey) {
const { messages } = await api.runtime.subagent.getSessionMessages({
sessionKey,
limit: 10,
});
const lastMsg = [...messages]
.reverse()
.find((msg) => msg?.role === "assistant");
if (!lastMsg) return "";

const content = lastMsg.content;
if (typeof content === "string") return content;
if (Array.isArray(content)) {
return content
.filter((part) => part.type === "text")
.map((part) => part.text ?? "")
.join("\n");
}
return "";
}

function isSilentReply(replyText) {
const trimmed = (replyText ?? "").trim();
return !trimmed || trimmed === "NO_REPLY" || trimmed === "HEARTBEAT_OK";
}

async function deliverReply({
replyWebhook,
replyAuthToken,
sessionKey,
message,
channel,
replyTo,
log,
}) {
if (!replyWebhook || !replyAuthToken) {
log?.warn?.("[calciforge-channel] reply webhook/auth not configured");
return;
}

try {
const resp = await fetch(replyWebhook, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${replyAuthToken}`,
},
body: JSON.stringify({ sessionKey, message, channel, to: replyTo }),
signal: AbortSignal.timeout(30000),
});

if (!resp.ok) {
log?.error?.(`[calciforge-channel] reply webhook failed - status=${resp.status}`);
} else {
log?.info?.("[calciforge-channel] reply delivered");
}
} catch (err) {
log?.error?.(`[calciforge-channel] reply webhook error - ${err.message}`);
}
}

function json(res, status, body) {
res.writeHead(status, { "Content-Type": "application/json" });
res.end(JSON.stringify(body));
}
25 changes: 25 additions & 0 deletions crates/calciforge-openclaw-channel-plugin/openclaw.plugin.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"id": "calciforge-channel",
"name": "Calciforge Channel",
"description": "Registers Calciforge as a native inbound channel so messages flow through OpenClaw's full agent runtime and replies are delivered back to Calciforge.",
"version": "0.1.0",
"channels": ["calciforge"],
"configSchema": {
"type": "object",
"additionalProperties": false,
"properties": {
"authToken": {
"type": "string",
"description": "Bearer token Calciforge sends to /calciforge/inbound"
},
"replyWebhook": {
"type": "string",
"description": "URL to POST agent responses back to Calciforge, e.g. http://calciforge.lan:18797/hooks/reply"
},
"replyAuthToken": {
"type": "string",
"description": "Bearer token for authenticating callbacks to Calciforge's reply webhook"
}
}
}
}
10 changes: 10 additions & 0 deletions crates/calciforge-openclaw-channel-plugin/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"name": "@openclaw/calciforge-channel",
"version": "0.1.0",
"description": "Calciforge native channel plugin for OpenClaw",
"type": "module",
"main": "index.js",
"peerDependencies": {
"openclaw": "*"
}
}
22 changes: 7 additions & 15 deletions crates/calciforge/src/adapters/dirac_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,28 +290,20 @@ mod tests {

#[tokio::test]
async fn dispatch_parses_json_stream() {
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::fs::PermissionsExt;

let dir = tempfile::tempdir().unwrap();
let script_path = dir.path().join("fake-dirac");
let mut script = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.mode(0o755)
.open(&script_path)
.unwrap();
writeln!(
script,
std::fs::write(
&script_path,
r#"#!/bin/sh
cat >/dev/null
printf '%s\n' '{{"type":"say","text":"step","partial":true}}'
printf '%s\n' '{{"type":"say","say":"completion_result","text":"done","partial":false}}'
"#
printf '%s\n' '{"type":"say","text":"step","partial":true}'
printf '%s\n' '{"type":"say","say":"completion_result","text":"done","partial":false}'
"#,
)
.unwrap();
script.sync_all().unwrap();
drop(script);
std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)).unwrap();

let adapter = DiracCliAdapter::new(
Some(script_path.to_string_lossy().to_string()),
Expand Down
Loading
Loading