Skip to content

Commit e2a057d

Browse files
authored
[rust/rqd] Add more protections for accountability issues (#1768)
Fix core accountability issues caused by frames that failed before having a chance to start not having their cores released
1 parent 21fa269 commit e2a057d

File tree

3 files changed

+84
-80
lines changed

3 files changed

+84
-80
lines changed

rust/crates/opencue-proto/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,10 @@ impl CoreDetail {
139139
///
140140
/// * `Ok(())` if cores were reserved successfully
141141
/// * `Err(String)` if trying to reserve more cores than are available
142-
pub fn reserve(&mut self, core_count_with_multiplier: usize) -> Result<(), String> {
142+
pub fn register_reservation(
143+
&mut self,
144+
core_count_with_multiplier: usize,
145+
) -> Result<(), String> {
143146
if self.idle_cores - (core_count_with_multiplier as i32) < 0 {
144147
Err(format!(
145148
"Tried to reserve {} out of {} cores available",
@@ -162,7 +165,7 @@ impl CoreDetail {
162165
///
163166
/// * `Ok(())` if cores were released successfully
164167
/// * `Err(String)` if trying to release more cores than are currently reserved
165-
pub fn release(&mut self, core_count_with_multiplier: u32) -> Result<(), String> {
168+
pub fn register_release(&mut self, core_count_with_multiplier: u32) -> Result<(), String> {
166169
if self.booked_cores < core_count_with_multiplier as i32 {
167170
Err(format!(
168171
"Tried to release {} out of {} cores reserved",

rust/crates/rqd/src/frame/manager.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ impl FrameManager {
7171
.environment
7272
.get("CUE_THREADABLE")
7373
.is_some_and(|v| v == "1");
74-
let cpu_request = run_frame.num_cores as u32 / self.config.machine.core_multiplier;
74+
let num_cores = (run_frame.num_cores as u32).div_ceil(self.config.machine.core_multiplier);
7575
let thread_ids = self
7676
.machine
77-
.reserve_cores(cpu_request as usize, run_frame.resource_id(), hyperthreaded)
77+
.reserve_cores(num_cores as usize, run_frame.resource_id(), hyperthreaded)
7878
.await
7979
.map_err(|err| {
8080
FrameManagerError::Aborted(format!(
@@ -90,11 +90,7 @@ impl FrameManager {
9090
let reserved_res = self.machine.reserve_gpus(run_frame.num_gpus as u32).await;
9191
if reserved_res.is_err() {
9292
// Release cores reserved on the last step
93-
if let Some(thread_ids) = &thread_ids {
94-
self.machine.release_threads(thread_ids).await;
95-
} else {
96-
self.machine.release_cores(cpu_request).await;
97-
}
93+
self.machine.release_cores(num_cores, &thread_ids).await;
9894
}
9995
Some(reserved_res.map_err(|err| {
10096
FrameManagerError::Aborted(format!(
@@ -109,16 +105,18 @@ impl FrameManager {
109105
run_frame,
110106
uid,
111107
self.config.runner.clone(),
112-
thread_ids,
108+
thread_ids.clone(),
113109
gpu_list,
114110
self.machine.get_host_name().await,
115111
));
116112

117113
if self.config.runner.run_on_docker {
118114
self.spawn_docker_frame(running_frame, false);
119-
} else {
120-
self.spawn_running_frame(running_frame, false);
115+
} else if self.spawn_running_frame(running_frame, false).is_err() {
116+
// Release cores reserved on the last step
117+
self.machine.release_cores(num_cores, &thread_ids).await;
121118
}
119+
122120
Ok(())
123121
}
124122

@@ -169,8 +167,8 @@ impl FrameManager {
169167
.await
170168
.map(Some),
171169
None => {
172-
let num_cores = running_frame.request.num_cores as u32
173-
/ self.config.machine.core_multiplier;
170+
let num_cores = (running_frame.request.num_cores as u32)
171+
.div_ceil(self.config.machine.core_multiplier);
174172
self.machine
175173
.reserve_cores(
176174
num_cores as usize,
@@ -182,10 +180,14 @@ impl FrameManager {
182180
} {
183181
errors.push(err.to_string());
184182
}
183+
184+
let num_cores = (running_frame.request.num_cores as u32)
185+
.div_ceil(self.config.machine.core_multiplier);
186+
let thread_ids = &running_frame.thread_ids.clone();
185187
if self.config.runner.run_on_docker {
186188
todo!("Recovering frames when running on docker is not yet supported")
187-
} else {
188-
self.spawn_running_frame(running_frame, true)
189+
} else if self.spawn_running_frame(running_frame, true).is_err() {
190+
self.machine.release_cores(num_cores, thread_ids).await;
189191
}
190192
}
191193
Err(err) => {
@@ -204,16 +206,24 @@ impl FrameManager {
204206
}
205207
}
206208

207-
fn spawn_running_frame(&self, running_frame: Arc<RunningFrame>, recover_mode: bool) {
209+
fn spawn_running_frame(
210+
&self,
211+
running_frame: Arc<RunningFrame>,
212+
recover_mode: bool,
213+
) -> Result<()> {
208214
self.machine.add_running_frame(Arc::clone(&running_frame));
209215
let running_frame_ref: Arc<RunningFrame> = Arc::clone(&running_frame);
210216
let thread_handle = tokio::spawn(async move { running_frame.run(recover_mode).await });
211-
if let Err(err) = running_frame_ref.update_launch_thread_handle(thread_handle) {
212-
warn!(
213-
"Failed to update thread handle for frame {}. {}",
214-
running_frame_ref, err
215-
);
216-
}
217+
running_frame_ref
218+
.update_launch_thread_handle(thread_handle)
219+
.map_err(|err| {
220+
warn!(
221+
"Failed to update thread handle for frame {}. {}",
222+
running_frame_ref, err
223+
);
224+
err
225+
})?;
226+
Ok(())
217227
}
218228

219229
#[cfg(feature = "containerized_frames")]

rust/crates/rqd/src/system/machine.rs

Lines changed: 48 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -370,15 +370,13 @@ impl MachineMonitor {
370370
info!("Sending frame complete report: {}", frame);
371371

372372
// Release resources
373-
if let Some(threads) = &frame.thread_ids {
374-
self.release_threads(threads).await;
375-
} else {
376-
// Ensure the division rounds up if num_cores is not a multiple of
377-
// core_multiplier
378-
let num_cores_to_release = (frame.request.num_cores as u32)
379-
.div_ceil(self.maching_config.core_multiplier);
380-
self.release_cores(num_cores_to_release).await;
381-
}
373+
// Ensure the division rounds up if num_cores is not a multiple of
374+
// core_multiplier
375+
let num_cores_to_release =
376+
(frame.request.num_cores as u32).div_ceil(self.maching_config.core_multiplier);
377+
378+
self.release_cores(num_cores_to_release, &frame.thread_ids)
379+
.await;
382380

383381
// Send complete report
384382
if let Err(err) = self
@@ -471,19 +469,14 @@ pub trait Machine {
471469
/// Vector of successfully reserved CPU core IDs
472470
async fn reserve_cores_by_id(&self, thread_ids: &[u32], resource_id: Uuid) -> Result<Vec<u32>>;
473471

474-
/// Release specific threads
475-
///
476-
/// # Arguments
477-
///
478-
/// * `threads` - Vector of thread IDs to release
479-
async fn release_threads(&self, thread_ids: &[u32]);
480-
481472
/// Releases a specified number of CPU cores
482473
///
483474
/// # Arguments
484475
///
485476
/// * `num_cores` - The number of cores to release
486-
async fn release_cores(&self, num_cores: u32);
477+
/// * `thread_ids` - List of threads to be released, None means the reservation didn't
478+
/// allocate specific threads
479+
async fn release_cores(&self, num_cores: u32, thread_ids: &Option<Vec<u32>>);
487480

488481
/// Reserve GPU units
489482
///
@@ -585,11 +578,15 @@ impl Machine for MachineMonitor {
585578
// Record reservation to be reported to cuebot
586579
if cores_result.is_ok() {
587580
let mut core_state = self.core_state.lock().await;
588-
debug!("Before: {:?}", *core_state);
589-
core_state
590-
.reserve(num_cores * self.maching_config.core_multiplier as usize)
591-
.map_err(|err| miette!(err))?;
592-
debug!("After: {:?}", *core_state);
581+
debug!("Reservation: Before: {:?}", *core_state);
582+
if let Err(err) = core_state
583+
.register_reservation(num_cores * self.maching_config.core_multiplier as usize)
584+
.map_err(|err| miette!(err))
585+
{
586+
error!("Accountability Error: {err}");
587+
Err(err)?;
588+
}
589+
debug!("Reservation: After: {:?}", *core_state);
593590
}
594591
cores_result
595592
}
@@ -605,52 +602,46 @@ impl Machine for MachineMonitor {
605602

606603
// Record reservation to be reported to cuebot
607604
let mut core_state = self.core_state.lock().await;
608-
core_state
609-
.reserve(thread_ids.len() * self.maching_config.core_multiplier as usize)
610-
.map_err(|err| miette!(err))?;
605+
debug!("Reservation: Before: {:?}", *core_state);
606+
if let Err(err) = core_state
607+
.register_reservation(thread_ids.len() * self.maching_config.core_multiplier as usize)
608+
.map_err(|err| miette!(err))
609+
{
610+
error!("Accountability Error: {err}");
611+
Err(err)?;
612+
}
613+
debug!("Reservation: After: {:?}", *core_state);
611614

612615
Ok(thread_ids)
613616
}
614617

615-
async fn release_threads(&self, thread_ids: &[u32]) {
616-
let mut released_cores = HashSet::new();
617-
{
618-
let mut system = self.system_manager.lock().await;
619-
for thread_id in thread_ids {
620-
match system.release_core_by_thread(thread_id) {
621-
Ok((phys_id, core_id)) => {
622-
released_cores.insert((phys_id, core_id));
623-
}
624-
Err(err) => match err {
625-
ReservationError::ReservationNotFound(_) => {
626-
// NoOp. When releasing a thread, the entire core might be released,
627-
// threfore misses are expected
628-
}
629-
_ => {
630-
error!("Failed to release proc {thread_id}. Unexpected error")
618+
async fn release_cores(&self, num_cores: u32, thread_ids: &Option<Vec<u32>>) {
619+
if let Some(thread_ids) = thread_ids {
620+
let mut released_cores = HashSet::new();
621+
{
622+
let mut system = self.system_manager.lock().await;
623+
for thread_id in thread_ids {
624+
match system.release_core_by_thread(thread_id) {
625+
Ok((phys_id, core_id)) => {
626+
released_cores.insert((phys_id, core_id));
631627
}
632-
},
628+
Err(err) => match err {
629+
ReservationError::ReservationNotFound(_) => {
630+
// NoOp. When releasing a thread, the entire core might be released,
631+
// threfore misses are expected
632+
}
633+
_ => {
634+
error!("Failed to release proc {thread_id}. Unexpected error")
635+
}
636+
},
637+
}
633638
}
634639
}
635640
}
636641
// Record reservation to be reported to cuebot
637642
let mut core_state = self.core_state.lock().await;
638643
if let Err(err) = core_state
639-
.release(released_cores.len() as u32 * self.maching_config.core_multiplier)
640-
.map_err(|err| miette!(err))
641-
{
642-
error!(
643-
"Accountability error. Failed to release the requested number of cores. {}",
644-
err
645-
)
646-
};
647-
}
648-
649-
async fn release_cores(&self, num_cores: u32) {
650-
// Record reservation to be reported to cuebot
651-
let mut core_state = self.core_state.lock().await;
652-
if let Err(err) = core_state
653-
.release(num_cores * self.maching_config.core_multiplier)
644+
.register_release(num_cores * self.maching_config.core_multiplier)
654645
.map_err(|err| miette!(err))
655646
{
656647
error!(

0 commit comments

Comments
 (0)