diff --git a/Cargo.toml b/Cargo.toml index 8e95105..d646868 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ desktop = { description = "Enable desktop GUI client (requires GTK system librar # Internal crates rustyclaw-core = { path = "crates/rustyclaw-core", version = "0.3.0" } rustyclaw-tui = { path = "crates/rustyclaw-tui", version = "0.3.0" } -chat-system = { version = "0.1.2", default-features = false } +chat-system = { version = "0.1.3", default-features = false } # Configuration and serialization serde = { version = "1.0", features = ["derive"] } diff --git a/crates/rustyclaw-core/Cargo.toml b/crates/rustyclaw-core/Cargo.toml index 6857e6d..9bbac0d 100644 --- a/crates/rustyclaw-core/Cargo.toml +++ b/crates/rustyclaw-core/Cargo.toml @@ -27,11 +27,8 @@ ssh = ["dep:russh", "dep:russh-keys", "dep:rand_core", "dep:sha2"] qr = ["dep:image"] # CLI-based messengers (tier 1) - no heavy deps, just HTTP signal-cli = ["chat-system/signal-cli"] -matrix-cli = [] # Matrix CLI messenger using HTTP API (no external deps) -telegram-cli = [] -discord-cli = [] -slack-cli = [] -all-messengers = ["whatsapp", "signal-cli", "matrix-cli", "telegram-cli", "discord-cli", "slack-cli"] +# matrix-cli removed - use matrix feature instead (chat-system 0.1.3 has all capabilities) +all-messengers = ["whatsapp", "signal-cli", "matrix"] full = ["web-tools", "browser", "mcp", "all-messengers", "ssh", "steel-memory"] [dependencies] diff --git a/crates/rustyclaw-core/src/gateway/messenger_handler.rs b/crates/rustyclaw-core/src/gateway/messenger_handler.rs index baed5a5..f1121a7 100644 --- a/crates/rustyclaw-core/src/gateway/messenger_handler.rs +++ b/crates/rustyclaw-core/src/gateway/messenger_handler.rs @@ -27,8 +27,8 @@ use super::{ ToolCallResult, }; -#[cfg(feature = "matrix-cli")] -use crate::messengers::MatrixCliMessenger; +#[cfg(feature = "matrix")] +use crate::messengers::MatrixMessenger; /// Shared messenger manager for the gateway. pub type SharedMessengerManager = Arc>; @@ -89,57 +89,10 @@ async fn create_messenger(config: &MessengerConfig) -> Result let name = config.name.clone(); let mut messenger: Box = match config.messenger_type.as_str() { - #[cfg(feature = "matrix-cli")] - "matrix-cli" => { - let homeserver = config - .homeserver - .clone() - .context("Matrix-CLI requires 'homeserver'")?; - let user_id = config - .user_id - .clone() - .context("Matrix-CLI requires 'user_id'")?; - let access_token = config - .access_token - .clone() - .context("Matrix-CLI requires 'access_token'")?; - - let mut messenger = MatrixCliMessenger::with_token( - name.clone(), - homeserver, - user_id, - access_token, - None, // device_id - ); - - // Set state directory for sync token persistence - if let Some(dirs) = directories::ProjectDirs::from("", "", "rustyclaw") { - let state_dir = dirs.data_dir().join("matrix").join(&name); - messenger = messenger.with_state_dir(state_dir); - } - - // Set allowed chats if configured - if !config.allowed_chats.is_empty() { - messenger = messenger.with_allowed_chats(config.allowed_chats.clone()); - } - - // Set DM config if present - if let Some(ref dm) = config.dm { - use crate::messengers::MatrixDmConfig; - let dm_config = MatrixDmConfig { - enabled: dm.enabled, - policy: dm.policy.clone().unwrap_or_else(|| "allowlist".to_string()), - allow_from: dm.allow_from.clone(), - }; - messenger = messenger.with_dm_config(dm_config); - } - - Box::new(messenger) - } - #[cfg(not(feature = "matrix-cli"))] + // matrix-cli type removed - use "matrix" type instead (chat-system 0.1.3) "matrix-cli" => { anyhow::bail!( - "Matrix-CLI messenger not compiled in. Rebuild with --features matrix-cli" + "matrix-cli messenger type is deprecated. Use 'matrix' type instead." ); } "irc" => build_irc_messenger(config, name)?, @@ -404,17 +357,17 @@ fn build_matrix_messenger(config: &MessengerConfig, name: String) -> Result Result Self { - Self { - name, - connected: false, - } - } -} - -#[async_trait] -impl Messenger for ConsoleMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "console" - } - - async fn initialize(&mut self) -> Result<()> { - self.connected = true; - debug!(name = %self.name, "ConsoleMessenger initialized"); - Ok(()) - } - - async fn send_message(&self, recipient: &str, content: &str) -> Result { - let id = format!("console-{}", chrono::Utc::now().timestamp_millis()); - println!("[{}] To {}: {}", self.name, recipient, content); - Ok(id) - } - - async fn receive_messages(&self) -> Result> { - Ok(Vec::new()) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - debug!(name = %self.name, "ConsoleMessenger disconnected"); - Ok(()) - } -} diff --git a/crates/rustyclaw-core/src/messengers/discord.rs b/crates/rustyclaw-core/src/messengers/discord.rs deleted file mode 100644 index 3042e14..0000000 --- a/crates/rustyclaw-core/src/messengers/discord.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! Discord messenger using bot token and REST API. - -use super::{Message, Messenger}; -use anyhow::Result; -use async_trait::async_trait; - -/// Discord messenger using bot token -pub struct DiscordMessenger { - name: String, - bot_token: String, - connected: bool, - http: reqwest::Client, -} - -impl DiscordMessenger { - pub fn new(name: String, bot_token: String) -> Self { - Self { - name, - bot_token, - connected: false, - http: reqwest::Client::new(), - } - } -} - -#[async_trait] -impl Messenger for DiscordMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "discord" - } - - async fn initialize(&mut self) -> Result<()> { - // Verify bot token by fetching current user - let resp = self - .http - .get("https://discord.com/api/v10/users/@me") - .header("Authorization", format!("Bot {}", self.bot_token)) - .send() - .await?; - - if resp.status().is_success() { - self.connected = true; - Ok(()) - } else { - anyhow::bail!("Discord auth failed: {}", resp.status()) - } - } - - async fn send_message(&self, channel_id: &str, content: &str) -> Result { - let url = format!( - "https://discord.com/api/v10/channels/{}/messages", - channel_id - ); - - let resp = self - .http - .post(&url) - .header("Authorization", format!("Bot {}", self.bot_token)) - .json(&serde_json::json!({ "content": content })) - .send() - .await?; - - if resp.status().is_success() { - let data: serde_json::Value = resp.json().await?; - Ok(data["id"].as_str().unwrap_or("unknown").to_string()) - } else { - anyhow::bail!("Discord send failed: {}", resp.status()) - } - } - - async fn receive_messages(&self) -> Result> { - // Real implementation would use Discord gateway WebSocket - Ok(Vec::new()) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - Ok(()) - } -} diff --git a/crates/rustyclaw-core/src/messengers/discord_cli.rs b/crates/rustyclaw-core/src/messengers/discord_cli.rs deleted file mode 100644 index 57dd0d3..0000000 --- a/crates/rustyclaw-core/src/messengers/discord_cli.rs +++ /dev/null @@ -1,268 +0,0 @@ -//! Discord messenger using REST API. -//! -//! Uses Discord's REST API at https://discord.com/api/v10/ -//! For simplicity, this uses polling for receiving messages rather than -//! WebSocket Gateway. Good for low-volume bot use cases. -//! -//! This requires the `discord-cli` feature to be enabled. - -use super::{Message, Messenger}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use reqwest::Client; -use serde::Deserialize; -use std::sync::Arc; -use tokio::sync::Mutex; - -const DISCORD_API_BASE: &str = "https://discord.com/api/v10"; - -/// Discord User object -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct DiscordUser { - id: String, - username: String, - discriminator: String, -} - -/// Discord Message object -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct DiscordMessage { - id: String, - channel_id: String, - author: DiscordUser, - content: String, - timestamp: String, -} - -/// Discord messenger implementation -pub struct DiscordCliMessenger { - name: String, - token: String, - client: Client, - connected: Arc>, - last_message_ids: Arc>>, - watch_channels: Vec, -} - -impl DiscordCliMessenger { - /// Create a new Discord messenger with bot token - pub fn new(name: String, token: String) -> Self { - Self { - name, - token, - client: Client::new(), - connected: Arc::new(Mutex::new(false)), - last_message_ids: Arc::new(Mutex::new(std::collections::HashMap::new())), - watch_channels: Vec::new(), - } - } - - /// Add a channel to watch for incoming messages - pub fn watch_channel(mut self, channel_id: String) -> Self { - self.watch_channels.push(channel_id); - self - } - - /// Get authorization header - fn auth_header(&self) -> String { - format!("Bot {}", self.token) - } - - /// Get current user info to verify token - async fn get_me(&self) -> Result { - let url = format!("{}/users/@me", DISCORD_API_BASE); - let response = self - .client - .get(&url) - .header("Authorization", self.auth_header()) - .send() - .await?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Discord API error: {} - {}", status, error_text); - } - - response.json().await.context("Failed to parse user info") - } - - /// Send a message to a channel - async fn send_message_internal(&self, channel_id: &str, content: &str) -> Result { - let url = format!("{}/channels/{}/messages", DISCORD_API_BASE, channel_id); - - let body = serde_json::json!({ - "content": content - }); - - let response = self - .client - .post(&url) - .header("Authorization", self.auth_header()) - .json(&body) - .send() - .await - .context("Failed to send Discord message")?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to send message: {} - {}", status, error_text); - } - - let msg: DiscordMessage = response.json().await?; - Ok(msg.id) - } - - /// Get messages from a channel - async fn get_channel_messages( - &self, - channel_id: &str, - limit: u32, - ) -> Result> { - let last_ids = self.last_message_ids.lock().await; - let after_id = last_ids.get(channel_id).cloned(); - drop(last_ids); - - let mut url = format!( - "{}/channels/{}/messages?limit={}", - DISCORD_API_BASE, channel_id, limit - ); - - if let Some(after) = after_id { - url.push_str(&format!("&after={}", after)); - } - - let response = self - .client - .get(&url) - .header("Authorization", self.auth_header()) - .send() - .await?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to get messages: {} - {}", status, error_text); - } - - let messages: Vec = response.json().await?; - - // Update last seen message ID - if let Some(latest) = messages.first() { - self.last_message_ids - .lock() - .await - .insert(channel_id.to_string(), latest.id.clone()); - } - - Ok(messages) - } -} - -impl std::fmt::Debug for DiscordCliMessenger { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DiscordCliMessenger") - .field("name", &self.name) - .field("watch_channels", &self.watch_channels) - .finish() - } -} - -#[async_trait] -impl Messenger for DiscordCliMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "discord" - } - - async fn initialize(&mut self) -> Result<()> { - let user = self - .get_me() - .await - .context("Failed to verify Discord bot token")?; - *self.connected.lock().await = true; - tracing::info!( - "Discord bot connected as {}#{}", - user.username, - user.discriminator - ); - Ok(()) - } - - async fn send_message(&self, recipient: &str, content: &str) -> Result { - self.send_message_internal(recipient, content).await - } - - async fn receive_messages(&self) -> Result> { - let mut all_messages = Vec::new(); - - for channel in &self.watch_channels { - match self.get_channel_messages(channel, 100).await { - Ok(messages) => { - for msg in messages { - all_messages.push(Message { - id: msg.id, - sender: msg.author.username, - content: msg.content, - timestamp: 0, // Would need to parse ISO timestamp - channel: Some(msg.channel_id), - reply_to: None, - thread_id: None, - media: None, - is_direct: false, // Discord DM detection would need channel type check - message_type: Default::default(), - edited_timestamp: None, - reactions: None, - }); - } - } - Err(e) => { - tracing::warn!("Failed to get messages for channel {}: {}", channel, e); - } - } - } - - Ok(all_messages) - } - - fn is_connected(&self) -> bool { - self.connected.try_lock().map(|g| *g).unwrap_or(false) - } - - async fn disconnect(&mut self) -> Result<()> { - *self.connected.lock().await = false; - Ok(()) - } - - async fn set_typing(&self, channel: &str, typing: bool) -> Result<()> { - if !typing { - return Ok(()); // Discord typing auto-expires after ~10 seconds - } - - let url = format!("{}/channels/{}/typing", DISCORD_API_BASE, channel); - - let response = self - .client - .post(&url) - .header("Authorization", format!("Bot {}", self.token)) - .send() - .await - .context("Failed to send typing indicator")?; - - if !response.status().is_success() { - // Non-fatal, just log - eprintln!( - "Failed to send Discord typing indicator: {}", - response.status() - ); - } - - Ok(()) - } -} diff --git a/crates/rustyclaw-core/src/messengers/google_chat.rs b/crates/rustyclaw-core/src/messengers/google_chat.rs deleted file mode 100644 index be98af4..0000000 --- a/crates/rustyclaw-core/src/messengers/google_chat.rs +++ /dev/null @@ -1,167 +0,0 @@ -//! Google Chat messenger using webhook URLs. -//! -//! Uses Google Chat incoming webhooks for sending messages. -//! For receiving messages, requires a Google Cloud Pub/Sub subscription -//! or the Google Chat API with a service account. - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; - -/// Google Chat messenger using webhooks and/or Chat API. -pub struct GoogleChatMessenger { - name: String, - /// Incoming webhook URL for sending messages. - webhook_url: Option, - /// Service account credentials JSON path (for Chat API). - credentials_path: Option, - /// Space name(s) to listen on (e.g. "spaces/AAAA"). - spaces: Vec, - connected: bool, - http: reqwest::Client, -} - -impl GoogleChatMessenger { - pub fn new(name: String) -> Self { - Self { - name, - webhook_url: None, - credentials_path: None, - spaces: Vec::new(), - connected: false, - http: reqwest::Client::new(), - } - } - - /// Set the incoming webhook URL. - pub fn with_webhook_url(mut self, url: String) -> Self { - self.webhook_url = Some(url); - self - } - - /// Set the service account credentials path. - pub fn with_credentials(mut self, path: String) -> Self { - self.credentials_path = Some(path); - self - } - - /// Set spaces to listen on. - pub fn with_spaces(mut self, spaces: Vec) -> Self { - self.spaces = spaces; - self - } -} - -#[async_trait] -impl Messenger for GoogleChatMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "google_chat" - } - - async fn initialize(&mut self) -> Result<()> { - // Validate that we have at least one way to send messages - if self.webhook_url.is_none() && self.credentials_path.is_none() { - anyhow::bail!( - "Google Chat requires either 'webhook_url' or 'credentials_path' \ - (service account JSON)" - ); - } - - self.connected = true; - tracing::info!( - has_webhook = self.webhook_url.is_some(), - has_credentials = self.credentials_path.is_some(), - spaces = ?self.spaces, - "Google Chat initialized" - ); - - Ok(()) - } - - async fn send_message(&self, space: &str, content: &str) -> Result { - // Prefer webhook for simplicity - if let Some(ref webhook_url) = self.webhook_url { - let payload = serde_json::json!({ - "text": content - }); - - let resp = self - .http - .post(webhook_url) - .json(&payload) - .send() - .await - .context("Google Chat webhook POST failed")?; - - if resp.status().is_success() { - let data: serde_json::Value = resp.json().await?; - return Ok(data["name"].as_str().unwrap_or("sent").to_string()); - } - anyhow::bail!("Google Chat webhook returned {}", resp.status()); - } - - // Fall back to Chat API with service account - if self.credentials_path.is_some() { - // Chat API requires OAuth2 — for now, return an error directing - // the user to use webhook mode or the claw-me-maybe skill. - anyhow::bail!( - "Google Chat API (service account) send not yet implemented for space '{}'. \ - Use webhook_url or the claw-me-maybe skill.", - space - ); - } - - anyhow::bail!("No Google Chat send method configured") - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - // Google Chat webhooks don't support threading via webhook URL, - // so we just send the message. - self.send_message(opts.recipient, opts.content).await - } - - async fn receive_messages(&self) -> Result> { - // Google Chat requires either: - // 1. A Pub/Sub subscription (push or pull) - // 2. The Chat API with events - // For now, return empty — receiving requires more complex setup. - Ok(Vec::new()) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_google_chat_creation() { - let m = GoogleChatMessenger::new("test".to_string()) - .with_webhook_url("https://chat.googleapis.com/v1/spaces/XXX/messages?key=YYY".to_string()); - assert_eq!(m.name(), "test"); - assert_eq!(m.messenger_type(), "google_chat"); - assert!(!m.is_connected()); - assert!(m.webhook_url.is_some()); - } - - #[test] - fn test_with_credentials() { - let m = GoogleChatMessenger::new("test".to_string()) - .with_credentials("/path/to/sa.json".to_string()) - .with_spaces(vec!["spaces/AAAA".to_string()]); - assert_eq!(m.credentials_path, Some("/path/to/sa.json".to_string())); - assert_eq!(m.spaces, vec!["spaces/AAAA"]); - } -} diff --git a/crates/rustyclaw-core/src/messengers/imessage.rs b/crates/rustyclaw-core/src/messengers/imessage.rs deleted file mode 100644 index ed1c911..0000000 --- a/crates/rustyclaw-core/src/messengers/imessage.rs +++ /dev/null @@ -1,253 +0,0 @@ -//! iMessage messenger via BlueBubbles server API. -//! -//! BlueBubbles is a self-hosted iMessage bridge that exposes a REST API. -//! This messenger connects to a BlueBubbles server to send and receive -//! iMessage conversations. -//! -//! Requirements: -//! - A running BlueBubbles server (macOS with iMessage) -//! - Server URL and password configured - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use std::sync::atomic::{AtomicI64, Ordering}; - -/// iMessage messenger via BlueBubbles REST API. -pub struct IMessageMessenger { - name: String, - /// BlueBubbles server URL (e.g. "http://localhost:1234"). - server_url: String, - /// BlueBubbles server password. - password: String, - connected: bool, - http: reqwest::Client, - /// Last message timestamp for polling (atomic for interior mutability). - last_poll_ts: AtomicI64, -} - -impl IMessageMessenger { - pub fn new(name: String, server_url: String, password: String) -> Self { - Self { - name, - server_url: server_url.trim_end_matches('/').to_string(), - password, - connected: false, - http: reqwest::Client::new(), - last_poll_ts: AtomicI64::new(chrono::Utc::now().timestamp_millis()), - } - } - - /// Build a BlueBubbles API URL with password query param. - fn api_url(&self, path: &str) -> String { - // URL-encode the password to handle special characters (&, =, #, etc.) - let encoded_password: String = self.password.bytes().map(|b| { - match b { - b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { - (b as char).to_string() - } - _ => format!("%{:02X}", b), - } - }).collect(); - format!( - "{}/api/v1/{}?password={}", - self.server_url, path, encoded_password - ) - } -} - -#[async_trait] -impl Messenger for IMessageMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "imessage" - } - - async fn initialize(&mut self) -> Result<()> { - // Check server connectivity - let url = self.api_url("server/info"); - let resp = self - .http - .get(&url) - .send() - .await - .context("Failed to connect to BlueBubbles server")?; - - if !resp.status().is_success() { - anyhow::bail!( - "BlueBubbles server returned {} — check URL and password", - resp.status() - ); - } - - let data: serde_json::Value = resp.json().await?; - if data["status"].as_i64() != Some(200) { - anyhow::bail!( - "BlueBubbles server error: {}", - data["message"].as_str().unwrap_or("unknown") - ); - } - - self.connected = true; - tracing::info!( - server = %self.server_url, - os_version = ?data["data"]["os_version"].as_str(), - "BlueBubbles/iMessage connected" - ); - - Ok(()) - } - - async fn send_message(&self, chat_guid: &str, content: &str) -> Result { - let url = self.api_url("message/text"); - let payload = serde_json::json!({ - "chatGuid": chat_guid, - "message": content, - "method": "apple-script" - }); - - let resp = self - .http - .post(&url) - .json(&payload) - .send() - .await - .context("BlueBubbles send failed")?; - - if resp.status().is_success() { - let data: serde_json::Value = resp.json().await?; - return Ok( - data["data"]["guid"] - .as_str() - .unwrap_or("sent") - .to_string(), - ); - } - - anyhow::bail!("BlueBubbles send returned {}", resp.status()) - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - self.send_message(opts.recipient, opts.content).await - } - - async fn receive_messages(&self) -> Result> { - let poll_ts = self.last_poll_ts.load(Ordering::Relaxed); - let url = format!( - "{}&after={}&limit=50&sort=asc", - self.api_url("message"), poll_ts - ); - - let resp = self.http.get(&url).send().await?; - if !resp.status().is_success() { - return Ok(Vec::new()); - } - - let data: serde_json::Value = resp.json().await?; - let messages = data["data"].as_array().cloned().unwrap_or_default(); - - let mut result = Vec::new(); - let mut max_ts = poll_ts; - for msg in &messages { - // Skip messages we sent - if msg["isFromMe"].as_bool() == Some(true) { - continue; - } - - let text = match msg["text"].as_str() { - Some(t) if !t.is_empty() => t, - _ => continue, - }; - - let guid = msg["guid"].as_str().unwrap_or("").to_string(); - let sender = msg["handle"]["address"] - .as_str() - .or(msg["handle"]["id"].as_str()) - .unwrap_or("unknown") - .to_string(); - let date_created = msg["dateCreated"].as_i64().unwrap_or(0); - let chat_guid = msg["chats"] - .as_array() - .and_then(|c| c.first()) - .and_then(|c| c["guid"].as_str()) - .map(|s| s.to_string()); - - if date_created > max_ts { - max_ts = date_created; - } - - result.push(Message { - id: guid, - sender, - content: text.to_string(), - timestamp: date_created / 1000, // ms to seconds - channel: chat_guid, - reply_to: None, - media: None, - is_direct: false, // TODO: implement DM detection - }); - } - - // Advance the poll timestamp so we don't re-fetch the same messages. - if max_ts > poll_ts { - self.last_poll_ts.store(max_ts, Ordering::Relaxed); - } - - Ok(result) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_imessage_creation() { - let m = IMessageMessenger::new( - "test".to_string(), - "http://localhost:1234".to_string(), - "password123".to_string(), - ); - assert_eq!(m.name(), "test"); - assert_eq!(m.messenger_type(), "imessage"); - assert!(!m.is_connected()); - assert_eq!(m.server_url, "http://localhost:1234"); - } - - #[test] - fn test_api_url() { - let m = IMessageMessenger::new( - "test".to_string(), - "http://localhost:1234/".to_string(), // trailing slash - "pass".to_string(), - ); - let url = m.api_url("server/info"); - assert!(url.starts_with("http://localhost:1234/api/v1/server/info")); - assert!(url.contains("password=pass")); - } - - #[test] - fn test_api_url_encodes_special_chars() { - let m = IMessageMessenger::new( - "test".to_string(), - "http://localhost:1234".to_string(), - "p@ss&word".to_string(), - ); - let url = m.api_url("server/info"); - // Special chars should be percent-encoded - assert!(url.contains("password=p%40ss%26word")); - assert!(!url.contains("p@ss&word")); - } -} diff --git a/crates/rustyclaw-core/src/messengers/irc.rs b/crates/rustyclaw-core/src/messengers/irc.rs deleted file mode 100644 index cfd5acb..0000000 --- a/crates/rustyclaw-core/src/messengers/irc.rs +++ /dev/null @@ -1,368 +0,0 @@ -//! IRC messenger using raw TCP/TLS connections. -//! -//! Implements basic IRC protocol (RFC 2812) for connecting to IRC servers, -//! joining channels, sending/receiving messages. Supports TLS. - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::TcpStream; -use tokio::sync::Mutex; - -/// IRC messenger using raw TCP/TLS. -pub struct IrcMessenger { - name: String, - server: String, - port: u16, - nick: String, - channels: Vec, - use_tls: bool, - password: Option, - connected: bool, - /// Shared writer half of the TCP stream. - writer: Option>>>, - /// Pending incoming messages collected by the reader task. - pending_messages: Arc>>, - /// Background reader task handle. - _reader_handle: Option>, -} - -impl IrcMessenger { - pub fn new(name: String, server: String, port: u16, nick: String) -> Self { - Self { - name, - server, - port, - nick, - channels: Vec::new(), - use_tls: port == 6697, - password: None, - connected: false, - writer: None, - pending_messages: Arc::new(Mutex::new(Vec::new())), - _reader_handle: None, - } - } - - /// Set channels to join on connect. - pub fn with_channels(mut self, channels: Vec) -> Self { - self.channels = channels; - self - } - - /// Set whether to use TLS. - pub fn with_tls(mut self, use_tls: bool) -> Self { - self.use_tls = use_tls; - self - } - - /// Set server password. - pub fn with_password(mut self, password: String) -> Self { - self.password = Some(password); - self - } - - /// Send a raw IRC command. - async fn send_raw(&self, line: &str) -> Result<()> { - if let Some(writer) = &self.writer { - let mut w = writer.lock().await; - w.write_all(format!("{}\r\n", line).as_bytes()).await?; - w.flush().await?; - } - Ok(()) - } -} - -/// Parse an IRC PRIVMSG line into sender, target, and message text. -fn parse_privmsg(line: &str) -> Option<(&str, &str, &str)> { - // Format: :nick!user@host PRIVMSG #channel :message text - if !line.starts_with(':') { - return None; - } - let rest = &line[1..]; - let parts: Vec<&str> = rest.splitn(4, ' ').collect(); - if parts.len() < 4 || parts[1] != "PRIVMSG" { - return None; - } - let sender = parts[0].split('!').next()?; - let target = parts[2]; - let msg = parts[3].strip_prefix(':')?; - Some((sender, target, msg)) -} - -/// Check if a line is a PING and return the token. -fn parse_ping(line: &str) -> Option<&str> { - line.strip_prefix("PING ") -} - -/// Split a UTF-8 string into chunks of at most `max_bytes` bytes each, -/// never splitting in the middle of a multi-byte character. -fn split_utf8(s: &str, max_bytes: usize) -> Vec<&str> { - let mut chunks = Vec::new(); - let mut start = 0; - while start < s.len() { - let mut end = (start + max_bytes).min(s.len()); - // Back up to a char boundary if we landed mid-codepoint - while end > start && !s.is_char_boundary(end) { - end -= 1; - } - if end == start { - // Shouldn't happen with valid UTF-8, but advance at least one char - end = start + s[start..].chars().next().map_or(1, |c| c.len_utf8()); - } - chunks.push(&s[start..end]); - start = end; - } - chunks -} - -#[async_trait] -impl Messenger for IrcMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "irc" - } - - async fn initialize(&mut self) -> Result<()> { - let addr = format!("{}:{}", self.server, self.port); - let stream = TcpStream::connect(&addr) - .await - .with_context(|| format!("Failed to connect to IRC server {}", addr))?; - - // TLS support requires the `rustls-platform-verifier` crate which is - // already pulled in transitively. For simplicity and to avoid adding - // direct deps, we only support plaintext for now and log a warning - // if TLS was requested. - if self.use_tls { - tracing::warn!( - "IRC TLS requested but not yet supported natively. \ - Connect to a plaintext port or use a TLS-terminating proxy (e.g. stunnel). \ - Falling back to plaintext." - ); - } - - let (reader, writer): ( - Box, - Box, - ) = { - let (r, w) = tokio::io::split(stream); - (Box::new(r), Box::new(w)) - }; - - let writer = Arc::new(Mutex::new(writer)); - self.writer = Some(writer.clone()); - - // Send registration - if let Some(ref pass) = self.password { - let mut w = writer.lock().await; - w.write_all(format!("PASS {}\r\n", pass).as_bytes()) - .await?; - } - { - let mut w = writer.lock().await; - w.write_all(format!("NICK {}\r\n", self.nick).as_bytes()) - .await?; - w.write_all( - format!("USER {} 0 * :RustyClaw Bot\r\n", self.nick).as_bytes(), - ) - .await?; - w.flush().await?; - } - - // Spawn reader task - let pending = self.pending_messages.clone(); - let channels = self.channels.clone(); - let nick = self.nick.clone(); - let writer_clone = writer.clone(); - - let handle = tokio::spawn(async move { - let mut buf_reader = BufReader::new(reader); - let mut line_buf = String::new(); - let mut joined = false; - - loop { - line_buf.clear(); - match buf_reader.read_line(&mut line_buf).await { - Ok(0) => break, // Connection closed - Ok(_) => { - let line = line_buf.trim_end(); - - // Handle PING/PONG - if let Some(token) = parse_ping(line) { - let mut w = writer_clone.lock().await; - let _ = w - .write_all(format!("PONG {}\r\n", token).as_bytes()) - .await; - let _ = w.flush().await; - continue; - } - - // Join channels after RPL_WELCOME (001) - if !joined && line.contains(" 001 ") { - let mut w = writer_clone.lock().await; - for ch in &channels { - let _ = w - .write_all(format!("JOIN {}\r\n", ch).as_bytes()) - .await; - } - let _ = w.flush().await; - joined = true; - } - - // Parse PRIVMSG - if let Some((sender, target, text)) = parse_privmsg(line) { - // Skip our own messages - if sender == nick { - continue; - } - - let channel = if target.starts_with('#') || target.starts_with('&') { - target.to_string() - } else { - sender.to_string() - }; - - let msg = Message { - id: format!( - "irc-{}", - chrono::Utc::now().timestamp_millis() - ), - sender: sender.to_string(), - content: text.to_string(), - timestamp: chrono::Utc::now().timestamp(), - channel: Some(channel), - reply_to: None, - media: None, - is_direct: false, // TODO: implement DM detection - }; - - let mut pending = pending.lock().await; - pending.push(msg); - } - } - Err(_) => break, - } - } - }); - - self._reader_handle = Some(handle); - self.connected = true; - - tracing::info!( - server = %self.server, - nick = %self.nick, - channels = ?self.channels, - tls = self.use_tls, - "IRC connected" - ); - - Ok(()) - } - - async fn send_message(&self, target: &str, content: &str) -> Result { - // IRC messages have a max length of ~512 bytes including the command. - // Split long messages. - let max_len = 400; // Leave room for PRIVMSG header - for chunk in split_utf8(content, max_len) { - self.send_raw(&format!("PRIVMSG {} :{}", target, chunk)) - .await?; - } - Ok(format!("irc-{}", chrono::Utc::now().timestamp_millis())) - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - // IRC doesn't have native reply support — prefix with context - let content = if let Some(reply_to) = opts.reply_to { - format!("[re: {}] {}", reply_to, opts.content) - } else { - opts.content.to_string() - }; - self.send_message(opts.recipient, &content).await - } - - async fn receive_messages(&self) -> Result> { - let mut pending = self.pending_messages.lock().await; - Ok(pending.drain(..).collect()) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - if self.connected { - let _ = self.send_raw("QUIT :RustyClaw shutting down").await; - } - self.connected = false; - self.writer = None; - if let Some(handle) = self._reader_handle.take() { - handle.abort(); - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_irc_messenger_creation() { - let m = IrcMessenger::new( - "test".to_string(), - "irc.libera.chat".to_string(), - 6697, - "rustyclaw".to_string(), - ); - assert_eq!(m.name(), "test"); - assert_eq!(m.messenger_type(), "irc"); - assert!(!m.is_connected()); - assert!(m.use_tls); - } - - #[test] - fn test_parse_privmsg() { - let line = ":nick!user@host PRIVMSG #channel :hello world"; - let (sender, target, msg) = parse_privmsg(line).unwrap(); - assert_eq!(sender, "nick"); - assert_eq!(target, "#channel"); - assert_eq!(msg, "hello world"); - } - - #[test] - fn test_parse_privmsg_dm() { - let line = ":alice!user@host PRIVMSG bot :direct message"; - let (sender, target, msg) = parse_privmsg(line).unwrap(); - assert_eq!(sender, "alice"); - assert_eq!(target, "bot"); - assert_eq!(msg, "direct message"); - } - - #[test] - fn test_parse_ping() { - assert_eq!(parse_ping("PING :server"), Some(":server")); - assert_eq!(parse_ping("PRIVMSG #ch :hello"), None); - } - - #[test] - fn test_with_options() { - let m = IrcMessenger::new( - "test".to_string(), - "irc.libera.chat".to_string(), - 6667, - "bot".to_string(), - ) - .with_channels(vec!["#test".to_string()]) - .with_tls(false) - .with_password("secret".to_string()); - - assert_eq!(m.channels, vec!["#test"]); - assert!(!m.use_tls); - assert_eq!(m.password, Some("secret".to_string())); - } -} diff --git a/crates/rustyclaw-core/src/messengers/matrix.rs b/crates/rustyclaw-core/src/messengers/matrix.rs deleted file mode 100644 index 67f4d0f..0000000 --- a/crates/rustyclaw-core/src/messengers/matrix.rs +++ /dev/null @@ -1,274 +0,0 @@ -//! Matrix messenger using matrix-sdk with E2EE support. -//! -//! This requires the `matrix` feature to be enabled. - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use matrix_sdk::{ - Client, Room, SessionTokens, - config::SyncSettings, - ruma::{ - OwnedUserId, RoomId, UserId, - events::room::message::{ - MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, - }, - }, -}; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::Mutex; - -/// Matrix messenger with E2EE support -pub struct MatrixMessenger { - name: String, - homeserver_url: String, - user_id: String, - password: Option, - access_token: Option, - device_id: Option, - store_path: PathBuf, - client: Option, - connected: bool, - /// Pending incoming messages (populated by sync) - pending_messages: Arc>>, -} - -impl MatrixMessenger { - /// Create a new Matrix messenger with password authentication - pub fn with_password( - name: String, - homeserver_url: String, - user_id: String, - password: String, - store_path: PathBuf, - ) -> Self { - Self { - name, - homeserver_url, - user_id, - password: Some(password), - access_token: None, - device_id: None, - store_path, - client: None, - connected: false, - pending_messages: Arc::new(Mutex::new(Vec::new())), - } - } - - /// Create a new Matrix messenger with access token authentication - pub fn with_token( - name: String, - homeserver_url: String, - user_id: String, - access_token: String, - device_id: Option, - store_path: PathBuf, - ) -> Self { - Self { - name, - homeserver_url, - user_id, - password: None, - access_token: Some(access_token), - device_id, - store_path, - client: None, - connected: false, - pending_messages: Arc::new(Mutex::new(Vec::new())), - } - } - - /// Get the Matrix client (must be initialized first) - fn client(&self) -> Result<&Client> { - self.client - .as_ref() - .context("Matrix client not initialized") - } - - /// Resolve a room by ID or alias - async fn resolve_room(&self, room_id_or_alias: &str) -> Result { - let client = self.client()?; - - // Try as room ID first - if let Ok(room_id) = <&RoomId>::try_from(room_id_or_alias) { - if let Some(room) = client.get_room(room_id) { - return Ok(room); - } - } - - // Try as room alias - if room_id_or_alias.starts_with('#') { - let alias = matrix_sdk::ruma::OwnedRoomAliasId::try_from(room_id_or_alias) - .context("Invalid room alias")?; - let response = client.resolve_room_alias(&alias).await?; - if let Some(room) = client.get_room(&response.room_id) { - return Ok(room); - } - } - - anyhow::bail!("Room not found: {}", room_id_or_alias) - } -} - -#[async_trait] -impl Messenger for MatrixMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "matrix" - } - - async fn initialize(&mut self) -> Result<()> { - // Build the client with SQLite store for E2EE state - let client = Client::builder() - .homeserver_url(&self.homeserver_url) - .sqlite_store(&self.store_path, None) - .build() - .await - .context("Failed to build Matrix client")?; - - // Authenticate - if let Some(ref password) = self.password { - // Password login - let user_id = <&UserId>::try_from(self.user_id.as_str()).context("Invalid user ID")?; - - client - .matrix_auth() - .login_username(user_id, password) - .initial_device_display_name("RustyClaw") - .send() - .await - .context("Matrix login failed")?; - } else if let Some(ref token) = self.access_token { - // Token-based session restore - let session = matrix_sdk::authentication::AuthSession::Matrix( - matrix_sdk::authentication::matrix::MatrixSession { - meta: matrix_sdk::SessionMeta { - user_id: OwnedUserId::try_from(self.user_id.as_str())?, - device_id: self - .device_id - .as_ref() - .map(|d| matrix_sdk::ruma::OwnedDeviceId::try_from(d.as_str())) - .transpose()? - .unwrap_or_else(|| "RUSTYCLAW".into()), - }, - tokens: SessionTokens { - access_token: token.clone(), - refresh_token: None, - }, - }, - ); - client.restore_session(session).await?; - } else { - anyhow::bail!("No authentication method provided"); - } - - // Set up message handler - let pending = self.pending_messages.clone(); - client.add_event_handler(move |ev: OriginalSyncRoomMessageEvent, room: Room| { - let pending = pending.clone(); - async move { - let content = match ev.content.msgtype { - MessageType::Text(text) => text.body, - MessageType::Notice(notice) => notice.body, - _ => return, - }; - - let message = Message { - id: ev.event_id.to_string(), - sender: ev.sender.to_string(), - content, - timestamp: ev.origin_server_ts.as_secs().into(), - channel: Some(room.room_id().to_string()), - reply_to: None, - media: None, - }; - - pending.lock().await.push(message); - } - }); - - // Initial sync to catch up - client.sync_once(SyncSettings::default()).await?; - - self.client = Some(client); - self.connected = true; - Ok(()) - } - - async fn send_message(&self, room_id: &str, content: &str) -> Result { - let room = self.resolve_room(room_id).await?; - - let content = RoomMessageEventContent::text_plain(content); - let response = room.send(content).await?; - - Ok(response.event_id.to_string()) - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - let room = self.resolve_room(opts.recipient).await?; - - let mut content = RoomMessageEventContent::text_plain(opts.content); - - // Handle reply - if let Some(reply_to) = opts.reply_to { - if let Ok(_event_id) = matrix_sdk::ruma::OwnedEventId::try_from(reply_to) { - // For proper threading, we'd need to fetch the original event - // For now, just reference it in the body - let reply_body = format!("> Replying to {}\n\n{}", reply_to, opts.content); - content = RoomMessageEventContent::text_plain(reply_body); - } - } - - let response = room.send(content).await?; - Ok(response.event_id.to_string()) - } - - async fn receive_messages(&self) -> Result> { - let client = self.client()?; - - // Do a quick sync to get new messages - let settings = SyncSettings::default().timeout(std::time::Duration::from_secs(0)); - let _ = client.sync_once(settings).await; - - // Drain pending messages - let mut pending = self.pending_messages.lock().await; - Ok(std::mem::take(&mut *pending)) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - if let Some(client) = self.client.take() { - // Logout invalidates the access token - let _ = client.matrix_auth().logout().await; - } - self.connected = false; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_matrix_messenger_creation() { - let messenger = MatrixMessenger::with_password( - "test".to_string(), - "https://matrix.org".to_string(), - "@test:matrix.org".to_string(), - "password".to_string(), - PathBuf::from("/tmp/matrix-test"), - ); - assert_eq!(messenger.name(), "test"); - assert_eq!(messenger.messenger_type(), "matrix"); - assert!(!messenger.is_connected()); - } -} diff --git a/crates/rustyclaw-core/src/messengers/matrix_cli.rs b/crates/rustyclaw-core/src/messengers/matrix_cli.rs deleted file mode 100644 index a6a5282..0000000 --- a/crates/rustyclaw-core/src/messengers/matrix_cli.rs +++ /dev/null @@ -1,779 +0,0 @@ -//! Matrix messenger using direct HTTP API calls to homeserver. -//! -//! This implementation uses the Matrix Client-Server API directly via HTTP, -//! avoiding external dependencies like matrix-sdk. It provides basic messaging -//! functionality without E2EE support. -//! -//! This requires the `matrix-cli` feature to be enabled. - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use pulldown_cmark::{Parser, html}; -use reqwest::Client; -use serde::Deserialize; -use serde_json::{Value, json}; -use std::collections::HashSet; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::Mutex; - -/// Matrix API response for login -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct LoginResponse { - access_token: String, - device_id: String, - user_id: String, -} - -/// Matrix API response for room events -#[derive(Debug, Deserialize)] -struct SyncResponse { - rooms: Option, - next_batch: String, -} - -#[derive(Debug, Deserialize)] -struct RoomsResponse { - join: Option>, - invite: Option>, -} - -/// Matrix API response for sending messages -#[derive(Debug, Deserialize)] -struct SendResponse { - event_id: String, -} - -/// Matrix room event -#[derive(Debug, Deserialize)] -struct RoomEvent { - #[serde(rename = "type")] - event_type: String, - sender: String, - content: Value, - event_id: String, - origin_server_ts: u64, -} - -/// DM configuration for Matrix messenger -#[derive(Debug, Clone, Default)] -pub struct MatrixDmConfig { - /// Whether DMs are enabled - pub enabled: bool, - /// DM policy: "allowlist", "open", or "pairing" - pub policy: String, - /// List of user IDs allowed to send DMs (for allowlist policy) - pub allow_from: Vec, -} - -/// Matrix messenger implementation using HTTP API -pub struct MatrixCliMessenger { - name: String, - homeserver_url: String, - user_id: String, - password: Option, - access_token: Option, - device_id: Option, - client: Client, - connected: bool, - sync_token: Arc>>, - /// Configured allowed chat room IDs (explicit allowlist) - allowed_chats: HashSet, - /// DM configuration - dm_config: MatrixDmConfig, - /// Dynamically accepted DM room IDs (from auto-accept) - dm_rooms: Arc>>, - /// Directory for persisting state (sync token, etc.) - state_dir: Option, - /// Messages from initial sync, waiting to be returned - pending_messages: Arc>>, -} - -impl MatrixCliMessenger { - /// Create a new Matrix CLI messenger with password authentication - pub fn with_password( - name: String, - homeserver_url: String, - user_id: String, - password: String, - ) -> Self { - Self { - name, - homeserver_url: homeserver_url.trim_end_matches('/').to_string(), - user_id, - password: Some(password), - access_token: None, - device_id: None, - client: Client::new(), - connected: false, - sync_token: Arc::new(Mutex::new(None)), - allowed_chats: HashSet::new(), - dm_config: MatrixDmConfig::default(), - dm_rooms: Arc::new(Mutex::new(HashSet::new())), - state_dir: None, - pending_messages: Arc::new(Mutex::new(Vec::new())), - } - } - - /// Create a new Matrix CLI messenger with access token authentication - pub fn with_token( - name: String, - homeserver_url: String, - user_id: String, - access_token: String, - device_id: Option, - ) -> Self { - Self { - name, - homeserver_url: homeserver_url.trim_end_matches('/').to_string(), - user_id, - password: None, - access_token: Some(access_token), - device_id, - client: Client::new(), - connected: false, - sync_token: Arc::new(Mutex::new(None)), - allowed_chats: HashSet::new(), - dm_config: MatrixDmConfig::default(), - dm_rooms: Arc::new(Mutex::new(HashSet::new())), - state_dir: None, - pending_messages: Arc::new(Mutex::new(Vec::new())), - } - } - - /// Set state directory for persisting sync token - pub fn with_state_dir(mut self, dir: std::path::PathBuf) -> Self { - self.state_dir = Some(dir); - self - } - - /// Set allowed chat room IDs - pub fn with_allowed_chats(mut self, chats: Vec) -> Self { - self.allowed_chats = chats.into_iter().collect(); - self - } - - /// Set DM configuration - pub fn with_dm_config(mut self, config: MatrixDmConfig) -> Self { - self.dm_config = config; - self - } - - /// Build authorization header - fn auth_header(&self) -> Result { - self.access_token - .as_ref() - .map(|token| format!("Bearer {}", token)) - .ok_or_else(|| anyhow::anyhow!("No access token available")) - } - - /// Load sync token from disk if state_dir is configured - fn load_sync_token(&self) -> Option { - let state_dir = self.state_dir.as_ref()?; - let token_path = state_dir.join("matrix_sync_token"); - std::fs::read_to_string(&token_path).ok() - } - - /// Save sync token to disk if state_dir is configured - fn save_sync_token(&self, token: &str) { - if let Some(ref state_dir) = self.state_dir { - let token_path = state_dir.join("matrix_sync_token"); - if let Err(e) = std::fs::create_dir_all(state_dir) { - eprintln!("Failed to create state dir: {}", e); - return; - } - if let Err(e) = std::fs::write(&token_path, token) { - eprintln!("Failed to save sync token: {}", e); - } - } - } - - /// Login with password and get access token - async fn login(&mut self) -> Result<()> { - let password = self - .password - .as_ref() - .context("No password provided for login")?; - - let login_request = json!({ - "type": "m.login.password", - "user": self.user_id, - "password": password, - "initial_device_display_name": "RustyClaw Matrix CLI" - }); - - let url = format!("{}/_matrix/client/v3/login", self.homeserver_url); - - let response = self - .client - .post(&url) - .json(&login_request) - .send() - .await - .context("Failed to send login request")?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Login failed: {} - {}", status, error_text); - } - - let login_response: LoginResponse = response - .json() - .await - .context("Failed to parse login response")?; - - self.access_token = Some(login_response.access_token); - self.device_id = Some(login_response.device_id); - - Ok(()) - } - - /// Join a room by ID - async fn join_room(&self, room_id: &str) -> Result<()> { - let url = format!( - "{}/_matrix/client/v3/rooms/{}/join", - self.homeserver_url, - urlencoding::encode(room_id) - ); - - let response = self - .client - .post(&url) - .header("Authorization", self.auth_header()?) - .header("Content-Type", "application/json") - .body("{}") - .send() - .await - .context("Failed to join room")?; - - if !response.status().is_success() { - let error = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to join room: {}", error); - } - - Ok(()) - } - - /// Get list of joined rooms - async fn get_joined_rooms(&self) -> Result> { - let url = format!("{}/_matrix/client/v3/joined_rooms", self.homeserver_url); - - let response = self - .client - .get(&url) - .header("Authorization", self.auth_header()?) - .send() - .await - .context("Failed to get joined rooms")?; - - if !response.status().is_success() { - anyhow::bail!("Failed to get joined rooms: {}", response.status()); - } - - #[derive(Deserialize)] - struct JoinedRoomsResponse { - joined_rooms: Vec, - } - - let resp: JoinedRoomsResponse = response.json().await?; - Ok(resp.joined_rooms) - } - - /// Resolve room ID from alias or return as-is if already an ID - async fn resolve_room_id(&self, room_id_or_alias: &str) -> Result { - // If it looks like a room ID, return as-is - if room_id_or_alias.starts_with('!') { - return Ok(room_id_or_alias.to_string()); - } - - // If it's an alias, resolve it - if room_id_or_alias.starts_with('#') { - let encoded_alias = urlencoding::encode(room_id_or_alias); - let url = format!( - "{}/_matrix/client/v3/directory/room/{}", - self.homeserver_url, encoded_alias - ); - - let response = self - .client - .get(&url) - .header("Authorization", self.auth_header()?) - .send() - .await - .context("Failed to resolve room alias")?; - - let status = response.status(); - if !status.is_success() { - anyhow::bail!("Failed to resolve room alias: {}", status); - } - - let room_info: Value = response.json().await?; - return room_info["room_id"] - .as_str() - .map(|s| s.to_string()) - .ok_or_else(|| anyhow::anyhow!("Room ID not found in response")); - } - - // Assume it's a room ID if it doesn't match alias pattern - Ok(room_id_or_alias.to_string()) - } - - /// Perform a sync to get new messages - async fn sync(&self, timeout_ms: Option) -> Result> { - let mut url = format!("{}/_matrix/client/v3/sync", self.homeserver_url); - - let mut params = Vec::new(); - { - let token = self.sync_token.lock().await; - if let Some(ref t) = *token { - params.push(format!("since={}", t)); - } - } - if let Some(timeout) = timeout_ms { - params.push(format!("timeout={}", timeout)); - } - - if !params.is_empty() { - url.push('?'); - url.push_str(¶ms.join("&")); - } - - let response = self - .client - .get(&url) - .header("Authorization", self.auth_header()?) - .send() - .await - .context("Failed to sync")?; - - if !response.status().is_success() { - anyhow::bail!("Sync failed: {}", response.status()); - } - - let sync_response: SyncResponse = response.json().await?; - - // Store next_batch for later - we'll only save it after successful message extraction - let next_batch = sync_response.next_batch.clone(); - - // Process invites if DM is enabled - if self.dm_config.enabled { - if let Some(ref rooms) = sync_response.rooms { - if let Some(ref invites) = rooms.invite { - for (room_id, invite_data) in invites { - // Find who sent the invite - if let Some(invite_state) = invite_data.get("invite_state") { - if let Some(events) = invite_state.get("events") { - if let Some(events_array) = events.as_array() { - for event in events_array { - if event.get("type").and_then(|t| t.as_str()) - == Some("m.room.member") - { - if let Some(sender) = - event.get("sender").and_then(|s| s.as_str()) - { - // Check if we should auto-accept - let should_accept = - match self.dm_config.policy.as_str() { - "open" => true, - "allowlist" => self - .dm_config - .allow_from - .iter() - .any(|u| u == sender), - _ => false, - }; - - if should_accept { - // Accept the invite - if let Err(e) = self.join_room(room_id).await { - eprintln!( - "Failed to auto-accept invite from {}: {}", - sender, e - ); - } else { - // Track as DM room - let mut dm_rooms = - self.dm_rooms.lock().await; - dm_rooms.insert(room_id.clone()); - eprintln!( - "Auto-accepted DM invite from {} to room {}", - sender, room_id - ); - } - } - } - } - } - } - } - } - } - } - } - } - - let mut messages = Vec::new(); - - // Track whether any allowed rooms appeared in this sync. - // We only advance the sync token if allowed rooms were present, - // to avoid skipping messages when sync returns only non-allowed room events. - let mut allowed_rooms_in_sync = false; - - // Get current DM rooms for filtering - let dm_rooms = self.dm_rooms.lock().await.clone(); - let has_room_filters = !self.allowed_chats.is_empty() || !dm_rooms.is_empty(); - eprintln!( - "DEBUG: sync - allowed_chats: {:?}, dm_rooms: {:?}", - self.allowed_chats, dm_rooms - ); - - if let Some(rooms) = sync_response.rooms { - if let Some(joined_rooms) = rooms.join { - eprintln!("DEBUG: sync - checking {} joined rooms", joined_rooms.len()); - for (room_id, room_data) in joined_rooms { - // Check if this room is allowed - let in_allowed_chats = self.allowed_chats.contains(&room_id); - let in_dm_rooms = dm_rooms.contains(&room_id); - let is_allowed_room = in_allowed_chats || in_dm_rooms; - - // If we have an allowlist OR dm_rooms, only process rooms in one of them - if has_room_filters { - if !is_allowed_room { - eprintln!("DEBUG: skipping room {} (not in allowed lists)", room_id); - continue; - } - // An allowed room appeared in this sync - allowed_rooms_in_sync = true; - } - eprintln!("DEBUG: processing room {}", room_id); - - if let Some(timeline) = room_data.get("timeline") { - if let Some(events) = timeline.get("events") { - if let Some(events_array) = events.as_array() { - for event_value in events_array { - if let Ok(event) = - serde_json::from_value::(event_value.clone()) - { - // Skip our own messages - if event.sender == self.user_id { - continue; - } - if event.event_type == "m.room.message" { - if let Some(body) = event.content.get("body") { - if let Some(body_str) = body.as_str() { - // Check if this is a DM room - let is_dm = in_dm_rooms; - messages.push(Message { - id: event.event_id, - sender: event.sender, - content: body_str.to_string(), - timestamp: (event.origin_server_ts / 1000) - as i64, - channel: Some(room_id.clone()), - reply_to: None, - thread_id: None, - media: None, - is_direct: is_dm, - message_type: Default::default(), - edited_timestamp: None, - reactions: None, - }); - } - } - } - } - } - } - } - } - } - } - } - - // Only advance sync token if: - // 1. We extracted messages, OR - // 2. Allowed rooms appeared in sync (even with no messages = caught up), OR - // 3. No room filters configured (process everything) - // - // This prevents the token from advancing when sync only contains - // events for non-allowed rooms, which would cause us to miss messages. - let should_advance_token = - !messages.is_empty() || allowed_rooms_in_sync || !has_room_filters; - - if should_advance_token { - let mut token = self.sync_token.lock().await; - *token = Some(next_batch.clone()); - self.save_sync_token(&next_batch); - } else { - eprintln!("DEBUG: sync - NOT advancing token (no allowed rooms in response)"); - } - - Ok(messages) - } - - /// Set typing indicator for a room - async fn set_typing(&self, room_id: &str, typing: bool) -> Result<()> { - let resolved_room_id = self.resolve_room_id(room_id).await?; - let url = format!( - "{}/_matrix/client/v3/rooms/{}/typing/{}", - self.homeserver_url, - urlencoding::encode(&resolved_room_id), - urlencoding::encode(&self.user_id) - ); - - let body = if typing { - json!({ "typing": true, "timeout": 30000 }) - } else { - json!({ "typing": false }) - }; - - let response = self - .client - .put(&url) - .header("Authorization", self.auth_header()?) - .json(&body) - .send() - .await - .context("Failed to set typing indicator")?; - - if !response.status().is_success() { - // Non-fatal - just log and continue - eprintln!("Failed to set typing indicator: {}", response.status()); - } - - Ok(()) - } - - /// Send a plain text message to a room - async fn send_text_message( - &self, - room_id: &str, - content: &str, - reply_to: Option<&str>, - ) -> Result { - let resolved_room_id = self.resolve_room_id(room_id).await?; - - let txn_id = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis(); - - // Convert markdown to HTML for formatted display - let parser = Parser::new(content); - let mut html_output = String::new(); - html::push_html(&mut html_output, parser); - - let mut message_content = json!({ - "msgtype": "m.text", - "body": content, - "format": "org.matrix.custom.html", - "formatted_body": html_output - }); - - // Handle reply-to if provided - if let Some(reply_event_id) = reply_to { - // Note: Modern Matrix clients use m.relates_to for threading and ignore the - // fallback body. We don't include the legacy "> <@user> text" fallback since - // we don't have the original message content readily available, and the - // malformed fallback causes visual garbage in some clients. - message_content["m.relates_to"] = json!({ - "m.in_reply_to": { - "event_id": reply_event_id - } - }); - } - - let url = format!( - "{}/_matrix/client/v3/rooms/{}/send/m.room.message/{}", - self.homeserver_url, - urlencoding::encode(&resolved_room_id), - txn_id - ); - - let response = self - .client - .put(&url) - .header("Authorization", self.auth_header()?) - .json(&message_content) - .send() - .await - .context("Failed to send message")?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to send message: {} - {}", status, error_text); - } - - let send_response: SendResponse = response.json().await?; - Ok(send_response.event_id) - } -} - -#[async_trait] -impl Messenger for MatrixCliMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "matrix-cli" - } - - async fn initialize(&mut self) -> Result<()> { - if self.access_token.is_none() && self.password.is_some() { - self.login().await?; - } - - if self.access_token.is_none() { - anyhow::bail!("No access token available and no password provided"); - } - - // If DM is enabled, load existing joined rooms that aren't in allowed_chats - // These are likely DM rooms from previous sessions - if self.dm_config.enabled { - if let Ok(joined) = self.get_joined_rooms().await { - let mut dm_rooms = self.dm_rooms.lock().await; - for room_id in joined { - if !self.allowed_chats.contains(&room_id) { - dm_rooms.insert(room_id); - } - } - if !dm_rooms.is_empty() { - eprintln!("Loaded {} existing DM rooms", dm_rooms.len()); - } - } - } - - // Load persisted sync token if available. - // This ensures we only process NEW messages after restart, not re-process old ones. - // The sync token represents our last known position in the event stream. - if let Some(saved_token) = self.load_sync_token() { - eprintln!("Loaded persisted sync token: {}", saved_token); - let mut token = self.sync_token.lock().await; - *token = Some(saved_token); - } - - // Do initial sync to catch up on any messages since last run. - // If we have a persisted token, this returns only NEW messages. - // If no token (fresh start), this returns recent messages (~10 per room). - let initial_messages = self.sync(Some(0)).await?; - if !initial_messages.is_empty() { - eprintln!( - "Initial sync returned {} new messages", - initial_messages.len() - ); - let mut pending = self.pending_messages.lock().await; - pending.extend(initial_messages); - } else { - eprintln!("Initial sync: no new messages (caught up)"); - } - - self.connected = true; - Ok(()) - } - - async fn send_message(&self, recipient: &str, content: &str) -> Result { - self.send_text_message(recipient, content, None).await - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - self.send_text_message(opts.recipient, opts.content, opts.reply_to) - .await - } - - async fn receive_messages(&self) -> Result> { - // First, return any pending messages from initial sync - { - let mut pending = self.pending_messages.lock().await; - if !pending.is_empty() { - let messages = std::mem::take(&mut *pending); - return Ok(messages); - } - } - - // Then do normal sync for new messages - self.sync(Some(1000)).await - } - - fn is_connected(&self) -> bool { - self.connected && self.access_token.is_some() - } - - async fn disconnect(&mut self) -> Result<()> { - if let Some(access_token) = &self.access_token { - let url = format!("{}/_matrix/client/v3/logout", self.homeserver_url); - - let _ = self - .client - .post(&url) - .header("Authorization", format!("Bearer {}", access_token)) - .send() - .await; - } - - self.access_token = None; - self.device_id = None; - self.connected = false; - { - let mut token = self.sync_token.lock().await; - *token = None; - } - - Ok(()) - } - - async fn set_typing(&self, channel: &str, typing: bool) -> Result<()> { - MatrixCliMessenger::set_typing(self, channel, typing).await - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_matrix_cli_messenger_creation() { - let messenger = MatrixCliMessenger::with_password( - "test".to_string(), - "https://matrix.org".to_string(), - "@test:matrix.org".to_string(), - "password".to_string(), - ); - assert_eq!(messenger.name(), "test"); - assert_eq!(messenger.messenger_type(), "matrix-cli"); - assert!(!messenger.is_connected()); - } - - #[test] - fn test_matrix_cli_messenger_with_token() { - let messenger = MatrixCliMessenger::with_token( - "test".to_string(), - "https://matrix.org".to_string(), - "@test:matrix.org".to_string(), - "syt_token".to_string(), - Some("DEVICEID".to_string()), - ); - assert_eq!(messenger.name(), "test"); - assert_eq!(messenger.messenger_type(), "matrix-cli"); - assert!(!messenger.is_connected()); - } - - #[test] - fn test_homeserver_url_trimming() { - let messenger = MatrixCliMessenger::with_password( - "test".to_string(), - "https://matrix.org/".to_string(), - "@test:matrix.org".to_string(), - "password".to_string(), - ); - assert_eq!(messenger.homeserver_url, "https://matrix.org"); - } - - // Note: Own-message filtering (sender == user_id) is tested implicitly - // via integration tests. The sync() function skips messages where - // event.sender == self.user_id to prevent the bot from replying to itself. -} diff --git a/crates/rustyclaw-core/src/messengers/mod.rs b/crates/rustyclaw-core/src/messengers/mod.rs index a17d01c..e5226e6 100644 --- a/crates/rustyclaw-core/src/messengers/mod.rs +++ b/crates/rustyclaw-core/src/messengers/mod.rs @@ -17,30 +17,10 @@ pub use media::{MediaConfig, MediaType}; pub use streaming::{StreamBuffer, StreamConfig, StreamStrategy}; #[cfg(feature = "matrix")] -pub use chat_system::messengers::MatrixMessenger; +pub use chat_system::messengers::{MatrixMessenger, MatrixDmConfig}; #[cfg(feature = "whatsapp")] pub use chat_system::messengers::WhatsAppMessenger; #[cfg(feature = "signal-cli")] pub use chat_system::messengers::SignalCliMessenger; - -#[cfg(feature = "matrix-cli")] -mod matrix_cli; -#[cfg(feature = "matrix-cli")] -pub use matrix_cli::{MatrixCliMessenger, MatrixDmConfig}; - -#[cfg(feature = "telegram-cli")] -mod telegram_cli; -#[cfg(feature = "telegram-cli")] -pub use telegram_cli::TelegramCliMessenger; - -#[cfg(feature = "discord-cli")] -mod discord_cli; -#[cfg(feature = "discord-cli")] -pub use discord_cli::DiscordCliMessenger; - -#[cfg(feature = "slack-cli")] -mod slack_cli; -#[cfg(feature = "slack-cli")] -pub use slack_cli::SlackCliMessenger; diff --git a/crates/rustyclaw-core/src/messengers/slack.rs b/crates/rustyclaw-core/src/messengers/slack.rs deleted file mode 100644 index d4b8b5d..0000000 --- a/crates/rustyclaw-core/src/messengers/slack.rs +++ /dev/null @@ -1,268 +0,0 @@ -//! Slack messenger using Bot Token and Web API. -//! -//! Uses the Slack Web API (chat.postMessage, conversations.history) with a bot token. -//! Supports channels, DMs, and threaded replies. - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use std::sync::Mutex; - -/// Slack messenger using bot token and Web API. -pub struct SlackMessenger { - name: String, - bot_token: String, - /// App-level token for Socket Mode (optional, for real-time events). - app_token: Option, - /// Default channel to listen on (if not specified per-message). - default_channel: Option, - connected: bool, - http: reqwest::Client, - /// Bot user ID (resolved on initialize). - bot_user_id: Option, - /// Track last read timestamp per channel for polling. - /// Wrapped in a Mutex so receive_messages(&self) can update it. - last_ts: Mutex>, -} - -impl SlackMessenger { - pub fn new(name: String, bot_token: String) -> Self { - Self { - name, - bot_token, - app_token: None, - default_channel: None, - connected: false, - http: reqwest::Client::new(), - bot_user_id: None, - last_ts: Mutex::new(std::collections::HashMap::new()), - } - } - - /// Set the app-level token for Socket Mode. - pub fn with_app_token(mut self, token: String) -> Self { - self.app_token = Some(token); - self - } - - /// Set the default channel to listen on. - pub fn with_default_channel(mut self, channel: String) -> Self { - self.default_channel = Some(channel); - self - } - - fn api_url(method: &str) -> String { - format!("https://slack.com/api/{}", method) - } - - /// Make an authenticated API call. - async fn api_call(&self, method: &str, body: &serde_json::Value) -> Result { - let resp = self - .http - .post(Self::api_url(method)) - .header("Authorization", format!("Bearer {}", self.bot_token)) - .header("Content-Type", "application/json; charset=utf-8") - .json(body) - .send() - .await - .with_context(|| format!("Slack API call to {} failed", method))?; - - let status = resp.status(); - let data: serde_json::Value = resp - .json() - .await - .with_context(|| format!("Failed to parse Slack {} response", method))?; - - if !status.is_success() { - anyhow::bail!("Slack {} returned HTTP {}", method, status); - } - - if data["ok"].as_bool() != Some(true) { - let error = data["error"].as_str().unwrap_or("unknown_error"); - anyhow::bail!("Slack {} error: {}", method, error); - } - - Ok(data) - } - - /// Fetch conversation history for a channel since last known timestamp. - async fn fetch_history(&self, channel: &str) -> Result> { - let mut params = serde_json::json!({ - "channel": channel, - "limit": 20 - }); - - { - let last_ts = self.last_ts.lock().unwrap(); - if let Some(ts) = last_ts.get(channel) { - params["oldest"] = serde_json::json!(ts); - } - } - - let data = self.api_call("conversations.history", ¶ms).await?; - - let messages_arr = data["messages"] - .as_array() - .cloned() - .unwrap_or_default(); - - let mut result = Vec::new(); - - for msg in &messages_arr { - // Skip bot's own messages - if let Some(user) = msg["user"].as_str() { - if Some(user) == self.bot_user_id.as_deref() { - continue; - } - } - - // Skip messages without text - let text = match msg["text"].as_str() { - Some(t) if !t.is_empty() => t, - _ => continue, - }; - - let ts = msg["ts"].as_str().unwrap_or("0").to_string(); - let sender = msg["user"].as_str().unwrap_or("unknown").to_string(); - - result.push(Message { - id: ts.clone(), - sender, - content: text.to_string(), - timestamp: parse_slack_ts(&ts), - channel: Some(channel.to_string()), - reply_to: msg["thread_ts"].as_str().map(|s| s.to_string()), - media: None, - is_direct: false, // TODO: implement DM detection - }); - } - - // Update last seen timestamp - if let Some(newest) = messages_arr - .first() - .and_then(|m| m["ts"].as_str()) - { - let mut last_ts = self.last_ts.lock().unwrap(); - last_ts.insert(channel.to_string(), newest.to_string()); - } - - // Slack returns newest first, reverse for chronological order - result.reverse(); - Ok(result) - } -} - -/// Parse a Slack timestamp (e.g. "1234567890.123456") into epoch seconds. -fn parse_slack_ts(ts: &str) -> i64 { - ts.split('.') - .next() - .and_then(|s| s.parse::().ok()) - .unwrap_or(0) -} - -#[async_trait] -impl Messenger for SlackMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "slack" - } - - async fn initialize(&mut self) -> Result<()> { - // Verify bot token with auth.test - let data = self - .api_call("auth.test", &serde_json::json!({})) - .await - .context("Slack auth.test failed — check your bot token")?; - - self.bot_user_id = data["user_id"].as_str().map(|s| s.to_string()); - self.connected = true; - - tracing::info!( - bot_user = ?self.bot_user_id, - team = ?data["team"].as_str(), - "Slack connected" - ); - - Ok(()) - } - - async fn send_message(&self, channel: &str, content: &str) -> Result { - let data = self - .api_call( - "chat.postMessage", - &serde_json::json!({ - "channel": channel, - "text": content, - "unfurl_links": false, - }), - ) - .await?; - - Ok(data["ts"].as_str().unwrap_or("unknown").to_string()) - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - let mut payload = serde_json::json!({ - "channel": opts.recipient, - "text": opts.content, - "unfurl_links": false, - }); - - // Thread reply - if let Some(thread_ts) = opts.reply_to { - payload["thread_ts"] = serde_json::json!(thread_ts); - } - - let data = self.api_call("chat.postMessage", &payload).await?; - Ok(data["ts"].as_str().unwrap_or("unknown").to_string()) - } - - async fn receive_messages(&self) -> Result> { - let channel = match &self.default_channel { - Some(ch) => ch.clone(), - None => return Ok(Vec::new()), - }; - self.fetch_history(&channel).await - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_slack_messenger_creation() { - let messenger = SlackMessenger::new("test".to_string(), "xoxb-fake".to_string()); - assert_eq!(messenger.name(), "test"); - assert_eq!(messenger.messenger_type(), "slack"); - assert!(!messenger.is_connected()); - } - - #[test] - fn test_parse_slack_ts() { - assert_eq!(parse_slack_ts("1234567890.123456"), 1234567890); - assert_eq!(parse_slack_ts("0"), 0); - assert_eq!(parse_slack_ts(""), 0); - } - - #[test] - fn test_with_options() { - let messenger = SlackMessenger::new("test".to_string(), "xoxb-fake".to_string()) - .with_app_token("xapp-fake".to_string()) - .with_default_channel("C12345".to_string()); - assert_eq!(messenger.app_token, Some("xapp-fake".to_string())); - assert_eq!(messenger.default_channel, Some("C12345".to_string())); - } -} diff --git a/crates/rustyclaw-core/src/messengers/slack_cli.rs b/crates/rustyclaw-core/src/messengers/slack_cli.rs deleted file mode 100644 index fef5739..0000000 --- a/crates/rustyclaw-core/src/messengers/slack_cli.rs +++ /dev/null @@ -1,293 +0,0 @@ -//! Slack messenger using Web API. -//! -//! Uses Slack's Web API at https://slack.com/api/ -//! Simple REST-based implementation for sending and receiving messages. -//! -//! This requires the `slack-cli` feature to be enabled. - -use super::{Message, Messenger}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use reqwest::Client; -use serde::Deserialize; -use std::sync::Arc; -use tokio::sync::Mutex; - -const SLACK_API_BASE: &str = "https://slack.com/api"; - -/// Slack API response wrapper -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct SlackResponse { - ok: bool, - #[serde(flatten)] - data: Option, - error: Option, -} - -/// Slack auth.test response -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct AuthTestResponse { - user_id: String, - user: String, - team_id: String, - team: String, -} - -/// Slack message object -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct SlackMessage { - ts: String, - user: Option, - text: String, - channel: Option, -} - -/// Slack conversations.history response -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct HistoryResponse { - messages: Vec, - has_more: bool, -} - -/// Slack chat.postMessage response -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct PostMessageResponse { - ts: String, - channel: String, -} - -/// Slack messenger implementation -pub struct SlackCliMessenger { - name: String, - token: String, - client: Client, - connected: Arc>, - /// Map of channel ID -> last seen message timestamp - last_timestamps: Arc>>, - /// Channels to watch for incoming messages - watch_channels: Vec, -} - -impl SlackCliMessenger { - /// Create a new Slack messenger with bot token - pub fn new(name: String, token: String) -> Self { - Self { - name, - token, - client: Client::new(), - connected: Arc::new(Mutex::new(false)), - last_timestamps: Arc::new(Mutex::new(std::collections::HashMap::new())), - watch_channels: Vec::new(), - } - } - - /// Add a channel to watch for incoming messages - pub fn watch_channel(mut self, channel_id: String) -> Self { - self.watch_channels.push(channel_id); - self - } - - /// Get authorization header - fn auth_header(&self) -> String { - format!("Bearer {}", self.token) - } - - /// Test authentication - async fn auth_test(&self) -> Result { - let url = format!("{}/auth.test", SLACK_API_BASE); - let response = self - .client - .post(&url) - .header("Authorization", self.auth_header()) - .send() - .await?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Slack API error: {} - {}", status, error_text); - } - - let data: SlackResponse = response.json().await?; - if !data.ok { - anyhow::bail!("Slack auth failed: {}", data.error.unwrap_or_default()); - } - - data.data - .ok_or_else(|| anyhow::anyhow!("No auth data in response")) - } - - /// Send a message to a channel - async fn post_message(&self, channel: &str, text: &str) -> Result { - let url = format!("{}/chat.postMessage", SLACK_API_BASE); - - let body = serde_json::json!({ - "channel": channel, - "text": text - }); - - let response = self - .client - .post(&url) - .header("Authorization", self.auth_header()) - .header("Content-Type", "application/json") - .json(&body) - .send() - .await - .context("Failed to send Slack message")?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to send message: {} - {}", status, error_text); - } - - let data: SlackResponse = response.json().await?; - if !data.ok { - anyhow::bail!("Slack post failed: {}", data.error.unwrap_or_default()); - } - - Ok(data.data.map(|d| d.ts).unwrap_or_default()) - } - - /// Get conversation history - async fn get_history(&self, channel: &str, limit: u32) -> Result> { - let last_ts = self.last_timestamps.lock().await; - let oldest = last_ts.get(channel).cloned(); - drop(last_ts); - - let mut url = format!( - "{}/conversations.history?channel={}&limit={}", - SLACK_API_BASE, channel, limit - ); - - if let Some(ts) = oldest { - url.push_str(&format!("&oldest={}", ts)); - } - - let response = self - .client - .get(&url) - .header("Authorization", self.auth_header()) - .send() - .await?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to get history: {} - {}", status, error_text); - } - - let data: SlackResponse = response.json().await?; - if !data.ok { - anyhow::bail!("Slack history failed: {}", data.error.unwrap_or_default()); - } - - let messages = data.data.map(|h| h.messages).unwrap_or_default(); - - // Update last seen timestamp - if let Some(latest) = messages.first() { - self.last_timestamps - .lock() - .await - .insert(channel.to_string(), latest.ts.clone()); - } - - Ok(messages) - } -} - -impl std::fmt::Debug for SlackCliMessenger { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SlackCliMessenger") - .field("name", &self.name) - .field("watch_channels", &self.watch_channels) - .finish() - } -} - -#[async_trait] -impl Messenger for SlackCliMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "slack" - } - - async fn initialize(&mut self) -> Result<()> { - let auth = self - .auth_test() - .await - .context("Failed to verify Slack token")?; - *self.connected.lock().await = true; - tracing::info!("Slack connected as {} in team {}", auth.user, auth.team); - Ok(()) - } - - async fn send_message(&self, recipient: &str, content: &str) -> Result { - self.post_message(recipient, content).await - } - - async fn receive_messages(&self) -> Result> { - let mut all_messages = Vec::new(); - - for channel in &self.watch_channels.clone() { - match self.get_history(channel, 100).await { - Ok(messages) => { - for msg in messages { - all_messages.push(Message { - id: msg.ts.clone(), - sender: msg.user.unwrap_or_else(|| "unknown".to_string()), - content: msg.text, - timestamp: parse_slack_ts(&msg.ts), - channel: Some(channel.clone()), - reply_to: None, - thread_id: None, - media: None, - is_direct: false, // Slack DM detection would need channel type check - message_type: Default::default(), - edited_timestamp: None, - reactions: None, - }); - } - } - Err(e) => { - tracing::warn!("Failed to get history for {}: {}", channel, e); - } - } - } - - Ok(all_messages) - } - - fn is_connected(&self) -> bool { - self.connected.try_lock().map(|g| *g).unwrap_or(false) - } - - async fn disconnect(&mut self) -> Result<()> { - *self.connected.lock().await = false; - Ok(()) - } - - async fn set_typing(&self, _channel: &str, _typing: bool) -> Result<()> { - // Slack doesn't have a typing indicator API for bots - // The users.setPresence endpoint only sets away/auto status - // Just no-op for now - Ok(()) - } -} - -/// Parse Slack timestamp (e.g., "1234567890.123456") to Unix timestamp -fn parse_slack_ts(ts: &str) -> i64 { - ts.split('.') - .next() - .and_then(|s| s.parse().ok()) - .unwrap_or(0) -} diff --git a/crates/rustyclaw-core/src/messengers/teams.rs b/crates/rustyclaw-core/src/messengers/teams.rs deleted file mode 100644 index 8a43ab4..0000000 --- a/crates/rustyclaw-core/src/messengers/teams.rs +++ /dev/null @@ -1,144 +0,0 @@ -//! Microsoft Teams messenger using incoming webhooks. -//! -//! Uses Teams incoming webhook connectors for sending messages. -//! For bidirectional messaging, a Teams Bot Framework registration is required. - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; - -/// Microsoft Teams messenger using webhooks / Bot Framework. -pub struct TeamsMessenger { - name: String, - /// Incoming webhook URL for sending messages. - webhook_url: Option, - /// Bot Framework app ID (for bidirectional messaging). - app_id: Option, - /// Bot Framework app password. - app_password: Option, - connected: bool, - http: reqwest::Client, -} - -impl TeamsMessenger { - pub fn new(name: String) -> Self { - Self { - name, - webhook_url: None, - app_id: None, - app_password: None, - connected: false, - http: reqwest::Client::new(), - } - } - - /// Set the incoming webhook URL. - pub fn with_webhook_url(mut self, url: String) -> Self { - self.webhook_url = Some(url); - self - } - - /// Set Bot Framework credentials. - pub fn with_bot_framework(mut self, app_id: String, app_password: String) -> Self { - self.app_id = Some(app_id); - self.app_password = Some(app_password); - self - } -} - -#[async_trait] -impl Messenger for TeamsMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "teams" - } - - async fn initialize(&mut self) -> Result<()> { - if self.webhook_url.is_none() && self.app_id.is_none() { - anyhow::bail!( - "Teams requires either 'webhook_url' (incoming webhook) or \ - 'app_id'+'app_password' (Bot Framework)" - ); - } - self.connected = true; - tracing::info!( - has_webhook = self.webhook_url.is_some(), - has_bot_framework = self.app_id.is_some(), - "Teams initialized" - ); - Ok(()) - } - - async fn send_message(&self, _channel: &str, content: &str) -> Result { - if let Some(ref webhook_url) = self.webhook_url { - // Teams Adaptive Card or simple text - let payload = serde_json::json!({ - "@type": "MessageCard", - "@context": "http://schema.org/extensions", - "text": content - }); - - let resp = self - .http - .post(webhook_url) - .json(&payload) - .send() - .await - .context("Teams webhook POST failed")?; - - if resp.status().is_success() { - return Ok(format!("teams-{}", chrono::Utc::now().timestamp_millis())); - } - anyhow::bail!("Teams webhook returned {}", resp.status()); - } - - // Bot Framework send requires conversation reference — not yet implemented - anyhow::bail!( - "Teams Bot Framework send not yet implemented. Use webhook_url or the claw-me-maybe skill." - ) - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - self.send_message(opts.recipient, opts.content).await - } - - async fn receive_messages(&self) -> Result> { - // Receiving requires Bot Framework with an HTTP endpoint - // or Graph API subscription. Return empty for now. - Ok(Vec::new()) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_teams_creation() { - let m = TeamsMessenger::new("test".to_string()) - .with_webhook_url("https://outlook.office.com/webhook/xxx".to_string()); - assert_eq!(m.name(), "test"); - assert_eq!(m.messenger_type(), "teams"); - assert!(!m.is_connected()); - } - - #[test] - fn test_with_bot_framework() { - let m = TeamsMessenger::new("test".to_string()) - .with_bot_framework("app-id".to_string(), "app-pass".to_string()); - assert_eq!(m.app_id, Some("app-id".to_string())); - assert_eq!(m.app_password, Some("app-pass".to_string())); - } -} diff --git a/crates/rustyclaw-core/src/messengers/telegram.rs b/crates/rustyclaw-core/src/messengers/telegram.rs deleted file mode 100644 index 98882df..0000000 --- a/crates/rustyclaw-core/src/messengers/telegram.rs +++ /dev/null @@ -1,173 +0,0 @@ -//! Telegram messenger using Bot API. - -use super::{Message, Messenger, SendOptions}; -use anyhow::Result; -use async_trait::async_trait; - -/// Telegram messenger using bot API -pub struct TelegramMessenger { - name: String, - bot_token: String, - connected: bool, - http: reqwest::Client, - last_update_id: i64, -} - -impl TelegramMessenger { - pub fn new(name: String, bot_token: String) -> Self { - Self { - name, - bot_token, - connected: false, - http: reqwest::Client::new(), - last_update_id: 0, - } - } - - fn api_url(&self, method: &str) -> String { - format!("https://api.telegram.org/bot{}/{}", self.bot_token, method) - } -} - -#[async_trait] -impl Messenger for TelegramMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "telegram" - } - - async fn initialize(&mut self) -> Result<()> { - // Verify bot token with getMe - let resp = self.http.get(self.api_url("getMe")).send().await?; - - if resp.status().is_success() { - let data: serde_json::Value = resp.json().await?; - if data["ok"].as_bool() == Some(true) { - self.connected = true; - return Ok(()); - } - } - anyhow::bail!("Telegram auth failed") - } - - async fn send_message(&self, chat_id: &str, content: &str) -> Result { - let resp = self - .http - .post(self.api_url("sendMessage")) - .json(&serde_json::json!({ - "chat_id": chat_id, - "text": content, - "parse_mode": "Markdown" - })) - .send() - .await?; - - if resp.status().is_success() { - let data: serde_json::Value = resp.json().await?; - if data["ok"].as_bool() == Some(true) { - return Ok(data["result"]["message_id"].to_string()); - } - } - anyhow::bail!("Telegram send failed") - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - let mut payload = serde_json::json!({ - "chat_id": opts.recipient, - "text": opts.content, - "parse_mode": "Markdown" - }); - - if opts.silent { - payload["disable_notification"] = serde_json::json!(true); - } - - if let Some(reply_to) = opts.reply_to { - if let Ok(msg_id) = reply_to.parse::() { - payload["reply_to_message_id"] = serde_json::json!(msg_id); - } - } - - let resp = self - .http - .post(self.api_url("sendMessage")) - .json(&payload) - .send() - .await?; - - if resp.status().is_success() { - let data: serde_json::Value = resp.json().await?; - if data["ok"].as_bool() == Some(true) { - return Ok(data["result"]["message_id"].to_string()); - } - } - anyhow::bail!("Telegram send failed") - } - - async fn receive_messages(&self) -> Result> { - let resp = self - .http - .post(self.api_url("getUpdates")) - .json(&serde_json::json!({ - "offset": self.last_update_id + 1, - "timeout": 0, - "allowed_updates": ["message"] - })) - .send() - .await?; - - if !resp.status().is_success() { - return Ok(Vec::new()); - } - - let data: serde_json::Value = resp.json().await?; - if data["ok"].as_bool() != Some(true) { - return Ok(Vec::new()); - } - - let updates = data["result"].as_array(); - let Some(updates) = updates else { - return Ok(Vec::new()); - }; - - let mut messages = Vec::new(); - for update in updates { - let _update_id = update["update_id"].as_i64().unwrap_or(0); - - if let Some(msg) = update.get("message") { - let id = msg["message_id"].to_string(); - let sender = msg["from"]["id"].to_string(); - let content = msg["text"].as_str().unwrap_or("").to_string(); - let timestamp = msg["date"].as_i64().unwrap_or(0); - let channel = msg["chat"]["id"].to_string(); - - messages.push(Message { - id, - sender, - content, - timestamp, - channel: Some(channel), - reply_to: msg["reply_to_message"]["message_id"] - .as_i64() - .map(|id| id.to_string()), - media: None, - is_direct: false, // TODO: implement DM detection - }); - } - } - - Ok(messages) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - Ok(()) - } -} diff --git a/crates/rustyclaw-core/src/messengers/telegram_cli.rs b/crates/rustyclaw-core/src/messengers/telegram_cli.rs deleted file mode 100644 index 07fbba5..0000000 --- a/crates/rustyclaw-core/src/messengers/telegram_cli.rs +++ /dev/null @@ -1,271 +0,0 @@ -//! Telegram messenger using Bot HTTP API. -//! -//! Uses the Telegram Bot API at https://api.telegram.org/bot/ -//! Simple REST-based implementation with no external dependencies. -//! -//! This requires the `telegram-cli` feature to be enabled. - -use super::{Message, Messenger}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use reqwest::Client; -use serde::Deserialize; -use serde_json::Value; - -use std::sync::Arc; -use tokio::sync::Mutex; - -/// Telegram API response wrapper -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct TelegramResponse { - ok: bool, - result: Option, - description: Option, -} - -/// Telegram Update object -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct Update { - update_id: i64, - message: Option, -} - -/// Telegram Message object -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct TelegramMessage { - message_id: i64, - from: Option, - chat: TelegramChat, - date: i64, - text: Option, -} - -/// Telegram User object -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct TelegramUser { - id: i64, - first_name: String, - last_name: Option, - username: Option, -} - -/// Telegram Chat object -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct TelegramChat { - id: i64, - #[serde(rename = "type")] - chat_type: String, - title: Option, - username: Option, -} - -/// Telegram messenger implementation -pub struct TelegramCliMessenger { - name: String, - _token: String, - client: Client, - connected: Arc>, - last_update_id: Arc>>, - base_url: String, -} - -impl TelegramCliMessenger { - /// Create a new Telegram messenger with bot token - pub fn new(name: String, token: String) -> Self { - let base_url = format!("https://api.telegram.org/bot{}", token); - Self { - name, - _token: token, - client: Client::new(), - connected: Arc::new(Mutex::new(false)), - last_update_id: Arc::new(Mutex::new(None)), - base_url, - } - } - - /// Get bot info to verify token - async fn get_me(&self) -> Result { - let url = format!("{}/getMe", self.base_url); - let response = self.client.get(&url).send().await?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Telegram API error: {} - {}", status, error_text); - } - - let data: TelegramResponse = response.json().await?; - data.result - .ok_or_else(|| anyhow::anyhow!("No result in response")) - } - - /// Send a message to a chat - async fn send_message_internal(&self, chat_id: &str, text: &str) -> Result { - let url = format!("{}/sendMessage", self.base_url); - - let body = serde_json::json!({ - "chat_id": chat_id, - "text": text, - "parse_mode": "HTML" - }); - - let response = self - .client - .post(&url) - .json(&body) - .send() - .await - .context("Failed to send Telegram message")?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to send message: {} - {}", status, error_text); - } - - let data: TelegramResponse = response.json().await?; - Ok(data.result.map(|m| m.message_id).unwrap_or(0)) - } - - /// Get updates (new messages) - async fn get_updates(&self, timeout: u64) -> Result> { - let last_id = *self.last_update_id.lock().await; - let mut url = format!("{}/getUpdates?timeout={}", self.base_url, timeout); - - if let Some(offset) = last_id { - url.push_str(&format!("&offset={}", offset + 1)); - } - - let response = self.client.get(&url).send().await?; - - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - anyhow::bail!("Failed to get updates: {} - {}", status, error_text); - } - - let data: TelegramResponse> = response.json().await?; - let updates = data.result.unwrap_or_default(); - - // Update offset - if let Some(last) = updates.last() { - *self.last_update_id.lock().await = Some(last.update_id); - } - - Ok(updates) - } -} - -impl std::fmt::Debug for TelegramCliMessenger { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TelegramCliMessenger") - .field("name", &self.name) - .field("connected", &self.connected) - .finish() - } -} - -#[async_trait] -impl Messenger for TelegramCliMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "telegram" - } - - async fn initialize(&mut self) -> Result<()> { - // Verify token by calling getMe - let _bot_info = self - .get_me() - .await - .context("Failed to verify Telegram bot token")?; - *self.connected.lock().await = true; - tracing::info!("Telegram bot connected successfully"); - Ok(()) - } - - async fn send_message(&self, recipient: &str, content: &str) -> Result { - let message_id = self.send_message_internal(recipient, content).await?; - Ok(message_id.to_string()) - } - - async fn receive_messages(&self) -> Result> { - let updates = self.get_updates(30).await?; - - let messages: Vec = updates - .into_iter() - .filter_map(|update| { - update.message.map(|msg| { - let sender = msg - .from - .map(|u| u.username.unwrap_or(u.first_name)) - .unwrap_or_else(|| "unknown".to_string()); - - Message { - id: msg.message_id.to_string(), - sender, - content: msg.text.unwrap_or_default(), - timestamp: msg.date, - channel: Some(msg.chat.id.to_string()), - reply_to: None, - thread_id: None, - media: None, - is_direct: false, // TODO: implement DM detection - message_type: Default::default(), - edited_timestamp: None, - reactions: None, - } - }) - }) - .collect(); - - Ok(messages) - } - - fn is_connected(&self) -> bool { - // Use try_lock to avoid blocking in sync context - self.connected.try_lock().map(|g| *g).unwrap_or(false) - } - - async fn disconnect(&mut self) -> Result<()> { - *self.connected.lock().await = false; - Ok(()) - } - - async fn set_typing(&self, channel: &str, typing: bool) -> Result<()> { - if !typing { - return Ok(()); // Telegram typing auto-expires, no need to clear - } - - let url = format!("{}/sendChatAction", self.base_url); - let body = serde_json::json!({ - "chat_id": channel, - "action": "typing" - }); - - let response = self - .client - .post(&url) - .json(&body) - .send() - .await - .context("Failed to send typing indicator")?; - - if !response.status().is_success() { - // Non-fatal, just log - eprintln!( - "Failed to send Telegram typing indicator: {}", - response.status() - ); - } - - Ok(()) - } -} diff --git a/crates/rustyclaw-core/src/messengers/webhook.rs b/crates/rustyclaw-core/src/messengers/webhook.rs deleted file mode 100644 index 724ab39..0000000 --- a/crates/rustyclaw-core/src/messengers/webhook.rs +++ /dev/null @@ -1,105 +0,0 @@ -//! Webhook messenger - POST messages to a URL. - -use super::{Message, Messenger, SendOptions}; -use anyhow::Result; -use async_trait::async_trait; -use serde::Serialize; - -/// Simple webhook messenger that POSTs messages to a URL -pub struct WebhookMessenger { - name: String, - webhook_url: String, - connected: bool, - http: reqwest::Client, -} - -impl WebhookMessenger { - pub fn new(name: String, webhook_url: String) -> Self { - Self { - name, - webhook_url, - connected: false, - http: reqwest::Client::new(), - } - } -} - -#[derive(Serialize)] -struct WebhookPayload<'a> { - content: &'a str, - recipient: &'a str, - #[serde(skip_serializing_if = "Option::is_none")] - reply_to: Option<&'a str>, -} - -#[async_trait] -impl Messenger for WebhookMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "webhook" - } - - async fn initialize(&mut self) -> Result<()> { - self.connected = true; - Ok(()) - } - - async fn send_message(&self, recipient: &str, content: &str) -> Result { - let payload = WebhookPayload { - content, - recipient, - reply_to: None, - }; - - let resp = self - .http - .post(&self.webhook_url) - .json(&payload) - .send() - .await?; - - if resp.status().is_success() { - Ok(format!("webhook-{}", chrono::Utc::now().timestamp_millis())) - } else { - anyhow::bail!("Webhook returned {}", resp.status()) - } - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - let payload = WebhookPayload { - content: opts.content, - recipient: opts.recipient, - reply_to: opts.reply_to, - }; - - let resp = self - .http - .post(&self.webhook_url) - .json(&payload) - .send() - .await?; - - if resp.status().is_success() { - Ok(format!("webhook-{}", chrono::Utc::now().timestamp_millis())) - } else { - anyhow::bail!("Webhook returned {}", resp.status()) - } - } - - async fn receive_messages(&self) -> Result> { - // Webhooks are typically outbound-only - Ok(Vec::new()) - } - - fn is_connected(&self) -> bool { - self.connected - } - - async fn disconnect(&mut self) -> Result<()> { - self.connected = false; - Ok(()) - } -} diff --git a/crates/rustyclaw-core/src/messengers/whatsapp.rs b/crates/rustyclaw-core/src/messengers/whatsapp.rs deleted file mode 100644 index 75e6315..0000000 --- a/crates/rustyclaw-core/src/messengers/whatsapp.rs +++ /dev/null @@ -1,304 +0,0 @@ -//! WhatsApp messenger using wa-rs library. -//! -//! This requires the `whatsapp` feature to be enabled. -//! -//! WhatsApp requires QR code linking before use. -//! Use `WhatsAppMessenger::link_device()` to generate a QR code for linking. - -use super::{Message, Messenger, SendOptions}; -use anyhow::{Context, Result}; -use async_trait::async_trait; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; -use wa_rs::Jid; -use wa_rs::bot::Bot; -use wa_rs::client::Client; -use wa_rs::traits::DeviceStore; -use wa_rs::types::events::Event; -use wa_rs::wa_rs_proto::whatsapp as wa; -use wa_rs_sqlite_storage::SqliteStore; -use wa_rs_tokio_transport::TokioWebSocketTransportFactory; -use wa_rs_ureq_http::UreqHttpClient; - -/// WhatsApp messenger using wa-rs (Baileys-compatible) -pub struct WhatsAppMessenger { - name: String, - db_path: PathBuf, - client: Option>, - /// Handle for the background bot task - _bot_handle: Option>, - connected: bool, - /// Pending incoming messages - pending_messages: Arc>>, -} - -impl WhatsAppMessenger { - /// Create a new WhatsApp messenger - /// - /// The db_path should be a file path for the SQLite database. - /// If the database already contains session data, it will be used. - /// Otherwise, you must call `link_device()` before use. - pub fn new(name: String, db_path: PathBuf) -> Self { - Self { - name, - db_path, - client: None, - _bot_handle: None, - connected: false, - pending_messages: Arc::new(Mutex::new(Vec::new())), - } - } - - /// Create SQLite backend - async fn create_backend(db_url: &str) -> Result> { - SqliteStore::new(db_url) - .await - .map(Arc::new) - .map_err(|e| anyhow::anyhow!("Failed to create WhatsApp backend: {}", e)) - } - - /// Build a bot with our standard configuration - async fn build_bot( - backend: Arc, - pending: Arc>>, - qr_tx: Option>, - ) -> Result { - let builder = Bot::builder() - .with_backend(backend) - .with_transport_factory(TokioWebSocketTransportFactory::new()) - .with_http_client(UreqHttpClient::new()) - .on_event(move |event, _client| { - let pending = pending.clone(); - let qr_tx = qr_tx.clone(); - async move { - match &event { - Event::PairingQrCode { code, .. } => { - if let Some(tx) = qr_tx { - let _ = tx.send(code.clone()); - } - } - Event::Message(msg, info) => { - let content = extract_text_content(msg); - if content.is_empty() { - return; - } - - let message = Message { - id: info.id.clone(), - sender: info.source.sender.to_string(), - content, - timestamp: info.timestamp.timestamp(), - channel: Some(info.source.chat.to_string()), - reply_to: None, - media: None, - is_direct: false, // TODO: implement DM detection - }; - - pending.lock().await.push(message); - } - Event::Connected(_) => { - tracing::info!("WhatsApp connected"); - } - Event::Disconnected(reason) => { - tracing::warn!("WhatsApp disconnected: {:?}", reason); - } - _ => {} - } - } - }); - - builder - .build() - .await - .context("Failed to build WhatsApp bot") - } - - /// Link device by scanning a QR code from WhatsApp mobile. - /// - /// Returns a receiver that will receive QR code strings to display. - /// Once scanning is complete, the session will be stored. - pub async fn link_device(&mut self) -> Result> { - let pending = self.pending_messages.clone(); - let db_url = self.db_path.to_string_lossy().to_string(); - let backend = Self::create_backend(&db_url).await?; - - let (qr_tx, qr_rx) = tokio::sync::mpsc::unbounded_channel(); - let mut bot = Self::build_bot(backend, pending, Some(qr_tx)).await?; - - let client = bot.client(); - let handle = bot.run().await.context("Failed to start WhatsApp")?; - - self.client = Some(client); - self._bot_handle = Some(handle); - self.connected = true; - - Ok(qr_rx) - } - - /// Check if the device is linked - pub fn is_linked(&self) -> bool { - self.client.is_some() - } - - /// Get the client (must be linked first) - fn client(&self) -> Result<&Arc> { - self.client - .as_ref() - .context("WhatsApp not linked - call link_device() first") - } - - /// Parse a recipient JID from string - /// - /// Formats: - /// - Phone number: "15551234567" or "+15551234567" - /// - User JID: "15551234567@s.whatsapp.net" - /// - Group JID: "123456789@g.us" - fn parse_jid(recipient: &str) -> Result { - use std::str::FromStr; - - // If already looks like a JID - if recipient.contains('@') { - return Jid::from_str(recipient) - .map_err(|e| anyhow::anyhow!("Invalid JID '{}': {}", recipient, e)); - } - - // Strip + prefix if present - let number = recipient.trim_start_matches('+'); - - // Create user JID (pn = phone number) - Ok(Jid::pn(number)) - } -} - -/// Extract text content from a WhatsApp protobuf Message -fn extract_text_content(msg: &wa::Message) -> String { - // Check conversation field (simple text message) - if let Some(ref text) = msg.conversation { - return text.clone(); - } - // Check extendedTextMessage field - if let Some(ref ext) = msg.extended_text_message { - if let Some(ref text) = ext.text { - return text.clone(); - } - } - String::new() -} - -/// Build a simple text message -fn build_text_message(text: &str) -> wa::Message { - wa::Message { - conversation: Some(text.to_string()), - ..Default::default() - } -} - -#[async_trait] -impl Messenger for WhatsAppMessenger { - fn name(&self) -> &str { - &self.name - } - - fn messenger_type(&self) -> &str { - "whatsapp" - } - - async fn initialize(&mut self) -> Result<()> { - // Check if we have an existing session - if !self.db_path.exists() { - self.connected = false; - return Ok(()); - } - - let pending = self.pending_messages.clone(); - let db_url = self.db_path.to_string_lossy().to_string(); - let backend = Self::create_backend(&db_url).await?; - - // Check if we have valid session data - if !backend.exists().await.unwrap_or(false) { - self.connected = false; - return Ok(()); - } - - let mut bot = Self::build_bot(backend, pending, None).await?; - - let client = bot.client(); - let handle = bot.run().await.context("Failed to start WhatsApp")?; - - self.client = Some(client); - self._bot_handle = Some(handle); - self.connected = true; - - Ok(()) - } - - async fn send_message(&self, recipient: &str, content: &str) -> Result { - let client = self.client()?; - let jid = Self::parse_jid(recipient)?; - - let message = build_text_message(content); - let message_id = client - .send_message(jid, message) - .await - .context("Failed to send WhatsApp message")?; - - Ok(message_id) - } - - async fn send_message_with_options(&self, opts: SendOptions<'_>) -> Result { - // For now, just send plain text - // TODO: Add reply_to and media support - self.send_message(opts.recipient, opts.content).await - } - - async fn receive_messages(&self) -> Result> { - let mut pending = self.pending_messages.lock().await; - let messages = std::mem::take(&mut *pending); - Ok(messages) - } - - fn is_connected(&self) -> bool { - self.connected && self.client.is_some() - } - - async fn disconnect(&mut self) -> Result<()> { - if let Some(client) = self.client.take() { - client.disconnect().await; - } - self._bot_handle = None; - self.connected = false; - Ok(()) - } - - async fn set_typing(&self, _channel: &str, _typing: bool) -> Result<()> { - // wa-rs doesn't expose presence/typing APIs yet - // TODO: Add when wa-rs supports it - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_whatsapp_messenger_creation() { - let messenger = - WhatsAppMessenger::new("test".to_string(), PathBuf::from("/tmp/whatsapp-test.db")); - assert_eq!(messenger.name(), "test"); - assert_eq!(messenger.messenger_type(), "whatsapp"); - assert!(!messenger.is_connected()); - assert!(!messenger.is_linked()); - } - - #[test] - fn test_parse_jid() { - let jid = WhatsAppMessenger::parse_jid("15551234567").unwrap(); - assert!(jid.to_string().contains("15551234567")); - - let jid = WhatsAppMessenger::parse_jid("+15551234567").unwrap(); - assert!(jid.to_string().contains("15551234567")); - } -}