From 2e5bf76524cf9d2e7885c0cfd39dc0534bb67f93 Mon Sep 17 00:00:00 2001 From: Marcus Date: Tue, 12 May 2026 20:50:58 +0100 Subject: [PATCH 1/8] Implement Remote Persistent Workers --- Cargo.lock | 1 + .../persistent-workers/README.md | 21 + .../persistent-workers/java/README.md | 28 ++ .../persistent-workers/kotlin/README.md | 25 ++ .../persistent-workers/typescript/README.md | 27 ++ .../src/simple_scheduler_state_manager.rs | 46 +- .../tests/simple_scheduler_test.rs | 176 ++++++++ nativelink-worker/BUILD.bazel | 6 + nativelink-worker/Cargo.toml | 5 +- nativelink-worker/src/lib.rs | 1 + .../src/persistent_worker/live_worker.rs | 420 +++++++++++++++++ .../src/persistent_worker/mod.rs | 36 ++ .../src/persistent_worker/pool.rs | 425 ++++++++++++++++++ .../src/persistent_worker/protocol.rs | 325 ++++++++++++++ .../src/running_actions_manager.rs | 149 +++++- .../tests/running_actions_manager_test.rs | 187 +++++++- web/platform/BUILD.bazel | 23 + web/platform/astro.config.ts | 7 +- web/platform/check.sh | 11 + .../src/components/qwik/components/icons.tsx | 6 +- .../src/components/qwik/components/scroll.tsx | 2 +- .../src/components/qwik/sections/hero.tsx | 2 +- .../components/qwik/sections/testimonials.tsx | 2 +- .../persistent-workers.mdx | 131 ++++++ web/platform/starlight.conf.ts | 7 +- 25 files changed, 2058 insertions(+), 11 deletions(-) create mode 100644 deployment-examples/persistent-workers/README.md create mode 100644 deployment-examples/persistent-workers/java/README.md create mode 100644 deployment-examples/persistent-workers/kotlin/README.md create mode 100644 deployment-examples/persistent-workers/typescript/README.md create mode 100644 nativelink-worker/src/persistent_worker/live_worker.rs create mode 100644 nativelink-worker/src/persistent_worker/mod.rs create mode 100644 nativelink-worker/src/persistent_worker/pool.rs create mode 100644 nativelink-worker/src/persistent_worker/protocol.rs create mode 100644 web/platform/BUILD.bazel create mode 100755 web/platform/check.sh create mode 100644 web/platform/src/content/docs/docs/deployment-examples/persistent-workers.mdx diff --git a/Cargo.lock b/Cargo.lock index 1d0766e5e..1aa28ad37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3197,6 +3197,7 @@ dependencies = [ "relative-path", "scopeguard", "serde", + "serde_json", "serde_json5", "serial_test", "shlex", diff --git a/deployment-examples/persistent-workers/README.md b/deployment-examples/persistent-workers/README.md new file mode 100644 index 000000000..60d038051 --- /dev/null +++ b/deployment-examples/persistent-workers/README.md @@ -0,0 +1,21 @@ +# Persistent Worker Examples + +These examples show the NativeLink-facing part of Bazel actions that opt in to +remote persistent worker execution. + +Persistent workers are useful for JVM-heavy tools such as `javac`, `scalac`, and +`kotlinc`, and for worker wrappers around tools such as `tsc`. Compatible +actions declare worker support with execution requirements: + +```starlark +execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "proto", # or "json" +} +``` + +See: + +- [Java](./java/README.md) +- [TypeScript](./typescript/README.md) +- [Kotlin](./kotlin/README.md) diff --git a/deployment-examples/persistent-workers/java/README.md b/deployment-examples/persistent-workers/java/README.md new file mode 100644 index 000000000..0ea0fde20 --- /dev/null +++ b/deployment-examples/persistent-workers/java/README.md @@ -0,0 +1,28 @@ +# Java Persistent Worker Example + +This example shows the NativeLink-facing part of a Java compile action that can +run through the remote persistent worker path. The action must declare that the +tool supports Bazel's worker protocol and that it uses the proto wire format. + +```starlark +def _javac_worker_impl(ctx): + args = ctx.actions.args() + args.add("@%s" % ctx.outputs.argfile.path) + + ctx.actions.run( + executable = ctx.executable.javac_worker, + arguments = [args], + inputs = ctx.files.srcs + [ctx.outputs.argfile], + outputs = [ctx.outputs.jar], + mnemonic = "Javac", + execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "proto", + }, + ) +``` + +NativeLink computes the `WorkerKey` from the executable, startup flags before the +first `@argfile`, and `requires-worker-protocol`. The first action starts +`javac_worker --persistent_worker`; later compatible actions send `WorkRequest` +messages to the same process. diff --git a/deployment-examples/persistent-workers/kotlin/README.md b/deployment-examples/persistent-workers/kotlin/README.md new file mode 100644 index 000000000..90c2f8498 --- /dev/null +++ b/deployment-examples/persistent-workers/kotlin/README.md @@ -0,0 +1,25 @@ +# Kotlin Persistent Worker Example + +Kotlin compile actions have the same remote execution shape as Java: keep the JVM +compiler process warm and send per-action argument files through the worker protocol. + +```starlark +def _kotlinc_worker_impl(ctx): + args = ctx.actions.args() + args.add("@%s" % ctx.outputs.argfile.path) + + ctx.actions.run( + executable = ctx.executable.kotlinc_worker, + arguments = [args], + inputs = ctx.files.srcs + [ctx.outputs.argfile], + outputs = [ctx.outputs.jar], + mnemonic = "KotlinCompile", + execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "proto", + }, + ) +``` + +The worker process is started once with `--persistent_worker`; compatible Kotlin +compile actions reuse it until the pool idle timeout or request cap retires it. diff --git a/deployment-examples/persistent-workers/typescript/README.md b/deployment-examples/persistent-workers/typescript/README.md new file mode 100644 index 000000000..f2368a7d2 --- /dev/null +++ b/deployment-examples/persistent-workers/typescript/README.md @@ -0,0 +1,27 @@ +# TypeScript Persistent Worker Example + +TypeScript worker wrappers commonly use JSON worker protocol framing. The +important NativeLink requirement is that the remote action advertises both +worker support and the JSON protocol. + +```starlark +def _tsc_worker_impl(ctx): + args = ctx.actions.args() + args.add("@%s" % ctx.outputs.tsconfig.path) + + ctx.actions.run( + executable = ctx.executable.tsc_worker, + arguments = [args], + inputs = ctx.files.srcs + [ctx.outputs.tsconfig], + outputs = ctx.outputs.js, + mnemonic = "TypeScriptCompile", + execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "json", + }, + ) +``` + +The persistent worker should read one newline-delimited JSON `WorkRequest` from +`stdin` and emit one newline-delimited JSON `WorkResponse` to `stdout` for each +action. diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index c0f9a282e..0b06092ea 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -354,6 +354,20 @@ where let now = (self.now_fn)().now(); + // Honor the per-action `Action.timeout` from the RBE protocol as a + // backend wall-clock deadline. Without this, the only enforcement is + // the Bazel client's --test_timeout, which surfaces as TIMEOUT/NO + // STATUS instead of a backend signal pointing at the worker. + let action_timeout = awaited_action.action_info().timeout; + if action_timeout > Duration::ZERO { + let executing_started_at = awaited_action.state().last_transition_timestamp; + if let Ok(elapsed) = now.duration_since(executing_started_at) + && elapsed > action_timeout + { + return true; + } + } + let registry_alive = if let Some(ref worker_registry) = self.worker_registry { if let Some(worker_id) = awaited_action.worker_id() { worker_registry @@ -785,7 +799,37 @@ where ActionStage::Queued } } - UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued, + UpdateOperationType::UpdateWithDisconnect => { + // A worker disconnect (e.g. OOMKill, pod eviction, network + // drop) used to requeue without counting as an attempt, + // which let an action that always crashes its worker loop + // forever until the Bazel client's --test_timeout fired. + // Count disconnects as attempts so max_job_retries caps the + // loop and the client sees a backend-attributable error. + awaited_action.attempts += 1; + + if awaited_action.attempts > self.max_job_retries { + ActionStage::Completed(ActionResult { + execution_metadata: ExecutionMetadata { + worker: maybe_worker_id + .map_or_else(String::default, ToString::to_string), + ..ExecutionMetadata::default() + }, + error: Some(make_err!( + Code::Internal, + "Worker disconnected repeatedly while executing this action ({} > {} attempts); the runner likely OOMKilled or the pod was evicted. {}", + awaited_action.attempts, + self.max_job_retries, + format!( + "for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}" + ), + )), + ..ActionResult::default() + }) + } else { + ActionStage::Queued + } + } // We shouldn't get here, but we just ignore it if we do. UpdateOperationType::ExecutionComplete => { warn!("inner_update_operation got an ExecutionComplete, that's unexpected."); diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index 59364bf28..39c583d0a 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -2116,6 +2116,182 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> Ok(()) } +/// Worker crash-loop regression: an action whose worker keeps disconnecting +/// (e.g. `OOMKill`) used to bypass `max_job_retries` because +/// `UpdateWithDisconnect` requeued without counting as an attempt. The build +/// would only terminate when the Bazel client's `--test_timeout` fired, +/// hiding the cluster-side root cause behind a TIMEOUT/NO STATUS surface. +/// After the fix, disconnects count as attempts and exceed the cap. +#[nativelink_test] +async fn worker_disconnect_loop_caps_at_max_job_retries_test() -> Result<(), Error> { + let worker_id = WorkerId("worker_id".to_string()); + + let task_change_notify = Arc::new(Notify::new()); + let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( + &SimpleSpec { + max_job_retries: 1, + ..Default::default() + }, + memory_awaited_action_db_factory( + 0, + &task_change_notify.clone(), + MockInstantWrapped::default, + ), + || async move {}, + task_change_notify, + MockInstantWrapped::default, + None, + ); + let action_digest = DigestInfo::new([99u8; 32], 512); + + let mut rx_from_worker = + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; + let insert_timestamp = make_system_time(1); + let mut action_listener = + setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?; + + let operation_id = { + let operation_id = match rx_from_worker.recv().await.unwrap().update { + Some(update_for_worker::Update::StartAction(exec)) => exec.operation_id, + v => panic!("Expected StartAction, got : {v:?}"), + }; + assert_eq!( + action_listener.changed().await.unwrap().0.stage, + ActionStage::Executing + ); + OperationId::from(operation_id.as_str()) + }; + + // First disconnect: should requeue (attempts=1, not yet > max_job_retries=1). + drop( + scheduler + .update_action( + &worker_id, + &operation_id, + UpdateOperationType::UpdateWithDisconnect, + ) + .await, + ); + { + let (action_state, _maybe_origin_metadata) = action_listener.changed().await.unwrap(); + assert_eq!( + action_state.stage, + ActionStage::Queued, + "First disconnect should requeue, got: {:?}", + action_state.stage, + ); + } + + // Reattach worker so it picks up the requeued action. + let mut rx_from_worker = + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; + { + match rx_from_worker.recv().await.unwrap().update { + Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } + v => panic!("Expected StartAction, got : {v:?}"), + } + assert_eq!( + action_listener.changed().await.unwrap().0.stage, + ActionStage::Executing + ); + } + + // Second disconnect: now attempts=2 > max_job_retries=1, so the action + // must transition to Completed with an error mentioning the disconnect + // loop, not silently requeue. + drop( + scheduler + .update_action( + &worker_id, + &operation_id, + UpdateOperationType::UpdateWithDisconnect, + ) + .await, + ); + { + let (action_state, _maybe_origin_metadata) = action_listener.changed().await.unwrap(); + let ActionStage::Completed(action_result) = &action_state.stage else { + panic!( + "Second disconnect should mark action Completed-with-error, got: {:?}", + action_state.stage + ); + }; + let err = action_result + .error + .as_ref() + .expect("Completed action from disconnect cap must carry an error"); + assert!( + err.to_string() + .contains("Worker disconnected repeatedly while executing this action"), + "Error message did not mention disconnect loop: {err}", + ); + } + + Ok(()) +} + +/// `Action.timeout` from the RBE protocol must be enforced backend-side. +/// Without this, an action that hangs forever only terminates when the +/// Bazel client's `--remote_timeout` (gRPC deadline) or `--test_timeout` +/// (client-side) fires; from the operator's perspective the cluster never +/// surfaces the slow action. +#[nativelink_test] +async fn action_timeout_is_enforced_backend_side_test() -> Result<(), Error> { + use nativelink_scheduler::awaited_action_db::AwaitedAction; + use nativelink_scheduler::simple_scheduler_state_manager::SimpleSchedulerStateManager; + + // Anchor MockClock so MockInstantWrapped::now() == make_system_time(0). + MockClock::set_time(Duration::from_secs(NOW_TIME)); + let executing_started_at = make_system_time(0); + + let action_digest = DigestInfo::new([7u8; 32], 1); + let mut action_info = make_base_action_info(executing_started_at, action_digest); + Arc::make_mut(&mut action_info).timeout = Duration::from_secs(2); + + let operation_id = OperationId::default(); + let mut awaited_action = + AwaitedAction::new(operation_id.clone(), action_info, executing_started_at); + awaited_action.worker_set_state( + Arc::new(ActionState { + stage: ActionStage::Executing, + client_operation_id: operation_id, + action_digest, + last_transition_timestamp: executing_started_at, + }), + executing_started_at, + ); + + let task_change_notify = Arc::new(Notify::new()); + let state_mgr = SimpleSchedulerStateManager::new( + /* max_job_retries */ 1, + /* no_event_action_timeout */ Duration::from_secs(60), + /* client_action_timeout */ Duration::from_secs(60), + /* max_executing_timeout */ Duration::ZERO, + memory_awaited_action_db_factory( + 0, + &task_change_notify.clone(), + MockInstantWrapped::default, + ), + MockInstantWrapped::default, + /* worker_registry */ None, + ); + + assert!( + !state_mgr.should_timeout_operation(&awaited_action).await, + "Should not time out before Action.timeout elapses", + ); + + // Advance past the 2s per-action deadline. + MockClock::advance(Duration::from_secs(5)); + + assert!( + state_mgr.should_timeout_operation(&awaited_action).await, + "Scheduler must mark Executing action timed out once Action.timeout has elapsed", + ); + + Ok(()) +} + #[nativelink_test] async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> { struct DropChecker { diff --git a/nativelink-worker/BUILD.bazel b/nativelink-worker/BUILD.bazel index 04d6c034a..915e184c7 100644 --- a/nativelink-worker/BUILD.bazel +++ b/nativelink-worker/BUILD.bazel @@ -13,6 +13,10 @@ rust_library( "src/directory_cache.rs", "src/lib.rs", "src/local_worker.rs", + "src/persistent_worker/live_worker.rs", + "src/persistent_worker/mod.rs", + "src/persistent_worker/pool.rs", + "src/persistent_worker/protocol.rs", "src/running_actions_manager.rs", "src/worker_api_client_wrapper.rs", "src/worker_utils.rs", @@ -42,6 +46,7 @@ rust_library( "@crates//:relative-path", "@crates//:scopeguard", "@crates//:serde", + "@crates//:serde_json", "@crates//:serde_json5", "@crates//:shlex", "@crates//:tokio", @@ -113,6 +118,7 @@ rust_test( "@crates//:pretty_assertions", "@crates//:prost-types", "@crates//:rand", + "@crates//:serde_json", "@crates//:serial_test", "@crates//:tempfile", "@crates//:tracing-test", diff --git a/nativelink-worker/Cargo.toml b/nativelink-worker/Cargo.toml index 9b551b971..7385b320f 100644 --- a/nativelink-worker/Cargo.toml +++ b/nativelink-worker/Cargo.toml @@ -31,7 +31,10 @@ relative-path = { version = "2.0.0", default-features = false, features = [ "std", ] } scopeguard = { version = "1.2.0", default-features = false } -serde = { version = "1.0.219", default-features = false } +serde = { version = "1.0.219", default-features = false, features = ["derive"] } +serde_json = { version = "1.0.140", default-features = false, features = [ + "std", +] } serde_json5 = { version = "0.2.1", default-features = false } shlex = { version = "1.3.0", default-features = false } tokio = { version = "1.52.2", features = [ diff --git a/nativelink-worker/src/lib.rs b/nativelink-worker/src/lib.rs index 22aaa5981..abc9692e0 100644 --- a/nativelink-worker/src/lib.rs +++ b/nativelink-worker/src/lib.rs @@ -16,6 +16,7 @@ pub mod directory_cache; pub mod local_worker; #[cfg(target_os = "linux")] pub mod namespace_utils; +pub mod persistent_worker; pub mod running_actions_manager; pub mod worker_api_client_wrapper; pub mod worker_utils; diff --git a/nativelink-worker/src/persistent_worker/live_worker.rs b/nativelink-worker/src/persistent_worker/live_worker.rs new file mode 100644 index 000000000..c2d2b51e8 --- /dev/null +++ b/nativelink-worker/src/persistent_worker/live_worker.rs @@ -0,0 +1,420 @@ +// Copyright 2024 Trace Machina, Inc. All rights reserved. +// +// Licensed under the Business Source License, Version 1.1 (the "License"); +// you may not use this file except in compliance with the License. +// You may requested a copy of the License by emailing contact@nativelink.com. +// +// Use of this module requires an enterprise license agreement, which can be +// attained by emailing contact@nativelink.com or signing up for Nativelink +// Cloud at app.nativelink.com. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! `LiveWorker`: one long-lived persistent-worker child process. +//! +//! A `LiveWorker` wraps a child process started with the action's tool + +//! startup-flag prefix + `--persistent_worker`. It owns the child's stdin/stdout +//! and exposes `dispatch(request) -> response` for the pool to invoke. +//! +//! Lifecycle: +//! - `spawn` → ready, never-used +//! - `dispatch` flips an in-flight bool, writes the request, reads the response, +//! updates `last_used` and `request_count` +//! - `drop` sends SIGTERM, waits a grace period, then SIGKILL +//! +//! v1 is single-request-per-worker (no multiplex). The pool serializes +//! concurrent acquires of the same worker via its data structure. + +use core::time::Duration; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use std::time::Instant; + +use bytes::BytesMut; +use nativelink_error::{Code, Error, ResultExt, make_err}; +use prost::Message as ProstMessage; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use tracing::{debug, warn}; + +use super::protocol::{WireFormat, WorkRequest, WorkResponse}; + +/// Default time we wait for a single `WorkResponse` after writing a request. +/// Beyond this we kill the worker and surface a `DeadlineExceeded` error. +/// Callers may override via `LiveWorker::dispatch_with_timeout`. +const DEFAULT_DISPATCH_TIMEOUT: Duration = Duration::from_secs(60 * 10); + +/// One persistent-worker child process. +#[derive(Debug)] +pub struct LiveWorker { + /// Spawned child process. Kept on the struct so its Drop kills the process + /// when the worker is dropped without `shutdown` being called. + child: Child, + stdin: ChildStdin, + stdout: BufReader, + /// Wire format we negotiated at spawn (immutable for this worker's lifetime). + wire_format: WireFormat, + /// Number of successful `dispatch` calls completed. + request_count: u64, + /// When the last `dispatch` finished. Used by the pool's idle eviction. + last_used: Instant, + /// Working directory the child runs in. Stored so the pool can reuse it + /// across requests for the same `WorkerKey` until per-request sandboxing + /// arrives in v2. + working_dir: PathBuf, +} + +impl LiveWorker { + /// Spawn a new persistent worker. + /// + /// `executable` is the resolved tool path (absolute, or relative to + /// `working_dir`). `startup_args` are the flags that the worker is launched + /// with once — these become part of the `WorkerKey`. The Bazel-conventional + /// `--persistent_worker` flag is appended automatically. + pub async fn spawn( + executable: &Path, + startup_args: &[String], + wire_format: WireFormat, + working_dir: &Path, + ) -> Result { + let mut cmd = Command::new(executable); + cmd.args(startup_args) + .arg("--persistent_worker") + .current_dir(working_dir) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + // Worker stderr is forwarded to ours; Bazel convention puts diagnostic + // logs there. We do not capture it on the rust side in v1. + .stderr(Stdio::inherit()) + .kill_on_drop(true); + + debug!( + ?executable, + ?startup_args, + ?wire_format, + "Spawning persistent worker" + ); + + let mut child = cmd.spawn().err_tip(|| { + format!( + "Spawning persistent worker {} with args {startup_args:?}", + executable.display() + ) + })?; + let stdin = child + .stdin + .take() + .ok_or_else(|| make_err!(Code::Internal, "Persistent worker child has no stdin"))?; + let stdout = child + .stdout + .take() + .ok_or_else(|| make_err!(Code::Internal, "Persistent worker child has no stdout"))?; + + Ok(Self { + child, + stdin, + stdout: BufReader::new(stdout), + wire_format, + request_count: 0, + last_used: Instant::now(), + working_dir: working_dir.to_path_buf(), + }) + } + + pub const fn wire_format(&self) -> WireFormat { + self.wire_format + } + + pub const fn request_count(&self) -> u64 { + self.request_count + } + + pub const fn last_used(&self) -> Instant { + self.last_used + } + + pub fn working_dir(&self) -> &Path { + &self.working_dir + } + + /// Returns true if the child process has already exited. + pub fn is_dead(&mut self) -> bool { + matches!(self.child.try_wait(), Ok(Some(_)) | Err(_)) + } + + /// Send a single `WorkRequest` and read the matching `WorkResponse`. On + /// any I/O error or response framing error, the worker is considered + /// dead — the caller (pool) must not return it to the idle set. + pub async fn dispatch(&mut self, request: &WorkRequest) -> Result { + self.dispatch_with_timeout(request, DEFAULT_DISPATCH_TIMEOUT) + .await + } + + pub async fn dispatch_with_timeout( + &mut self, + request: &WorkRequest, + timeout: Duration, + ) -> Result { + if request.request_id != 0 { + return Err(make_err!( + Code::InvalidArgument, + "v1 persistent workers do not support multiplex; request_id must be 0, got {}", + request.request_id + )); + } + + let bytes = request.encode_framed(self.wire_format)?; + self.stdin + .write_all(&bytes) + .await + .err_tip(|| "Writing WorkRequest to persistent worker stdin")?; + self.stdin + .flush() + .await + .err_tip(|| "Flushing persistent worker stdin")?; + + let read_fut = read_response(&mut self.stdout, self.wire_format); + let response = if let Ok(result) = tokio::time::timeout(timeout, read_fut).await { + result? + } else { + warn!( + ?timeout, + "Persistent worker did not respond before deadline; killing child" + ); + drop(self.child.kill().await); + return Err(make_err!( + Code::DeadlineExceeded, + "Persistent worker did not respond within {timeout:?}" + )); + }; + + if response.request_id != 0 { + // v1 contract: workers MUST echo request_id 0. If a tool ever + // produces a multiplex-style id we don't know what to do with the + // response — fail conservatively rather than misroute output. + return Err(make_err!( + Code::Internal, + "Persistent worker returned non-zero request_id={}; v1 does not support multiplex", + response.request_id + )); + } + + self.request_count += 1; + self.last_used = Instant::now(); + Ok(response) + } + + /// Gracefully drain: close stdin so the worker observes EOF and exits on + /// its own, then wait up to `grace`, then SIGKILL if still alive. Always + /// consumes the worker. + pub async fn shutdown(mut self, grace: Duration) { + // Dropping stdin closes the pipe, which most well-behaved workers + // interpret as "no more work, please exit". + drop(self.stdin); + + match tokio::time::timeout(grace, self.child.wait()).await { + Ok(Ok(status)) => debug!(?status, "Persistent worker exited cleanly"), + Ok(Err(err)) => warn!(?err, "Error waiting for persistent worker exit"), + Err(_) => { + warn!( + ?grace, + "Persistent worker did not exit within grace; SIGKILLing" + ); + drop(self.child.kill().await); + } + } + } +} + +/// Read one framed `WorkResponse` from the worker's stdout. +async fn read_response( + stdout: &mut BufReader, + format: WireFormat, +) -> Result { + match format { + WireFormat::Proto => { + // Length-delimited varint frame. Decode the varint manually because + // we want to read exactly the right number of bytes without buffering + // arbitrary data from the worker's pipe. + let len = read_varint(stdout).await?; + if len > 64 * 1024 * 1024 { + return Err(make_err!( + Code::OutOfRange, + "Persistent worker WorkResponse length {len} exceeds 64 MiB cap" + )); + } + let mut buf = BytesMut::with_capacity(len); + buf.resize(len, 0); + stdout + .read_exact(&mut buf) + .await + .err_tip(|| "Reading WorkResponse proto body from persistent worker")?; + ::decode(&buf[..]) + .map_err(|e| make_err!(Code::Internal, "Decoding WorkResponse proto body: {e}")) + } + WireFormat::Json => { + // Newline-delimited JSON. + let mut line = Vec::with_capacity(256); + loop { + let mut byte = [0u8; 1]; + let n = stdout + .read(&mut byte) + .await + .err_tip(|| "Reading WorkResponse JSON byte from persistent worker")?; + if n == 0 { + return Err(make_err!( + Code::Aborted, + "Persistent worker closed stdout before sending a full JSON response" + )); + } + if byte[0] == b'\n' { + if line.is_empty() { + continue; // tolerate blank lines between responses + } + break; + } + line.push(byte[0]); + if line.len() > 64 * 1024 * 1024 { + return Err(make_err!( + Code::OutOfRange, + "Persistent worker WorkResponse JSON line exceeds 64 MiB cap" + )); + } + } + WorkResponse::decode_framed(&line, WireFormat::Json) + } + } +} + +/// Read a protobuf varint from the reader. Reused for the length prefix of a +/// length-delimited frame. +async fn read_varint(r: &mut BufReader) -> Result { + let mut result: u64 = 0; + for shift in (0..64).step_by(7) { + let mut byte = [0u8; 1]; + let n = r + .read(&mut byte) + .await + .err_tip(|| "Reading varint byte from persistent worker stdout")?; + if n == 0 { + return Err(make_err!( + Code::Aborted, + "Persistent worker closed stdout while reading varint" + )); + } + result |= u64::from(byte[0] & 0x7f) << shift; + if byte[0] & 0x80 == 0 { + return usize::try_from(result).map_err(|_| { + make_err!( + Code::OutOfRange, + "Varint length {result} does not fit in usize" + ) + }); + } + } + Err(make_err!( + Code::OutOfRange, + "Varint did not terminate within 10 bytes" + )) +} + +#[cfg(test)] +mod tests { + use nativelink_macro::nativelink_test; + + use super::*; + + fn echo_script(working_dir: &Path, body: &str) -> PathBuf { + let path = working_dir.join("worker.sh"); + std::fs::write(&path, body).unwrap(); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755)).unwrap(); + } + path + } + + #[nativelink_test] + async fn shutdown_kills_unresponsive_worker() { + // A worker that never reads/writes — shutdown grace expires, we SIGKILL. + let dir = tempfile::tempdir().unwrap(); + let script = echo_script(dir.path(), "#!/bin/sh\nexec sleep 30\n"); + let worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) + .await + .unwrap(); + let start = Instant::now(); + worker.shutdown(Duration::from_millis(100)).await; + // SIGKILL should arrive well within a second. + assert!(start.elapsed() < Duration::from_secs(2)); + } + + #[nativelink_test] + async fn dispatch_json_round_trip() { + // A worker scripted to read one JSON line and echo a canned response. + let dir = tempfile::tempdir().unwrap(); + let script = echo_script( + dir.path(), + // Read one line, ignore it, emit a fixed response. Newline-terminated. + "#!/bin/sh\nread line\necho '{\"exitCode\":0,\"output\":\"ok\"}'\n", + ); + let mut worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) + .await + .unwrap(); + + let req = WorkRequest { + arguments: vec!["compile".into()], + ..WorkRequest::default() + }; + let resp = worker.dispatch(&req).await.unwrap(); + assert_eq!(resp.exit_code, 0); + assert_eq!(resp.output, "ok"); + assert_eq!(worker.request_count(), 1); + + worker.shutdown(Duration::from_secs(1)).await; + } + + #[nativelink_test] + async fn dispatch_timeout_kills_worker() { + let dir = tempfile::tempdir().unwrap(); + // Worker reads, then sleeps forever instead of responding. + let script = echo_script(dir.path(), "#!/bin/sh\nread line\nexec sleep 60\n"); + let mut worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) + .await + .unwrap(); + + let req = WorkRequest { + arguments: vec!["compile".into()], + ..WorkRequest::default() + }; + let result = worker + .dispatch_with_timeout(&req, Duration::from_millis(100)) + .await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().code, Code::DeadlineExceeded); + assert!(worker.is_dead()); + } + + #[nativelink_test] + async fn rejects_multiplex_request_id() { + let dir = tempfile::tempdir().unwrap(); + let script = echo_script( + dir.path(), + "#!/bin/sh\nread line\necho '{\"exitCode\":0}'\n", + ); + let mut worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) + .await + .unwrap(); + let req = WorkRequest { + request_id: 7, + ..WorkRequest::default() + }; + let err = worker.dispatch(&req).await.unwrap_err(); + assert_eq!(err.code, Code::InvalidArgument); + worker.shutdown(Duration::from_secs(1)).await; + } +} diff --git a/nativelink-worker/src/persistent_worker/mod.rs b/nativelink-worker/src/persistent_worker/mod.rs new file mode 100644 index 000000000..0e669bd84 --- /dev/null +++ b/nativelink-worker/src/persistent_worker/mod.rs @@ -0,0 +1,36 @@ +// Copyright 2024 Trace Machina, Inc. All rights reserved. +// +// Licensed under the Business Source License, Version 1.1 (the "License"); +// you may not use this file except in compliance with the License. +// You may requested a copy of the License by emailing contact@nativelink.com. +// +// Use of this module requires an enterprise license agreement, which can be +// attained by emailing contact@nativelink.com or signing up for Nativelink +// Cloud at app.nativelink.com. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Bazel persistent worker protocol support for `NativeLink` workers. +//! +//! Public surface is intentionally small: +//! - [`WireFormat`] and the [`WorkRequest`]/[`WorkResponse`] types are the wire. +//! - [`PersistentWorkerPool`] is what `running_actions_manager` interacts with. +//! - [`WorkerKey`] is the equivalence class for worker reuse. +//! +//! See `docs/persistent-workers.md` for the design and §10 decisions that +//! shape this implementation. + +pub mod live_worker; +pub mod pool; +pub mod protocol; + +pub use live_worker::LiveWorker; +pub use pool::{ + DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_REQUESTS_PER_WORKER, DEFAULT_MAX_WORKERS_PER_KEY, Lease, + PersistentWorkerPool, PoolConfig, WorkerKey, +}; +pub use protocol::{Input, WireFormat, WorkRequest, WorkResponse}; diff --git a/nativelink-worker/src/persistent_worker/pool.rs b/nativelink-worker/src/persistent_worker/pool.rs new file mode 100644 index 000000000..684c9b111 --- /dev/null +++ b/nativelink-worker/src/persistent_worker/pool.rs @@ -0,0 +1,425 @@ +// Copyright 2024 Trace Machina, Inc. All rights reserved. +// +// Licensed under the Business Source License, Version 1.1 (the "License"); +// you may not use this file except in compliance with the License. +// You may requested a copy of the License by emailing contact@nativelink.com. +// +// Use of this module requires an enterprise license agreement, which can be +// attained by emailing contact@nativelink.com or signing up for Nativelink +// Cloud at app.nativelink.com. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! `PersistentWorkerPool`: per-`LocalWorker` pool of `LiveWorker`s keyed by +//! `WorkerKey`. Mirrors Bazel's notion of `WorkerKey` — two actions whose +//! executable + startup-flag prefix + wire format are identical share a worker +//! process. + +use core::mem; +use core::time::Duration; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use nativelink_error::{Code, Error, ResultExt, make_err}; +use nativelink_util::background_spawn; +use parking_lot::Mutex; +use tracing::{debug, info, warn}; + +use super::live_worker::LiveWorker; +use super::protocol::WireFormat; + +/// Default per-key bound on live workers. Matches Bazel's `worker_max_instances` +/// default and keeps memory pressure bounded without operator intervention. +pub const DEFAULT_MAX_WORKERS_PER_KEY: usize = 4; + +/// Default idle timeout. A worker that has not handled a request for this +/// duration is shut down by the background sweeper. +pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60 * 5); + +/// Default per-worker request cap. After this many `dispatch` calls a worker is +/// dropped on `release` (helps recycle accumulated JVM state). +pub const DEFAULT_MAX_REQUESTS_PER_WORKER: u64 = 200; + +/// Identity by which two persistent-worker actions are considered compatible. +/// Same `WorkerKey` => same worker can serve both. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct WorkerKey { + /// Resolved tool executable. Mirrors `Action.arguments[0]`. + pub executable: PathBuf, + /// Leading arguments that become tool startup flags. Computed as the prefix + /// of `Action.arguments[1..]` up to (excluding) the first arg starting + /// with `@` (Bazel's response-file convention). + pub startup_args: Vec, + /// Wire format from the action's `requires-worker-protocol`. + pub wire_format: WireFormat, +} + +impl WorkerKey { + /// Derive a `WorkerKey` from a full action `argv` and the negotiated wire + /// format. + /// + /// `argv[0]` is the executable. Subsequent arguments are scanned + /// left-to-right and treated as startup flags until the first + /// `@`-prefixed argument (Bazel's response file), at which point the rest + /// are considered per-request inputs and excluded from the key. + pub fn from_argv(argv: &[String], wire_format: WireFormat) -> Result { + let (exe, rest) = argv.split_first().ok_or_else(|| { + make_err!( + Code::InvalidArgument, + "Cannot derive WorkerKey from empty argument list" + ) + })?; + let startup_args: Vec = rest + .iter() + .take_while(|a| !a.starts_with('@')) + .cloned() + .collect(); + Ok(Self { + executable: PathBuf::from(exe), + startup_args, + wire_format, + }) + } +} + +/// Configuration knobs for a `PersistentWorkerPool`. Defaults are sensible for +/// a JVM-heavy workload; operators can override in `nativelink-config`. +#[derive(Clone, Copy, Debug)] +pub struct PoolConfig { + pub max_workers_per_key: usize, + pub idle_timeout: Duration, + pub max_requests_per_worker: u64, + /// SIGKILL grace when shutting a worker down. + pub shutdown_grace: Duration, +} + +impl Default for PoolConfig { + fn default() -> Self { + Self { + max_workers_per_key: DEFAULT_MAX_WORKERS_PER_KEY, + idle_timeout: DEFAULT_IDLE_TIMEOUT, + max_requests_per_worker: DEFAULT_MAX_REQUESTS_PER_WORKER, + shutdown_grace: Duration::from_secs(5), + } + } +} + +/// Reservation handle returned by `acquire`. Holding it removes the worker +/// from the pool's idle set; `release` returns it (unless the request marked +/// it as dead). Drop-without-release is treated as "worker is dead": it is +/// shut down rather than re-pooled. +#[derive(Debug)] +pub struct Lease { + inner: Option, +} + +#[derive(Debug)] +struct LeaseInner { + worker: LiveWorker, + pool: Arc, + key: WorkerKey, +} + +impl Lease { + pub const fn worker(&mut self) -> &mut LiveWorker { + &mut self + .inner + .as_mut() + .expect("lease used after release") + .worker + } + + /// Return the worker to the pool. Caller signals whether the worker is + /// still healthy. Unhealthy workers are shut down asynchronously and not + /// returned to the idle set. + pub async fn release(mut self, healthy: bool) { + let Some(inner) = self.inner.take() else { + return; + }; + inner + .pool + .return_worker(inner.key, inner.worker, healthy) + .await; + } +} + +impl Drop for Lease { + fn drop(&mut self) { + // If the caller forgot to call `release`, treat the worker as + // unhealthy and shut it down on a detached task. + if let Some(inner) = self.inner.take() { + let pool = inner.pool.clone(); + let key = inner.key.clone(); + let worker = inner.worker; + let grace = pool.config.shutdown_grace; + background_spawn!("persistent_worker_lease_drop", async move { + warn!( + ?key, + "Lease dropped without release; treating worker as unhealthy" + ); + worker.shutdown(grace).await; + pool.decrement_count(&key); + }); + } + } +} + +/// Per-key pool state. Wrapped in a `Mutex` because `acquire` and `release` +/// run on different tokio tasks. +#[derive(Debug)] +struct KeyState { + /// Idle workers ready to serve. Always size <= `total_count`. + idle: Vec, + /// Total live workers for this key (idle + leased). Used to enforce + /// `max_workers_per_key`. + total_count: usize, +} + +impl KeyState { + const fn new() -> Self { + Self { + idle: Vec::new(), + total_count: 0, + } + } +} + +#[derive(Debug)] +struct PoolInner { + config: PoolConfig, + state: Mutex>, +} + +impl PoolInner { + /// Helper used by Drop on a forgotten Lease. + fn decrement_count(&self, key: &WorkerKey) { + let mut state = self.state.lock(); + if let Some(s) = state.get_mut(key) { + s.total_count = s.total_count.saturating_sub(1); + } + } + + async fn return_worker(self: Arc, key: WorkerKey, worker: LiveWorker, healthy: bool) { + let exceeded_cap = worker.request_count() >= self.config.max_requests_per_worker; + if !healthy || exceeded_cap { + debug!( + ?key, + request_count = worker.request_count(), + healthy, + "Retiring persistent worker" + ); + worker.shutdown(self.config.shutdown_grace).await; + self.decrement_count(&key); + return; + } + let mut state = self.state.lock(); + let entry = state.entry(key).or_insert_with(KeyState::new); + entry.idle.push(worker); + } +} + +/// The pool itself. +#[derive(Clone, Debug)] +pub struct PersistentWorkerPool { + inner: Arc, +} + +impl Default for PersistentWorkerPool { + fn default() -> Self { + Self::new(PoolConfig::default()) + } +} + +impl PersistentWorkerPool { + pub fn new(config: PoolConfig) -> Self { + Self { + inner: Arc::new(PoolInner { + config, + state: Mutex::new(HashMap::new()), + }), + } + } + + /// Acquire a worker for `key`. If an idle worker is available it is + /// returned. Otherwise a new one is spawned, capped at + /// `max_workers_per_key`. If the cap is reached and no idle worker exists, + /// returns `Err(Code::ResourceExhausted)` — the caller should fall back to + /// the one-shot subprocess path or wait and retry. + /// + /// `working_dir` is the action's execution directory. v1 reuses it across + /// requests for the same `WorkerKey`; v2 will introduce per-request + /// sandboxing. + pub async fn acquire( + &self, + key: WorkerKey, + executable_path: &Path, + working_dir: &Path, + ) -> Result { + // First, try to take an idle worker without blocking on spawn. + { + let mut state = self.inner.state.lock(); + let entry = state.entry(key.clone()).or_insert_with(KeyState::new); + while let Some(mut worker) = entry.idle.pop() { + if worker.is_dead() { + // Process died while idle (e.g. JVM OOM). Drop it. + entry.total_count = entry.total_count.saturating_sub(1); + continue; + } + return Ok(Lease { + inner: Some(LeaseInner { + worker, + pool: self.inner.clone(), + key, + }), + }); + } + if entry.total_count >= self.inner.config.max_workers_per_key { + return Err(make_err!( + Code::ResourceExhausted, + "Persistent worker pool for {key:?} is at capacity ({} workers)", + self.inner.config.max_workers_per_key + )); + } + // Reserve a slot before releasing the lock so a concurrent + // acquire can't over-commit. + entry.total_count += 1; + } + + // Lock released; spawn outside the critical section. + let spawn_result = LiveWorker::spawn( + executable_path, + &key.startup_args, + key.wire_format, + working_dir, + ) + .await; + let worker = match spawn_result { + Ok(w) => w, + Err(err) => { + // Roll back our reservation on spawn failure. + self.inner.decrement_count(&key); + return Err(err).err_tip(|| format!("Spawning persistent worker for {key:?}")); + } + }; + info!(?key, "Spawned new persistent worker"); + Ok(Lease { + inner: Some(LeaseInner { + worker, + pool: self.inner.clone(), + key, + }), + }) + } + + /// Drop all idle workers whose `last_used` is older than `idle_timeout`. + /// Intended to be called periodically from a background task. + pub async fn sweep_idle(&self) { + let threshold = std::time::Instant::now() + .checked_sub(self.inner.config.idle_timeout) + .unwrap_or_else(std::time::Instant::now); + let evicted: Vec = { + let mut state = self.inner.state.lock(); + let mut evicted = Vec::new(); + for entry in state.values_mut() { + let (keep, drop_): (Vec<_>, Vec<_>) = mem::take(&mut entry.idle) + .into_iter() + .partition(|w| w.last_used() >= threshold); + entry.total_count = entry.total_count.saturating_sub(drop_.len()); + entry.idle = keep; + evicted.extend(drop_); + } + evicted + }; + for w in evicted { + w.shutdown(self.inner.config.shutdown_grace).await; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn worker_key_excludes_argfile_and_later_args() { + let argv: Vec = ["javac", "-source", "21", "@args.txt", "Foo.java"] + .iter() + .map(|s| (*s).to_string()) + .collect(); + let key = WorkerKey::from_argv(&argv, WireFormat::Proto).unwrap(); + assert_eq!(key.executable, PathBuf::from("javac")); + assert_eq!(key.startup_args, vec!["-source".to_string(), "21".into()]); + } + + #[test] + fn worker_key_handles_empty_startup_args() { + let argv: Vec = ["javac", "@args.txt"] + .iter() + .map(|s| (*s).to_string()) + .collect(); + let key = WorkerKey::from_argv(&argv, WireFormat::Json).unwrap(); + assert!(key.startup_args.is_empty()); + assert_eq!(key.wire_format, WireFormat::Json); + } + + #[test] + fn worker_key_handles_no_argfile() { + // No @argfile means *all* trailing args are startup flags. Unusual but + // legal — the action just doesn't use a response file. + let argv: Vec = ["scalac", "-deprecation", "-Xfatal-warnings"] + .iter() + .map(|s| (*s).to_string()) + .collect(); + let key = WorkerKey::from_argv(&argv, WireFormat::Proto).unwrap(); + assert_eq!(key.startup_args.len(), 2); + } + + #[test] + fn worker_key_rejects_empty_argv() { + let argv: Vec = Vec::new(); + assert!(WorkerKey::from_argv(&argv, WireFormat::Proto).is_err()); + } + + #[test] + fn worker_key_equality_collapses_compatible_actions() { + let a = WorkerKey::from_argv( + &["javac", "-source", "21", "@a"].map(String::from), + WireFormat::Proto, + ) + .unwrap(); + let b = WorkerKey::from_argv( + &["javac", "-source", "21", "@b"].map(String::from), + WireFormat::Proto, + ) + .unwrap(); + assert_eq!(a, b); + } + + #[test] + fn worker_key_distinguishes_different_startup_args() { + let a = WorkerKey::from_argv( + &["javac", "-source", "21", "@a"].map(String::from), + WireFormat::Proto, + ) + .unwrap(); + let b = WorkerKey::from_argv( + &["javac", "-source", "17", "@a"].map(String::from), + WireFormat::Proto, + ) + .unwrap(); + assert_ne!(a, b); + } + + #[test] + fn worker_key_distinguishes_wire_formats() { + let a = + WorkerKey::from_argv(&["javac", "@a"].map(String::from), WireFormat::Proto).unwrap(); + let b = WorkerKey::from_argv(&["javac", "@a"].map(String::from), WireFormat::Json).unwrap(); + assert_ne!(a, b); + } +} diff --git a/nativelink-worker/src/persistent_worker/protocol.rs b/nativelink-worker/src/persistent_worker/protocol.rs new file mode 100644 index 000000000..a284784f5 --- /dev/null +++ b/nativelink-worker/src/persistent_worker/protocol.rs @@ -0,0 +1,325 @@ +// Copyright 2024 Trace Machina, Inc. All rights reserved. +// +// Licensed under the Business Source License, Version 1.1 (the "License"); +// you may not use this file except in compliance with the License. +// You may requested a copy of the License by emailing contact@nativelink.com. +// +// Use of this module requires an enterprise license agreement, which can be +// attained by emailing contact@nativelink.com or signing up for Nativelink +// Cloud at app.nativelink.com. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Wire-format types for Bazel's persistent worker protocol. +//! +//! Reference: +//! and . +//! +//! Both JSON (newline-delimited, camelCase field names) and proto (length-delimited +//! varint prefix) wire formats are supported. The choice is per-request, driven by +//! the action's `requires-worker-protocol` execution requirement. + +use bytes::{Bytes, BytesMut}; +use nativelink_error::{Code, Error, make_err}; +use prost::Message as ProstMessage; +use serde::{Deserialize, Serialize}; + +/// One input file declared in a `WorkRequest`. The tool may use the digest to +/// verify cached state matches. +#[derive(Clone, PartialEq, Eq, ProstMessage, Serialize, Deserialize)] +pub struct Input { + #[prost(string, tag = "1")] + #[serde(default, skip_serializing_if = "String::is_empty")] + pub path: String, + + #[prost(bytes = "vec", tag = "2")] + #[serde( + default, + skip_serializing_if = "Vec::is_empty", + with = "serde_bytes_b64" + )] + pub digest: Vec, +} + +/// A single unit of work dispatched to a persistent worker subprocess. +#[derive(Clone, PartialEq, Eq, ProstMessage, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkRequest { + #[prost(string, repeated, tag = "1")] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub arguments: Vec, + + #[prost(message, repeated, tag = "2")] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub inputs: Vec, + + /// Multiplex worker request id. v1 always sends 0 and rejects responses + /// with non-zero ids. + #[prost(int32, tag = "3")] + #[serde(default)] + pub request_id: i32, + + #[prost(bool, tag = "4")] + #[serde(default)] + pub cancel: bool, + + #[prost(int32, tag = "5")] + #[serde(default)] + pub verbosity: i32, + + #[prost(string, tag = "6")] + #[serde(default, skip_serializing_if = "String::is_empty")] + pub sandbox_dir: String, +} + +/// A single response emitted by a persistent worker subprocess. +#[derive(Clone, PartialEq, Eq, ProstMessage, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkResponse { + #[prost(int32, tag = "1")] + #[serde(default)] + pub exit_code: i32, + + #[prost(string, tag = "2")] + #[serde(default, skip_serializing_if = "String::is_empty")] + pub output: String, + + /// Echoed from `WorkRequest.request_id`. v1 expects 0. + #[prost(int32, tag = "3")] + #[serde(default)] + pub request_id: i32, + + #[prost(bool, tag = "4")] + #[serde(default)] + pub was_cancelled: bool, +} + +/// Wire format negotiated per request from the action's +/// `requires-worker-protocol` execution requirement. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum WireFormat { + /// Length-delimited proto (Bazel default). + Proto, + /// Newline-delimited JSON, camelCase fields. + Json, +} + +impl WireFormat { + /// Parse the execution-requirement value. Bazel accepts only "proto" and + /// "json"; anything else is an error. + pub fn parse(s: &str) -> Result { + match s { + "proto" => Ok(Self::Proto), + "json" => Ok(Self::Json), + other => Err(make_err!( + Code::InvalidArgument, + "Unsupported requires-worker-protocol value: {other:?}; expected 'proto' or 'json'" + )), + } + } +} + +impl WorkRequest { + /// Serialize this request in the given wire format. The returned bytes + /// include the framing prefix/suffix appropriate for the format + /// (length-delimited varint for proto, trailing newline for JSON). + pub fn encode_framed(&self, format: WireFormat) -> Result { + match format { + WireFormat::Proto => { + let mut buf = + BytesMut::with_capacity(::encoded_len(self) + 10); + ::encode_length_delimited(self, &mut buf) + .map_err(|e| make_err!(Code::Internal, "WorkRequest proto encode: {e}"))?; + Ok(buf.freeze()) + } + WireFormat::Json => { + let mut s = serde_json::to_string(self) + .map_err(|e| make_err!(Code::Internal, "WorkRequest JSON encode: {e}"))?; + s.push('\n'); + Ok(Bytes::from(s)) + } + } + } +} + +impl WorkResponse { + /// Decode a single response from a byte buffer. For proto: expects a + /// length-delimited varint frame at the buffer start. For JSON: expects a + /// single complete JSON object, optionally with a trailing newline. + pub fn decode_framed(buf: &[u8], format: WireFormat) -> Result { + match format { + WireFormat::Proto => ::decode_length_delimited(buf) + .map_err(|e| make_err!(Code::Internal, "WorkResponse proto decode: {e}")), + WireFormat::Json => serde_json::from_slice(buf) + .map_err(|e| make_err!(Code::Internal, "WorkResponse JSON decode: {e}")), + } + } +} + +/// Serde adapter for `Input.digest`: Bazel's JSON serialization base64-encodes +/// the digest bytes (as is standard for proto-JSON byte fields). +mod serde_bytes_b64 { + use serde::{Deserialize, Deserializer, Serializer}; + + // Minimal RFC 4648 base64 encoder/decoder so we don't pull in a `base64` + // crate dep just for this adapter. v1 expects digests under ~64 bytes. + const ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + fn encode(input: &[u8]) -> String { + let mut out = String::with_capacity(input.len().div_ceil(3) * 4); + for chunk in input.chunks(3) { + let b0 = chunk[0]; + let b1 = chunk.get(1).copied().unwrap_or(0); + let b2 = chunk.get(2).copied().unwrap_or(0); + out.push(ALPHA[(b0 >> 2) as usize] as char); + out.push(ALPHA[(((b0 & 0x03) << 4) | (b1 >> 4)) as usize] as char); + if chunk.len() > 1 { + out.push(ALPHA[(((b1 & 0x0f) << 2) | (b2 >> 6)) as usize] as char); + } else { + out.push('='); + } + if chunk.len() > 2 { + out.push(ALPHA[(b2 & 0x3f) as usize] as char); + } else { + out.push('='); + } + } + out + } + + fn decode(s: &str) -> Result, &'static str> { + let mut lookup = [255u8; 256]; + for (i, b) in ALPHA.iter().enumerate() { + lookup[*b as usize] = u8::try_from(i).expect("base64 alphabet index fits in u8"); + } + let bytes = s.as_bytes(); + if !bytes.len().is_multiple_of(4) { + return Err("base64 input length not multiple of 4"); + } + let mut out = Vec::with_capacity(bytes.len() / 4 * 3); + for chunk in bytes.chunks(4) { + let v0 = lookup[chunk[0] as usize]; + let v1 = lookup[chunk[1] as usize]; + let v2 = lookup[chunk[2] as usize]; + let v3 = lookup[chunk[3] as usize]; + if v0 == 255 || v1 == 255 { + return Err("invalid base64 char"); + } + out.push((v0 << 2) | (v1 >> 4)); + if chunk[2] != b'=' { + if v2 == 255 { + return Err("invalid base64 char"); + } + out.push((v1 << 4) | (v2 >> 2)); + } + if chunk[3] != b'=' { + if v3 == 255 { + return Err("invalid base64 char"); + } + out.push((v2 << 6) | v3); + } + } + Ok(out) + } + + pub(super) fn serialize(bytes: &[u8], s: S) -> Result { + s.serialize_str(&encode(bytes)) + } + + pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result, D::Error> { + let s = String::deserialize(d)?; + decode(&s).map_err(serde::de::Error::custom) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_wire_format() { + assert_eq!(WireFormat::parse("proto").unwrap(), WireFormat::Proto); + assert_eq!(WireFormat::parse("json").unwrap(), WireFormat::Json); + assert!(WireFormat::parse("xml").is_err()); + } + + #[test] + fn proto_round_trip_minimal() { + let req = WorkRequest { + arguments: vec!["-source".into(), "21".into(), "Foo.java".into()], + ..WorkRequest::default() + }; + let bytes = req.encode_framed(WireFormat::Proto).unwrap(); + // Encoding is length-delimited; round-trip via prost's decoder. + let mut cursor: &[u8] = &bytes; + let decoded = ::decode_length_delimited(&mut cursor).unwrap(); + assert_eq!(decoded, req); + } + + #[test] + fn json_round_trip_uses_camel_case() { + let resp = WorkResponse { + exit_code: 0, + output: "compiled Foo.java".into(), + request_id: 0, + was_cancelled: false, + }; + let bytes = serde_json::to_vec(&resp).unwrap(); + let s = core::str::from_utf8(&bytes).unwrap(); + // Bazel's JSON convention is camelCase; assert the wire form. + assert!(s.contains(r#""exitCode":0"#), "got: {s}"); + assert!(s.contains(r#""requestId":0"#), "got: {s}"); + assert!(s.contains(r#""wasCancelled":false"#), "got: {s}"); + + let parsed: WorkResponse = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(parsed, resp); + } + + #[test] + fn json_decodes_minimum_response() { + // A real worker may omit fields whose values are proto defaults. + let bytes = br#"{"exitCode":1,"output":"oh no"}"#; + let resp = WorkResponse::decode_framed(bytes, WireFormat::Json).unwrap(); + assert_eq!(resp.exit_code, 1); + assert_eq!(resp.output, "oh no"); + assert_eq!(resp.request_id, 0); + assert!(!resp.was_cancelled); + } + + #[test] + fn proto_request_includes_inputs_with_digest() { + let req = WorkRequest { + arguments: vec!["@argfile".into()], + inputs: vec![Input { + path: "Foo.java".into(), + digest: vec![0xde, 0xad, 0xbe, 0xef], + }], + ..WorkRequest::default() + }; + let bytes = req.encode_framed(WireFormat::Proto).unwrap(); + let mut cursor: &[u8] = &bytes; + let decoded = ::decode_length_delimited(&mut cursor).unwrap(); + assert_eq!(decoded, req); + } + + #[test] + fn json_input_digest_uses_base64() { + let req = WorkRequest { + inputs: vec![Input { + path: "Foo.java".into(), + digest: vec![0xde, 0xad, 0xbe, 0xef], + }], + ..WorkRequest::default() + }; + let s = serde_json::to_string(&req).unwrap(); + // 0xdeadbeef base64-encodes to "3q2+7w==". + assert!(s.contains(r#""digest":"3q2+7w==""#), "got: {s}"); + + let parsed: WorkRequest = serde_json::from_str(&s).unwrap(); + assert_eq!(parsed, req); + } +} diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 4e9ea04a5..6c5390068 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -82,10 +82,17 @@ use tonic::Request; use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; +use crate::persistent_worker::{ + Input as PersistentWorkerInput, PersistentWorkerPool, WireFormat, WorkRequest, WorkerKey, +}; + /// For simplicity we use a fixed exit code for cases when our program is terminated /// due to a signal. const EXIT_CODE_FOR_SIGNAL: i32 = 9; +const SUPPORTS_WORKERS_PROPERTY: &str = "supports-workers"; +const REQUIRES_WORKER_PROTOCOL_PROPERTY: &str = "requires-worker-protocol"; + /// Default strategy for uploading historical results. /// Note: If this value changes the config documentation /// should reflect it. @@ -111,6 +118,45 @@ struct SideChannelInfo { failure: Option, } +fn action_supports_persistent_workers( + action_info: &ActionInfo, +) -> Option> { + if action_info + .platform_properties + .get(SUPPORTS_WORKERS_PROPERTY) + .is_none_or(|value| value != "1") + { + return None; + } + + let protocol = action_info + .platform_properties + .get(REQUIRES_WORKER_PROTOCOL_PROPERTY) + .map_or("proto", String::as_str); + Some(WireFormat::parse(protocol)) +} + +fn os_args_to_strings(args: &[&OsStr]) -> Result, Error> { + args.iter() + .map(|arg| { + arg.to_str().map(str::to_owned).ok_or_else(|| { + make_err!( + Code::InvalidArgument, + "Persistent worker command arguments must be valid UTF-8: {arg:?}" + ) + }) + }) + .collect() +} + +fn persistent_worker_request_arguments(argv: &[String]) -> Vec { + argv.iter() + .skip(1) + .skip_while(|arg| !arg.starts_with('@')) + .cloned() + .collect() +} + /// Aggressively download the digests of files and make a local folder from it. This function /// will spawn unbounded number of futures to try and get these downloaded. The store itself /// should be rate limited if spawning too many requests at once is an issue. @@ -960,6 +1006,90 @@ impl RunningActionImpl { info!(?args, "Executing command"); let program = self.canonicalise_path(args[0], &command_proto.working_directory)?; + if let Some(wire_format_result) = action_supports_persistent_workers(&self.action_info) { + match wire_format_result { + Ok(wire_format) => { + let command_argv = os_args_to_strings(&args)?; + let key = WorkerKey::from_argv(&command_argv, wire_format)?; + let request = WorkRequest { + arguments: persistent_worker_request_arguments(&command_argv), + inputs: Vec::::new(), + request_id: 0, + cancel: false, + verbosity: 0, + sandbox_dir: if command_proto.working_directory.is_empty() { + self.work_directory.clone() + } else { + format!( + "{}/{}", + self.work_directory, command_proto.working_directory + ) + }, + }; + let worker_cwd = PathBuf::from(&request.sandbox_dir); + + match self + .running_actions_manager + .persistent_worker_pool + .acquire(key.clone(), &program, &worker_cwd) + .await + { + Ok(mut lease) => { + let timer = self.metrics().child_process.begin_timer(); + let response = match lease + .worker() + .dispatch_with_timeout(&request, self.timeout) + .await + { + Ok(response) => { + lease.release(true).await; + response + } + Err(err) => { + lease.release(false).await; + return Err(err).err_tip(|| { + format!("Dispatching action to persistent worker {key:?}") + }); + } + }; + timer.measure(); + + if response.exit_code == 0 { + self.metrics().child_process_success_error_code.inc(); + } else { + self.metrics().child_process_failure_error_code.inc(); + } + info!(?args, ?key, "Persistent worker command complete"); + { + let mut state = self.state.lock(); + state.command_proto = Some(command_proto); + state.execution_result = Some(RunningActionImplExecutionResult { + stdout: Bytes::from(response.output), + stderr: Bytes::new(), + exit_code: response.exit_code, + }); + state.execution_metadata.execution_completed_timestamp = + (self.running_actions_manager.callbacks.now_fn)(); + } + return Ok(self); + } + Err(err) => { + info!( + ?err, + ?key, + "Falling back to one-shot execution; persistent worker unavailable" + ); + } + } + } + Err(err) => { + info!( + ?err, + "Falling back to one-shot execution; unsupported persistent worker protocol" + ); + } + } + } let mut command_builder = process::Command::new(program); #[cfg(target_family = "unix")] @@ -1192,7 +1322,22 @@ impl RunningActionImpl { ) }; - let exit_code = exit_status.code().map_or(EXIT_CODE_FOR_SIGNAL, |exit_code| { + let exit_code = exit_status.code().map_or_else(|| { + // No exit code means the runner was terminated by a + // signal. SIGKILL on Linux is the kernel OOM killer's + // weapon of choice, so flag this for operators trying + // to correlate action failures with kubectl-top + // memory pressure. + warn!( + ?args, + "Runner subprocess terminated by signal (no exit code); likely OOMKilled \ + or externally killed. If this repeats for the same action, raise \ + `workers.specs[*].resources.limits.memory` or shrink the action's \ + concurrency." + ); + self.metrics().child_process_failure_error_code.inc(); + EXIT_CODE_FOR_SIGNAL + }, |exit_code| { if exit_code == 0 { self.metrics().child_process_success_error_code.inc(); } else { @@ -2063,6 +2208,7 @@ pub struct RunningActionsManagerImpl { /// Optional directory cache for improving performance by caching reconstructed /// input directories and using hardlinks. directory_cache: Option>, + persistent_worker_pool: PersistentWorkerPool, } impl RunningActionsManagerImpl { @@ -2107,6 +2253,7 @@ impl RunningActionsManagerImpl { cleaning_up_operations: Mutex::new(HashSet::new()), cleanup_complete_notify: Arc::new(Notify::new()), directory_cache: args.directory_cache, + persistent_worker_pool: PersistentWorkerPool::default(), #[cfg(target_os = "linux")] use_namespaces: args.use_namespaces, }) diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index ab6a91f9c..19e29ec26 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -52,7 +52,9 @@ mod tests { HistoricalExecuteResponse, StartExecute, }; use nativelink_proto::google::rpc::Status; - use nativelink_store::ac_utils::{get_and_decode_digest, serialize_and_upload_message}; + use nativelink_store::ac_utils::{ + compute_buf_digest, get_and_decode_digest, serialize_and_upload_message, + }; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::FilesystemStore; use nativelink_store::memory_store::MemoryStore; @@ -3322,6 +3324,189 @@ exit 1 Ok(()) } + #[nativelink_test] + #[cfg(target_family = "unix")] + async fn persistent_worker_reuses_json_worker_between_actions() + -> Result<(), Box> { + const WORKER_ID: &str = "foo_worker_id"; + const WORKER_SCRIPT_NAME: &str = "worker.sh"; + + fn test_monotonic_clock() -> SystemTime { + static CLOCK: AtomicU64 = AtomicU64::new(0); + monotonic_clock(&CLOCK) + } + + async fn create_action( + cas_store: &Arc, + worker_script_digest: DigestInfo, + ) -> Result<(Action, DigestInfo), Error> { + let command = Command { + arguments: vec![format!("./{WORKER_SCRIPT_NAME}"), "@args.txt".to_string()], + output_paths: vec!["count.txt".to_string()], + platform: Some(Platform { + properties: vec![ + Property { + name: "supports-workers".to_string(), + value: "1".to_string(), + }, + Property { + name: "requires-worker-protocol".to_string(), + value: "json".to_string(), + }, + ], + }), + ..Default::default() + }; + let command_digest = serialize_and_upload_message( + &command, + cas_store.as_pin(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let args_digest = DigestInfo::new([9u8; 32], 0); + cas_store.update_oneshot(args_digest, Bytes::new()).await?; + let input_root_digest = serialize_and_upload_message( + &Directory { + files: vec![ + FileNode { + name: WORKER_SCRIPT_NAME.to_string(), + digest: Some(worker_script_digest.into()), + is_executable: true, + node_properties: None, + }, + FileNode { + name: "args.txt".to_string(), + digest: Some(args_digest.into()), + is_executable: false, + node_properties: None, + }, + ], + ..Default::default() + }, + cas_store.as_pin(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + platform: command.platform.clone(), + ..Default::default() + }; + let action_digest = serialize_and_upload_message( + &action, + cas_store.as_pin(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + Ok((action, action_digest)) + } + + async fn run_persistent_action( + running_actions_manager: &Arc, + slow_store: &Arc, + action: &Action, + action_digest: DigestInfo, + operation_id: &str, + ) -> Result { + let running_action_impl = running_actions_manager + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + operation_id: operation_id.to_string(), + queued_timestamp: None, + platform: action.platform.clone(), + worker_id: WORKER_ID.to_string(), + }, + ) + .await?; + let action_result = run_action(running_action_impl).await?; + slow_store + .as_ref() + .get_part_unchunked(action_result.output_files[0].digest, 0, None) + .await + .and_then(|content| { + String::from_utf8(content.to_vec()).map_err(|err| { + Error::from_std_err(Code::Internal, &err) + .append("Decoding persistent worker count.txt") + }) + }) + } + + let (_, slow_store, cas_store, ac_store) = setup_stores().await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; + + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( + RunningActionsManagerArgs { + root_action_directory, + execution_configuration: ExecutionConfiguration::default(), + cas_store: cas_store.clone(), + ac_store: Some(Store::new(ac_store.clone())), + historical_store: Store::new(cas_store.clone()), + upload_action_result_config: &UploadActionResultConfig { + upload_ac_results_strategy: UploadCacheResultsStrategy::Never, + ..Default::default() + }, + max_action_timeout: Duration::from_secs(30), + max_upload_timeout: Duration::from_secs(DEFAULT_MAX_UPLOAD_TIMEOUT), + timeout_handled_externally: false, + directory_cache: None, + #[cfg(target_os = "linux")] + use_namespaces: use_namespaces(), + }, + Callbacks { + now_fn: test_monotonic_clock, + sleep_fn: |_duration| Box::pin(future::pending()), + }, + )?); + + let worker_script = r#"#!/bin/sh +count=0 +while IFS= read -r request; do + count=$((count + 1)) + sandbox_dir=$(printf '%s' "$request" | sed -n 's/.*"sandboxDir":"\([^"]*\)".*/\1/p') + printf '%s' "$count" > "$sandbox_dir/count.txt" + printf '{"exitCode":0,"output":"count=%s"}\n' "$count" +done +"#; + let worker_script_bytes = Bytes::from(worker_script); + let worker_script_digest = + compute_buf_digest(&worker_script_bytes, &mut DigestHasherFunc::Sha256.hasher()); + cas_store + .update_oneshot(worker_script_digest, worker_script_bytes) + .await?; + let (action, action_digest) = create_action(&cas_store, worker_script_digest).await?; + + let first = run_persistent_action( + &running_actions_manager, + &slow_store, + &action, + action_digest, + "persistent_worker_first", + ) + .await?; + let second = run_persistent_action( + &running_actions_manager, + &slow_store, + &action, + action_digest, + "persistent_worker_second", + ) + .await?; + + assert_eq!(first, "1"); + assert_eq!( + second, "2", + "second action must reuse the already-running persistent worker" + ); + Ok(()) + } + // We've experienced deadlocks when uploading, so make only a single permit available and // check it's able to handle uploading some directories with some files in. diff --git a/web/platform/BUILD.bazel b/web/platform/BUILD.bazel new file mode 100644 index 000000000..fd40a1710 --- /dev/null +++ b/web/platform/BUILD.bazel @@ -0,0 +1,23 @@ +load("@rules_shell//shell:sh_binary.bzl", "sh_binary") +load("@rules_shell//shell:sh_test.bzl", "sh_test") + +sh_binary( + name = "check", + srcs = ["check.sh"], + visibility = ["//visibility:public"], +) + +sh_test( + name = "check_test", + timeout = "moderate", + srcs = ["check.sh"], + env_inherit = [ + "BUN_INSTALL", + "HOME", + "PATH", + ], + tags = [ + "local", + "no-sandbox", + ], +) diff --git a/web/platform/astro.config.ts b/web/platform/astro.config.ts index 98cfba3a2..cd60d4d59 100644 --- a/web/platform/astro.config.ts +++ b/web/platform/astro.config.ts @@ -1,4 +1,5 @@ import { defineConfig, passthroughImageService } from "astro/config"; +import type { AstroUserConfig } from "astro"; import { rehypeHeadingIds } from "@astrojs/markdown-remark"; import react from "@astrojs/react"; @@ -15,6 +16,10 @@ import rehypeAutolinkHeadings from "rehype-autolink-headings"; import { starlightConfig } from "./starlight.conf"; +type AstroVitePlugins = NonNullable< + NonNullable["plugins"] +>; + // https://astro.build/config export default defineConfig({ site: "https://nativelink.com", @@ -74,6 +79,6 @@ export default defineConfig({ ], }, vite: { - plugins: [tailwindcss()], + plugins: tailwindcss() as unknown as AstroVitePlugins, }, }); diff --git a/web/platform/check.sh b/web/platform/check.sh new file mode 100755 index 000000000..dfb66d486 --- /dev/null +++ b/web/platform/check.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ -n ${BUILD_WORKSPACE_DIRECTORY:-} ]]; then + repo_root="${BUILD_WORKSPACE_DIRECTORY}" +else + repo_root="$(git rev-parse --show-toplevel)" +fi + +cd "${repo_root}/web/platform" +exec bun run astro check diff --git a/web/platform/src/components/qwik/components/icons.tsx b/web/platform/src/components/qwik/components/icons.tsx index f7d39658a..9a9492ea9 100644 --- a/web/platform/src/components/qwik/components/icons.tsx +++ b/web/platform/src/components/qwik/components/icons.tsx @@ -1,6 +1,8 @@ import { _jsxQ, _jsxS, component$ } from "@builder.io/qwik"; -const HiCheckSolid = (props) => +type IconProps = Record; + +const HiCheckSolid = (props: IconProps) => /* @__PURE__ */ _jsxS( "svg", { @@ -39,7 +41,7 @@ export const Checkmark = component$(() => { ); }); -const HiXMarkSolid = (props) => +const HiXMarkSolid = (props: IconProps) => /* @__PURE__ */ _jsxS( "svg", { diff --git a/web/platform/src/components/qwik/components/scroll.tsx b/web/platform/src/components/qwik/components/scroll.tsx index 6aa4de409..f5ed4533a 100644 --- a/web/platform/src/components/qwik/components/scroll.tsx +++ b/web/platform/src/components/qwik/components/scroll.tsx @@ -5,7 +5,7 @@ export const ScrollTracker = component$((props: { scrolled: boolean }) => { const scrolled = useSignal(props.scrolled); useVisibleTask$(() => { - const cleanup = scroll((progress) => { + const cleanup = scroll((progress: number) => { // Progress is a value between 0 and 1 // Consider scrolled if we're even slightly scrolled down const isScrolled = progress > 0; diff --git a/web/platform/src/components/qwik/sections/hero.tsx b/web/platform/src/components/qwik/sections/hero.tsx index 30f8b9bb5..c0c07af5c 100644 --- a/web/platform/src/components/qwik/sections/hero.tsx +++ b/web/platform/src/components/qwik/sections/hero.tsx @@ -1,5 +1,5 @@ import { component$, useSignal, useVisibleTask$ } from "@builder.io/qwik"; -import { Background, Cloud } from "../../media/icons/icons.tsx"; +import { Background } from "../../media/icons/icons.tsx"; import { BackgroundVideo } from "../components/video.tsx"; const _MockUp = diff --git a/web/platform/src/components/qwik/sections/testimonials.tsx b/web/platform/src/components/qwik/sections/testimonials.tsx index 92d857cc8..ad0f2fe01 100644 --- a/web/platform/src/components/qwik/sections/testimonials.tsx +++ b/web/platform/src/components/qwik/sections/testimonials.tsx @@ -1,6 +1,6 @@ import { _jsxQ, _jsxS, component$ } from "@builder.io/qwik"; -const SiRockylinux = (props) => +const SiRockylinux = (props: Record) => /* @__PURE__ */ _jsxS( "svg", { diff --git a/web/platform/src/content/docs/docs/deployment-examples/persistent-workers.mdx b/web/platform/src/content/docs/docs/deployment-examples/persistent-workers.mdx new file mode 100644 index 000000000..4d6a7aa71 --- /dev/null +++ b/web/platform/src/content/docs/docs/deployment-examples/persistent-workers.mdx @@ -0,0 +1,131 @@ +--- +title: Persistent Workers +description: Configure Bazel worker actions for NativeLink remote persistent worker execution. +--- + +import { Tabs, TabItem } from '@astrojs/starlight/components'; + +Persistent workers keep compatible compiler and tool processes alive across +remote actions. This reduces repeated tool startup cost for JVM-heavy tools such +as `javac`, `scalac`, and `kotlinc`, and for worker wrappers around tools such +as `tsc`. + +The repository examples live in +[`deployment-examples/persistent-workers`](https://github.com/TraceMachina/nativelink/tree/main/deployment-examples/persistent-workers). + +:::caution +Persistent worker actions must use tools that implement Bazel's persistent +worker protocol. Adding these execution requirements to a normal one-shot tool +won't make that tool persistent-worker compatible. +::: + +## Execution Requirements + +NativeLink uses Bazel execution requirements to choose the persistent worker +path: + +```starlark +execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "proto", # or "json" +} +``` + +If `requires-worker-protocol` is omitted, NativeLink treats the action as +`proto`. + +## Examples + + + + +```starlark +def _javac_worker_impl(ctx): + args = ctx.actions.args() + args.add("@%s" % ctx.outputs.argfile.path) + + ctx.actions.run( + executable = ctx.executable.javac_worker, + arguments = [args], + inputs = ctx.files.srcs + [ctx.outputs.argfile], + outputs = [ctx.outputs.jar], + mnemonic = "Javac", + execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "proto", + }, + ) +``` + + + + +```starlark +def _tsc_worker_impl(ctx): + args = ctx.actions.args() + args.add("@%s" % ctx.outputs.tsconfig.path) + + ctx.actions.run( + executable = ctx.executable.tsc_worker, + arguments = [args], + inputs = ctx.files.srcs + [ctx.outputs.tsconfig], + outputs = ctx.outputs.js, + mnemonic = "TypeScriptCompile", + execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "json", + }, + ) +``` + + + + +```starlark +def _kotlinc_worker_impl(ctx): + args = ctx.actions.args() + args.add("@%s" % ctx.outputs.argfile.path) + + ctx.actions.run( + executable = ctx.executable.kotlinc_worker, + arguments = [args], + inputs = ctx.files.srcs + [ctx.outputs.argfile], + outputs = [ctx.outputs.jar], + mnemonic = "KotlinCompile", + execution_requirements = { + "supports-workers": "1", + "requires-worker-protocol": "proto", + }, + ) +``` + + + + +## How NativeLink Reuses Workers + +NativeLink derives a `WorkerKey` from: + +- the executable path +- startup arguments before the first `@argfile` +- `requires-worker-protocol` + +Actions with the same key can reuse the same warm process. NativeLink starts the +tool with `--persistent_worker`, sends one `WorkRequest` per action, and reads one +`WorkResponse` per action. + +## Performance Check + +The implementation includes an integration test that sends two remote actions to +one JSON worker. The first action writes `1`, and the second writes `2`, proving +that the second action reused the already-running worker process. + +Run the focused test gates with: + +```bash +bazel test \ + //nativelink-worker:unit_test \ + //nativelink-worker:integration \ + //nativelink-scheduler:integration \ + --test_output=errors +``` diff --git a/web/platform/starlight.conf.ts b/web/platform/starlight.conf.ts index d01c8e42b..18ea4ff13 100644 --- a/web/platform/starlight.conf.ts +++ b/web/platform/starlight.conf.ts @@ -1,3 +1,4 @@ +import type { StarlightUserConfig } from "@astrojs/starlight/types"; import starlightUtils from "@lorenzo_lewis/starlight-utils"; const docsRoot = "/docs"; @@ -117,6 +118,10 @@ export const starlightConfig = { label: "Metrics and Observability", link: `${docsRoot}/deployment-examples/metrics`, }, + { + label: "Persistent Workers", + link: `${docsRoot}/deployment-examples/persistent-workers`, + }, ], }, { @@ -243,4 +248,4 @@ export const starlightConfig = { ], }, ], -}; +} satisfies StarlightUserConfig; From 2971f7a89daadfe2b53a12acc900cc5b6a4ad37e Mon Sep 17 00:00:00 2001 From: Marcus Date: Wed, 13 May 2026 13:41:10 -0400 Subject: [PATCH 2/8] remove web due to the rewrite coming --- web/platform/BUILD.bazel | 23 ----------------------- web/platform/check.sh | 11 ----------- 2 files changed, 34 deletions(-) delete mode 100644 web/platform/BUILD.bazel delete mode 100755 web/platform/check.sh diff --git a/web/platform/BUILD.bazel b/web/platform/BUILD.bazel deleted file mode 100644 index fd40a1710..000000000 --- a/web/platform/BUILD.bazel +++ /dev/null @@ -1,23 +0,0 @@ -load("@rules_shell//shell:sh_binary.bzl", "sh_binary") -load("@rules_shell//shell:sh_test.bzl", "sh_test") - -sh_binary( - name = "check", - srcs = ["check.sh"], - visibility = ["//visibility:public"], -) - -sh_test( - name = "check_test", - timeout = "moderate", - srcs = ["check.sh"], - env_inherit = [ - "BUN_INSTALL", - "HOME", - "PATH", - ], - tags = [ - "local", - "no-sandbox", - ], -) diff --git a/web/platform/check.sh b/web/platform/check.sh deleted file mode 100755 index dfb66d486..000000000 --- a/web/platform/check.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -if [[ -n ${BUILD_WORKSPACE_DIRECTORY:-} ]]; then - repo_root="${BUILD_WORKSPACE_DIRECTORY}" -else - repo_root="$(git rev-parse --show-toplevel)" -fi - -cd "${repo_root}/web/platform" -exec bun run astro check From 06359e48891b687130e4a84d13efc1f68a328c14 Mon Sep 17 00:00:00 2001 From: Marcus Date: Wed, 13 May 2026 17:44:29 -0400 Subject: [PATCH 3/8] remove unused import --- nativelink-worker/tests/running_actions_manager_test.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index 19e29ec26..4bd244272 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -52,9 +52,7 @@ mod tests { HistoricalExecuteResponse, StartExecute, }; use nativelink_proto::google::rpc::Status; - use nativelink_store::ac_utils::{ - compute_buf_digest, get_and_decode_digest, serialize_and_upload_message, - }; + use nativelink_store::ac_utils::{get_and_decode_digest, serialize_and_upload_message}; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::FilesystemStore; use nativelink_store::memory_store::MemoryStore; From 0d8c2e04745beea4b21fefc03432e69957cb10eb Mon Sep 17 00:00:00 2001 From: Marcus Date: Wed, 13 May 2026 18:25:50 -0400 Subject: [PATCH 4/8] fix errors --- nativelink-worker/tests/running_actions_manager_test.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index 4bd244272..16a3d4dd3 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -52,6 +52,8 @@ mod tests { HistoricalExecuteResponse, StartExecute, }; use nativelink_proto::google::rpc::Status; + #[cfg(target_family = "unix")] + use nativelink_store::ac_utils::compute_buf_digest; use nativelink_store::ac_utils::{get_and_decode_digest, serialize_and_upload_message}; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::FilesystemStore; From e384c1e2d8307848eab072640f214019fcce09cf Mon Sep 17 00:00:00 2001 From: Marcus Date: Wed, 13 May 2026 19:53:07 -0400 Subject: [PATCH 5/8] fix cross-platform issues --- .../src/persistent_worker/live_worker.rs | 108 ++++++++++++++---- 1 file changed, 85 insertions(+), 23 deletions(-) diff --git a/nativelink-worker/src/persistent_worker/live_worker.rs b/nativelink-worker/src/persistent_worker/live_worker.rs index c2d2b51e8..eb95f0da9 100644 --- a/nativelink-worker/src/persistent_worker/live_worker.rs +++ b/nativelink-worker/src/persistent_worker/live_worker.rs @@ -324,29 +324,70 @@ async fn read_varint(r: &mut BufReader) -> Result { #[cfg(test)] mod tests { + use std::io::Write as _; + use nativelink_macro::nativelink_test; use super::*; - fn echo_script(working_dir: &Path, body: &str) -> PathBuf { + struct TestWorkerProgram { + executable: PathBuf, + startup_args: Vec, + } + + impl TestWorkerProgram { + fn startup_args(&self) -> &[String] { + &self.startup_args + } + } + + #[cfg(unix)] + fn echo_script(working_dir: &Path, unix_body: &str, _windows_body: &str) -> TestWorkerProgram { let path = working_dir.join("worker.sh"); - std::fs::write(&path, body).unwrap(); - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755)).unwrap(); + let mut file = std::fs::File::create(&path).unwrap(); + file.write_all(unix_body.as_bytes()).unwrap(); + file.sync_all().unwrap(); + drop(file); + + TestWorkerProgram { + executable: PathBuf::from("/bin/sh"), + startup_args: vec![path.display().to_string()], + } + } + + #[cfg(windows)] + fn echo_script(working_dir: &Path, _unix_body: &str, windows_body: &str) -> TestWorkerProgram { + let path = working_dir.join("worker.ps1"); + let mut file = std::fs::File::create(&path).unwrap(); + file.write_all(windows_body.as_bytes()).unwrap(); + file.sync_all().unwrap(); + drop(file); + + TestWorkerProgram { + executable: PathBuf::from("powershell.exe"), + startup_args: vec![ + "-NoProfile".to_owned(), + "-ExecutionPolicy".to_owned(), + "Bypass".to_owned(), + "-File".to_owned(), + path.display().to_string(), + ], } - path } #[nativelink_test] async fn shutdown_kills_unresponsive_worker() { // A worker that never reads/writes — shutdown grace expires, we SIGKILL. let dir = tempfile::tempdir().unwrap(); - let script = echo_script(dir.path(), "#!/bin/sh\nexec sleep 30\n"); - let worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) - .await - .unwrap(); + let script = echo_script(dir.path(), "exec sleep 30\n", "Start-Sleep -Seconds 30\n"); + let worker = LiveWorker::spawn( + &script.executable, + script.startup_args(), + WireFormat::Json, + dir.path(), + ) + .await + .unwrap(); let start = Instant::now(); worker.shutdown(Duration::from_millis(100)).await; // SIGKILL should arrive well within a second. @@ -360,11 +401,17 @@ mod tests { let script = echo_script( dir.path(), // Read one line, ignore it, emit a fixed response. Newline-terminated. - "#!/bin/sh\nread line\necho '{\"exitCode\":0,\"output\":\"ok\"}'\n", + "read line\necho '{\"exitCode\":0,\"output\":\"ok\"}'\n", + "$line = [Console]::In.ReadLine()\n[Console]::Out.WriteLine('{\"exitCode\":0,\"output\":\"ok\"}')\n", ); - let mut worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) - .await - .unwrap(); + let mut worker = LiveWorker::spawn( + &script.executable, + script.startup_args(), + WireFormat::Json, + dir.path(), + ) + .await + .unwrap(); let req = WorkRequest { arguments: vec!["compile".into()], @@ -382,10 +429,19 @@ mod tests { async fn dispatch_timeout_kills_worker() { let dir = tempfile::tempdir().unwrap(); // Worker reads, then sleeps forever instead of responding. - let script = echo_script(dir.path(), "#!/bin/sh\nread line\nexec sleep 60\n"); - let mut worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) - .await - .unwrap(); + let script = echo_script( + dir.path(), + "read line\nexec sleep 60\n", + "$line = [Console]::In.ReadLine()\nStart-Sleep -Seconds 60\n", + ); + let mut worker = LiveWorker::spawn( + &script.executable, + script.startup_args(), + WireFormat::Json, + dir.path(), + ) + .await + .unwrap(); let req = WorkRequest { arguments: vec!["compile".into()], @@ -404,11 +460,17 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let script = echo_script( dir.path(), - "#!/bin/sh\nread line\necho '{\"exitCode\":0}'\n", + "read line\necho '{\"exitCode\":0}'\n", + "$line = [Console]::In.ReadLine()\n[Console]::Out.WriteLine('{\"exitCode\":0}')\n", ); - let mut worker = LiveWorker::spawn(&script, &[], WireFormat::Json, dir.path()) - .await - .unwrap(); + let mut worker = LiveWorker::spawn( + &script.executable, + script.startup_args(), + WireFormat::Json, + dir.path(), + ) + .await + .unwrap(); let req = WorkRequest { request_id: 7, ..WorkRequest::default() From ad8000646bb1de3a95d255f1022546e55fd86080 Mon Sep 17 00:00:00 2001 From: Marcus Date: Sun, 17 May 2026 19:33:51 -0700 Subject: [PATCH 6/8] address comments --- .../src/persistent_worker/live_worker.rs | 7 +- .../src/persistent_worker/pool.rs | 131 +++++++++++++++++- .../src/running_actions_manager.rs | 53 +++++-- .../tests/running_actions_manager_test.rs | 4 + 4 files changed, 175 insertions(+), 20 deletions(-) diff --git a/nativelink-worker/src/persistent_worker/live_worker.rs b/nativelink-worker/src/persistent_worker/live_worker.rs index eb95f0da9..95b51d2c4 100644 --- a/nativelink-worker/src/persistent_worker/live_worker.rs +++ b/nativelink-worker/src/persistent_worker/live_worker.rs @@ -87,9 +87,10 @@ impl LiveWorker { .current_dir(working_dir) .stdin(Stdio::piped()) .stdout(Stdio::piped()) - // Worker stderr is forwarded to ours; Bazel convention puts diagnostic - // logs there. We do not capture it on the rust side in v1. - .stderr(Stdio::inherit()) + // WorkResponse.output carries per-action diagnostics. The child + // stderr stream is process-lifetime data and cannot be attributed + // safely to a single request. + .stderr(Stdio::null()) .kill_on_drop(true); debug!( diff --git a/nativelink-worker/src/persistent_worker/pool.rs b/nativelink-worker/src/persistent_worker/pool.rs index 684c9b111..b3ec5725f 100644 --- a/nativelink-worker/src/persistent_worker/pool.rs +++ b/nativelink-worker/src/persistent_worker/pool.rs @@ -125,6 +125,42 @@ struct LeaseInner { key: WorkerKey, } +#[derive(Debug)] +struct CountSlotGuard { + pool: Arc, + key: WorkerKey, + active: bool, +} + +impl CountSlotGuard { + fn new(pool: Arc, key: WorkerKey) -> Self { + Self { + pool, + key, + active: true, + } + } + + fn disarm(mut self) { + self.active = false; + } + + fn decrement_now(mut self) { + if self.active { + self.pool.decrement_count(&self.key); + self.active = false; + } + } +} + +impl Drop for CountSlotGuard { + fn drop(&mut self) { + if self.active { + self.pool.decrement_count(&self.key); + } + } +} + impl Lease { pub const fn worker(&mut self) -> &mut LiveWorker { &mut self @@ -158,12 +194,13 @@ impl Drop for Lease { let worker = inner.worker; let grace = pool.config.shutdown_grace; background_spawn!("persistent_worker_lease_drop", async move { + let count_slot = CountSlotGuard::new(pool.clone(), key.clone()); warn!( ?key, "Lease dropped without release; treating worker as unhealthy" ); worker.shutdown(grace).await; - pool.decrement_count(&key); + count_slot.decrement_now(); }); } } @@ -207,6 +244,7 @@ impl PoolInner { async fn return_worker(self: Arc, key: WorkerKey, worker: LiveWorker, healthy: bool) { let exceeded_cap = worker.request_count() >= self.config.max_requests_per_worker; if !healthy || exceeded_cap { + let count_slot = CountSlotGuard::new(self.clone(), key.clone()); debug!( ?key, request_count = worker.request_count(), @@ -214,7 +252,7 @@ impl PoolInner { "Retiring persistent worker" ); worker.shutdown(self.config.shutdown_grace).await; - self.decrement_count(&key); + count_slot.decrement_now(); return; } let mut state = self.state.lock(); @@ -251,9 +289,9 @@ impl PersistentWorkerPool { /// returns `Err(Code::ResourceExhausted)` — the caller should fall back to /// the one-shot subprocess path or wait and retry. /// - /// `working_dir` is the action's execution directory. v1 reuses it across - /// requests for the same `WorkerKey`; v2 will introduce per-request - /// sandboxing. + /// `working_dir` is a stable worker process directory. Per-request action + /// directories are passed through `WorkRequest::sandbox_dir`, because the + /// action directory is deleted after each action completes. pub async fn acquire( &self, key: WorkerKey, @@ -291,6 +329,7 @@ impl PersistentWorkerPool { } // Lock released; spawn outside the critical section. + let count_slot = CountSlotGuard::new(self.inner.clone(), key.clone()); let spawn_result = LiveWorker::spawn( executable_path, &key.startup_args, @@ -301,11 +340,10 @@ impl PersistentWorkerPool { let worker = match spawn_result { Ok(w) => w, Err(err) => { - // Roll back our reservation on spawn failure. - self.inner.decrement_count(&key); return Err(err).err_tip(|| format!("Spawning persistent worker for {key:?}")); } }; + count_slot.disarm(); info!(?key, "Spawned new persistent worker"); Ok(Lease { inner: Some(LeaseInner { @@ -343,8 +381,32 @@ impl PersistentWorkerPool { #[cfg(test)] mod tests { + #[cfg(unix)] + use std::io::Write as _; + + #[cfg(unix)] + use nativelink_macro::nativelink_test; + use super::*; + #[cfg(unix)] + fn shell_script(working_dir: &Path, body: &str) -> PathBuf { + let path = working_dir.join("worker.sh"); + let mut file = std::fs::File::create(&path).unwrap(); + file.write_all(body.as_bytes()).unwrap(); + file.sync_all().unwrap(); + path + } + + #[cfg(unix)] + fn shell_worker_key(script: &Path) -> WorkerKey { + WorkerKey { + executable: PathBuf::from("/bin/sh"), + startup_args: vec![script.display().to_string()], + wire_format: WireFormat::Json, + } + } + #[test] fn worker_key_excludes_argfile_and_later_args() { let argv: Vec = ["javac", "-source", "21", "@args.txt", "Foo.java"] @@ -422,4 +484,59 @@ mod tests { let b = WorkerKey::from_argv(&["javac", "@a"].map(String::from), WireFormat::Json).unwrap(); assert_ne!(a, b); } + + #[nativelink_test] + #[cfg(unix)] + async fn dropping_unhealthy_release_frees_pool_slot() { + let dir = tempfile::tempdir().unwrap(); + let script = shell_script(dir.path(), "exec sleep 60\n"); + let key = shell_worker_key(&script); + let pool = PersistentWorkerPool::new(PoolConfig { + max_workers_per_key: 1, + shutdown_grace: Duration::from_secs(1), + ..PoolConfig::default() + }); + + let lease = pool + .acquire(key.clone(), Path::new("/bin/sh"), dir.path()) + .await + .unwrap(); + let release_handle = tokio::spawn(lease.release(false)); + tokio::time::sleep(Duration::from_millis(50)).await; + release_handle.abort(); + assert!(release_handle.await.unwrap_err().is_cancelled()); + + let next_lease = tokio::time::timeout( + Duration::from_secs(2), + pool.acquire(key, Path::new("/bin/sh"), dir.path()), + ) + .await + .unwrap() + .unwrap(); + next_lease.release(false).await; + } + + #[nativelink_test] + #[cfg(unix)] + async fn concurrent_acquire_respects_per_key_cap() { + let dir = tempfile::tempdir().unwrap(); + let script = shell_script(dir.path(), "exec sleep 60\n"); + let key = shell_worker_key(&script); + let pool = PersistentWorkerPool::new(PoolConfig { + max_workers_per_key: 1, + shutdown_grace: Duration::from_millis(100), + ..PoolConfig::default() + }); + + let lease = pool + .acquire(key.clone(), Path::new("/bin/sh"), dir.path()) + .await + .unwrap(); + let err = pool + .acquire(key, Path::new("/bin/sh"), dir.path()) + .await + .unwrap_err(); + assert_eq!(err.code, Code::ResourceExhausted); + lease.release(false).await; + } } diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 6c5390068..77b8af46d 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -1026,7 +1026,8 @@ impl RunningActionImpl { ) }, }; - let worker_cwd = PathBuf::from(&request.sandbox_dir); + let worker_cwd = + PathBuf::from(&self.running_actions_manager.root_action_directory); match self .running_actions_manager @@ -1036,21 +1037,53 @@ impl RunningActionImpl { { Ok(mut lease) => { let timer = self.metrics().child_process.begin_timer(); - let response = match lease - .worker() - .dispatch_with_timeout(&request, self.timeout) - .await - { - Ok(response) => { + let dispatch_result = { + let dispatch_fut = + lease.worker().dispatch_with_timeout(&request, self.timeout); + tokio::pin!(dispatch_fut); + tokio::select! { + result = &mut dispatch_fut => Some(result), + _ = &mut kill_channel_rx => None, + } + }; + let response = match dispatch_result { + Some(Ok(response)) => { lease.release(true).await; response } - Err(err) => { + Some(Err(err)) => { lease.release(false).await; return Err(err).err_tip(|| { format!("Dispatching action to persistent worker {key:?}") }); } + None => { + drop(timer); + lease.release(false).await; + { + let mut state = self.state.lock(); + state.error = Error::merge_option( + state.error.take(), + Some(Error::new( + Code::Cancelled, + format!( + "Persistent worker command '{}' was killed by scheduler", + args.join(OsStr::new(" ")).to_string_lossy() + ), + )), + ); + state.command_proto = Some(command_proto); + state.execution_result = + Some(RunningActionImplExecutionResult { + stdout: Bytes::new(), + stderr: Bytes::new(), + exit_code: EXIT_CODE_FOR_SIGNAL, + }); + state.execution_metadata.execution_completed_timestamp = + (self.running_actions_manager.callbacks.now_fn)(); + } + return Ok(self); + } }; timer.measure(); @@ -1064,8 +1097,8 @@ impl RunningActionImpl { let mut state = self.state.lock(); state.command_proto = Some(command_proto); state.execution_result = Some(RunningActionImplExecutionResult { - stdout: Bytes::from(response.output), - stderr: Bytes::new(), + stdout: Bytes::new(), + stderr: Bytes::from(response.output), exit_code: response.exit_code, }); state.execution_metadata.execution_completed_timestamp = diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index 7bee1b200..79907fcf3 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -3446,6 +3446,10 @@ exit 1 let worker_script = r#"#!/bin/sh count=0 while IFS= read -r request; do + if ! pwd -P >/dev/null 2>&1; then + printf '{"exitCode":1,"output":"worker cwd was removed"}\n' + continue + fi count=$((count + 1)) sandbox_dir=$(printf '%s' "$request" | sed -n 's/.*"sandboxDir":"\([^"]*\)".*/\1/p') printf '%s' "$count" > "$sandbox_dir/count.txt" From e2b1271886be00b220e17c6024a2bb70242ce93a Mon Sep 17 00:00:00 2001 From: Marcus Date: Sun, 17 May 2026 19:48:02 -0700 Subject: [PATCH 7/8] fix clippy --- nativelink-worker/src/persistent_worker/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nativelink-worker/src/persistent_worker/pool.rs b/nativelink-worker/src/persistent_worker/pool.rs index b3ec5725f..cbbf4e17d 100644 --- a/nativelink-worker/src/persistent_worker/pool.rs +++ b/nativelink-worker/src/persistent_worker/pool.rs @@ -133,7 +133,7 @@ struct CountSlotGuard { } impl CountSlotGuard { - fn new(pool: Arc, key: WorkerKey) -> Self { + const fn new(pool: Arc, key: WorkerKey) -> Self { Self { pool, key, From eb83072758f97717b5579161467fd9bfb2aa59e6 Mon Sep 17 00:00:00 2001 From: Marcus Date: Wed, 20 May 2026 16:05:20 -0700 Subject: [PATCH 8/8] address regression from merge commit #93cb622 --- nativelink-worker/src/running_actions_manager.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index aee3584a4..ed0984b03 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -1032,7 +1032,9 @@ impl RunningActionImpl { // level more effectively and adjust this. info!(?args, "Executing command"); - let program = self.canonicalise_path(args[0], &command_proto.working_directory)?; + let program = self + .canonicalise_path(args[0], &command_proto.working_directory) + .err_tip(|| format!("Canonicalisation failure. Command={args:#?}"))?; if let Some(wire_format_result) = action_supports_persistent_workers(&self.action_info) { match wire_format_result { Ok(wire_format) => {