Skip to content

Commit f677cb1

Browse files
authored
[rust/rqd] Refactor core reservation logic (#1769)
Use a single source of truth for core reservations to avoid synchronization issues. The previous logic had some potential synchronization issues that could lead to resource leaking.
1 parent e2a057d commit f677cb1

File tree

8 files changed

+1039
-1539
lines changed

8 files changed

+1039
-1539
lines changed

rust/crates/opencue-proto/src/lib.rs

Lines changed: 0 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use core::fmt;
22

33
use host::Host;
44
use job::{Frame, Job};
5-
use report::CoreDetail;
65
use rqd::RunFrame;
76
use uuid::Uuid;
87

@@ -127,113 +126,3 @@ impl fmt::Display for Host {
127126
write!(f, "{}/({})", self.name, self.id)
128127
}
129128
}
130-
131-
impl CoreDetail {
132-
/// Update CoreDetail by reserving a number of cores
133-
///
134-
/// # Arguments
135-
///
136-
/// * `core_count_with_multiplier` - Number of cores to reserve multiplied by core_multiplier
137-
///
138-
/// # Returns
139-
///
140-
/// * `Ok(())` if cores were reserved successfully
141-
/// * `Err(String)` if trying to reserve more cores than are available
142-
pub fn register_reservation(
143-
&mut self,
144-
core_count_with_multiplier: usize,
145-
) -> Result<(), String> {
146-
if self.idle_cores - (core_count_with_multiplier as i32) < 0 {
147-
Err(format!(
148-
"Tried to reserve {} out of {} cores available",
149-
core_count_with_multiplier, self.idle_cores,
150-
))
151-
} else {
152-
self.idle_cores -= core_count_with_multiplier as i32;
153-
self.booked_cores += core_count_with_multiplier as i32;
154-
Ok(())
155-
}
156-
}
157-
158-
/// Update CoreDetail by releasing a number of previously reserved cores
159-
///
160-
/// # Arguments
161-
///
162-
/// * `core_count_with_multiplier` - The number of cores to release multiplied by core_multiplier
163-
///
164-
/// # Returns
165-
///
166-
/// * `Ok(())` if cores were released successfully
167-
/// * `Err(String)` if trying to release more cores than are currently reserved
168-
pub fn register_release(&mut self, core_count_with_multiplier: u32) -> Result<(), String> {
169-
if self.booked_cores < core_count_with_multiplier as i32 {
170-
Err(format!(
171-
"Tried to release {} out of {} cores reserved",
172-
core_count_with_multiplier, self.booked_cores,
173-
))
174-
} else {
175-
self.idle_cores += core_count_with_multiplier as i32;
176-
self.booked_cores -= core_count_with_multiplier as i32;
177-
Ok(())
178-
}
179-
}
180-
181-
/// Update CoreDetail by locking a specified number of cores. If the amount requested is
182-
/// not available, the maximum available will be reserved.
183-
///
184-
/// # Arguments
185-
///
186-
/// * `count_with_multiplier` - Number of cores to lock multiplied by core_multiplier
187-
///
188-
/// # Returns
189-
///
190-
/// * `u32` - The actual number of cores that were locked (may be less than requested if not enough are available)
191-
pub fn lock_cores(&mut self, count_with_multiplier: u32) -> u32 {
192-
let amount_not_locked = self.total_cores - self.locked_cores;
193-
let amount_to_lock = std::cmp::min(amount_not_locked, count_with_multiplier as i32);
194-
195-
if amount_to_lock > 0 {
196-
self.locked_cores += amount_to_lock;
197-
self.idle_cores -= std::cmp::min(amount_to_lock, self.idle_cores)
198-
}
199-
200-
amount_to_lock as u32
201-
}
202-
203-
/// Update CoreDetail by locking all available cores
204-
///
205-
/// This will set idle_cores to 0 and locked_cores to total_cores
206-
pub fn lock_all_cores(&mut self) {
207-
self.idle_cores = 0;
208-
self.locked_cores = self.total_cores;
209-
}
210-
211-
/// Update CoreDetail by unlocking a specified number of cores that were previously locked.
212-
///
213-
/// # Arguments
214-
///
215-
/// * `count_with_multiplier` - Number of cores to unlock multiplied by core_multiplier
216-
///
217-
/// # Returns
218-
///
219-
/// * `u32` - The actual number of cores that were unlocked (may be less than requested if fewer cores are locked)
220-
pub fn unlock_cores(&mut self, count_with_multiplier: u32) -> u32 {
221-
let amount_to_unlock = std::cmp::min(count_with_multiplier as i32, self.locked_cores);
222-
223-
if amount_to_unlock > 0 {
224-
self.locked_cores -= amount_to_unlock;
225-
self.idle_cores += amount_to_unlock;
226-
}
227-
amount_to_unlock as u32
228-
}
229-
230-
/// Update CoreDetail by unlocking all locked cores
231-
///
232-
/// This will unlock all locked cores and add them to idle_cores
233-
pub fn unlock_all_cores(&mut self) {
234-
if self.locked_cores > 0 {
235-
self.idle_cores += self.locked_cores;
236-
self.locked_cores = 0;
237-
}
238-
}
239-
}

