Skip to content

Commit df48a53

Browse files
aherrmannfacebook-github-bot
authored andcommitted
Add remote persistent worker support (#787)
Summary: Closes #776. Implements support for persistent workers in remote builds using the Bazel remote execution protocol and the approach documented in the Bazel remote persistent workers proposal: https://github.com/bazelbuild/proposals/blob/main/designs/2021-03-06-remote-persistent-workers.md The Bazel remote persistent worker protocol includes an automatic fallback in cases where the remote execution system does not yet support persistent workers. To that end actions take the shape ``` WORKER WORKER_ARGS... REQUEST_ARGS_FILE ``` The remote execution system separates worker arguments on the command-line from request arguments in the response file and adds the `--persistent_worker` flag. Pull Request resolved: #787 Test Plan: Test added in D68157749. Reviewed By: IanChilds Differential Revision: D65213349 Pulled By: KapJI fbshipit-source-id: 79d219a06c08fad7d16976c6c9e5a6815dd073df
1 parent aed7312 commit df48a53

File tree

12 files changed

+88
-5
lines changed

12 files changed

+88
-5
lines changed

app/buck2_action_impl/src/actions/impls/run.rs

+28
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use buck2_core::execution_types::executor_config::RemoteExecutorDependency;
4040
use buck2_core::fs::artifact_path_resolver::ArtifactFs;
4141
use buck2_core::fs::buck_out_path::BuildArtifactPath;
4242
use buck2_core::fs::paths::forward_rel_path::ForwardRelativePathBuf;
43+
use buck2_error::buck2_error;
4344
use buck2_error::BuckErrorContext;
4445
use buck2_events::dispatch::span_async_simple;
4546
use buck2_execute::artifact::fs::ExecutorFs;
@@ -263,6 +264,7 @@ struct UnpackedWorkerValues<'v> {
263264
id: WorkerId,
264265
concurrency: Option<usize>,
265266
streaming: bool,
267+
supports_bazel_remote_persistent_worker_protocol: bool,
266268
}
267269

268270
struct UnpackedRunActionValues<'v> {
@@ -320,6 +322,8 @@ impl RunAction {
320322
id: WorkerId(worker.id),
321323
concurrency: worker.concurrency(),
322324
streaming: worker.streaming(),
325+
supports_bazel_remote_persistent_worker_protocol: worker
326+
.supports_bazel_remote_persistent_worker_protocol(),
323327
});
324328

