Skip to content

Commit e583e3b

Browse files
committed
Merge branch 'main' into mongo-username-password
2 parents 45da505 + 32ef435 commit e583e3b

26 files changed

Lines changed: 307 additions & 139 deletions

.github/workflows/native-cargo.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,7 @@ jobs:
4949
run: cargo test --all --profile=smol
5050
env:
5151
NATIVELINK_TEST_MONGO_URL: ${{ secrets.NATIVELINK_TEST_MONGO_URL}}
52+
53+
# Not a default target, but need to make sure we don't actually break it
54+
- name: Test worker_find_logging
55+
run: cargo build --features worker_find_logging --all-targets

BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ rust_binary(
1414
srcs = [
1515
"src/bin/nativelink.rs",
1616
],
17+
# Enable this to get extra debug about workers that are not being used by the CAS
18+
# crate_features = ["worker_find_logging"],
1719
deps = [
1820
"//nativelink-config",
1921
"//nativelink-error",

Cargo.lock

Lines changed: 12 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ name = "nativelink"
2525
[features]
2626
nix = ["nativelink-worker/nix"]
2727

28+
# Enable this to get extra debug about workers that are not being used by the CAS
29+
# for some reason. We don't enable this by default, as it's part of a hot path in
30+
# the scheduling system, and also that a worker not matching isn't necessarily bad.
31+
worker_find_logging = [
32+
"nativelink-scheduler/worker_find_logging",
33+
"nativelink-util/worker_find_logging",
34+
]
35+
2836
[dependencies]
2937
nativelink-config = { path = "nativelink-config" }
3038
nativelink-error = { path = "nativelink-error" }

flake.nix

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
src = pkgs.lib.cleanSourceWith {
8989
src = (craneLibFor pkgs).path ./.;
9090
filter = path: type:
91-
(builtins.match "^.*(examples/.+\.json5|data/SekienAkashita\.jpg|nativelink-config/README\.md)" path != null)
91+
(builtins.match "^.*(examples/.+\.json5|data/.+|nativelink-config/README\.md)" path != null)
9292
|| ((craneLibFor pkgs).filterCargoSources path type);
9393
};
9494

@@ -161,6 +161,8 @@
161161
(craneLibFor p).buildPackage ((commonArgsFor p)
162162
// {
163163
cargoArtifacts = cargoArtifactsFor p;
164+
# Enable this for debugging worker scheduler issues
165+
# cargoExtraArgs = "--features worker_find_logging";
164166
});
165167

166168
nativeTargetPkgs =
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"ABI_LIBC_VERSION": "glibc_2.35",
3-
"BAZEL_TARGET_LIBC": "glibc_2.35",
4-
"BAZEL_HOST_SYSTEM": "x86_64-unknown-linux-gnu"
2+
"ABI_LIBC_VERSION": "glibc_2.35",
3+
"BAZEL_HOST_SYSTEM": "x86_64-unknown-linux-gnu",
4+
"BAZEL_TARGET_LIBC": "glibc_2.35"
55
}

nativelink-scheduler/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ edition = "2024"
55
name = "nativelink-scheduler"
66
version = "0.7.0"
77

8+
[features]
9+
worker_find_logging = ["nativelink-util/worker_find_logging"]
10+
811
[dependencies]
912
nativelink-config = { path = "../nativelink-config" }
1013
nativelink-error = { path = "../nativelink-error" }

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use nativelink_util::task::JoinHandleDropGuard;
3333
use tokio::sync::Notify;
3434
use tokio::sync::mpsc::{self, UnboundedSender};
3535
use tonic::async_trait;
36+
#[cfg(feature = "worker_find_logging")]
37+
use tracing::info;
3638
use tracing::{error, warn};
3739

3840
use crate::platform_property_manager::PlatformPropertyManager;
@@ -191,21 +193,46 @@ impl ApiWorkerSchedulerImpl {
191193
Ok(())
192194
}
193195

196+
#[cfg_attr(not(feature = "worker_find_logging"), allow(unused_variables))]
197+
fn inner_worker_checker(
198+
(worker_id, w): &(&WorkerId, &Worker),
199+
platform_properties: &PlatformProperties,
200+
) -> bool {
201+
#[cfg(feature = "worker_find_logging")]
202+
{
203+
if !w.can_accept_work() {
204+
info!(
205+
"Worker {worker_id} cannot accept work because is_paused: {}, is_draining: {}",
206+
w.is_paused, w.is_draining
207+
);
208+
return false;
209+
}
210+
if !platform_properties.is_satisfied_by(&w.platform_properties) {
211+
info!("Worker {worker_id} properties are insufficient");
212+
return false;
213+
}
214+
return true;
215+
}
216+
#[cfg(not(feature = "worker_find_logging"))]
217+
{
218+
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
219+
}
220+
}
221+
194222
fn inner_find_worker_for_action(
195223
&self,
196224
platform_properties: &PlatformProperties,
197225
) -> Option<WorkerId> {
198226
let mut workers_iter = self.workers.iter();
199-
let workers_iter = match self.allocation_strategy {
200-
// Use rfind to get the least recently used that satisfies the properties.
201-
WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter.rfind(|(_, w)| {
202-
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
203-
}),
204-
// Use find to get the most recently used that satisfies the properties.
205-
WorkerAllocationStrategy::MostRecentlyUsed => workers_iter.find(|(_, w)| {
206-
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
207-
}),
208-
};
227+
let workers_iter =
228+
match self.allocation_strategy {
229+
// Use rfind to get the least recently used that satisfies the properties.
230+
WorkerAllocationStrategy::LeastRecentlyUsed => workers_iter
231+
.rfind(|worker| Self::inner_worker_checker(worker, platform_properties)),
232+
// Use find to get the most recently used that satisfies the properties.
233+
WorkerAllocationStrategy::MostRecentlyUsed => workers_iter
234+
.find(|worker| Self::inner_worker_checker(worker, platform_properties)),
235+
};
209236
workers_iter.map(|(_, w)| w.id.clone())
210237
}
211238

nativelink-util/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ rust_test_suite(
8989
name = "integration",
9090
timeout = "short",
9191
srcs = [
92+
"tests/action_messages_test.rs",
9293
"tests/buf_channel_test.rs",
9394
"tests/channel_body_for_tests_test.rs",
9495
"tests/common_test.rs",
@@ -104,6 +105,8 @@ rust_test_suite(
104105
],
105106
compile_data = [
106107
"tests/data/SekienAkashita.jpg",
108+
"tests/data/action_message_cachable_060.json",
109+
"tests/data/action_message_uncachable_060.json",
107110
],
108111
proc_macro_deps = [
109112
"//nativelink-macro",

nativelink-util/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ edition = "2024"
55
name = "nativelink-util"
66
version = "0.7.0"
77

8+
[features]
9+
worker_find_logging = []
10+
811
[dependencies]
912
nativelink-config = { path = "../nativelink-config" }
1013
nativelink-error = { path = "../nativelink-error" }

0 commit comments

Comments
 (0)