Skip to content

Commit 25caf53

Browse files
committed
Expose OriginContextCollector to scheduler's matching engine
Ensures OriginContext data is available to the matching engine and when dealing with an action, the first user to invoke the action will be associated with the creation of the operation.
1 parent 1837f6e commit 25caf53

15 files changed

+375
-178
lines changed

nativelink-scheduler/src/awaited_action_db/awaited_action.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use nativelink_metric::{
2222
use nativelink_util::action_messages::{
2323
ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
2424
};
25+
use nativelink_util::origin_context::ActiveOriginContext;
26+
use nativelink_util::origin_event::{OriginMetadata, ORIGIN_EVENT_COLLECTOR};
2527
use serde::{Deserialize, Serialize};
2628
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};
2729

@@ -78,6 +80,9 @@ pub struct AwaitedAction {
7880
#[metric(help = "The state of the AwaitedAction")]
7981
state: Arc<ActionState>,
8082

83+
/// The origin metadata of the action.
84+
maybe_origin_metadata: Option<OriginMetadata>,
85+
8186
/// Number of attempts the job has been tried.
8287
#[metric(help = "The number of attempts the AwaitedAction has been tried")]
8388
pub attempts: usize,
@@ -100,6 +105,11 @@ impl AwaitedAction {
100105
client_operation_id: operation_id.clone(),
101106
action_digest: action_info.unique_qualifier.digest(),
102107
});
108+
let maybe_origin_metadata = ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
109+
.ok()
110+
.flatten()
111+
.map(|v| v.metadata.clone());
112+
103113
Self {
104114
version: AwaitedActionVersion(0),
105115
action_info,
@@ -108,6 +118,7 @@ impl AwaitedAction {
108118
attempts: 0,
109119
last_worker_updated_timestamp: now,
110120
last_client_keepalive_timestamp: now,
121+
maybe_origin_metadata,
111122
worker_id: None,
112123
state,
113124
}
@@ -141,6 +152,10 @@ impl AwaitedAction {
141152
&self.state
142153
}
143154

155+
pub(crate) fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> {
156+
self.maybe_origin_metadata.as_ref()
157+
}
158+
144159
pub(crate) fn worker_id(&self) -> Option<WorkerId> {
145160
self.worker_id
146161
}

nativelink-scheduler/src/cache_lookup_scheduler.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProv
3333
use nativelink_util::operation_state_manager::{
3434
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
3535
};
36+
use nativelink_util::origin_context::ActiveOriginContext;
37+
use nativelink_util::origin_event::{OriginMetadata, ORIGIN_EVENT_COLLECTOR};
3638
use nativelink_util::store_trait::Store;
3739
use parking_lot::{Mutex, MutexGuard};
3840
use scopeguard::guard;
@@ -109,27 +111,34 @@ fn subscribe_to_existing_action(
109111

110112
struct CacheLookupActionStateResult {
111113
action_state: Arc<ActionState>,
114+
maybe_origin_metadata: Option<OriginMetadata>,
112115
change_called: bool,
113116
}
114117

115118
#[async_trait]
116119
impl ActionStateResult for CacheLookupActionStateResult {
117-
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
118-
Ok(self.action_state.clone())
120+
async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
121+
Ok((
122+
self.action_state.clone(),
123+
self.maybe_origin_metadata.clone(),
124+
))
119125
}
120126

121-
async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
127+
async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
122128
if self.change_called {
123129
return Err(make_err!(
124130
Code::Internal,
125131
"CacheLookupActionStateResult::changed called twice"
126132
));
127133
}
128134
self.change_called = true;
129-
Ok(self.action_state.clone())
135+
Ok((
136+
self.action_state.clone(),
137+
self.maybe_origin_metadata.clone(),
138+
))
130139
}
131140

132-
async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
141+
async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
133142
// TODO(allada) We should probably remove as_action_info()
134143
// or implement it properly.
135144
return Err(make_err!(
@@ -251,11 +260,17 @@ impl CacheLookupScheduler {
251260
action_digest: action_info.unique_qualifier.digest(),
252261
};
253262

263+
let maybe_origin_metadata =
264+
ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
265+
.ok()
266+
.flatten()
267+
.map(|v| v.metadata.clone());
254268
for (client_operation_id, pending_tx) in pending_txs {
255269
action_state.client_operation_id = client_operation_id;
256270
// Ignore errors here, as the other end may have hung up.
257271
let _ = pending_tx.send(Ok(Box::new(CacheLookupActionStateResult {
258272
action_state: Arc::new(action_state.clone()),
273+
maybe_origin_metadata: maybe_origin_metadata.clone(),
259274
change_called: false,
260275
})));
261276
}

nativelink-scheduler/src/default_scheduler_factory.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ use nativelink_config::schedulers::{
2020
};
2121
use nativelink_config::stores::EvictionPolicy;
2222
use nativelink_error::{make_input_err, Error, ResultExt};
23+
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
2324
use nativelink_store::redis_store::RedisStore;
2425
use nativelink_store::store_manager::StoreManager;
2526
use nativelink_util::instant_wrapper::InstantWrapper;
2627
use nativelink_util::operation_state_manager::ClientStateManager;
27-
use tokio::sync::Notify;
28+
use tokio::sync::{mpsc, Notify};
2829

2930
use crate::cache_lookup_scheduler::CacheLookupScheduler;
3031
use crate::grpc_scheduler::GrpcScheduler;
@@ -46,25 +47,27 @@ pub type SchedulerFactoryResults = (
4647
pub fn scheduler_factory(
4748
spec: &SchedulerSpec,
4849
store_manager: &StoreManager,
50+
maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
4951
) -> Result<SchedulerFactoryResults, Error> {
50-
inner_scheduler_factory(spec, store_manager)
52+
inner_scheduler_factory(spec, store_manager, maybe_origin_event_tx)
5153
}
5254

5355
fn inner_scheduler_factory(
5456
spec: &SchedulerSpec,
5557
store_manager: &StoreManager,
58+
maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
5659
) -> Result<SchedulerFactoryResults, Error> {
5760
let scheduler: SchedulerFactoryResults = match spec {
5861
SchedulerSpec::simple(spec) => {
59-
simple_scheduler_factory(spec, store_manager, SystemTime::now)?
62+
simple_scheduler_factory(spec, store_manager, SystemTime::now, maybe_origin_event_tx)?
6063
}
6164
SchedulerSpec::grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None),
6265
SchedulerSpec::cache_lookup(spec) => {
6366
let ac_store = store_manager
6467
.get_store(&spec.ac_store)
6568
.err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?;
6669
let (action_scheduler, worker_scheduler) =
67-
inner_scheduler_factory(&spec.scheduler, store_manager)
70+
inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
6871
.err_tip(|| "In nested CacheLookupScheduler construction")?;
6972
let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
7073
ac_store,
@@ -74,7 +77,7 @@ fn inner_scheduler_factory(
7477
}
7578
SchedulerSpec::property_modifier(spec) => {
7679
let (action_scheduler, worker_scheduler) =
77-
inner_scheduler_factory(&spec.scheduler, store_manager)
80+
inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
7881
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
7982
let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
8083
spec,
@@ -91,6 +94,7 @@ fn simple_scheduler_factory(
9194
spec: &SimpleSpec,
9295
store_manager: &StoreManager,
9396
now_fn: fn() -> SystemTime,
97+
maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
9498
) -> Result<SchedulerFactoryResults, Error> {
9599
match spec
96100
.experimental_backend
@@ -104,8 +108,12 @@ fn simple_scheduler_factory(
104108
&task_change_notify.clone(),
105109
SystemTime::now,
106110
);
107-
let (action_scheduler, worker_scheduler) =
108-
SimpleScheduler::new(spec, awaited_action_db, task_change_notify);
111+
let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
112+
spec,
113+
awaited_action_db,
114+
task_change_notify,
115+
maybe_origin_event_tx.cloned(),
116+
);
109117
Ok((Some(action_scheduler), Some(worker_scheduler)))
110118
}
111119
ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
@@ -134,8 +142,12 @@ fn simple_scheduler_factory(
134142
Default::default,
135143
)
136144
.err_tip(|| "In state_manager_factory::redis_state_manager")?;
137-
let (action_scheduler, worker_scheduler) =
138-
SimpleScheduler::new(spec, awaited_action_db, task_change_notify);
145+
let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
146+
spec,
147+
awaited_action_db,
148+
task_change_notify,
149+
maybe_origin_event_tx.cloned(),
150+
);
139151
Ok((Some(action_scheduler), Some(worker_scheduler)))
140152
}
141153
}

nativelink-scheduler/src/grpc_scheduler.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProv
3737
use nativelink_util::operation_state_manager::{
3838
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
3939
};
40+
use nativelink_util::origin_event::OriginMetadata;
4041
use nativelink_util::retry::{Retrier, RetryResult};
4142
use nativelink_util::{background_spawn, tls_utils};
4243
use parking_lot::Mutex;
@@ -55,13 +56,15 @@ struct GrpcActionStateResult {
5556

5657
#[async_trait]
5758
impl ActionStateResult for GrpcActionStateResult {
58-
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
59+
async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
5960
let mut action_state = self.rx.borrow().clone();
6061
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
61-
Ok(action_state)
62+
// TODO(allada) We currently don't support OriginMetadata in this implementation, but
63+
// we should.
64+
Ok((action_state, None))
6265
}
6366

64-
async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
67+
async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
6568
self.rx.changed().await.map_err(|_| {
6669
make_err!(
6770
Code::Internal,
@@ -70,10 +73,12 @@ impl ActionStateResult for GrpcActionStateResult {
7073
})?;
7174
let mut action_state = self.rx.borrow().clone();
7275
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
73-
Ok(action_state)
76+
// TODO(allada) We currently don't support OriginMetadata in this implementation, but
77+
// we should.
78+
Ok((action_state, None))
7479
}
7580

76-
async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
81+
async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
7782
// TODO(allada) We should probably remove as_action_info()
7883
// or implement it properly.
7984
return Err(make_err!(

0 commit comments

Comments
 (0)