Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions deployment-examples/persistent-workers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Persistent Worker Examples

These examples show the NativeLink-facing part of Bazel actions that opt in to
remote persistent worker execution.

Persistent workers are useful for JVM-heavy tools such as `javac`, `scalac`, and
`kotlinc`, and for worker wrappers around tools such as `tsc`. Compatible
actions declare worker support with execution requirements:

```starlark
execution_requirements = {
"supports-workers": "1",
"requires-worker-protocol": "proto", # or "json"
}
```

See:

- [Java](./java/README.md)
- [TypeScript](./typescript/README.md)
- [Kotlin](./kotlin/README.md)
28 changes: 28 additions & 0 deletions deployment-examples/persistent-workers/java/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Java Persistent Worker Example

This example shows the NativeLink-facing part of a Java compile action that can
run through the remote persistent worker path. The action must declare that the
tool supports Bazel's worker protocol and that it uses the proto wire format.

```starlark
def _javac_worker_impl(ctx):
args = ctx.actions.args()
args.add("@%s" % ctx.outputs.argfile.path)

ctx.actions.run(
executable = ctx.executable.javac_worker,
arguments = [args],
inputs = ctx.files.srcs + [ctx.outputs.argfile],
outputs = [ctx.outputs.jar],
mnemonic = "Javac",
execution_requirements = {
"supports-workers": "1",
"requires-worker-protocol": "proto",
},
)
```

NativeLink computes the `WorkerKey` from the executable, startup flags before the
first `@argfile`, and `requires-worker-protocol`. The first action starts
`javac_worker --persistent_worker`; later compatible actions send `WorkRequest`
messages to the same process.
25 changes: 25 additions & 0 deletions deployment-examples/persistent-workers/kotlin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Kotlin Persistent Worker Example

Kotlin compile actions have the same remote execution shape as Java: keep the JVM
compiler process warm and send per-action argument files through the worker protocol.

```starlark
def _kotlinc_worker_impl(ctx):
args = ctx.actions.args()
args.add("@%s" % ctx.outputs.argfile.path)

ctx.actions.run(
executable = ctx.executable.kotlinc_worker,
arguments = [args],
inputs = ctx.files.srcs + [ctx.outputs.argfile],
outputs = [ctx.outputs.jar],
mnemonic = "KotlinCompile",
execution_requirements = {
"supports-workers": "1",
"requires-worker-protocol": "proto",
},
)
```

The worker process is started once with `--persistent_worker`; compatible Kotlin
compile actions reuse it until the pool idle timeout or request cap retires it.
27 changes: 27 additions & 0 deletions deployment-examples/persistent-workers/typescript/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# TypeScript Persistent Worker Example

TypeScript worker wrappers commonly use JSON worker protocol framing. The
important NativeLink requirement is that the remote action advertises both
worker support and the JSON protocol.

```starlark
def _tsc_worker_impl(ctx):
args = ctx.actions.args()
args.add("@%s" % ctx.outputs.tsconfig.path)

ctx.actions.run(
executable = ctx.executable.tsc_worker,
arguments = [args],
inputs = ctx.files.srcs + [ctx.outputs.tsconfig],
outputs = ctx.outputs.js,
mnemonic = "TypeScriptCompile",
execution_requirements = {
"supports-workers": "1",
"requires-worker-protocol": "json",
},
)
```

The persistent worker should read one newline-delimited JSON `WorkRequest` from
`stdin` and emit one newline-delimited JSON `WorkResponse` to `stdout` for each
action.
46 changes: 45 additions & 1 deletion nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,20 @@ where

let now = (self.now_fn)().now();

// Honor the per-action `Action.timeout` from the RBE protocol as a
// backend wall-clock deadline. Without this, the only enforcement is
// the Bazel client's --test_timeout, which surfaces as TIMEOUT/NO
// STATUS instead of a backend signal pointing at the worker.
let action_timeout = awaited_action.action_info().timeout;
if action_timeout > Duration::ZERO {
let executing_started_at = awaited_action.state().last_transition_timestamp;
if let Ok(elapsed) = now.duration_since(executing_started_at)
&& elapsed > action_timeout
{
return true;
}
}

