Skip to content

Commit e44484d

Browse files
authored
Merge pull request #97 from XpressAI/fix/agent-apps-tools-session
fix: agent apps, MCP tool routing, and session continuity
2 parents 5b678bb + 15c341f commit e44484d

18 files changed

Lines changed: 2858 additions & 237 deletions

File tree

Cargo.lock

Lines changed: 271 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/xpressclaw-core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ croner = "2"
3737
bytes = "1"
3838
sysinfo = "0.34"
3939
notify = "7"
40+
lettre = { version = "0.11", features = ["tokio1-native-tls", "builder"] }
41+
async-imap = "0.10"
42+
async-native-tls = "0.5"
43+
async-std = { version = "1", features = ["default"] }
44+
base64 = "0.22"
4045

4146
# Local LLM (optional, enabled by default via "local-llm" feature)
4247
llama-cpp-2 = { workspace = true, optional = true }

crates/xpressclaw-core/src/agents/harness.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,18 @@ impl HarnessClient {
132132

133133
/// Send a message to the agent's persistent session.
134134
/// Returns a stream of SSE chunks (may be buffered by the SDK).
135+
///
136+
/// `history` is the full conversation so far (excluding the current message).
137+
/// The harness injects it when starting a fresh session so the agent has context
138+
/// even after a container restart.
135139
pub async fn send_session_message(
136140
&self,
137141
message: &str,
138142
conversation_id: &str,
139143
sender_name: &str,
140144
sender_type: &str,
141145
system_prompt: &str,
146+
history: &serde_json::Value,
142147
) -> Result<ChatStream> {
143148
let url = format!("{}/v1/session/send", self.base_url);
144149

@@ -148,6 +153,7 @@ impl HarnessClient {
148153
"sender_name": sender_name,
149154
"sender_type": sender_type,
150155
"system_prompt": system_prompt,
156+
"history": history,
151157
"stream": true,
152158
});
153159

crates/xpressclaw-core/src/connectors/deliver.rs

Lines changed: 280 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,17 @@ pub fn deliver(db: &Arc<Database>, connector_name: &str, channel_name: &str, mes
5252
"telegram" => deliver_telegram(&connector.config, &channel.config, message),
5353
"webhook" => deliver_webhook(&connector.config, &channel.config, message),
5454
"file_watcher" => deliver_file(&channel.config, message),
55+
"slack" => deliver_slack(&connector.config, &channel.config, message),
56+
"github" => deliver_github(&connector.config, &channel.config, message),
57+
"jira" => deliver_jira(&connector.config, &channel.config, message),
58+
"email" => deliver_email(&connector.config, &channel.config, message),
5559
other => {
5660
info!(
5761
connector_type = other,
5862
connector = connector_name,
5963
channel = channel_name,
6064
message,
61-
"sink delivery (stub connector — message logged only)"
65+
"sink delivery (unknown connector — message logged only)"
6266
);
6367
Ok(())
6468
}
@@ -163,3 +167,278 @@ fn deliver_file(channel_config: &Value, message: &str) -> Result<()> {
163167

164168
Ok(())
165169
}
170+
171+
/// Send via Slack chat.postMessage API.
172+
fn deliver_slack(connector_config: &Value, channel_config: &Value, message: &str) -> Result<()> {
173+
let bot_token = connector_config
174+
.get("bot_token")
175+
.and_then(|v| v.as_str())
176+
.ok_or_else(|| Error::Connector("slack: missing bot_token".into()))?
177+
.to_string();
178+
179+
let channel_id = channel_config
180+
.get("channel_id")
181+
.and_then(|v| v.as_str())
182+
.ok_or_else(|| Error::Connector("slack: missing channel_id in channel config".into()))?
183+
.to_string();
184+
185+
let message = message.to_string();
186+
tokio::spawn(async move {
187+
let body = json!({
188+
"channel": channel_id,
189+
"text": message,
190+
});
191+
let client = reqwest::Client::new();
192+
if let Err(e) = client
193+
.post("https://slack.com/api/chat.postMessage")
194+
.header("Authorization", format!("Bearer {}", bot_token))
195+
.header("Content-Type", "application/json; charset=utf-8")
196+
.json(&body)
197+
.send()
198+
.await
199+
{
200+
error!(error = %e, "slack chat.postMessage failed");
201+
}
202+
});
203+
204+
Ok(())
205+
}
206+
207+
/// Send via GitHub API (comment on issue or create issue).
208+
fn deliver_github(connector_config: &Value, _channel_config: &Value, message: &str) -> Result<()> {
209+
let token = connector_config
210+
.get("token")
211+
.and_then(|v| v.as_str())
212+
.ok_or_else(|| Error::Connector("github: missing token".into()))?
213+
.to_string();
214+
215+
let owner = connector_config
216+
.get("owner")
217+
.and_then(|v| v.as_str())
218+
.ok_or_else(|| Error::Connector("github: missing owner".into()))?
219+
.to_string();
220+
221+
let repo = connector_config
222+
.get("repo")
223+
.and_then(|v| v.as_str())
224+
.ok_or_else(|| Error::Connector("github: missing repo".into()))?
225+
.to_string();
226+
227+
// Try to parse message as JSON to extract issue_number for comments;
228+
// otherwise create a new issue with the message as body.
229+
let message = message.to_string();
230+
tokio::spawn(async move {
231+
let client = reqwest::Client::builder()
232+
.user_agent("xpressclaw/0.1")
233+
.build()
234+
.unwrap_or_else(|_| reqwest::Client::new());
235+
236+
// Attempt to parse JSON to check for issue_number
237+
let parsed: Option<Value> = serde_json::from_str(&message).ok();
238+
let issue_number = parsed
239+
.as_ref()
240+
.and_then(|v| v.get("issue_number"))
241+
.and_then(|v| v.as_u64());
242+
243+
if let Some(num) = issue_number {
244+
let url = format!(
245+
"https://api.github.com/repos/{}/{}/issues/{}/comments",
246+
owner, repo, num
247+
);
248+
let body_text = parsed
249+
.as_ref()
250+
.and_then(|v| v.get("body"))
251+
.and_then(|v| v.as_str())
252+
.unwrap_or(&message);
253+
let body = json!({ "body": body_text });
254+
if let Err(e) = client
255+
.post(&url)
256+
.header("Authorization", format!("token {}", token))
257+
.header("Accept", "application/vnd.github.v3+json")
258+
.json(&body)
259+
.send()
260+
.await
261+
{
262+
error!(error = %e, "github comment POST failed");
263+
}
264+
} else {
265+
let url = format!("https://api.github.com/repos/{}/{}/issues", owner, repo);
266+
let body = json!({
267+
"title": "New issue from xpressclaw",
268+
"body": message,
269+
});
270+
if let Err(e) = client
271+
.post(&url)
272+
.header("Authorization", format!("token {}", token))
273+
.header("Accept", "application/vnd.github.v3+json")
274+
.json(&body)
275+
.send()
276+
.await
277+
{
278+
error!(error = %e, "github create issue failed");
279+
}
280+
}
281+
});
282+
283+
Ok(())
284+
}
285+
286+
/// Send via Jira API (add comment to issue).
287+
fn deliver_jira(connector_config: &Value, _channel_config: &Value, message: &str) -> Result<()> {
288+
use base64::Engine as _;
289+
290+
let base_url = connector_config
291+
.get("base_url")
292+
.and_then(|v| v.as_str())
293+
.ok_or_else(|| Error::Connector("jira: missing base_url".into()))?
294+
.trim_end_matches('/')
295+
.to_string();
296+
297+
let email = connector_config
298+
.get("email")
299+
.and_then(|v| v.as_str())
300+
.ok_or_else(|| Error::Connector("jira: missing email".into()))?
301+
.to_string();
302+
303+
let api_token = connector_config
304+
.get("api_token")
305+
.and_then(|v| v.as_str())
306+
.ok_or_else(|| Error::Connector("jira: missing api_token".into()))?
307+
.to_string();
308+
309+
// Try to extract issue key from JSON message
310+
let parsed: Option<Value> = serde_json::from_str(message).ok();
311+
let issue_key = parsed
312+
.as_ref()
313+
.and_then(|v| v.get("issue_key").or(v.get("key")))
314+
.and_then(|v| v.as_str())
315+
.ok_or_else(|| {
316+
Error::Connector("jira: message must contain issue_key or key field".into())
317+
})?
318+
.to_string();
319+
320+
let body_text = parsed
321+
.as_ref()
322+
.and_then(|v| v.get("body"))
323+
.and_then(|v| v.as_str())
324+
.unwrap_or(message)
325+
.to_string();
326+
327+
tokio::spawn(async move {
328+
let credentials = format!("{}:{}", email, api_token);
329+
let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes());
330+
let auth = format!("Basic {}", encoded);
331+
332+
let url = format!("{}/rest/api/3/issue/{}/comment", base_url, issue_key);
333+
let body = json!({
334+
"body": {
335+
"type": "doc",
336+
"version": 1,
337+
"content": [{
338+
"type": "paragraph",
339+
"content": [{
340+
"type": "text",
341+
"text": body_text
342+
}]
343+
}]
344+
}
345+
});
346+
347+
let client = reqwest::Client::new();
348+
if let Err(e) = client
349+
.post(&url)
350+
.header("Authorization", &auth)
351+
.header("Content-Type", "application/json")
352+
.json(&body)
353+
.send()
354+
.await
355+
{
356+
error!(error = %e, "jira comment POST failed");
357+
}
358+
});
359+
360+
Ok(())
361+
}
362+
363+
/// Send via SMTP email.
364+
fn deliver_email(connector_config: &Value, channel_config: &Value, message: &str) -> Result<()> {
365+
let smtp_host = connector_config
366+
.get("smtp_host")
367+
.and_then(|v| v.as_str())
368+
.ok_or_else(|| Error::Connector("email: missing smtp_host".into()))?
369+
.to_string();
370+
371+
let smtp_port = connector_config
372+
.get("smtp_port")
373+
.and_then(|v| v.as_u64())
374+
.unwrap_or(587) as u16;
375+
376+
let username = connector_config
377+
.get("username")
378+
.and_then(|v| v.as_str())
379+
.ok_or_else(|| Error::Connector("email: missing username".into()))?
380+
.to_string();
381+
382+
let password = connector_config
383+
.get("password")
384+
.and_then(|v| v.as_str())
385+
.ok_or_else(|| Error::Connector("email: missing password".into()))?
386+
.to_string();
387+
388+
let to_addr = channel_config
389+
.get("to")
390+
.and_then(|v| v.as_str())
391+
.ok_or_else(|| Error::Connector("email: missing 'to' in channel config".into()))?
392+
.to_string();
393+
394+
let message = message.to_string();
395+
let from = username.clone();
396+
397+
tokio::spawn(async move {
398+
use lettre::message::header::ContentType;
399+
use lettre::transport::smtp::authentication::Credentials;
400+
use lettre::{AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor};
401+
402+
let email = match Message::builder()
403+
.from(match from.parse() {
404+
Ok(addr) => addr,
405+
Err(e) => {
406+
error!(error = %e, "invalid from address for email delivery");
407+
return;
408+
}
409+
})
410+
.to(match to_addr.parse() {
411+
Ok(addr) => addr,
412+
Err(e) => {
413+
error!(error = %e, "invalid to address for email delivery");
414+
return;
415+
}
416+
})
417+
.subject("Message from xpressclaw")
418+
.header(ContentType::TEXT_PLAIN)
419+
.body(message)
420+
{
421+
Ok(e) => e,
422+
Err(e) => {
423+
error!(error = %e, "failed to build email");
424+
return;
425+
}
426+
};
427+
428+
let creds = Credentials::new(username, password);
429+
430+
let mailer = match AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&smtp_host) {
431+
Ok(builder) => builder.port(smtp_port).credentials(creds).build(),
432+
Err(e) => {
433+
error!(error = %e, "failed to create SMTP transport");
434+
return;
435+
}
436+
};
437+
438+
if let Err(e) = mailer.send(email).await {
439+
error!(error = %e, "SMTP send failed");
440+
}
441+
});
442+
443+
Ok(())
444+
}

0 commit comments

Comments
 (0)