|
| 1 | +//! Dynamic auto-scaling MCP connection pool |
| 2 | +//! |
| 3 | +//! Provides parallel MCP access by maintaining a pool of McpService instances |
| 4 | +//! that scales up under load and scales down after idle timeout. |
| 5 | +
|
| 6 | +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
| 7 | +use std::sync::Arc; |
| 8 | +use std::time::{Duration, Instant}; |
| 9 | +use tokio::sync::{Mutex, Semaphore}; |
| 10 | +use anyhow::Result; |
| 11 | +use serde_json::Value; |
| 12 | + |
| 13 | +use crate::services::mcp_service::McpService; |
| 14 | + |
| 15 | +/// Configuration for the MCP pool |
| 16 | +#[derive(Clone)] |
| 17 | +pub struct McpPoolConfig { |
| 18 | + /// Minimum instances to keep alive (default: 1) |
| 19 | + pub min_instances: usize, |
| 20 | + /// Maximum instances allowed (default: 8) |
| 21 | + pub max_instances: usize, |
| 22 | + /// Idle timeout before scaling down (default: 30s) |
| 23 | + pub idle_timeout: Duration, |
| 24 | + /// Enable debug logging |
| 25 | + pub debug: bool, |
| 26 | +} |
| 27 | + |
| 28 | +impl Default for McpPoolConfig { |
| 29 | + fn default() -> Self { |
| 30 | + Self { |
| 31 | + min_instances: 1, |
| 32 | + max_instances: 8, |
| 33 | + idle_timeout: Duration::from_secs(30), |
| 34 | + debug: false, |
| 35 | + } |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +/// A pooled MCP instance with usage tracking |
| 40 | +struct PooledInstance { |
| 41 | + /// The actual MCP service |
| 42 | + service: Arc<Mutex<McpService>>, |
| 43 | + /// Whether this instance is currently in use |
| 44 | + busy: AtomicBool, |
| 45 | + /// Last time this instance was used |
| 46 | + last_used: Mutex<Instant>, |
| 47 | + /// Instance ID for debugging |
| 48 | + id: usize, |
| 49 | +} |
| 50 | + |
| 51 | +impl PooledInstance { |
| 52 | + async fn new(id: usize, debug: bool) -> Result<Self> { |
| 53 | + let mut service = McpService::new_with_debug(debug); |
| 54 | + service.load_config()?; |
| 55 | + |
| 56 | + // Initialize all servers |
| 57 | + let servers: Vec<String> = service.list_servers() |
| 58 | + .iter() |
| 59 | + .map(|(id, _)| (*id).clone()) |
| 60 | + .collect(); |
| 61 | + |
| 62 | + for server_id in servers { |
| 63 | + let _ = service.initialize_server(&server_id).await; |
| 64 | + } |
| 65 | + |
| 66 | + Ok(Self { |
| 67 | + service: Arc::new(Mutex::new(service)), |
| 68 | + busy: AtomicBool::new(false), |
| 69 | + last_used: Mutex::new(Instant::now()), |
| 70 | + id, |
| 71 | + }) |
| 72 | + } |
| 73 | + |
| 74 | + fn is_busy(&self) -> bool { |
| 75 | + self.busy.load(Ordering::SeqCst) |
| 76 | + } |
| 77 | + |
| 78 | + fn try_acquire(&self) -> bool { |
| 79 | + self.busy.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_ok() |
| 80 | + } |
| 81 | + |
| 82 | + fn release(&self) { |
| 83 | + self.busy.store(false, Ordering::SeqCst); |
| 84 | + } |
| 85 | + |
| 86 | + async fn update_last_used(&self) { |
| 87 | + *self.last_used.lock().await = Instant::now(); |
| 88 | + } |
| 89 | + |
| 90 | + async fn idle_duration(&self) -> Duration { |
| 91 | + self.last_used.lock().await.elapsed() |
| 92 | + } |
| 93 | +} |
| 94 | + |
| 95 | +/// RAII guard that releases the instance back to pool on drop |
| 96 | +pub struct McpGuard { |
| 97 | + instance: Arc<PooledInstance>, |
| 98 | + pool: Arc<McpPoolInner>, |
| 99 | +} |
| 100 | + |
| 101 | +impl McpGuard { |
| 102 | + /// Call an MCP tool through this pooled instance |
| 103 | + pub async fn call_tool(&self, tool_name: &str, params: Option<Value>) -> Result<Value> { |
| 104 | + let mut service = self.instance.service.lock().await; |
| 105 | + |
| 106 | + // Find the server that has this tool |
| 107 | + let servers: Vec<String> = service.list_servers() |
| 108 | + .iter() |
| 109 | + .map(|(id, _)| (*id).clone()) |
| 110 | + .collect(); |
| 111 | + |
| 112 | + for server_id in servers { |
| 113 | + if let Ok(tools) = service.list_tools(&server_id).await { |
| 114 | + if tools.iter().any(|t| t.name == tool_name) { |
| 115 | + let result = service.call_tool(&server_id, tool_name, params).await?; |
| 116 | + self.instance.update_last_used().await; |
| 117 | + return Ok(result); |
| 118 | + } |
| 119 | + } |
| 120 | + } |
| 121 | + |
| 122 | + Err(anyhow::anyhow!("Tool '{}' not found in any MCP server", tool_name)) |
| 123 | + } |
| 124 | + |
| 125 | + /// Get the underlying service for direct access (use sparingly) |
| 126 | + pub async fn service(&self) -> tokio::sync::MutexGuard<'_, McpService> { |
| 127 | + self.instance.service.lock().await |
| 128 | + } |
| 129 | +} |
| 130 | + |
| 131 | +impl Drop for McpGuard { |
| 132 | + fn drop(&mut self) { |
| 133 | + self.instance.release(); |
| 134 | + self.pool.active_count.fetch_sub(1, Ordering::SeqCst); |
| 135 | + |
| 136 | + if self.pool.config.debug { |
| 137 | + eprintln!("[McpPool] Released instance #{}", self.instance.id); |
| 138 | + } |
| 139 | + } |
| 140 | +} |
| 141 | + |
| 142 | +/// Inner pool state (shared via Arc) |
| 143 | +struct McpPoolInner { |
| 144 | + instances: Mutex<Vec<Arc<PooledInstance>>>, |
| 145 | + config: McpPoolConfig, |
| 146 | + next_id: AtomicUsize, |
| 147 | + active_count: AtomicUsize, |
| 148 | + /// Semaphore to limit concurrent instance creation |
| 149 | + creation_semaphore: Semaphore, |
| 150 | +} |
| 151 | + |
| 152 | +/// Dynamic auto-scaling MCP connection pool |
| 153 | +pub struct McpPool { |
| 154 | + inner: Arc<McpPoolInner>, |
| 155 | + /// Handle to the cleanup task |
| 156 | + _cleanup_handle: Option<tokio::task::JoinHandle<()>>, |
| 157 | +} |
| 158 | + |
| 159 | +impl McpPool { |
| 160 | + /// Create a new MCP pool with default configuration |
| 161 | + pub async fn new() -> Result<Self> { |
| 162 | + Self::with_config(McpPoolConfig::default()).await |
| 163 | + } |
| 164 | + |
| 165 | + /// Create a new MCP pool with custom configuration |
| 166 | + pub async fn with_config(config: McpPoolConfig) -> Result<Self> { |
| 167 | + let inner = Arc::new(McpPoolInner { |
| 168 | + instances: Mutex::new(Vec::new()), |
| 169 | + config: config.clone(), |
| 170 | + next_id: AtomicUsize::new(0), |
| 171 | + active_count: AtomicUsize::new(0), |
| 172 | + creation_semaphore: Semaphore::new(1), // Only one creation at a time |
| 173 | + }); |
| 174 | + |
| 175 | + // Create minimum instances |
| 176 | + { |
| 177 | + let mut instances = inner.instances.lock().await; |
| 178 | + for _ in 0..config.min_instances { |
| 179 | + let id = inner.next_id.fetch_add(1, Ordering::SeqCst); |
| 180 | + if config.debug { |
| 181 | + eprintln!("[McpPool] Creating initial instance #{}", id); |
| 182 | + } |
| 183 | + let instance = PooledInstance::new(id, config.debug).await?; |
| 184 | + instances.push(Arc::new(instance)); |
| 185 | + } |
| 186 | + } |
| 187 | + |
| 188 | + // Start cleanup task |
| 189 | + let cleanup_inner = Arc::clone(&inner); |
| 190 | + let cleanup_handle = tokio::spawn(async move { |
| 191 | + Self::cleanup_loop(cleanup_inner).await; |
| 192 | + }); |
| 193 | + |
| 194 | + Ok(Self { |
| 195 | + inner, |
| 196 | + _cleanup_handle: Some(cleanup_handle), |
| 197 | + }) |
| 198 | + } |
| 199 | + |
| 200 | + /// Acquire an MCP instance from the pool |
| 201 | + /// |
| 202 | + /// If all instances are busy and we're under max_instances, spawns a new one. |
| 203 | + /// Returns an RAII guard that releases the instance on drop. |
| 204 | + pub async fn acquire(&self) -> Result<McpGuard> { |
| 205 | + // First, try to acquire an existing idle instance |
| 206 | + { |
| 207 | + let instances = self.inner.instances.lock().await; |
| 208 | + for instance in instances.iter() { |
| 209 | + if instance.try_acquire() { |
| 210 | + self.inner.active_count.fetch_add(1, Ordering::SeqCst); |
| 211 | + if self.inner.config.debug { |
| 212 | + eprintln!("[McpPool] Acquired existing instance #{}", instance.id); |
| 213 | + } |
| 214 | + return Ok(McpGuard { |
| 215 | + instance: Arc::clone(instance), |
| 216 | + pool: Arc::clone(&self.inner), |
| 217 | + }); |
| 218 | + } |
| 219 | + } |
| 220 | + } |
| 221 | + |
| 222 | + // All instances busy - try to create a new one |
| 223 | + let current_count = { |
| 224 | + let instances = self.inner.instances.lock().await; |
| 225 | + instances.len() |
| 226 | + }; |
| 227 | + |
| 228 | + if current_count < self.inner.config.max_instances { |
| 229 | + // Acquire creation semaphore to prevent thundering herd |
| 230 | + let _permit = self.inner.creation_semaphore.acquire().await?; |
| 231 | + |
| 232 | + // Double-check after acquiring semaphore |
| 233 | + let mut instances = self.inner.instances.lock().await; |
| 234 | + |
| 235 | + // Maybe another task created one while we waited |
| 236 | + for instance in instances.iter() { |
| 237 | + if instance.try_acquire() { |
| 238 | + self.inner.active_count.fetch_add(1, Ordering::SeqCst); |
| 239 | + if self.inner.config.debug { |
| 240 | + eprintln!("[McpPool] Acquired instance #{} (created by another task)", instance.id); |
| 241 | + } |
| 242 | + return Ok(McpGuard { |
| 243 | + instance: Arc::clone(instance), |
| 244 | + pool: Arc::clone(&self.inner), |
| 245 | + }); |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | + // Still need to create one |
| 250 | + if instances.len() < self.inner.config.max_instances { |
| 251 | + let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); |
| 252 | + if self.inner.config.debug { |
| 253 | + eprintln!("[McpPool] Scaling UP: creating instance #{} (total: {})", |
| 254 | + id, instances.len() + 1); |
| 255 | + } |
| 256 | + |
| 257 | + let instance = Arc::new(PooledInstance::new(id, self.inner.config.debug).await?); |
| 258 | + instance.try_acquire(); // Mark as busy immediately |
| 259 | + instances.push(Arc::clone(&instance)); |
| 260 | + |
| 261 | + self.inner.active_count.fetch_add(1, Ordering::SeqCst); |
| 262 | + |
| 263 | + return Ok(McpGuard { |
| 264 | + instance, |
| 265 | + pool: Arc::clone(&self.inner), |
| 266 | + }); |
| 267 | + } |
| 268 | + } |
| 269 | + |
| 270 | + // At max capacity - wait for one to become available |
| 271 | + if self.inner.config.debug { |
| 272 | + eprintln!("[McpPool] At max capacity ({}), waiting for available instance...", |
| 273 | + self.inner.config.max_instances); |
| 274 | + } |
| 275 | + |
| 276 | + loop { |
| 277 | + tokio::time::sleep(Duration::from_millis(50)).await; |
| 278 | + |
| 279 | + let instances = self.inner.instances.lock().await; |
| 280 | + for instance in instances.iter() { |
| 281 | + if instance.try_acquire() { |
| 282 | + self.inner.active_count.fetch_add(1, Ordering::SeqCst); |
| 283 | + if self.inner.config.debug { |
| 284 | + eprintln!("[McpPool] Acquired instance #{} after waiting", instance.id); |
| 285 | + } |
| 286 | + return Ok(McpGuard { |
| 287 | + instance: Arc::clone(instance), |
| 288 | + pool: Arc::clone(&self.inner), |
| 289 | + }); |
| 290 | + } |
| 291 | + } |
| 292 | + } |
| 293 | + } |
| 294 | + |
| 295 | + /// Get current pool statistics |
| 296 | + pub async fn stats(&self) -> PoolStats { |
| 297 | + let instances = self.inner.instances.lock().await; |
| 298 | + let total = instances.len(); |
| 299 | + let busy = instances.iter().filter(|i| i.is_busy()).count(); |
| 300 | + |
| 301 | + PoolStats { |
| 302 | + total_instances: total, |
| 303 | + busy_instances: busy, |
| 304 | + idle_instances: total - busy, |
| 305 | + max_instances: self.inner.config.max_instances, |
| 306 | + } |
| 307 | + } |
| 308 | + |
| 309 | + /// Background cleanup loop |
| 310 | + async fn cleanup_loop(inner: Arc<McpPoolInner>) { |
| 311 | + let check_interval = Duration::from_secs(5); |
| 312 | + |
| 313 | + loop { |
| 314 | + tokio::time::sleep(check_interval).await; |
| 315 | + |
| 316 | + let mut instances = inner.instances.lock().await; |
| 317 | + let min = inner.config.min_instances; |
| 318 | + let timeout = inner.config.idle_timeout; |
| 319 | + |
| 320 | + // Find idle instances to remove |
| 321 | + let mut to_remove = Vec::new(); |
| 322 | + |
| 323 | + for (idx, instance) in instances.iter().enumerate() { |
| 324 | + // Don't remove if we'd go below minimum |
| 325 | + if instances.len() - to_remove.len() <= min { |
| 326 | + break; |
| 327 | + } |
| 328 | + |
| 329 | + // Don't remove busy instances |
| 330 | + if instance.is_busy() { |
| 331 | + continue; |
| 332 | + } |
| 333 | + |
| 334 | + // Check idle duration |
| 335 | + if instance.idle_duration().await > timeout { |
| 336 | + to_remove.push(idx); |
| 337 | + if inner.config.debug { |
| 338 | + eprintln!("[McpPool] Scaling DOWN: removing idle instance #{}", instance.id); |
| 339 | + } |
| 340 | + } |
| 341 | + } |
| 342 | + |
| 343 | + // Remove in reverse order to preserve indices |
| 344 | + for idx in to_remove.into_iter().rev() { |
| 345 | + instances.remove(idx); |
| 346 | + } |
| 347 | + } |
| 348 | + } |
| 349 | +} |
| 350 | + |
| 351 | +/// Pool statistics |
| 352 | +#[derive(Debug, Clone)] |
| 353 | +pub struct PoolStats { |
| 354 | + pub total_instances: usize, |
| 355 | + pub busy_instances: usize, |
| 356 | + pub idle_instances: usize, |
| 357 | + pub max_instances: usize, |
| 358 | +} |
| 359 | + |
| 360 | +impl std::fmt::Display for PoolStats { |
| 361 | + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 362 | + write!(f, "{}/{} busy, {}/{} max", |
| 363 | + self.busy_instances, self.total_instances, |
| 364 | + self.total_instances, self.max_instances) |
| 365 | + } |
| 366 | +} |
| 367 | + |
| 368 | +#[cfg(test)] |
| 369 | +mod tests { |
| 370 | + use super::*; |
| 371 | + |
| 372 | + #[tokio::test] |
| 373 | + async fn test_pool_config_default() { |
| 374 | + let config = McpPoolConfig::default(); |
| 375 | + assert_eq!(config.min_instances, 1); |
| 376 | + assert_eq!(config.max_instances, 8); |
| 377 | + assert_eq!(config.idle_timeout, Duration::from_secs(30)); |
| 378 | + } |
| 379 | +} |
0 commit comments