let registry_alive = if let Some(ref worker_registry) = self.worker_registry {
if let Some(worker_id) = awaited_action.worker_id() {
worker_registry
Expand Down Expand Up @@ -785,7 +799,37 @@ where
ActionStage::Queued
}
}
UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued,
UpdateOperationType::UpdateWithDisconnect => {
// A worker disconnect (e.g. OOMKill, pod eviction, network
// drop) used to requeue without counting as an attempt,
// which let an action that always crashes its worker loop
// forever until the Bazel client's --test_timeout fired.
// Count disconnects as attempts so max_job_retries caps the
// loop and the client sees a backend-attributable error.
awaited_action.attempts += 1;

if awaited_action.attempts > self.max_job_retries {
ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: maybe_worker_id
.map_or_else(String::default, ToString::to_string),
..ExecutionMetadata::default()
},
error: Some(make_err!(
Code::Internal,
"Worker disconnected repeatedly while executing this action ({} > {} attempts); the runner likely OOMKilled or the pod was evicted. {}",
awaited_action.attempts,
self.max_job_retries,
format!(
"for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"
),
)),
..ActionResult::default()
})
} else {
ActionStage::Queued
}
}
// We shouldn't get here, but we just ignore it if we do.
UpdateOperationType::ExecutionComplete => {
warn!("inner_update_operation got an ExecutionComplete, that's unexpected.");
Expand Down
176 changes: 176 additions & 0 deletions nativelink-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,182 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error>
Ok(())
}

/// Worker crash-loop regression: an action whose worker keeps disconnecting
/// (e.g. `OOMKill`) used to bypass `max_job_retries` because
/// `UpdateWithDisconnect` requeued without counting as an attempt. The build
/// would only terminate when the Bazel client's `--test_timeout` fired,
/// hiding the cluster-side root cause behind a TIMEOUT/NO STATUS surface.
/// After the fix, disconnects count as attempts and exceed the cap.
#[nativelink_test]
async fn worker_disconnect_loop_caps_at_max_job_retries_test() -> Result<(), Error> {
let worker_id = WorkerId("worker_id".to_string());

let task_change_notify = Arc::new(Notify::new());
let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback(
&SimpleSpec {
max_job_retries: 1,
..Default::default()
},
memory_awaited_action_db_factory(
0,
&task_change_notify.clone(),
MockInstantWrapped::default,
),
|| async move {},
task_change_notify,
MockInstantWrapped::default,
None,
);
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker =
setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?;
let insert_timestamp = make_system_time(1);
let mut action_listener =
setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?;

let operation_id = {
let operation_id = match rx_from_worker.recv().await.unwrap().update {
Some(update_for_worker::Update::StartAction(exec)) => exec.operation_id,
v => panic!("Expected StartAction, got : {v:?}"),
};
assert_eq!(
action_listener.changed().await.unwrap().0.stage,
ActionStage::Executing
);
OperationId::from(operation_id.as_str())
};

// First disconnect: should requeue (attempts=1, not yet > max_job_retries=1).
drop(
scheduler
.update_action(
&worker_id,
&operation_id,
UpdateOperationType::UpdateWithDisconnect,
)
.await,
);
{
let (action_state, _maybe_origin_metadata) = action_listener.changed().await.unwrap();
assert_eq!(
action_state.stage,
ActionStage::Queued,
"First disconnect should requeue, got: {:?}",
action_state.stage,
);
}

// Reattach worker so it picks up the requeued action.
let mut rx_from_worker =
setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?;
{
match rx_from_worker.recv().await.unwrap().update {
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
v => panic!("Expected StartAction, got : {v:?}"),
}
assert_eq!(
action_listener.changed().await.unwrap().0.stage,
ActionStage::Executing
);
}

// Second disconnect: now attempts=2 > max_job_retries=1, so the action
// must transition to Completed with an error mentioning the disconnect
// loop, not silently requeue.
drop(
scheduler
.update_action(
&worker_id,
&operation_id,
UpdateOperationType::UpdateWithDisconnect,
)
.await,
);
{
let (action_state, _maybe_origin_metadata) = action_listener.changed().await.unwrap();
let ActionStage::Completed(action_result) = &action_state.stage else {
panic!(
"Second disconnect should mark action Completed-with-error, got: {:?}",
action_state.stage
);
};
let err = action_result
.error
.as_ref()
.expect("Completed action from disconnect cap must carry an error");
assert!(
err.to_string()
.contains("Worker disconnected repeatedly while executing this action"),
"Error message did not mention disconnect loop: {err}",
);
}

Ok(())
}