rust/crates/rqd/src/frame/manager.rs

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use chrono::{DateTime, Local};
2+
use itertools::Either;
23
use miette::{Diagnostic, Result, miette};
34
use opencue_proto::{
45
host::HardwareState,
@@ -64,17 +65,14 @@ impl FrameManager {
6465

6566
// **Attention**: If an error happens between here and spawning a frame, the resources
6667
// reserved need to be released.
67-
//
68-
// Cuebot unfortunatelly uses a hardcoded frame environment variable to signal if
69-
// a frame is hyperthreaded. Rqd should only reserve cores if a frame is hyperthreaded.
70-
let hyperthreaded = run_frame
71-
.environment
72-
.get("CUE_THREADABLE")
73-
.is_some_and(|v| v == "1");
68+
7469
let num_cores = (run_frame.num_cores as u32).div_ceil(self.config.machine.core_multiplier);
70+
71+
// Reserving cores will always yield a list of reserved thread_ids. If hyperthreading is off,
72+
// the list should be ignored
7573
let thread_ids = self
7674
.machine
77-
.reserve_cores(num_cores as usize, run_frame.resource_id(), hyperthreaded)
75+
.reserve_cores(Either::Left(num_cores as usize), run_frame.resource_id())
7876
.await
7977
.map_err(|err| {
8078
FrameManagerError::Aborted(format!(
@@ -90,7 +88,13 @@ impl FrameManager {
9088
let reserved_res = self.machine.reserve_gpus(run_frame.num_gpus as u32).await;
9189
if reserved_res.is_err() {
9290
// Release cores reserved on the last step
93-
self.machine.release_cores(num_cores, &thread_ids).await;
91+
if let Err(err) = self.machine.release_cores(&run_frame.resource_id()).await {
92+
warn!(
93+
"Failed to release cores reserved for {} during gpu reservation failure. {}",
94+
&run_frame.resource_id(),
95+
err
96+
)
97+
};
9498
}
9599
Some(reserved_res.map_err(|err| {
96100
FrameManagerError::Aborted(format!(
@@ -101,20 +105,35 @@ impl FrameManager {
101105
}
102106
};
103107

108+
// Cuebot unfortunatelly uses a hardcoded frame environment variable to signal if
109+
// a frame is hyperthreaded. Rqd should only reserve cores if a frame is hyperthreaded.
110+
let hyperthreaded = run_frame
111+
.environment
112+
.get("CUE_THREADABLE")
113+
.is_some_and(|v| v == "1");
114+
// Ignore the list of allocated threads if hyperthreading is off
115+
let thread_ids = hyperthreaded.then_some(thread_ids);
116+
117+
let resource_id = run_frame.resource_id();
104118
let running_frame = Arc::new(RunningFrame::init(
105119
run_frame,
106120
uid,
107121
self.config.runner.clone(),
108-
thread_ids.clone(),
122+
thread_ids,
109123
gpu_list,
110124
self.machine.get_host_name().await,
111125
));
112126

113127
if self.config.runner.run_on_docker {
114128
self.spawn_docker_frame(running_frame, false);
115129
} else if self.spawn_running_frame(running_frame, false).is_err() {
116-
// Release cores reserved on the last step
117-
self.machine.release_cores(num_cores, &thread_ids).await;
130+
// Release cores reserved if spawning the frame failed
131+
if let Err(err) = self.machine.release_cores(&resource_id).await {
132+
warn!(
133+
"Failed to release cores reserved for {} during spawn failure. {}",
134+
&resource_id, err
135+
);
136+
}
118137
}
119138

120139
Ok(())
@@ -161,33 +180,38 @@ impl FrameManager {
161180
Ok(running_frame) => {
162181
// Update reservations. If a thread_ids list exists, the frame was booked using affinity
163182
if let Err(err) = match &running_frame.thread_ids {
164-
Some(thread_ids) => self
165-
.machine
166-
.reserve_cores_by_id(thread_ids, running_frame.request.resource_id())
167-
.await
168-
.map(Some),
183+
Some(thread_ids) => {
184+
self.machine
185+
.reserve_cores(
186+
Either::Right(thread_ids.clone()),
187+
running_frame.request.resource_id(),
188+
)
189+
.await
190+
}
169191
None => {
170192
let num_cores = (running_frame.request.num_cores as u32)
171193
.div_ceil(self.config.machine.core_multiplier);
172194
self.machine
173195
.reserve_cores(
174-
num_cores as usize,
196+
Either::Left(num_cores as usize),
175197
running_frame.request.resource_id(),
176-
false,
177198
)
178199
.await
179200
}
180201
} {
181202
errors.push(err.to_string());
182203
}
183204

184-
let num_cores = (running_frame.request.num_cores as u32)
185-
.div_ceil(self.config.machine.core_multiplier);
186-
let thread_ids = &running_frame.thread_ids.clone();
205+
let resource_id = running_frame.request.resource_id();
187206
if self.config.runner.run_on_docker {
188207
todo!("Recovering frames when running on docker is not yet supported")
189208
} else if self.spawn_running_frame(running_frame, true).is_err() {
190-
self.machine.release_cores(num_cores, thread_ids).await;
209+
if let Err(err) = self.machine.release_cores(&resource_id).await {
210+
warn!(
211+
"Failed to release cores reserved for {} during recover spawn error. {}",
212+
&resource_id, err
213+
);
214+
}
191215
}
192216
}
193217
Err(err) => {

0 commit comments

Comments
 (0)