325329
Ok(UnpackedRunActionValues {
@@ -352,11 +356,35 @@ impl RunAction {
352356
.exe
353357
.add_to_command_line(&mut worker_rendered, &mut cli_ctx)?;
354358
worker.exe.visit_artifacts(artifact_visitor)?;
359+
let worker_key = if worker.supports_bazel_remote_persistent_worker_protocol {
360+
let mut worker_visitor = SimpleCommandLineArtifactVisitor::new();
361+
worker.exe.visit_artifacts(&mut worker_visitor)?;
362+
if !worker_visitor.outputs.is_empty() {
363+
// TODO[AH] create appropriate error enum value.
364+
return Err(buck2_error!(
365+
buck2_error::ErrorTag::ActionMismatchedOutputs,
366+
"Remote persistent worker command should not produce outputs."
367+
));
368+
}
369+
let worker_inputs: Vec<&ArtifactGroupValues> = worker_visitor
370+
.inputs()
371+
.map(|group| action_execution_ctx.artifact_values(group))
372+
.collect();
373+
let (_, worker_digest) = metadata_content(
374+
fs.fs(),
375+
&worker_inputs,
376+
action_execution_ctx.digest_config(),
377+
)?;
378+
Some(worker_digest)
379+
} else {
380+
None
381+
};
355382
Some(WorkerSpec {
356383
exe: worker_rendered,
357384
id: worker.id,
358385
concurrency: worker.concurrency,
359386
streaming: worker.streaming,
387+
remote_key: worker_key,
360388
})
361389
} else {
362390
None

app/buck2_build_api/src/actions/execute/action_executor.rs

+1
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,7 @@ mod tests {
761761
CommandGenerationOptions {
762762
path_separator: PathSeparatorKind::Unix,
763763
output_paths_behavior: Default::default(),
764+
use_bazel_protocol_remote_persistent_workers: false,
764765
},
765766
Default::default(),
766767
),

app/buck2_build_api/src/interpreter/rule_defs/command_executor_config.rs

+3
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ pub fn register_command_executor_config(builder: &mut GlobalsBuilder) {
8686
/// * `allow_hybrid_fallbacks_on_failure`: Whether to allow fallbacks when the result is failure (i.e. the command failed on the primary, but the infra worked)
8787
/// * `use_windows_path_separators`: Whether to use Windows path separators in command line arguments
8888
/// * `use_persistent workers`: Whether to use persistent workers for local execution if they are available
89+
/// * `use_bazel_protocol_remote_persistent_workers`: Whether to use persistent workers for remote execution via the Bazel remote persistent worker protocol if they are available
8990
/// * `allow_cache_uploads`: Whether to upload local actions to the RE cache
9091
/// * `max_cache_upload_mebibytes`: Maximum size to upload in cache uploads
9192
/// * `experimental_low_pass_filter`: Whether to use the experimental low pass filter
@@ -111,6 +112,7 @@ pub fn register_command_executor_config(builder: &mut GlobalsBuilder) {
111112
#[starlark(default = false, require = named)] allow_hybrid_fallbacks_on_failure: bool,
112113
#[starlark(default = false, require = named)] use_windows_path_separators: bool,
113114
#[starlark(default = false, require = named)] use_persistent_workers: bool,
115+
#[starlark(default = false, require = named)] use_bazel_protocol_remote_persistent_workers: bool,
114116
#[starlark(default = false, require = named)] allow_cache_uploads: bool,
115117
#[starlark(default = NoneOr::None, require = named)] max_cache_upload_mebibytes: NoneOr<
116118
i32,
@@ -323,6 +325,7 @@ pub fn register_command_executor_config(builder: &mut GlobalsBuilder) {
323325
PathSeparatorKind::Unix
324326
},
325327
output_paths_behavior,
328+
use_bazel_protocol_remote_persistent_workers,
326329
},
327330
}
328331
};

app/buck2_build_api/src/interpreter/rule_defs/provider/builtin/worker_info.rs

+14
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct WorkerInfoGen<V: ValueLifetimeless> {
5050
pub concurrency: ValueOfUncheckedGeneric<V, NoneOr<usize>>,
5151
// Whether to always run actions using this worker via the streaming API
5252
pub streaming: ValueOfUncheckedGeneric<V, bool>,
53+
// Bazel remote persistent worker protocol capable worker
54+
pub supports_bazel_remote_persistent_worker_protocol: ValueOfUncheckedGeneric<V, bool>,
5355

5456
pub id: u64,
5557
}
@@ -68,6 +70,8 @@ fn worker_info_creator(globals: &mut GlobalsBuilder) {
6870
ValueOf<'v, usize>,
6971
>,
7072
#[starlark(require = named, default = NoneType)] streaming: Value<'v>,
73+
#[starlark(require = named, default = false)]
74+
supports_bazel_remote_persistent_worker_protocol: bool,
7175
eval: &mut Evaluator<'v, '_, '_>,
7276
) -> starlark::Result<WorkerInfo<'v>> {
7377
let heap = eval.heap();
@@ -79,6 +83,9 @@ fn worker_info_creator(globals: &mut GlobalsBuilder) {
7983
id,
8084
concurrency: heap.alloc_typed_unchecked(concurrency).cast(),
8185
streaming: ValueOfUnchecked::new(streaming),
86+
supports_bazel_remote_persistent_worker_protocol: heap
87+
.alloc_typed_unchecked(supports_bazel_remote_persistent_worker_protocol)
88+
.cast(),
8289
})
8390
}
8491
}
@@ -105,6 +112,13 @@ impl<'v, V: ValueLike<'v>> WorkerInfoGen<V> {
105112
.into_option()
106113
.unwrap_or(false)
107114
}
115+
116+
pub fn supports_bazel_remote_persistent_worker_protocol(&self) -> bool {
117+
self.supports_bazel_remote_persistent_worker_protocol
118+
.to_value()
119+
.unpack()
120+
.expect("validated at construction")
121+
}
108122
}
109123

110124
fn validate_worker_info<'v, V>(info: &WorkerInfoGen<V>) -> buck2_error::Result<()>

app/buck2_build_api_tests/src/interpreter/rule_defs/provider/builtin/worker_info.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn run_display() {
2424
.run_starlark_bzl_test(
2525
r#"
2626
def test():
27-
assert_eq('WorkerInfo(exe=cmd_args("x"), concurrency=None, streaming=None)', str(WorkerInfo(exe="x")))
27+
assert_eq('WorkerInfo(exe=cmd_args("x"), concurrency=None, streaming=None, supports_bazel_remote_persistent_worker_protocol=False)', str(WorkerInfo(exe="x")))
2828
"#,
2929
)
3030
.unwrap();

app/buck2_core/src/execution_types/executor_config.rs

+2
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ impl Default for CacheUploadBehavior {
283283
pub struct CommandGenerationOptions {
284284
pub path_separator: PathSeparatorKind,
285285
pub output_paths_behavior: OutputPathsBehavior,
286+
pub use_bazel_protocol_remote_persistent_workers: bool,
286287
}
287288

288289
#[derive(Debug, Eq, PartialEq, Hash, Allocative, Clone)]
@@ -314,6 +315,7 @@ impl CommandExecutorConfig {
314315
options: CommandGenerationOptions {
315316
path_separator: PathSeparatorKind::system_default(),
316317
output_paths_behavior: Default::default(),
318+
use_bazel_protocol_remote_persistent_workers: false,
317319
},
318320
})
319321
}