/// `Action.timeout` from the RBE protocol must be enforced backend-side.
/// Without this, an action that hangs forever only terminates when the
/// Bazel client's `--remote_timeout` (gRPC deadline) or `--test_timeout`
/// (client-side) fires; from the operator's perspective the cluster never
/// surfaces the slow action.
#[nativelink_test]
async fn action_timeout_is_enforced_backend_side_test() -> Result<(), Error> {
use nativelink_scheduler::awaited_action_db::AwaitedAction;
use nativelink_scheduler::simple_scheduler_state_manager::SimpleSchedulerStateManager;

// Anchor MockClock so MockInstantWrapped::now() == make_system_time(0).
MockClock::set_time(Duration::from_secs(NOW_TIME));
let executing_started_at = make_system_time(0);

let action_digest = DigestInfo::new([7u8; 32], 1);
let mut action_info = make_base_action_info(executing_started_at, action_digest);
Arc::make_mut(&mut action_info).timeout = Duration::from_secs(2);

let operation_id = OperationId::default();
let mut awaited_action =
AwaitedAction::new(operation_id.clone(), action_info, executing_started_at);
awaited_action.worker_set_state(
Arc::new(ActionState {
stage: ActionStage::Executing,
client_operation_id: operation_id,
action_digest,
last_transition_timestamp: executing_started_at,
}),
executing_started_at,
);

let task_change_notify = Arc::new(Notify::new());
let state_mgr = SimpleSchedulerStateManager::new(
/* max_job_retries */ 1,
/* no_event_action_timeout */ Duration::from_secs(60),
/* client_action_timeout */ Duration::from_secs(60),
/* max_executing_timeout */ Duration::ZERO,
memory_awaited_action_db_factory(
0,
&task_change_notify.clone(),
MockInstantWrapped::default,
),
MockInstantWrapped::default,
/* worker_registry */ None,
);

assert!(
!state_mgr.should_timeout_operation(&awaited_action).await,
"Should not time out before Action.timeout elapses",
);

// Advance past the 2s per-action deadline.
MockClock::advance(Duration::from_secs(5));

assert!(
state_mgr.should_timeout_operation(&awaited_action).await,
"Scheduler must mark Executing action timed out once Action.timeout has elapsed",
);

Ok(())
}

#[nativelink_test]
async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> {
struct DropChecker {
Expand Down
7 changes: 7 additions & 0 deletions nativelink-worker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ rust_library(
"src/directory_cache.rs",
"src/lib.rs",
"src/local_worker.rs",
"src/persistent_worker/live_worker.rs",
"src/persistent_worker/mod.rs",
"src/persistent_worker/pool.rs",
"src/persistent_worker/protocol.rs",
"src/qos.rs",
"src/running_actions_manager.rs",
"src/worker_api_client_wrapper.rs",
Expand Down Expand Up @@ -43,6 +47,7 @@ rust_library(
"@crates//:relative-path",
"@crates//:scopeguard",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:serde_json5",
"@crates//:shlex",
"@crates//:tokio",
Expand Down Expand Up @@ -116,6 +121,8 @@ rust_test(
"@crates//:hyper",
"@crates//:pretty_assertions",
"@crates//:prost-types",
"@crates//:rand",
"@crates//:serde_json",
"@crates//:serial_test",
"@crates//:tempfile",
"@crates//:tracing-test",
Expand Down
5 changes: 4 additions & 1 deletion nativelink-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ relative-path = { version = "2.0.0", default-features = false, features = [
"std",
] }
scopeguard = { version = "1.2.0", default-features = false }
serde = { version = "1.0.219", default-features = false }
serde = { version = "1.0.219", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.140", default-features = false, features = [
"std",
] }
serde_json5 = { version = "0.2.1", default-features = false }
shlex = { version = "2.0.0", default-features = false }
tokio = { version = "1.52.2", features = [
Expand Down
Loading
Loading