From ce150874049a158664d1c2d9dbd65f3700a73c77 Mon Sep 17 00:00:00 2001 From: nglmercer Date: Sun, 31 May 2026 15:42:38 -0500 Subject: [PATCH 1/2] feat: remove Docker/Podman dependency with native runtime support Add a native process-based runtime that runs HelixDB without containers, enabling bare-metal execution. This is the foundation for future in-memory database features that need direct process control. Changes: - Add `Native` variant to `ContainerRuntime` enum in config.rs - Create `Runtime` enum dispatching between container and native runtimes - Implement `NativeManager` with PID-based process lifecycle management - Add `volumes_dir`/`instance_volume` helpers to `ProjectContext` - Update all commands (run, stop, restart, status, logs, prune, delete) to use `Runtime::for_project()` instead of `LocalRuntime::new()` - Fix dashboard.rs match exhaustiveness for new Native variant - Fix all clippy warnings in sdks (collapsible_if, derivable_impls, should_implement_trait) --- helix-cli/src/commands/dashboard.rs | 3 + helix-cli/src/commands/delete.rs | 4 +- helix-cli/src/commands/logs/mod.rs | 4 +- helix-cli/src/commands/prune.rs | 4 +- helix-cli/src/commands/restart.rs | 4 +- helix-cli/src/commands/run.rs | 10 +- helix-cli/src/commands/status.rs | 10 +- helix-cli/src/commands/stop.rs | 4 +- helix-cli/src/config.rs | 7 + helix-cli/src/local_runtime.rs | 744 +++++++++++++++++++------- helix-cli/src/project.rs | 8 + sdks/rust/helix-dsl-macros/src/lib.rs | 13 +- sdks/rust/src/dsl.rs | 24 +- 13 files changed, 618 insertions(+), 221 deletions(-) diff --git a/helix-cli/src/commands/dashboard.rs b/helix-cli/src/commands/dashboard.rs index dec45134..96728678 100644 --- a/helix-cli/src/commands/dashboard.rs +++ b/helix-cli/src/commands/dashboard.rs @@ -7,6 +7,8 @@ use crate::utils::command_exists; use eyre::{Result, eyre}; use std::process::{Command, Stdio}; +// The dashboard is always a container (Docker/Podman), never native. + const DASHBOARD_IMAGE: &str = "public.ecr.aws/p8l2s5f1/helix-dashboard:latest"; const DASHBOARD_CONTAINER_NAME: &str = "helix-dashboard"; @@ -43,6 +45,7 @@ async fn start( match runtime { ContainerRuntime::Docker => "host.docker.internal".to_string(), ContainerRuntime::Podman => "host.containers.internal".to_string(), + ContainerRuntime::Native => "localhost".to_string(), } } else { host diff --git a/helix-cli/src/commands/delete.rs b/helix-cli/src/commands/delete.rs index 2156b829..6cf91d25 100644 --- a/helix-cli/src/commands/delete.rs +++ b/helix-cli/src/commands/delete.rs @@ -1,5 +1,5 @@ use crate::config::InstanceInfo; -use crate::local_runtime::LocalRuntime; +use crate::local_runtime::Runtime; use crate::output::Operation; use crate::project::ProjectContext; use crate::utils::{print_confirm, print_warning}; @@ -24,7 +24,7 @@ pub async fn run(instance: String, yes: bool) -> Result<()> { let op = Operation::new("Deleting", &instance); if matches!(info, InstanceInfo::Local(_)) { - let _ = LocalRuntime::new(&project).prune_instance(&instance); + let _ = Runtime::for_project(&project).prune(&instance); } project.config.local.remove(&instance); diff --git a/helix-cli/src/commands/logs/mod.rs b/helix-cli/src/commands/logs/mod.rs index 80c2fe71..f051773e 100644 --- a/helix-cli/src/commands/logs/mod.rs +++ b/helix-cli/src/commands/logs/mod.rs @@ -1,7 +1,7 @@ use crate::commands::auth::require_auth; use crate::config::InstanceInfo; use crate::enterprise_cloud::cloud_base_url; -use crate::local_runtime::LocalRuntime; +use crate::local_runtime::Runtime; use crate::project::ProjectContext; use crate::prompts; use chrono::{DateTime, Duration, Utc}; @@ -34,7 +34,7 @@ pub async fn run( "--range, --start, and --end are only supported for Enterprise logs; local logs use docker/podman logs" )); } - LocalRuntime::new(&project).logs(&instance, follow)?; + Runtime::for_project(&project).logs(&instance, follow)?; } InstanceInfo::Enterprise(config) => { if follow { diff --git a/helix-cli/src/commands/prune.rs b/helix-cli/src/commands/prune.rs index b67e6b04..51f03539 100644 --- a/helix-cli/src/commands/prune.rs +++ b/helix-cli/src/commands/prune.rs @@ -1,4 +1,4 @@ -use crate::local_runtime::LocalRuntime; +use crate::local_runtime::Runtime; use crate::output::Operation; use crate::project::ProjectContext; use crate::prompts::{self, PruneSelection}; @@ -26,7 +26,7 @@ pub async fn run(instance: Option, all: bool, yes: bool) -> Result<()> { async fn prune_one(project: &ProjectContext, instance: &str) -> Result<()> { let op = Operation::new("Pruning", instance); - let removed_container = LocalRuntime::new(project).prune_instance(instance)?; + let removed_container = Runtime::for_project(project).prune(instance)?; let workspace = project.instance_workspace(instance); let removed_workspace = workspace.exists(); if workspace.exists() { diff --git a/helix-cli/src/commands/restart.rs b/helix-cli/src/commands/restart.rs index 6becd223..ae018a5d 100644 --- a/helix-cli/src/commands/restart.rs +++ b/helix-cli/src/commands/restart.rs @@ -1,5 +1,5 @@ use crate::config::InstanceInfo; -use crate::local_runtime::LocalRuntime; +use crate::local_runtime::Runtime; use crate::output::Operation; use crate::project::ProjectContext; use crate::prompts; @@ -12,7 +12,7 @@ pub async fn run(instance: Option) -> Result<()> { return Err(eyre!("'{instance}' is not a local v2 instance")); }; let op = Operation::new("Restarting", &instance); - LocalRuntime::new(&project).restart(&instance, config)?; + Runtime::for_project(&project).restart(&instance, config)?; op.success(); Ok(()) } diff --git a/helix-cli/src/commands/run.rs b/helix-cli/src/commands/run.rs index 885b7e12..2eb9c80a 100644 --- a/helix-cli/src/commands/run.rs +++ b/helix-cli/src/commands/run.rs @@ -1,5 +1,5 @@ use crate::config::{InstanceInfo, LocalStorageMode}; -use crate::local_runtime::LocalRuntime; +use crate::local_runtime::Runtime; use crate::output::{Operation, Verbosity}; use crate::project::ProjectContext; use crate::prompts; @@ -36,18 +36,18 @@ pub async fn run( } project.ensure_instance_dir(&instance)?; - let runtime = LocalRuntime::new(&project); + let runtime = Runtime::for_project(&project); if foreground { crate::output::info("Running in foreground. Press Ctrl-C to stop."); - runtime.run_foreground(&instance, &config).await?; + runtime.start_foreground(&instance, &config).await?; op.success(); } else { - runtime.run_detached(&instance, &config)?; + runtime.start(&instance, &config)?; op.success(); if Verbosity::current().show_normal() { Operation::print_details(&[ ("URL", &format!("http://localhost:{}", config.port)), - ("Container", &runtime.container_name(&instance)), + ("Container", &runtime.display_name(&instance)), ]); } } diff --git a/helix-cli/src/commands/status.rs b/helix-cli/src/commands/status.rs index f34baa86..0be63921 100644 --- a/helix-cli/src/commands/status.rs +++ b/helix-cli/src/commands/status.rs @@ -1,5 +1,5 @@ use crate::config::InstanceInfo; -use crate::local_runtime::LocalRuntime; +use crate::local_runtime::Runtime; use crate::project::ProjectContext; use crate::prompts::{self, StatusSelection}; use crate::utils::{print_field, print_header, print_newline}; @@ -13,7 +13,7 @@ pub async fn run(instance: Option) -> Result<()> { print_field("Root", &project.root.display().to_string()); print_newline(); - let runtime = LocalRuntime::new(&project); + let runtime = Runtime::for_project(&project); print_header("Instances"); match resolve_status_selection(&project, instance)? { StatusSelection::All => { @@ -21,7 +21,9 @@ pub async fn run(instance: Option) -> Result<()> { print_instance(&project, &runtime, name)?; } } - StatusSelection::Instance(instance) => print_instance(&project, &runtime, &instance)?, + StatusSelection::Instance(instance) => { + print_instance(&project, &runtime, &instance)? + } } Ok(()) @@ -41,7 +43,7 @@ fn resolve_status_selection( Ok(StatusSelection::All) } -fn print_instance(project: &ProjectContext, runtime: &LocalRuntime, name: &str) -> Result<()> { +fn print_instance(project: &ProjectContext, runtime: &Runtime, name: &str) -> Result<()> { match project.config.get_instance(name)? { InstanceInfo::Local(config) => { let status = runtime.status(name)?; diff --git a/helix-cli/src/commands/stop.rs b/helix-cli/src/commands/stop.rs index 9a5a1f09..5386a3d4 100644 --- a/helix-cli/src/commands/stop.rs +++ b/helix-cli/src/commands/stop.rs @@ -1,5 +1,5 @@ use crate::config::InstanceInfo; -use crate::local_runtime::LocalRuntime; +use crate::local_runtime::Runtime; use crate::output::Operation; use crate::project::ProjectContext; use crate::prompts; @@ -15,7 +15,7 @@ pub async fn run(instance: Option) -> Result<()> { return Err(eyre!("'{instance}' is not a local v2 instance")); } let op = Operation::new("Stopping", &instance); - if LocalRuntime::new(&project).stop(&instance)? { + if Runtime::for_project(&project).stop(&instance)? { op.success(); } else { crate::output::info(&format!("Instance '{instance}' was not running")); diff --git a/helix-cli/src/config.rs b/helix-cli/src/config.rs index a07a07e4..91c601e5 100644 --- a/helix-cli/src/config.rs +++ b/helix-cli/src/config.rs @@ -86,6 +86,7 @@ pub enum ContainerRuntime { #[default] Docker, Podman, + Native, } impl ContainerRuntime { @@ -93,6 +94,7 @@ impl ContainerRuntime { match self { Self::Docker => "docker", Self::Podman => "podman", + Self::Native => "native", } } @@ -100,8 +102,13 @@ impl ContainerRuntime { match self { Self::Docker => "Docker", Self::Podman => "Podman", + Self::Native => "Native", } } + + pub const fn is_native(&self) -> bool { + matches!(self, Self::Native) + } } fn default_container_runtime() -> ContainerRuntime { diff --git a/helix-cli/src/local_runtime.rs b/helix-cli/src/local_runtime.rs index 07b187d4..4b02b6bb 100644 --- a/helix-cli/src/local_runtime.rs +++ b/helix-cli/src/local_runtime.rs @@ -5,8 +5,9 @@ use crate::project::ProjectContext; use eyre::{Result, eyre}; use std::io::{Read, Write}; use std::net::TcpStream; -use std::process::{Command, Output, Stdio}; -use std::thread; +use std::path::PathBuf; +use std::process::{Command, Stdio}; +use std::{fs, thread}; use std::time::{Duration, Instant}; use tokio::process::Command as TokioCommand; @@ -19,12 +20,6 @@ const LOCAL_S3_BUCKET: &str = "helix-db"; const LOCAL_S3_REGION: &str = "us-east-1"; const LOCAL_DB_PATH: &str = "db/"; -#[derive(Debug, Clone)] -pub struct LocalRuntime { - runtime: ContainerRuntime, - project_name: String, -} - #[derive(Debug, Clone)] pub struct LocalStatus { pub instance_name: String, @@ -33,6 +28,95 @@ pub struct LocalStatus { pub ports: String, } +// --------------------------------------------------------------------------- +// Enum-based runtime dispatch +// --------------------------------------------------------------------------- + +pub enum Runtime<'a> { + Container(LocalRuntime), + Native(NativeManager<'a>), +} + +impl<'a> Runtime<'a> { + pub fn for_project(project: &'a ProjectContext) -> Self { + if project.config.project.container_runtime.is_native() { + Self::Native(NativeManager::new(project)) + } else { + Self::Container(LocalRuntime::new(project)) + } + } + + pub async fn start_foreground( + &self, + instance_name: &str, + config: &LocalInstanceConfig, + ) -> Result<()> { + match self { + Self::Container(r) => r.start_foreground(instance_name, config).await, + Self::Native(r) => r.start_foreground(instance_name, config).await, + } + } + + pub fn start(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { + match self { + Self::Container(r) => r.start(instance_name, config), + Self::Native(r) => r.start(instance_name, config), + } + } + + pub fn stop(&self, instance_name: &str) -> Result { + match self { + Self::Container(r) => r.stop(instance_name), + Self::Native(r) => r.stop(instance_name), + } + } + + pub fn restart(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { + match self { + Self::Container(r) => r.restart(instance_name, config), + Self::Native(r) => r.restart(instance_name, config), + } + } + + pub fn logs(&self, instance_name: &str, follow: bool) -> Result<()> { + match self { + Self::Container(r) => r.logs(instance_name, follow), + Self::Native(r) => r.logs(instance_name, follow), + } + } + + pub fn status(&self, instance_name: &str) -> Result> { + match self { + Self::Container(r) => r.status(instance_name), + Self::Native(r) => r.status(instance_name), + } + } + + pub fn prune(&self, instance_name: &str) -> Result { + match self { + Self::Container(r) => r.prune(instance_name), + Self::Native(r) => r.prune(instance_name), + } + } + + pub fn display_name(&self, instance_name: &str) -> String { + match self { + Self::Container(r) => r.display_name(instance_name), + Self::Native(r) => r.display_name(instance_name), + } + } +} + +// --------------------------------------------------------------------------- +// Container-based runtime (Docker / Podman) +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone)] +pub struct LocalRuntime { + runtime: ContainerRuntime, + project_name: String, +} + #[derive(Debug, Clone)] struct DiskRuntimeResources { minio_container: String, @@ -76,7 +160,7 @@ impl LocalRuntime { format!("helix-{}-{}", self.project_name, instance_name) } - pub fn pull_image(&self, config: &LocalInstanceConfig) -> Result<()> { + fn pull_image(&self, config: &LocalInstanceConfig) -> Result<()> { self.pull_image_ref(&config.image_ref()) } @@ -107,7 +191,176 @@ impl LocalRuntime { .unwrap_or(false) } - pub fn run_detached(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { + fn disk_resources(&self, instance_name: &str) -> DiskRuntimeResources { + let base = self.container_name(instance_name); + DiskRuntimeResources { + minio_container: format!("{base}-minio"), + network: format!("{base}-net"), + volume: format!("{base}-minio-data"), + } + } + + fn start_disk_dependencies(&self, instance_name: &str) -> Result { + let resources = self.disk_resources(instance_name); + self.pull_image_ref(MINIO_IMAGE)?; + self.pull_image_ref(MINIO_MC_IMAGE)?; + self.ensure_network(&resources.network)?; + self.ensure_volume(&resources.volume)?; + let _ = self.remove_container(&resources.minio_container); + + let args = minio_run_args(&resources); + let output = Command::new(self.runtime.binary()) + .args(&args) + .output() + .map_err(|e| eyre!("Failed to start {}: {e}", resources.minio_container))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(eyre!( + "Failed to start {}:\n{stderr}", + resources.minio_container + )); + } + + self.ensure_minio_bucket(&resources)?; + Ok(resources) + } + + fn ensure_network(&self, network: &str) -> Result<()> { + if self.resource_exists(&["network", "inspect", network]) { + return Ok(()); + } + + let output = Command::new(self.runtime.binary()) + .args(["network", "create", network]) + .output() + .map_err(|e| eyre!("Failed to create network {network}: {e}"))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + if !stderr.to_ascii_lowercase().contains("already exists") { + return Err(eyre!("Failed to create network {network}:\n{stderr}")); + } + } + + Ok(()) + } + + fn ensure_volume(&self, volume: &str) -> Result<()> { + let output = Command::new(self.runtime.binary()) + .args(["volume", "create", volume]) + .output() + .map_err(|e| eyre!("Failed to create volume {volume}: {e}"))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(eyre!("Failed to create volume {volume}:\n{stderr}")); + } + + Ok(()) + } + + fn ensure_minio_bucket(&self, resources: &DiskRuntimeResources) -> Result<()> { + let deadline = Instant::now() + Duration::from_secs(30); + let args = minio_bucket_init_args(resources); + let mut last_stderr = String::new(); + + while Instant::now() < deadline { + let output = Command::new(self.runtime.binary()) + .args(&args) + .output() + .map_err(|e| eyre!("Failed to initialize local MinIO bucket: {e}"))?; + + if output.status.success() { + return Ok(()); + } + + last_stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + thread::sleep(Duration::from_millis(500)); + } + + Err(eyre!( + "Timed out initializing local MinIO bucket {LOCAL_S3_BUCKET}:\n{last_stderr}" + )) + } + + fn remove_disk_resources(&self, instance_name: &str, include_volume: bool) -> Result { + let resources = self.disk_resources(instance_name); + let removed_minio = self.remove_container(&resources.minio_container)?; + let removed_network = self.remove_network(&resources.network)?; + let removed_volume = if include_volume { + self.remove_volume(&resources.volume)? + } else { + false + }; + + Ok(removed_minio || removed_network || removed_volume) + } + + fn remove_network(&self, network: &str) -> Result { + let output = Command::new(self.runtime.binary()) + .args(["network", "rm", network]) + .output() + .map_err(|e| eyre!("Failed to remove network {network}: {e}"))?; + + let stderr = String::from_utf8_lossy(&output.stderr); + if missing_resource(&stderr) { + return Ok(false); + } + + if !output.status.success() { + return Err(eyre!("Failed to remove network {network}:\n{stderr}")); + } + Ok(true) + } + + fn remove_volume(&self, volume: &str) -> Result { + let output = Command::new(self.runtime.binary()) + .args(["volume", "rm", volume]) + .output() + .map_err(|e| eyre!("Failed to remove volume {volume}: {e}"))?; + + let stderr = String::from_utf8_lossy(&output.stderr); + if missing_resource(&stderr) { + return Ok(false); + } + + if !output.status.success() { + return Err(eyre!("Failed to remove volume {volume}:\n{stderr}")); + } + Ok(true) + } + + fn resource_exists(&self, args: &[&str]) -> bool { + Command::new(self.runtime.binary()) + .args(args) + .output() + .map(|output| output.status.success()) + .unwrap_or(false) + } + + fn remove_container(&self, name: &str) -> Result { + let output = Command::new(self.runtime.binary()) + .args(["rm", "-f", name]) + .output() + .map_err(|e| eyre!("Failed to remove {name}: {e}"))?; + + let stderr = String::from_utf8_lossy(&output.stderr); + if missing_resource(&stderr) { + return Ok(false); + } + + if !output.status.success() { + return Err(eyre!("Failed to remove {name}:\n{stderr}")); + } + Ok(true) + } + + fn wait_ready(&self, port: u16) -> Result<()> { + wait_ready(port) + } + + fn start(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { Self::check_available(self.runtime)?; self.pull_image(config)?; @@ -136,7 +389,7 @@ impl LocalRuntime { Ok(()) } - pub async fn run_foreground( + async fn start_foreground( &self, instance_name: &str, config: &LocalInstanceConfig, @@ -196,16 +449,16 @@ impl LocalRuntime { Ok(()) } - pub fn stop(&self, instance_name: &str) -> Result { + fn stop(&self, instance_name: &str) -> Result { let name = self.container_name(instance_name); let removed_helix = self.remove_container(&name)?; let removed_disk_resources = self.remove_disk_resources(instance_name, false)?; Ok(removed_helix || removed_disk_resources) } - pub fn restart(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { + fn restart(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { if config.storage.is_disk() { - return self.run_detached(instance_name, config); + return self.start(instance_name, config); } let name = self.container_name(instance_name); @@ -219,10 +472,10 @@ impl LocalRuntime { return Ok(()); } - self.run_detached(instance_name, config) + self.start(instance_name, config) } - pub fn logs(&self, instance_name: &str, follow: bool) -> Result<()> { + fn logs(&self, instance_name: &str, follow: bool) -> Result<()> { let name = self.container_name(instance_name); let mut command = Command::new(self.runtime.binary()); command.arg("logs"); @@ -246,7 +499,7 @@ impl LocalRuntime { Ok(()) } - pub fn status(&self, instance_name: &str) -> Result> { + fn status(&self, instance_name: &str) -> Result> { let name = self.container_name(instance_name); let output = Command::new(self.runtime.binary()) .args([ @@ -282,234 +535,363 @@ impl LocalRuntime { })) } - pub fn prune_instance(&self, instance_name: &str) -> Result { + fn prune(&self, instance_name: &str) -> Result { let name = self.container_name(instance_name); let removed_helix = self.remove_container(&name)?; let removed_disk_resources = self.remove_disk_resources(instance_name, true)?; Ok(removed_helix || removed_disk_resources) } - pub fn run_command(&self, args: &[&str]) -> Result { - Command::new(self.runtime.binary()) - .args(args) - .output() - .map_err(|e| { - eyre!( - "Failed to run {} {}: {e}", - self.runtime.binary(), - args.join(" ") - ) - }) - } - - fn disk_resources(&self, instance_name: &str) -> DiskRuntimeResources { - let base = self.container_name(instance_name); - DiskRuntimeResources { - minio_container: format!("{base}-minio"), - network: format!("{base}-net"), - volume: format!("{base}-minio-data"), - } + fn display_name(&self, instance_name: &str) -> String { + self.container_name(instance_name) } +} - fn start_disk_dependencies(&self, instance_name: &str) -> Result { - let resources = self.disk_resources(instance_name); - self.pull_image_ref(MINIO_IMAGE)?; - self.pull_image_ref(MINIO_MC_IMAGE)?; - self.ensure_network(&resources.network)?; - self.ensure_volume(&resources.volume)?; - let _ = self.remove_container(&resources.minio_container); +// --------------------------------------------------------------------------- +// Native process-based runtime +// --------------------------------------------------------------------------- - let args = minio_run_args(&resources); - let output = Command::new(self.runtime.binary()) - .args(&args) - .output() - .map_err(|e| eyre!("Failed to start {}: {e}", resources.minio_container))?; +pub struct NativeManager<'a> { + project: &'a ProjectContext, +} - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(eyre!( - "Failed to start {}:\n{stderr}", - resources.minio_container - )); - } +impl<'a> NativeManager<'a> { + pub fn new(project: &'a ProjectContext) -> Self { + Self { project } + } - self.ensure_minio_bucket(&resources)?; - Ok(resources) + fn pid_file(&self, instance_name: &str) -> PathBuf { + self.project + .instance_workspace(instance_name) + .join("helix.pid") } - fn ensure_network(&self, network: &str) -> Result<()> { - if self.resource_exists(&["network", "inspect", network]) { - return Ok(()); - } + fn log_file(&self, instance_name: &str) -> PathBuf { + self.project + .instance_workspace(instance_name) + .join("helix.log") + } - let output = Command::new(self.runtime.binary()) - .args(["network", "create", network]) - .output() - .map_err(|e| eyre!("Failed to create network {network}: {e}"))?; + fn binary_path(&self, instance_name: &str) -> PathBuf { + self.project + .instance_workspace(instance_name) + .join("helix-container") + .join("target") + .join("release") + .join("helix-container") + } - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - if !stderr.to_ascii_lowercase().contains("already exists") { - return Err(eyre!("Failed to create network {network}:\n{stderr}")); - } + fn save_pid(&self, instance_name: &str, pid: u32) -> Result<()> { + let pid_file = self.pid_file(instance_name); + if let Some(parent) = pid_file.parent() { + fs::create_dir_all(parent)?; } - - Ok(()) + fs::write(&pid_file, pid.to_string()) + .map_err(|e| eyre!("Failed to write PID file: {e}")) } - fn ensure_volume(&self, volume: &str) -> Result<()> { - let output = Command::new(self.runtime.binary()) - .args(["volume", "create", volume]) - .output() - .map_err(|e| eyre!("Failed to create volume {volume}: {e}"))?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(eyre!("Failed to create volume {volume}:\n{stderr}")); + fn read_pid(&self, instance_name: &str) -> Result> { + let pid_file = self.pid_file(instance_name); + if !pid_file.exists() { + return Ok(None); } + let content = fs::read_to_string(&pid_file) + .map_err(|e| eyre!("Failed to read PID file: {e}"))?; + let pid = content + .trim() + .parse::() + .map_err(|e| eyre!("Invalid PID in file: {e}"))?; + Ok(Some(pid)) + } - Ok(()) + fn remove_pid_file(&self, instance_name: &str) { + let _ = fs::remove_file(self.pid_file(instance_name)); } - fn ensure_minio_bucket(&self, resources: &DiskRuntimeResources) -> Result<()> { - let deadline = Instant::now() + Duration::from_secs(30); - let args = minio_bucket_init_args(resources); - let mut last_stderr = String::new(); + fn remove_log_file(&self, instance_name: &str) { + let _ = fs::remove_file(self.log_file(instance_name)); + } - while Instant::now() < deadline { - let output = Command::new(self.runtime.binary()) - .args(&args) + fn is_process_running(pid: u32) -> bool { + #[cfg(target_os = "linux")] + { + PathBuf::from(format!("/proc/{pid}")).exists() + } + #[cfg(not(target_os = "linux"))] + { + Command::new("kill") + .args(["-0", &pid.to_string()]) .output() - .map_err(|e| eyre!("Failed to initialize local MinIO bucket: {e}"))?; - - if output.status.success() { - return Ok(()); - } - - last_stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); - thread::sleep(Duration::from_millis(500)); + .map(|o| o.status.success()) + .unwrap_or(false) } - - Err(eyre!( - "Timed out initializing local MinIO bucket {LOCAL_S3_BUCKET}:\n{last_stderr}" - )) } - fn remove_disk_resources(&self, instance_name: &str, include_volume: bool) -> Result { - let resources = self.disk_resources(instance_name); - let removed_minio = self.remove_container(&resources.minio_container)?; - let removed_network = self.remove_network(&resources.network)?; - let removed_volume = if include_volume { - self.remove_volume(&resources.volume)? - } else { - false + fn stop_process(&self, instance_name: &str) -> Result { + let Some(pid) = self.read_pid(instance_name)? else { + return Ok(false); }; - Ok(removed_minio || removed_network || removed_volume) - } + if !Self::is_process_running(pid) { + self.remove_pid_file(instance_name); + return Ok(false); + } - fn remove_network(&self, network: &str) -> Result { - let output = Command::new(self.runtime.binary()) - .args(["network", "rm", network]) + let output = Command::new("kill") + .args(["-TERM", &pid.to_string()]) .output() - .map_err(|e| eyre!("Failed to remove network {network}: {e}"))?; + .map_err(|e| eyre!("Failed to send SIGTERM to process {pid}: {e}"))?; - let stderr = String::from_utf8_lossy(&output.stderr); - if missing_resource(&stderr) { + if !output.status.success() { + self.remove_pid_file(instance_name); return Ok(false); } - if !output.status.success() { - return Err(eyre!("Failed to remove network {network}:\n{stderr}")); + let deadline = Instant::now() + Duration::from_secs(10); + while Instant::now() < deadline { + if !Self::is_process_running(pid) { + self.remove_pid_file(instance_name); + return Ok(true); + } + thread::sleep(Duration::from_millis(200)); } + + let _ = Command::new("kill") + .args(["-KILL", &pid.to_string()]) + .output(); + self.remove_pid_file(instance_name); Ok(true) } - fn remove_volume(&self, volume: &str) -> Result { - let output = Command::new(self.runtime.binary()) - .args(["volume", "rm", volume]) - .output() - .map_err(|e| eyre!("Failed to remove volume {volume}: {e}"))?; + fn start(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { + let binary = self.binary_path(instance_name); + if !binary.exists() { + return Err(CliError::new(format!( + "native binary not found at {}", + binary.display() + )) + .with_hint(format!( + "run 'helix build {instance_name}' first to compile the instance" + )) + .into()); + } - let stderr = String::from_utf8_lossy(&output.stderr); - if missing_resource(&stderr) { - return Ok(false); + let data_dir = self.project.instance_volume(instance_name); + fs::create_dir_all(&data_dir)?; + let port = config.port; + + let log_file = self.log_file(instance_name); + if let Some(parent) = log_file.parent() { + fs::create_dir_all(parent)?; } + let log = fs::File::create(&log_file) + .map_err(|e| eyre!("Failed to create log file: {e}"))?; - if !output.status.success() { - return Err(eyre!("Failed to remove volume {volume}:\n{stderr}")); + let mut cmd = Command::new(&binary); + cmd.env("HELIX_PORT", port.to_string()) + .env("HELIX_DATA_DIR", &data_dir) + .current_dir(&data_dir) + .stdout(Stdio::from(log.try_clone()?)) + .stderr(Stdio::from(log)); + + if config.storage.is_disk() { + cmd.env("S3_BUCKET", LOCAL_S3_BUCKET) + .env("S3_REGION", LOCAL_S3_REGION) + .env("DB_PATH", LOCAL_DB_PATH); } - Ok(true) - } - fn resource_exists(&self, args: &[&str]) -> bool { - Command::new(self.runtime.binary()) - .args(args) - .output() - .map(|output| output.status.success()) - .unwrap_or(false) + let child = cmd + .spawn() + .map_err(|e| eyre!("Failed to start native process: {e}"))?; + + self.save_pid(instance_name, child.id())?; + + wait_ready(port)?; + Ok(()) } - fn remove_container(&self, name: &str) -> Result { - let output = Command::new(self.runtime.binary()) - .args(["rm", "-f", name]) - .output() - .map_err(|e| eyre!("Failed to remove {name}: {e}"))?; + async fn start_foreground( + &self, + instance_name: &str, + config: &LocalInstanceConfig, + ) -> Result<()> { + let binary = self.binary_path(instance_name); + if !binary.exists() { + return Err(CliError::new(format!( + "native binary not found at {}", + binary.display() + )) + .with_hint(format!( + "run 'helix build {instance_name}' first to compile the instance" + )) + .into()); + } - let stderr = String::from_utf8_lossy(&output.stderr); - if missing_resource(&stderr) { - return Ok(false); + let data_dir = self.project.instance_volume(instance_name); + fs::create_dir_all(&data_dir)?; + let port = config.port; + + let mut cmd = TokioCommand::new(&binary); + cmd.env("HELIX_PORT", port.to_string()) + .env("HELIX_DATA_DIR", &data_dir) + .current_dir(&data_dir) + .stdin(Stdio::inherit()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + + if config.storage.is_disk() { + cmd.env("S3_BUCKET", LOCAL_S3_BUCKET) + .env("S3_REGION", LOCAL_S3_REGION) + .env("DB_PATH", LOCAL_DB_PATH); } - if !output.status.success() { - return Err(eyre!("Failed to remove {name}:\n{stderr}")); + let mut child = cmd + .spawn() + .map_err(|e| eyre!("Failed to start native process: {e}"))?; + + let mut wait = Box::pin(child.wait()); + tokio::select! { + status = &mut wait => { + let status = status?; + if !status.success() { + return Err(eyre!("Native process exited with status {status}")); + } + } + signal = tokio::signal::ctrl_c() => { + signal?; + crate::output::info("Stopping foreground local Helix instance"); + match tokio::time::timeout(Duration::from_secs(10), &mut wait).await { + Ok(Ok(_)) => {} + Ok(Err(e)) => return Err(eyre!("Failed to wait for process to stop: {e}")), + Err(_) => return Err(eyre!("Timed out waiting for process to stop")), + } + } } - Ok(true) + + Ok(()) } - fn wait_ready(&self, port: u16) -> Result<()> { - let deadline = Instant::now() + Duration::from_secs(30); - while Instant::now() < deadline { - if self.query_endpoint_ready(port) { - return Ok(()); - } - thread::sleep(Duration::from_millis(250)); + fn stop(&self, instance_name: &str) -> Result { + self.stop_process(instance_name) + } + + fn restart(&self, instance_name: &str, config: &LocalInstanceConfig) -> Result<()> { + let _ = self.stop_process(instance_name); + self.start(instance_name, config) + } + + fn logs(&self, instance_name: &str, follow: bool) -> Result<()> { + let log_file = self.log_file(instance_name); + if !log_file.exists() { + return Err(eyre!("No log file found for instance '{instance_name}'")); } - Err(CliError::new("local Helix did not become ready in time") - .with_hint(format!( - "check logs with 'helix logs' or verify port {port} is reachable" - )) - .into()) + if follow { + let status = Command::new("tail") + .args(["-f", &log_file.to_string_lossy()]) + .stdin(Stdio::inherit()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .status() + .map_err(|e| eyre!("Failed to follow logs: {e}"))?; + + if !status.success() { + return Err(eyre!("tail exited with status {status}")); + } + } else { + let content = fs::read_to_string(&log_file) + .map_err(|e| eyre!("Failed to read log file: {e}"))?; + print!("{content}"); + } + Ok(()) } - fn query_endpoint_ready(&self, port: u16) -> bool { - let Ok(mut stream) = TcpStream::connect_timeout( - &(std::net::Ipv4Addr::LOCALHOST, port).into(), - Duration::from_millis(500), - ) else { - return false; + fn status(&self, instance_name: &str) -> Result> { + let Some(pid) = self.read_pid(instance_name)? else { + return Ok(None); }; - let _ = stream.set_read_timeout(Some(Duration::from_millis(750))); - let _ = stream.set_write_timeout(Some(Duration::from_millis(750))); - let body = r#"{"request_type":"read","query":{"queries":[{"Query":{"name":"readiness","steps":[{"NWhere":{"Eq":["$label",{"String":"__HelixReadiness__"}]}},"Count"],"condition":null}}],"returns":["readiness"]},"parameters":{}}"#; - let request = format!( - "POST /v1/query HTTP/1.1\r\nHost: localhost:{port}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", - body.len() - ); + if Self::is_process_running(pid) { + let port = self + .project + .config + .local + .get(instance_name) + .map(|c| c.port) + .unwrap_or(0); + Ok(Some(LocalStatus { + instance_name: instance_name.to_string(), + container_name: format!("native-{pid}"), + status: "Up (native)".to_string(), + ports: format!("0.0.0.0:{port}->8080/tcp"), + })) + } else { + self.remove_pid_file(instance_name); + Ok(None) + } + } + + fn prune(&self, instance_name: &str) -> Result { + let removed = self.stop_process(instance_name)?; + self.remove_pid_file(instance_name); + self.remove_log_file(instance_name); + Ok(removed) + } - if stream.write_all(request.as_bytes()).is_err() { - return false; + fn display_name(&self, instance_name: &str) -> String { + match self.read_pid(instance_name) { + Ok(Some(pid)) => format!("native process {pid}"), + _ => format!("native:{instance_name}"), } + } +} - let mut response = String::new(); - if stream.read_to_string(&mut response).is_err() { - return false; +// --------------------------------------------------------------------------- +// Shared helpers +// --------------------------------------------------------------------------- + +fn wait_ready(port: u16) -> Result<()> { + let deadline = Instant::now() + Duration::from_secs(30); + while Instant::now() < deadline { + if query_endpoint_ready(port) { + return Ok(()); } + thread::sleep(Duration::from_millis(250)); + } - response.starts_with("HTTP/1.1 2") || response.starts_with("HTTP/1.0 2") + Err(CliError::new("local Helix did not become ready in time") + .with_hint(format!( + "check logs with 'helix logs' or verify port {port} is reachable" + )) + .into()) +} + +fn query_endpoint_ready(port: u16) -> bool { + let Ok(mut stream) = TcpStream::connect_timeout( + &(std::net::Ipv4Addr::LOCALHOST, port).into(), + Duration::from_millis(500), + ) else { + return false; + }; + let _ = stream.set_read_timeout(Some(Duration::from_millis(750))); + let _ = stream.set_write_timeout(Some(Duration::from_millis(750))); + + let body = r#"{"request_type":"read","query":{"queries":[{"Query":{"name":"readiness","steps":[{"NWhere":{"Eq":["$label",{"String":"__HelixReadiness__"}]}},"Count"],"condition":null}}],"returns":["readiness"]},"parameters":{}}"#; + let request = format!( + "POST /v1/query HTTP/1.1\r\nHost: localhost:{port}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", + body.len() + ); + + if stream.write_all(request.as_bytes()).is_err() { + return false; } + + let mut response = String::new(); + if stream.read_to_string(&mut response).is_err() { + return false; + } + + response.starts_with("HTTP/1.1 2") || response.starts_with("HTTP/1.0 2") } fn helix_run_args( diff --git a/helix-cli/src/project.rs b/helix-cli/src/project.rs index 5e864835..73f99673 100644 --- a/helix-cli/src/project.rs +++ b/helix-cli/src/project.rs @@ -33,6 +33,14 @@ impl ProjectContext { self.helix_dir.join(instance_name) } + pub fn volumes_dir(&self) -> PathBuf { + self.helix_dir.join(".volumes") + } + + pub fn instance_volume(&self, instance_name: &str) -> PathBuf { + self.volumes_dir().join(instance_name) + } + pub fn ensure_instance_dir(&self, instance_name: &str) -> Result<(), ProjectError> { let workspace = self.instance_workspace(instance_name); std::fs::create_dir_all(&workspace).map_err(|source| ProjectError::CreateDir { diff --git a/sdks/rust/helix-dsl-macros/src/lib.rs b/sdks/rust/helix-dsl-macros/src/lib.rs index feaa2aec..67f95e97 100644 --- a/sdks/rust/helix-dsl-macros/src/lib.rs +++ b/sdks/rust/helix-dsl-macros/src/lib.rs @@ -294,13 +294,12 @@ fn parse_param_type(ty: &Type) -> syn::Result { } "Vec" => { let inner = single_type_arg(segment, ty)?; - if let Type::Path(inner_path) = inner { - if let Some(inner_seg) = inner_path.path.segments.last() { - if inner_seg.ident == "u8" && matches!(inner_seg.arguments, PathArguments::None) - { - return Ok(ParamTypeSpec::Bytes); - } - } + if let Type::Path(inner_path) = inner + && let Some(inner_seg) = inner_path.path.segments.last() + && inner_seg.ident == "u8" + && matches!(inner_seg.arguments, PathArguments::None) + { + return Ok(ParamTypeSpec::Bytes); } Ok(ParamTypeSpec::Array(Box::new(parse_param_type(inner)?))) } diff --git a/sdks/rust/src/dsl.rs b/sdks/rust/src/dsl.rs index 143f7c90..0bdf3b20 100644 --- a/sdks/rust/src/dsl.rs +++ b/sdks/rust/src/dsl.rs @@ -1431,21 +1431,25 @@ impl Expr { } /// Addition: self + other + #[allow(clippy::should_implement_trait)] pub fn add(self, other: Expr) -> Self { Expr::Add(Box::new(self), Box::new(other)) } /// Subtraction: self - other + #[allow(clippy::should_implement_trait)] pub fn sub(self, other: Expr) -> Self { Expr::Sub(Box::new(self), Box::new(other)) } /// Multiplication: self * other + #[allow(clippy::should_implement_trait)] pub fn mul(self, other: Expr) -> Self { Expr::Mul(Box::new(self), Box::new(other)) } /// Division: self / other + #[allow(clippy::should_implement_trait)] pub fn div(self, other: Expr) -> Self { Expr::Div(Box::new(self), Box::new(other)) } @@ -1456,6 +1460,7 @@ impl Expr { } /// Negation: -self + #[allow(clippy::should_implement_trait)] pub fn neg(self) -> Self { Expr::Neg(Box::new(self)) } @@ -1914,6 +1919,7 @@ impl Predicate { } /// Negate a predicate + #[allow(clippy::should_implement_trait)] pub fn not(predicate: Predicate) -> Self { Predicate::Not(Box::new(predicate)) } @@ -2046,24 +2052,20 @@ impl From for Projection { } /// Sort order for ordering steps -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] pub enum Order { /// Ascending order (smallest first) + #[default] Asc, /// Descending order (largest first) Desc, } -impl Default for Order { - fn default() -> Self { - Order::Asc - } -} - /// Emit behavior for repeat steps -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] pub enum EmitBehavior { /// Don't emit intermediate results. + #[default] None, /// Emit the current node stream before each repeat iteration. Before, @@ -2073,12 +2075,6 @@ pub enum EmitBehavior { All, } -impl Default for EmitBehavior { - fn default() -> Self { - EmitBehavior::None - } -} - /// Aggregation function for reduce operations #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum AggregateFunction { From 5d90cad5f1e5e5d71f93cd5e6c0089fa83efe7dd Mon Sep 17 00:00:00 2001 From: nglmercer Date: Sun, 31 May 2026 15:52:49 -0500 Subject: [PATCH 2/2] fix: address Greptile review findings on native runtime - Kill child process if save_pid fails after spawn (prevents orphan) - Send SIGKILL via PID on foreground Ctrl-C timeout (prevents runaway process) - Store process start time in PID file and verify before sending signals (prevents killing a recycled PID belonging to an unrelated process) - Add comment clarifying dashboard Native arm is dead code --- helix-cli/src/commands/dashboard.rs | 3 +- helix-cli/src/local_runtime.rs | 81 +++++++++++++++++++++++------ 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/helix-cli/src/commands/dashboard.rs b/helix-cli/src/commands/dashboard.rs index 96728678..ba0e7075 100644 --- a/helix-cli/src/commands/dashboard.rs +++ b/helix-cli/src/commands/dashboard.rs @@ -7,8 +7,6 @@ use crate::utils::command_exists; use eyre::{Result, eyre}; use std::process::{Command, Stdio}; -// The dashboard is always a container (Docker/Podman), never native. - const DASHBOARD_IMAGE: &str = "public.ecr.aws/p8l2s5f1/helix-dashboard:latest"; const DASHBOARD_CONTAINER_NAME: &str = "helix-dashboard"; @@ -45,6 +43,7 @@ async fn start( match runtime { ContainerRuntime::Docker => "host.docker.internal".to_string(), ContainerRuntime::Podman => "host.containers.internal".to_string(), + // detect_runtime never returns Native; fallback for completeness ContainerRuntime::Native => "localhost".to_string(), } } else { diff --git a/helix-cli/src/local_runtime.rs b/helix-cli/src/local_runtime.rs index 4b02b6bb..21d7b752 100644 --- a/helix-cli/src/local_runtime.rs +++ b/helix-cli/src/local_runtime.rs @@ -586,22 +586,32 @@ impl<'a> NativeManager<'a> { if let Some(parent) = pid_file.parent() { fs::create_dir_all(parent)?; } - fs::write(&pid_file, pid.to_string()) + let start_time = Self::process_start_time(pid).unwrap_or(0); + fs::write(&pid_file, format!("{pid}\n{start_time}\n")) .map_err(|e| eyre!("Failed to write PID file: {e}")) } - fn read_pid(&self, instance_name: &str) -> Result> { + fn read_pid(&self, instance_name: &str) -> Result> { let pid_file = self.pid_file(instance_name); if !pid_file.exists() { return Ok(None); } let content = fs::read_to_string(&pid_file) .map_err(|e| eyre!("Failed to read PID file: {e}"))?; - let pid = content + let mut lines = content.lines(); + let pid = lines + .next() + .unwrap_or("") .trim() .parse::() .map_err(|e| eyre!("Invalid PID in file: {e}"))?; - Ok(Some(pid)) + let start_time = lines + .next() + .unwrap_or("0") + .trim() + .parse::() + .unwrap_or(0); + Ok(Some((pid, start_time))) } fn remove_pid_file(&self, instance_name: &str) { @@ -627,12 +637,41 @@ impl<'a> NativeManager<'a> { } } + /// Returns the process start time as jiffies since boot (Linux) or 0 on other platforms. + fn process_start_time(pid: u32) -> Option { + #[cfg(target_os = "linux")] + { + let stat = fs::read_to_string(format!("/proc/{pid}/stat")).ok()?; + // Field 22 (0-indexed: 21) is starttime in jiffies since boot + let fields: Vec<&str> = stat.splitn(22, ' ').collect(); + let start_str = fields.get(21)?; + start_str.trim().parse::().ok() + } + #[cfg(not(target_os = "linux"))] + { + let _ = pid; + None + } + } + + /// Verify that the PID still refers to the same process we started. + fn verify_pid(&self, _instance_name: &str, pid: u32, saved_start_time: u64) -> bool { + if !Self::is_process_running(pid) { + return false; + } + if saved_start_time == 0 { + // Can't verify on this platform; assume it's the same process + return true; + } + Self::process_start_time(pid) == Some(saved_start_time) + } + fn stop_process(&self, instance_name: &str) -> Result { - let Some(pid) = self.read_pid(instance_name)? else { + let Some((pid, start_time)) = self.read_pid(instance_name)? else { return Ok(false); }; - if !Self::is_process_running(pid) { + if !self.verify_pid(instance_name, pid, start_time) { self.remove_pid_file(instance_name); return Ok(false); } @@ -700,11 +739,15 @@ impl<'a> NativeManager<'a> { .env("DB_PATH", LOCAL_DB_PATH); } - let child = cmd + let mut child = cmd .spawn() .map_err(|e| eyre!("Failed to start native process: {e}"))?; - self.save_pid(instance_name, child.id())?; + if let Err(e) = self.save_pid(instance_name, child.id()) { + let _ = child.kill(); + let _ = child.wait(); + return Err(e); + } wait_ready(port)?; Ok(()) @@ -749,9 +792,9 @@ impl<'a> NativeManager<'a> { .spawn() .map_err(|e| eyre!("Failed to start native process: {e}"))?; - let mut wait = Box::pin(child.wait()); + let pid = child.id(); tokio::select! { - status = &mut wait => { + status = child.wait() => { let status = status?; if !status.success() { return Err(eyre!("Native process exited with status {status}")); @@ -760,10 +803,18 @@ impl<'a> NativeManager<'a> { signal = tokio::signal::ctrl_c() => { signal?; crate::output::info("Stopping foreground local Helix instance"); - match tokio::time::timeout(Duration::from_secs(10), &mut wait).await { + match tokio::time::timeout(Duration::from_secs(10), child.wait()).await { Ok(Ok(_)) => {} Ok(Err(e)) => return Err(eyre!("Failed to wait for process to stop: {e}")), - Err(_) => return Err(eyre!("Timed out waiting for process to stop")), + Err(_) => { + // Timeout expired — force-kill by PID + if let Some(pid) = pid { + let _ = Command::new("kill") + .args(["-KILL", &pid.to_string()]) + .output(); + } + return Err(eyre!("Timed out waiting for process to stop")); + } } } } @@ -807,11 +858,11 @@ impl<'a> NativeManager<'a> { } fn status(&self, instance_name: &str) -> Result> { - let Some(pid) = self.read_pid(instance_name)? else { + let Some((pid, start_time)) = self.read_pid(instance_name)? else { return Ok(None); }; - if Self::is_process_running(pid) { + if self.verify_pid(instance_name, pid, start_time) { let port = self .project .config @@ -840,7 +891,7 @@ impl<'a> NativeManager<'a> { fn display_name(&self, instance_name: &str) -> String { match self.read_pid(instance_name) { - Ok(Some(pid)) => format!("native process {pid}"), + Ok(Some((pid, _))) => format!("native process {pid}"), _ => format!("native:{instance_name}"), } }