Skip to content

Commit 8635994

Browse files
authored
feat: Implement real-time command output streaming (Phase 1 of #68) (#69)
This commit implements the core streaming infrastructure for real-time SSH command output, enabling future interactive UI features while maintaining full backward compatibility. ## Key Changes ### Core Streaming Infrastructure - Add `CommandOutput` enum for stdout/stderr streaming events - Add `CommandOutputBuffer` for internal output collection - Implement `execute_streaming()` method with tokio channel-based architecture - Refactor existing `execute()` to use streaming internally (zero breaking changes) ### Public API Addition - New `connect_and_execute_with_output_streaming()` method in SshClient - Streaming API respects timeout configuration - Channel capacity: 100 events with efficient memory usage (~16KB per command) ### Architecture Documentation - Document streaming design patterns in ARCHITECTURE.md - Include performance characteristics and memory overhead analysis - Document backward compatibility guarantees - Add implementation details for future phases ### Error Handling - Add `JoinError` variant to tokio_client::Error - Graceful handling of task join failures - Silent handling of send errors when receiver is dropped ### Testing - Add comprehensive streaming integration tests (tests/streaming_test.rs) - Test stdout/stderr separation and ordering - Test backward compatibility of refactored execute() - All existing tests pass without modification (100% compatibility) ## Implementation Details The streaming implementation uses a producer-consumer pattern: - Background tokio task collects output chunks via bounded channel - Zero-copy data transfer using russh's CryptoVec - Graceful degradation if receiver drops early - No performance impact on non-streaming usage ## Related Issues - Implements Phase 1 of #68 (Core Streaming Infrastructure) - Foundation for Phase 2 (Multi-node executor integration) - Foundation for Phase 3 (Interactive TUI) - Based on design concepts from PR #37
1 parent 460d468 commit 8635994

6 files changed

Lines changed: 524 additions & 23 deletions

File tree

ARCHITECTURE.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,91 @@ let tasks: Vec<JoinHandle<Result<ExecutionResult>>> = nodes
337337
- Uses system known_hosts file (~/.ssh/known_hosts)
338338
- SSH agent authentication with auto-detection
339339

340+
### 4.0.1 Command Output Streaming Infrastructure
341+
342+
**Status:** Implemented (2025-10-29) as part of Phase 1 of Issue #68
343+
344+
**Design Motivation:**
345+
Real-time command output streaming enables future UI features such as live progress bars, per-node output display, and streaming aggregation. The infrastructure provides the foundation for responsive UIs while maintaining full backward compatibility with existing synchronous APIs.
346+
347+
**Architecture:**
348+
349+
The streaming infrastructure consists of three key components:
350+
351+
1. **CommandOutput Enum** (`tokio_client/channel_manager.rs`)
352+
```rust
353+
pub enum CommandOutput {
354+
StdOut(CryptoVec),
355+
StdErr(CryptoVec),
356+
}
357+
```
358+
- Represents streaming output events
359+
- Separates stdout and stderr streams
360+
- Uses russh's `CryptoVec` for zero-copy efficiency
361+
362+
2. **CommandOutputBuffer** (`tokio_client/channel_manager.rs`)
363+
```rust
364+
pub(crate) struct CommandOutputBuffer {
365+
sender: Sender<CommandOutput>,
366+
receiver_task: JoinHandle<(Vec<u8>, Vec<u8>)>,
367+
}
368+
```
369+
- Internal buffer for collecting streaming output
370+
- Background task aggregates stdout and stderr
371+
- Channel capacity: 100 events (tunable)
372+
- Used by synchronous `execute()` for backward compatibility
373+
374+
3. **Streaming API Methods**
375+
- `Client::execute_streaming(command, sender)` - Low-level streaming API
376+
- `SshClient::connect_and_execute_with_output_streaming()` - High-level streaming API
377+
- Both respect timeout settings and handle errors consistently
378+
379+
**Implementation Pattern:**
380+
381+
```rust
382+
// Streaming execution (new in Phase 1)
383+
let (sender, receiver_task) = build_output_buffer();
384+
let exit_status = client.execute_streaming("command", sender).await?;
385+
let (stdout, stderr) = receiver_task.await?;
386+
387+
// Backward-compatible execution (refactored to use streaming)
388+
let result = client.execute("command").await?;
389+
// Internally uses execute_streaming() + CommandOutputBuffer
390+
```
391+
392+
**Backward Compatibility:**
393+
394+
The existing `execute()` method was refactored to use `execute_streaming()` internally:
395+
- Same function signature
396+
- Same return type (`CommandExecutedResult`)
397+
- Same error handling behavior
398+
- Same timeout behavior
399+
- Zero breaking changes to existing code
400+
401+
**Performance Characteristics:**
402+
- Channel-based architecture with bounded buffer (100 events)
403+
- Zero-copy transfer of SSH channel data via `CryptoVec`
404+
- Background task for output aggregation (non-blocking)
405+
- Memory overhead: ~16KB per streaming command (8KB stdout + 1KB stderr + buffer)
406+
- Latency: Real-time streaming with minimal buffering delay
407+
408+
**Error Handling:**
409+
- New `JoinError` variant in `tokio_client::Error`
410+
- Handles task join failures gracefully
411+
- Timeout handling preserved from original implementation
412+
- Channel send errors handled silently (receiver may be dropped)
413+
414+
**Testing:**
415+
- Integration tests cover streaming with stdout/stderr separation
416+
- Backward compatibility test ensures no behavioral changes
417+
- Tests use localhost SSH for reproducible validation
418+
- All existing tests pass with zero modifications
419+
420+
**Future Phases (Issue #68):**
421+
- Phase 2: Executor integration for parallel streaming
422+
- Phase 3: UI components (progress bars, live updates)
423+
- Phase 4: Advanced features (filtering, aggregation)
424+
340425
### 4.1 Authentication Module (`ssh/auth.rs`)
341426

342427
**Status:** Implemented (2025-10-17) as part of code deduplication refactoring (Issue #34)

src/ssh/client/command.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ use super::config::ConnectionConfig;
1616
use super::core::SshClient;
1717
use super::result::CommandResult;
1818
use crate::ssh::known_hosts::StrictHostKeyChecking;
19+
use crate::ssh::tokio_client::CommandOutput;
1920
use anyhow::{Context, Result};
2021
use std::path::Path;
2122
use std::time::Duration;
23+
use tokio::sync::mpsc::Sender;
2224

2325
// SSH command execution timeout design:
2426
// - 5 minutes (300s) handles long-running commands
@@ -160,4 +162,108 @@ impl SshClient {
160162
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
161163
}
162164
}
165+
166+
/// Execute a command with streaming output support
167+
///
168+
/// This method provides real-time command output streaming through the provided sender channel.
169+
/// Output is sent as `CommandOutput::StdOut` or `CommandOutput::StdErr` variants.
170+
///
171+
/// # Arguments
172+
/// * `command` - The command to execute
173+
/// * `config` - Connection configuration
174+
/// * `output_sender` - Channel sender for streaming output
175+
///
176+
/// # Returns
177+
/// The exit status of the command
178+
pub async fn connect_and_execute_with_output_streaming(
179+
&mut self,
180+
command: &str,
181+
config: &ConnectionConfig<'_>,
182+
output_sender: Sender<CommandOutput>,
183+
) -> Result<u32> {
184+
tracing::debug!("Connecting to {}:{}", self.host, self.port);
185+
186+
// Determine authentication method based on parameters
187+
let auth_method = self
188+
.determine_auth_method(
189+
config.key_path,
190+
config.use_agent,
191+
config.use_password,
192+
#[cfg(target_os = "macos")]
193+
config.use_keychain,
194+
)
195+
.await?;
196+
197+
let strict_mode = config
198+
.strict_mode
199+
.unwrap_or(StrictHostKeyChecking::AcceptNew);
200+
201+
// Create client connection - either direct or through jump hosts
202+
let client = self
203+
.establish_connection(
204+
&auth_method,
205+
strict_mode,
206+
config.jump_hosts_spec,
207+
config.key_path,
208+
config.use_agent,
209+
config.use_password,
210+
)
211+
.await?;
212+
213+
tracing::debug!("Connected and authenticated successfully");
214+
tracing::debug!("Executing command with streaming: {}", command);
215+
216+
// Execute command with streaming and timeout
217+
let exit_status = self
218+
.execute_streaming_with_timeout(&client, command, config.timeout_seconds, output_sender)
219+
.await?;
220+
221+
tracing::debug!("Command execution completed with status: {}", exit_status);
222+
223+
Ok(exit_status)
224+
}
225+
226+
/// Execute a command with streaming output and the specified timeout
227+
async fn execute_streaming_with_timeout(
228+
&self,
229+
client: &crate::ssh::tokio_client::Client,
230+
command: &str,
231+
timeout_seconds: Option<u64>,
232+
output_sender: Sender<CommandOutput>,
233+
) -> Result<u32> {
234+
if let Some(timeout_secs) = timeout_seconds {
235+
if timeout_secs == 0 {
236+
// No timeout (unlimited)
237+
tracing::debug!("Executing command with streaming, no timeout (unlimited)");
238+
client.execute_streaming(command, output_sender)
239+
.await
240+
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
241+
} else {
242+
// With timeout
243+
let command_timeout = Duration::from_secs(timeout_secs);
244+
tracing::debug!(
245+
"Executing command with streaming, timeout of {} seconds",
246+
timeout_secs
247+
);
248+
tokio::time::timeout(
249+
command_timeout,
250+
client.execute_streaming(command, output_sender)
251+
)
252+
.await
253+
.with_context(|| format!("Command execution timeout: The command '{}' did not complete within {} seconds on {}:{}", command, timeout_secs, self.host, self.port))?
254+
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
255+
}
256+
} else {
257+
// Default timeout if not specified
258+
let command_timeout = Duration::from_secs(DEFAULT_COMMAND_TIMEOUT_SECS);
259+
tracing::debug!("Executing command with streaming, default timeout of 300 seconds");
260+
tokio::time::timeout(
261+
command_timeout,
262+
client.execute_streaming(command, output_sender)
263+
)
264+
.await
265+
.with_context(|| format!("Command execution timeout: The command '{}' did not complete within 5 minutes on {}:{}", command, self.host, self.port))?
266+
.with_context(|| format!("Failed to execute command '{}' on {}:{}. The SSH connection was successful but the command could not be executed.", command, self.host, self.port))
267+
}
268+
}
163269
}

0 commit comments

Comments
 (0)