Skip to content

Commit 7afe286

Browse files
ClientKeepAlive update action ClientKeepAlive (#1580)
When the scheduler was updated to add the keep alive to the AwaitedAction the MemoryAwaitedActionDb was not updated to set this when a ClientKeepAlive was received. Fix the test client_reconnect_keeps_action_alive which was not performing the eviction due to optimisations in the filter_operations function which then detected the issue. Then update the ActionEvent::ClientKeepAlive event handler to update the client keep alive timestamp in the AwaitedAction. Fixes #1579.
1 parent e753b8d commit 7afe286

File tree

6 files changed

+36
-24
lines changed

6 files changed

+36
-24
lines changed

nativelink-scheduler/src/awaited_action_db/awaited_action.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ impl AwaitedAction {
171171
pub(crate) fn last_client_keepalive_timestamp(&self) -> SystemTime {
172172
self.last_client_keepalive_timestamp
173173
}
174+
174175
pub(crate) fn update_client_keep_alive(&mut self, now: SystemTime) {
175176
self.last_client_keepalive_timestamp = now;
176177
}

nativelink-scheduler/src/awaited_action_db/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::cmp;
1616
use std::ops::Bound;
1717
use std::sync::Arc;
18+
use std::time::Duration;
1819

1920
pub use awaited_action::{AwaitedAction, AwaitedActionSortKey};
2021
use futures::{Future, Stream};
@@ -25,6 +26,9 @@ use serde::{Deserialize, Serialize};
2526

2627
mod awaited_action;
2728

29+
/// Duration to wait before sending client keep alive messages.
30+
pub const CLIENT_KEEPALIVE_DURATION: Duration = Duration::from_secs(10);
31+
2832
/// A simple enum to represent the state of an `AwaitedAction`.
2933
#[derive(Debug, Clone, Copy)]
3034
pub enum SortedAwaitedActionState {

nativelink-scheduler/src/memory_awaited_action_db.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::collections::{BTreeMap, BTreeSet, HashMap};
1616
use std::ops::{Bound, RangeBounds};
1717
use std::sync::Arc;
18-
use std::time::Duration;
1918

2019
use async_lock::Mutex;
2120
use futures::{FutureExt, Stream};
@@ -35,15 +34,12 @@ use tracing::{event, Level};
3534

3635
use crate::awaited_action_db::{
3736
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedAction,
38-
SortedAwaitedActionState,
37+
SortedAwaitedActionState, CLIENT_KEEPALIVE_DURATION,
3938
};
4039

4140
/// Number of events to process per cycle.
4241
const MAX_ACTION_EVENTS_RX_PER_CYCLE: usize = 1024;
4342

44-
/// Duration to wait before sending client keep alive messages.
45-
const CLIENT_KEEPALIVE_DURATION: Duration = Duration::from_secs(10);
46-
4743
/// Represents a client that is currently listening to an action.
4844
/// When the client is dropped, it will send the `AwaitedAction` to the
4945
/// `event_tx` if there are other cleanups needed.
@@ -452,11 +448,21 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
452448
}
453449
}
454450
ActionEvent::ClientKeepAlive(client_id) => {
455-
let maybe_size = self
451+
if let Some(client_awaited_action) = self
456452
.client_operation_to_awaited_action
457-
.size_for_key(&client_id)
458-
.await;
459-
if maybe_size.is_none() {
453+
.get(&client_id)
454+
.await
455+
{
456+
if let Some(awaited_action_sender) = self
457+
.operation_id_to_awaited_action
458+
.get(&client_awaited_action.operation_id)
459+
{
460+
awaited_action_sender.send_if_modified(|awaited_action| {
461+
awaited_action.update_client_keep_alive((self.now_fn)().now());
462+
false
463+
});
464+
}
465+
} else {
460466
event!(
461467
Level::ERROR,
462468
?client_id,

nativelink-scheduler/src/simple_scheduler.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use tokio_stream::StreamExt;
3838
use tracing::{event, info_span, Level};
3939

4040
use crate::api_worker_scheduler::ApiWorkerScheduler;
41-
use crate::awaited_action_db::AwaitedActionDb;
41+
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
4242
use crate::platform_property_manager::PlatformPropertyManager;
4343
use crate::simple_scheduler_state_manager::SimpleSchedulerStateManager;
4444
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp};
@@ -368,6 +368,12 @@ impl SimpleScheduler {
368368
if client_action_timeout_s == 0 {
369369
client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S;
370370
}
371+
// This matches the value of CLIENT_KEEPALIVE_DURATION which means that
372+
// tasks are going to be dropped all over the place, this isn't a good
373+
// setting.
374+
if client_action_timeout_s <= CLIENT_KEEPALIVE_DURATION.as_secs() {
375+
event!(Level::ERROR, client_action_timeout_s, "Setting client_action_timeout_s to less than the client keep alive interval is going to cause issues, please set above {}.", CLIENT_KEEPALIVE_DURATION.as_secs());
376+
}
371377

372378
let mut max_job_retries = spec.max_job_retries;
373379
if max_job_retries == 0 {

nativelink-scheduler/src/store_awaited_action_db.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,11 @@ use tracing::{event, Level};
3838

3939
use crate::awaited_action_db::{
4040
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, SortedAwaitedAction,
41-
SortedAwaitedActionState,
41+
SortedAwaitedActionState, CLIENT_KEEPALIVE_DURATION,
4242
};
4343

4444
type ClientOperationId = OperationId;
4545

46-
/// Duration to wait before sending client keep alive messages.
47-
const CLIENT_KEEPALIVE_DURATION: Duration = Duration::from_secs(10);
48-
4946
/// Maximum number of retries to update client keep alive.
5047
const MAX_RETRIES_FOR_CLIENT_KEEPALIVE: u32 = 8;
5148

nativelink-scheduler/tests/simple_scheduler_test.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ use nativelink_util::action_messages::{
4949
use nativelink_util::common::DigestInfo;
5050
use nativelink_util::instant_wrapper::MockInstantWrapped;
5151
use nativelink_util::operation_state_manager::{
52-
ActionStateResult, ClientStateManager, OperationFilter, UpdateOperationType,
52+
ActionStateResult, ClientStateManager, OperationFilter, OperationStageFlags,
53+
UpdateOperationType,
5354
};
5455
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
5556
use pretty_assertions::assert_eq;
@@ -2292,18 +2293,15 @@ async fn client_reconnect_keeps_action_alive() -> Result<(), Error> {
22922293
assert_eq!(poll!(&mut changed_fut), Poll::Pending);
22932294
tokio::task::yield_now().await;
22942295
// Eviction happens when someone touches the internal
2295-
// evicting map. So we constantly ask for some other client
2296-
// to trigger eviction logic.
2297-
assert!(scheduler
2296+
// evicting map. So we constantly ask for all queued actions.
2297+
// Regression: https://github.com/TraceMachina/nativelink/issues/1579
2298+
let mut stream = scheduler
22982299
.filter_operations(OperationFilter {
2299-
client_operation_id: Some(OperationId::from("dummy_client_id")),
2300+
stages: OperationStageFlags::Queued,
23002301
..Default::default()
23012302
})
2302-
.await
2303-
.unwrap()
2304-
.next()
2305-
.await
2306-
.is_none());
2303+
.await?;
2304+
while stream.next().await.is_some() {}
23072305
}
23082306

23092307
Ok(())

0 commit comments

Comments
 (0)