Skip to content

Commit 78d1232

Browse files
Implement Remote Persistent Workers (#2323)
* Implement Remote Persistent Workers * remove web due to the rewrite coming * remove unused import * fix errors * fix cross-platform issues * address comments * fix clippy * address regression from merge commit #93cb622
1 parent f25e7ac commit 78d1232

23 files changed

Lines changed: 2240 additions & 10 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Persistent Worker Examples
2+
3+
These examples show the NativeLink-facing part of Bazel actions that opt in to
4+
remote persistent worker execution.
5+
6+
Persistent workers are useful for JVM-heavy tools such as `javac`, `scalac`, and
7+
`kotlinc`, and for worker wrappers around tools such as `tsc`. Compatible
8+
actions declare worker support with execution requirements:
9+
10+
```starlark
11+
execution_requirements = {
12+
"supports-workers": "1",
13+
"requires-worker-protocol": "proto", # or "json"
14+
}
15+
```
16+
17+
See:
18+
19+
- [Java](./java/README.md)
20+
- [TypeScript](./typescript/README.md)
21+
- [Kotlin](./kotlin/README.md)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Java Persistent Worker Example
2+
3+
This example shows the NativeLink-facing part of a Java compile action that can
4+
run through the remote persistent worker path. The action must declare that the
5+
tool supports Bazel's worker protocol and that it uses the proto wire format.
6+
7+
```starlark
8+
def _javac_worker_impl(ctx):
9+
args = ctx.actions.args()
10+
args.add("@%s" % ctx.outputs.argfile.path)
11+
12+
ctx.actions.run(
13+
executable = ctx.executable.javac_worker,
14+
arguments = [args],
15+
inputs = ctx.files.srcs + [ctx.outputs.argfile],
16+
outputs = [ctx.outputs.jar],
17+
mnemonic = "Javac",
18+
execution_requirements = {
19+
"supports-workers": "1",
20+
"requires-worker-protocol": "proto",
21+
},
22+
)
23+
```
24+
25+
NativeLink computes the `WorkerKey` from the executable, startup flags before the
26+
first `@argfile`, and `requires-worker-protocol`. The first action starts
27+
`javac_worker --persistent_worker`; later compatible actions send `WorkRequest`
28+
messages to the same process.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Kotlin Persistent Worker Example
2+
3+
Kotlin compile actions have the same remote execution shape as Java: keep the JVM
4+
compiler process warm and send per-action argument files through the worker protocol.
5+
6+
```starlark
7+
def _kotlinc_worker_impl(ctx):
8+
args = ctx.actions.args()
9+
args.add("@%s" % ctx.outputs.argfile.path)
10+
11+
ctx.actions.run(
12+
executable = ctx.executable.kotlinc_worker,
13+
arguments = [args],
14+
inputs = ctx.files.srcs + [ctx.outputs.argfile],
15+
outputs = [ctx.outputs.jar],
16+
mnemonic = "KotlinCompile",
17+
execution_requirements = {
18+
"supports-workers": "1",
19+
"requires-worker-protocol": "proto",
20+
},
21+
)
22+
```
23+
24+
The worker process is started once with `--persistent_worker`; compatible Kotlin
25+
compile actions reuse it until the pool idle timeout or request cap retires it.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# TypeScript Persistent Worker Example
2+
3+
TypeScript worker wrappers commonly use JSON worker protocol framing. The
4+
important NativeLink requirement is that the remote action advertises both
5+
worker support and the JSON protocol.
6+
7+
```starlark
8+
def _tsc_worker_impl(ctx):
9+
args = ctx.actions.args()
10+
args.add("@%s" % ctx.outputs.tsconfig.path)
11+
12+
ctx.actions.run(
13+
executable = ctx.executable.tsc_worker,
14+
arguments = [args],
15+
inputs = ctx.files.srcs + [ctx.outputs.tsconfig],
16+
outputs = ctx.outputs.js,
17+
mnemonic = "TypeScriptCompile",
18+
execution_requirements = {
19+
"supports-workers": "1",
20+
"requires-worker-protocol": "json",
21+
},
22+
)
23+
```
24+
25+
The persistent worker should read one newline-delimited JSON `WorkRequest` from
26+
`stdin` and emit one newline-delimited JSON `WorkResponse` to `stdout` for each
27+
action.

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,20 @@ where
354354

355355
let now = (self.now_fn)().now();
356356

357+
// Honor the per-action `Action.timeout` from the RBE protocol as a
358+
// backend wall-clock deadline. Without this, the only enforcement is
359+
// the Bazel client's --test_timeout, which surfaces as TIMEOUT/NO
360+
// STATUS instead of a backend signal pointing at the worker.
361+
let action_timeout = awaited_action.action_info().timeout;
362+
if action_timeout > Duration::ZERO {
363+
let executing_started_at = awaited_action.state().last_transition_timestamp;
364+
if let Ok(elapsed) = now.duration_since(executing_started_at)
365+
&& elapsed > action_timeout
366+
{
367+
return true;
368+
}
369+
}
370+
357371
let registry_alive = if let Some(ref worker_registry) = self.worker_registry {
358372
if let Some(worker_id) = awaited_action.worker_id() {
359373
worker_registry
@@ -785,7 +799,37 @@ where
785799
ActionStage::Queued
786800
}
787801
}
788-
UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued,
802+
UpdateOperationType::UpdateWithDisconnect => {
803+
// A worker disconnect (e.g. OOMKill, pod eviction, network
804+
// drop) used to requeue without counting as an attempt,
805+
// which let an action that always crashes its worker loop
806+
// forever until the Bazel client's --test_timeout fired.
807+
// Count disconnects as attempts so max_job_retries caps the
808+
// loop and the client sees a backend-attributable error.
809+
awaited_action.attempts += 1;
810+
811+
if awaited_action.attempts > self.max_job_retries {
812+
ActionStage::Completed(ActionResult {
813+
execution_metadata: ExecutionMetadata {
814+
worker: maybe_worker_id
815+
.map_or_else(String::default, ToString::to_string),
816+
..ExecutionMetadata::default()
817+
},
818+
error: Some(make_err!(
819+
Code::Internal,
820+
"Worker disconnected repeatedly while executing this action ({} > {} attempts); the runner likely OOMKilled or the pod was evicted. {}",
821+
awaited_action.attempts,
822+
self.max_job_retries,
823+
format!(
824+
"for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"
825+
),
826+
)),
827+
..ActionResult::default()
828+
})
829+
} else {
830+
ActionStage::Queued
831+
}
832+
}
789833
// We shouldn't get here, but we just ignore it if we do.
790834
UpdateOperationType::ExecutionComplete => {
791835
warn!("inner_update_operation got an ExecutionComplete, that's unexpected.");

nativelink-scheduler/tests/simple_scheduler_test.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,6 +2116,182 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error>
21162116
Ok(())
21172117
}
21182118

2119+
/// Worker crash-loop regression: an action whose worker keeps disconnecting
2120+
/// (e.g. `OOMKill`) used to bypass `max_job_retries` because
2121+
/// `UpdateWithDisconnect` requeued without counting as an attempt. The build
2122+
/// would only terminate when the Bazel client's `--test_timeout` fired,
2123+
/// hiding the cluster-side root cause behind a TIMEOUT/NO STATUS surface.
2124+
/// After the fix, disconnects count as attempts and exceed the cap.
2125+
#[nativelink_test]
2126+
async fn worker_disconnect_loop_caps_at_max_job_retries_test() -> Result<(), Error> {
2127+
let worker_id = WorkerId("worker_id".to_string());
2128+
2129+
let task_change_notify = Arc::new(Notify::new());
2130+
let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback(
2131+
&SimpleSpec {
2132+
max_job_retries: 1,
2133+
..Default::default()
2134+
},
2135+
memory_awaited_action_db_factory(
2136+
0,
2137+
&task_change_notify.clone(),
2138+
MockInstantWrapped::default,
2139+
),
2140+
|| async move {},
2141+
task_change_notify,
2142+
MockInstantWrapped::default,
2143+
None,
2144+
);
2145+
let action_digest = DigestInfo::new([99u8; 32], 512);
2146+
2147+
let mut rx_from_worker =
2148+
setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?;
2149+
let insert_timestamp = make_system_time(1);
2150+
let mut action_listener =
2151+
setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?;
2152+
2153+
let operation_id = {
2154+
let operation_id = match rx_from_worker.recv().await.unwrap().update {
2155+
Some(update_for_worker::Update::StartAction(exec)) => exec.operation_id,
2156+
v => panic!("Expected StartAction, got : {v:?}"),
2157+
};
2158+
assert_eq!(
2159+
action_listener.changed().await.unwrap().0.stage,
2160+
ActionStage::Executing
2161+
);
2162+
OperationId::from(operation_id.as_str())
2163+
};
2164+
2165+
// First disconnect: should requeue (attempts=1, not yet > max_job_retries=1).
2166+
drop(
2167+
scheduler
2168+
.update_action(
2169+
&worker_id,
2170+
&operation_id,
2171+
UpdateOperationType::UpdateWithDisconnect,
2172+
)
2173+
.await,
2174+
);
2175+
{
2176+
let (action_state, _maybe_origin_metadata) = action_listener.changed().await.unwrap();
2177+
assert_eq!(
2178+
action_state.stage,
2179+
ActionStage::Queued,
2180+
"First disconnect should requeue, got: {:?}",
2181+
action_state.stage,
2182+
);
2183+
}
2184+
2185+
// Reattach worker so it picks up the requeued action.
2186+
let mut rx_from_worker =
2187+
setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?;
2188+
{
2189+
match rx_from_worker.recv().await.unwrap().update {
2190+
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
2191+
v => panic!("Expected StartAction, got : {v:?}"),
2192+
}
2193+
assert_eq!(
2194+
action_listener.changed().await.unwrap().0.stage,
2195+
ActionStage::Executing
2196+
);
2197+
}
2198+
2199+
// Second disconnect: now attempts=2 > max_job_retries=1, so the action
2200+
// must transition to Completed with an error mentioning the disconnect
2201+
// loop, not silently requeue.
2202+
drop(
2203+
scheduler
2204+
.update_action(
2205+
&worker_id,
2206+
&operation_id,
2207+
UpdateOperationType::UpdateWithDisconnect,
2208+
)
2209+
.await,
2210+
);
2211+
{
2212+
let (action_state, _maybe_origin_metadata) = action_listener.changed().await.unwrap();
2213+
let ActionStage::Completed(action_result) = &action_state.stage else {
2214+
panic!(
2215+
"Second disconnect should mark action Completed-with-error, got: {:?}",
2216+
action_state.stage
2217+
);
2218+
};
2219+
let err = action_result
2220+
.error
2221+
.as_ref()
2222+
.expect("Completed action from disconnect cap must carry an error");
2223+
assert!(
2224+
err.to_string()
2225+
.contains("Worker disconnected repeatedly while executing this action"),
2226+
"Error message did not mention disconnect loop: {err}",
2227+
);
2228+
}
2229+
2230+
Ok(())
2231+
}
2232+
2233+
/// `Action.timeout` from the RBE protocol must be enforced backend-side.
2234+
/// Without this, an action that hangs forever only terminates when the
2235+
/// Bazel client's `--remote_timeout` (gRPC deadline) or `--test_timeout`
2236+
/// (client-side) fires; from the operator's perspective the cluster never
2237+
/// surfaces the slow action.
2238+
#[nativelink_test]
2239+
async fn action_timeout_is_enforced_backend_side_test() -> Result<(), Error> {
2240+
use nativelink_scheduler::awaited_action_db::AwaitedAction;
2241+
use nativelink_scheduler::simple_scheduler_state_manager::SimpleSchedulerStateManager;
2242+
2243+
// Anchor MockClock so MockInstantWrapped::now() == make_system_time(0).
2244+
MockClock::set_time(Duration::from_secs(NOW_TIME));
2245+
let executing_started_at = make_system_time(0);
2246+
2247+
let action_digest = DigestInfo::new([7u8; 32], 1);
2248+
let mut action_info = make_base_action_info(executing_started_at, action_digest);
2249+
Arc::make_mut(&mut action_info).timeout = Duration::from_secs(2);
2250+
2251+
let operation_id = OperationId::default();
2252+
let mut awaited_action =
2253+
AwaitedAction::new(operation_id.clone(), action_info, executing_started_at);
2254+
awaited_action.worker_set_state(
2255+
Arc::new(ActionState {
2256+
stage: ActionStage::Executing,
2257+
client_operation_id: operation_id,
2258+
action_digest,
2259+
last_transition_timestamp: executing_started_at,
2260+
}),
2261+
executing_started_at,
2262+
);
2263+
2264+
let task_change_notify = Arc::new(Notify::new());
2265+
let state_mgr = SimpleSchedulerStateManager::new(
2266+
/* max_job_retries */ 1,
2267+
/* no_event_action_timeout */ Duration::from_secs(60),
2268+
/* client_action_timeout */ Duration::from_secs(60),
2269+
/* max_executing_timeout */ Duration::ZERO,
2270+
memory_awaited_action_db_factory(
2271+
0,
2272+
&task_change_notify.clone(),
2273+
MockInstantWrapped::default,
2274+
),
2275+
MockInstantWrapped::default,
2276+
/* worker_registry */ None,
2277+
);
2278+
2279+
assert!(
2280+
!state_mgr.should_timeout_operation(&awaited_action).await,
2281+
"Should not time out before Action.timeout elapses",
2282+
);
2283+
2284+
// Advance past the 2s per-action deadline.
2285+
MockClock::advance(Duration::from_secs(5));
2286+
2287+
assert!(
2288+
state_mgr.should_timeout_operation(&awaited_action).await,
2289+
"Scheduler must mark Executing action timed out once Action.timeout has elapsed",
2290+
);
2291+
2292+
Ok(())
2293+
}
2294+
21192295
#[nativelink_test]
21202296
async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> {
21212297
struct DropChecker {

nativelink-worker/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ rust_library(
1313
"src/directory_cache.rs",
1414
"src/lib.rs",
1515
"src/local_worker.rs",
16+
"src/persistent_worker/live_worker.rs",
17+
"src/persistent_worker/mod.rs",
18+
"src/persistent_worker/pool.rs",
19+
"src/persistent_worker/protocol.rs",
1620
"src/qos.rs",
1721
"src/running_actions_manager.rs",
1822
"src/worker_api_client_wrapper.rs",
@@ -43,6 +47,7 @@ rust_library(
4347
"@crates//:relative-path",
4448
"@crates//:scopeguard",
4549
"@crates//:serde",
50+
"@crates//:serde_json",
4651
"@crates//:serde_json5",
4752
"@crates//:shlex",
4853
"@crates//:tokio",
@@ -116,6 +121,8 @@ rust_test(
116121
"@crates//:hyper",
117122
"@crates//:pretty_assertions",
118123
"@crates//:prost-types",
124+
"@crates//:rand",
125+
"@crates//:serde_json",
119126
"@crates//:serial_test",
120127
"@crates//:tempfile",
121128
"@crates//:tracing-test",

nativelink-worker/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ relative-path = { version = "2.0.0", default-features = false, features = [
3131
"std",
3232
] }
3333
scopeguard = { version = "1.2.0", default-features = false }
34-
serde = { version = "1.0.219", default-features = false }
34+
serde = { version = "1.0.219", default-features = false, features = ["derive"] }
35+
serde_json = { version = "1.0.140", default-features = false, features = [
36+
"std",
37+
] }
3538
serde_json5 = { version = "0.2.1", default-features = false }
3639
shlex = { version = "2.0.0", default-features = false }
3740
tokio = { version = "1.52.2", features = [

0 commit comments

Comments
 (0)