app/buck2_execute/src/execute/command_executor.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use buck2_core::fs::artifact_path_resolver::ArtifactFs;
2020
use buck2_core::fs::project_rel_path::ProjectRelativePath;
2121
use buck2_core::fs::project_rel_path::ProjectRelativePathBuf;
2222
use buck2_directory::directory::fingerprinted_directory::FingerprintedDirectory;
23-
#[cfg(fbcode_build)]
2423
use buck2_error::buck2_error;
2524
use buck2_error::BuckErrorContext;
2625
use buck2_futures::cancellation::CancellationContext;
@@ -187,15 +186,45 @@ impl CommandExecutor {
187186
}
188187
CommandExecutionInput::ScratchPath(_) => None,
189188
});
189+
let mut platform = self.0.re_platform.clone();
190+
let args = if self.0.options.use_bazel_protocol_remote_persistent_workers
191+
&& let Some(worker) = request.worker()
192+
&& let Some(key) = worker.remote_key.as_ref()
193+
{
194+
platform.properties.push(RE::Property {
195+
name: "persistentWorkerKey".to_owned(),
196+
value: key.to_string(),
197+
});
198+
// TODO[AH] Ideally, Buck2 could generate an argfile on the fly.
199+
for arg in request.args() {
200+
if !(arg.starts_with("@")
201+
|| arg.starts_with("-flagfile")
202+
|| arg.starts_with("--flagfile"))
203+
{
204+
return Err(buck2_error!(
205+
buck2_error::ErrorTag::Input,
206+
"Remote persistent worker arguments must be passed as `@argfile`, `-flagfile=argfile`, or `--flagfile=argfile`."
207+
));
208+
}
209+
}
210+
worker
211+
.exe
212+
.iter()
213+
.chain(request.args().iter())
214+
.cloned()
215+
.collect()
216+
} else {
217+
request.all_args_vec()
218+
};
190219
let action = re_create_action(
191-
request.all_args_vec(),
220+
args,
192221
request.paths().output_paths(),
193222
request.working_directory(),
194223
request.env(),
195224
input_digest,
196225
action_metadata_blobs,
197226
request.timeout(),
198-
self.0.re_platform.clone(),
227+
platform,
199228
false,
200229
digest_config,
201230
self.0.options.output_paths_behavior,

app/buck2_execute/src/execute/request.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,13 @@ impl CommandExecutionPaths {
278278
#[derive(Copy, Clone, Dupe, Debug, Display, Allocative, Hash, PartialEq, Eq)]
279279
pub struct WorkerId(pub u64);
280280

281-
#[derive(Clone)]
281+
#[derive(Clone, Debug)]
282282
pub struct WorkerSpec {
283283
pub id: WorkerId,
284284
pub exe: Vec<String>,
285285
pub concurrency: Option<usize>,
286286
pub streaming: bool,
287+
pub remote_key: Option<TrackedFileDigest>,
287288
}
288289

289290
/// The data contains the information about the command to be executed.

app/buck2_execute/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#![feature(try_trait_v2)]
1515
#![feature(used_with_arg)]
1616
#![feature(trait_upcasting)]
17+
#![feature(let_chains)]
1718

1819
pub mod artifact;
1920
pub mod artifact_utils;

app/buck2_server/src/daemon/common.rs

+1
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ pub fn get_default_executor_config(host_platform: HostPlatformOverride) -> Comma
500500
options: CommandGenerationOptions {
501501
path_separator: get_default_path_separator(host_platform),
502502
output_paths_behavior: Default::default(),
503+
use_bazel_protocol_remote_persistent_workers: false,
503504
},
504505
}
505506
}

app/buck2_test/src/orchestrator.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,7 @@ impl<'b> BuckTestOrchestrator<'b> {
11911191
options: CommandGenerationOptions {
11921192
path_separator: PathSeparatorKind::system_default(),
11931193
output_paths_behavior: Default::default(),
1194+
use_bazel_protocol_remote_persistent_workers: false,
11941195
},
11951196
};
11961197
let CommandExecutorResponse {
@@ -1733,6 +1734,7 @@ impl<'a> Execute2RequestExpander<'a> {
17331734
id: WorkerId(worker.id),
17341735
concurrency: worker.concurrency(),
17351736
streaming: worker.streaming(),
1737+
remote_key: None,
17361738
})
17371739
}
17381740
_ => None,

tests/core/docs/test_builtin_docs_data/buck2-golden-docs/build/WorkerInfo.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
## WorkerInfo.concurrency
55
## WorkerInfo.exe
66
## WorkerInfo.streaming
7+
## WorkerInfo.supports\_bazel\_remote\_persistent\_worker\_protocol

0 commit comments

Comments
 (0)