Skip to content

Commit 97bc655

Browse files
committed
Add worker_find_logging
1 parent 7cedb68 commit 97bc655

9 files changed

Lines changed: 75 additions & 10 deletions

File tree

.github/workflows/native-cargo.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,7 @@ jobs:
4646

4747
- name: Test on ${{ runner.os }}
4848
run: cargo test --all --profile=smol
49+
50+
# Not a default target, but need to make sure we don't actually break it
51+
- name: Test worker_find_logging
52+
run: cargo build --features worker_find_logging --all-targets

BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ rust_binary(
1414
srcs = [
1515
"src/bin/nativelink.rs",
1616
],
17+
# Enable this to get extra debug about workers that are not being used by the CAS
18+
# crate_features = ["worker_find_logging"],
1719
deps = [
1820
"//nativelink-config",
1921
"//nativelink-error",

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ name = "nativelink"
2525
[features]
2626
nix = ["nativelink-worker/nix"]
2727

28+
# Enable this to get extra debug about workers that are not being used by the CAS
29+
# for some reason. We don't enable this by default, as it's part of a hot path in
30+
# the scheduling system, and also that a worker not matching isn't necessarily bad.
31+
worker_find_logging = [
32+
"nativelink-scheduler/worker_find_logging",
33+
"nativelink-util/worker_find_logging",
34+
]
35+
2836
[dependencies]
2937
nativelink-config = { path = "nativelink-config" }
3038
nativelink-error = { path = "nativelink-error" }

flake.nix

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@
161161
(craneLibFor p).buildPackage ((commonArgsFor p)
162162
// {
163163
cargoArtifacts = cargoArtifactsFor p;
164+
# Enable this for debugging worker scheduler issues
165+
# cargoExtraArgs = "--features worker_find_logging";
164166
});
165167

166168
nativeTargetPkgs =

nativelink-scheduler/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ edition = "2024"
55
name = "nativelink-scheduler"
66
version = "0.7.0"
77

8+
[features]
9+
worker_find_logging = ["nativelink-util/worker_find_logging"]
10+
811
[dependencies]
912
nativelink-config = { path = "../nativelink-config" }
1013
nativelink-error = { path = "../nativelink-error" }

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use nativelink_util::task::JoinHandleDropGuard;
3333
use tokio::sync::Notify;
3434
use tokio::sync::mpsc::{self, UnboundedSender};
3535
use tonic::async_trait;
36+
#[cfg(feature = "worker_find_logging")]
37+
use tracing::info;
3638
use tracing::{error, warn};
3739

3840
use crate::platform_property_manager::PlatformPropertyManager;
@@ -191,21 +193,46 @@ impl ApiWorkerSchedulerImpl {
191193
Ok(())
192194
}
193195

196+
#[cfg_attr(not(feature = "worker_find_logging"), allow(unused_variables))]
197+
fn inner_worker_checker(
198+
(worker_id, w): &(&WorkerId, &Worker),
199+
platform_properties: &PlatformProperties,
200+
) -> bool {
201+
#[cfg(feature = "worker_find_logging")]
202+
{
203+
if !w.can_accept_work() {
204+
info!(
205+
"Worker {worker_id} cannot accept work because is_paused: {}, is_draining: {}",
206+
w.is_paused, w.is_draining
207+
);
208+
return false;
209+
}
210+
if !platform_properties.is_satisfied_by(&w.platform_properties) {
211+
info!("Worker {worker_id} properties are insufficient");
212+
return false;
213+
}
214+
return true;
215+
}
216+
#[cfg(not(feature = "worker_find_logging"))]
217+
{
218+
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
219+
}
220+
}
221+
194222
fn inner_find_worker_for_action(
195223
&self,
196224
platform_properties: &PlatformProperties,
197225
) -> Option<WorkerId> {
198226
let mut workers_iter = self.workers.iter();
199-
let workers_iter = match self.allocation_strategy {
200-
// Use rfind to get the least recently used that satisfies the properties.
201-
WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter.rfind(|(_, w)| {
202-
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
203-
}),
204-
// Use find to get the most recently used that satisfies the properties.
205-
WorkerAllocationStrategy::MostRecentlyUsed => workers_iter.find(|(_, w)| {
206-
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
207-
}),
208-
};
227+
let workers_iter =
228+
match self.allocation_strategy {
229+
// Use rfind to get the least recently used that satisfies the properties.
230+
WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
231+
.rfind(|worker| Self::inner_worker_checker(worker, platform_properties)),
232+
// Use find to get the most recently used that satisfies the properties.
233+
WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
234+
.find(|worker| Self::inner_worker_checker(worker, platform_properties)),
235+
};
209236
workers_iter.map(|(_, w)| w.id.clone())
210237
}
211238

nativelink-util/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ edition = "2024"
55
name = "nativelink-util"
66
version = "0.7.0"
77

8+
[features]
9+
worker_find_logging = []
10+
811
[dependencies]
912
nativelink-config = { path = "../nativelink-config" }
1013
nativelink-error = { path = "../nativelink-error" }

nativelink-util/src/platform_properties.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use nativelink_metric::{
2121
use nativelink_proto::build::bazel::remote::execution::v2::Platform as ProtoPlatform;
2222
use nativelink_proto::build::bazel::remote::execution::v2::platform::Property as ProtoProperty;
2323
use serde::{Deserialize, Serialize};
24+
#[cfg(feature = "worker_find_logging")]
25+
use tracing::info;
2426

2527
/// `PlatformProperties` helps manage the configuration of platform properties to
2628
/// keys and types. The scheduler uses these properties to decide what jobs
@@ -47,9 +49,19 @@ impl PlatformProperties {
4749
for (property, check_value) in &self.properties {
4850
if let Some(worker_value) = worker_properties.properties.get(property) {
4951
if !check_value.is_satisfied_by(worker_value) {
52+
#[cfg(feature = "worker_find_logging")]
53+
{
54+
info!(
55+
"Property mismatch on worker property {property}. {worker_value:?} != {check_value:?}"
56+
);
57+
}
5058
return false;
5159
}
5260
} else {
61+
#[cfg(feature = "worker_find_logging")]
62+
{
63+
info!("Property missing on worker property {property}");
64+
}
5365
return false;
5466
}
5567
}

src/bin/nativelink.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,10 @@ fn get_config() -> Result<CasConfig, Error> {
838838
}
839839

840840
fn main() -> Result<(), Box<dyn core::error::Error>> {
841+
if cfg!(feature = "worker_find_logging") {
842+
info!("worker_find_logging enabled");
843+
}
844+
841845
let mut cfg = get_config()?;
842846

843847
let global_cfg = if let Some(global_cfg) = &mut cfg.global {

0 commit comments

Comments
 (0)