Skip to content

Commit 21bc549

Browse files
authored
[rust/rqd] Add OOM prevention logic with kill frame selection (#2064)
Implement a new OOM (Out-Of-Memory) prevention module that proactively kills frames when memory usage exceeds configured thresholds, preventing system-wide OOM killer from terminating the RQD process. When memory usage exceeds `memory_oom_margin_percentage` (default 96%), the system calculates a target memory level (5% below the threshold) and selects frames to kill to reach that target safely. Frame Selection Algorithm ------------------------- Frames are sorted using a multi-criteria scoring system that balances three factors: 1. **Memory Impact (weight: 10x)** - Measures absolute memory savings: consumed_memory / total_memory_consumed - Prioritizes frames that will free the most system memory - Normalized to [0,1] range by dividing by the maximum memory impact 3. **Overboard Rate (weight: 7x)** - Measures relative excess: (consumed - limit) / limit - Targets frames most aggressively exceeding their soft limits - A frame using 10GB with a 1GB limit (900% over) has higher overboard rate than one using 60GB with a 50GB limit (20% over) - Normalized to [0,1] range by dividing by the maximum overboard rate 5. **Duration Rate (weight: 12x - highest)** - Prefers killing more recent frames: (max_duration - frame_duration) / max_duration - Minimizes wasted compute by preserving long-running frames - Inverted scale: newer frames score higher - Normalized to [0,1] range Each frame receives a composite score calculated as: score = (memory_impact × 10) + (overboard_rate × 7) + (duration_rate × 12) All metrics are normalized before weighting to ensure each factor contributes meaningfully regardless of absolute values. This prevents a frame with very high memory usage from completely dominating the score, allowing the algorithm to balance all three criteria. The algorithm sorts frames by descending score and kills them iteratively until enough memory is freed to reach the target level. This conservative approach avoids unnecessary termination while effectively preventing OOM. The weighting scheme reflects production priorities: - Duration (12x): Preserve investment in long-running frames - Memory Impact (10x): Maximize immediate memory relief - Overboard Rate (7x): Discourage limit violations Implementation includes comprehensive test coverage validating: - Threshold triggering behavior - Target memory calculation - Single and multi-frame selection - Each scoring criterion independently - Normalized scoring with edge cases - Stop-when-sufficient-memory-freed logic
1 parent 0bf7e2c commit 21bc549

File tree

5 files changed

+829
-44
lines changed

5 files changed

+829
-44
lines changed

rust/crates/rqd/src/config/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub struct MachineConfig {
8989
#[serde(with = "humantime_serde")]
9090
pub nimby_start_retry_interval: Duration,
9191
pub nimby_display_xauthority_path: String,
92+
pub memory_oom_margin_percentage: u32,
9293
}
9394

9495
impl Default for MachineConfig {
@@ -111,6 +112,7 @@ impl Default for MachineConfig {
111112
nimby_display_file_path: None,
112113
nimby_start_retry_interval: Duration::from_secs(60 * 5), // 5 min
113114
nimby_display_xauthority_path: "/home/{username}/Xauthority".to_string(),
115+
memory_oom_margin_percentage: 96,
114116
}
115117
}
116118
}

rust/crates/rqd/src/frame/running_frame.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use tokio::io::AsyncReadExt;
2424
use tokio::{io::AsyncBufReadExt, task::JoinHandle};
2525
use tracing::{error, info, trace, warn};
2626

27+
use crate::system::OOM_REASON_MSG;
2728
use crate::{frame::frame_cmd::FrameCmdBuilder, system::manager::ProcessStats};
2829

2930
use serde::{Deserialize, Serialize};
@@ -179,13 +180,69 @@ impl RunningFrame {
179180
}
180181
}
181182

183+
#[cfg(test)]
184+
pub fn init_started_for_test(
185+
request: RunFrame,
186+
uid: u32,
187+
config: RunnerConfig,
188+
cpu_list: Option<Vec<u32>>,
189+
gpu_list: Option<Vec<u32>>,
190+
hostname: String,
191+
duration: Duration,
192+
) -> Self {
193+
let instance = Self::init(request, uid, config, cpu_list, gpu_list, hostname);
194+
195+
{
196+
let mut state = instance
197+
.state
198+
.write()
199+
.unwrap_or_else(|err| err.into_inner());
200+
201+
match &mut *state {
202+
FrameState::Created(created_state) => {
203+
*state = FrameState::Running(RunningState {
204+
pid: 999, // Dummy pid
205+
start_time: SystemTime::now()
206+
.checked_sub(duration)
207+
.unwrap_or(SystemTime::now()),
208+
launch_thread_handle: created_state.launch_thread_handle.take(),
209+
kill_reason: None,
210+
});
211+
}
212+
FrameState::Running(running_state) => warn!(
213+
"Invalid State. Frame {} has already started {:?}",
214+
instance, running_state
215+
),
216+
FrameState::Finished(_) => {
217+
warn!("Invalid States. Frame {} has already finished", instance)
218+
}
219+
FrameState::FailedBeforeStart => {
220+
warn!("Invalid States. Frame {} failed before starting", instance)
221+
}
222+
}
223+
} // state is dropped here
224+
225+
instance
226+
}
227+
182228
pub fn update_frame_stats(&self, proc_stats: ProcessStats) {
183229
self.frame_stats
184230
.write()
185231
.unwrap_or_else(|poisoned| poisoned.into_inner())
186232
.update(proc_stats);
187233
}
188234

235+
pub fn get_duration(&self) -> Duration {
236+
let state = self.state.read().unwrap_or_else(|err| err.into_inner());
237+
238+
match *state {
239+
FrameState::Created(_) => Duration::ZERO,
240+
FrameState::Running(ref r) => r.start_time.elapsed().unwrap_or(Duration::ZERO),
241+
FrameState::Finished(ref r) => r.start_time.elapsed().unwrap_or(Duration::ZERO),
242+
FrameState::FailedBeforeStart => Duration::ZERO,
243+
}
244+
}
245+
189246
pub fn get_state_copy(&self) -> FrameState {
190247
let state = self.state.read().unwrap_or_else(|err| err.into_inner());
191248

@@ -250,13 +307,23 @@ impl RunningFrame {
250307
match &mut *state {
251308
FrameState::Created(_) => Err(miette!("Invalid State. Frame {} hasn't started", self)),
252309
FrameState::Running(running_state) => {
310+
// Replace exit_signal to memory signal if kill_reason matches the memory check message
311+
let modified_exit_signal = match &running_state.kill_reason {
312+
Some(reason) if reason.contains(OOM_REASON_MSG) => {
313+
// 33 is the error signal hardcoded on Cuebot for memory issues
314+
// (See Dispatcher.java:EXIT_STATUS_MEMORY_FAILURE)
315+
Some(33)
316+
}
317+
_ => exit_signal,
318+
};
319+
253320
// Create a new FinishedState with the current running state values
254321
let finished_state = FinishedState {
255322
pid: running_state.pid,
256323
start_time: running_state.start_time,
257324
end_time: SystemTime::now(),
258325
exit_code,
259-
exit_signal,
326+
exit_signal: modified_exit_signal,
260327
kill_reason: running_state.kill_reason.clone(),
261328
};
262329

rust/crates/rqd/src/system/machine.rs

Lines changed: 104 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
use std::sync::Arc;
22

3-
use crate::{config::CONFIG, frame::manager, report::report_client};
3+
use crate::{
4+
config::CONFIG,
5+
frame::manager,
6+
report::report_client,
7+
system::oom::{self, OOM_REASON_MSG},
8+
};
49
use async_trait::async_trait;
510
use bytesize::KIB;
611
use itertools::Either;
@@ -61,7 +66,7 @@ pub struct MachineMonitor {
6166
pub system_manager: Mutex<SystemManagerType>,
6267
pub core_manager: Arc<RwLock<CoreStateManager>>,
6368
pub running_frames_cache: Arc<RunningFrameCache>,
64-
last_host_state: Arc<Mutex<Option<RenderHost>>>,
69+
last_host_state: Arc<RwLock<Option<RenderHost>>>,
6570
interrupt: Mutex<Option<broadcast::Sender<()>>>,
6671
reboot_when_idle: Mutex<bool>,
6772
#[cfg(feature = "nimby")]
@@ -136,7 +141,7 @@ impl MachineMonitor {
136141
report_client,
137142
system_manager: Mutex::new(system_manager),
138143
running_frames_cache: RunningFrameCache::init(),
139-
last_host_state: Arc::new(Mutex::new(None)),
144+
last_host_state: Arc::new(RwLock::new(None)),
140145
interrupt: Mutex::new(None),
141146
reboot_when_idle: Mutex::new(false),
142147
#[cfg(feature = "nimby")]
@@ -162,7 +167,7 @@ impl MachineMonitor {
162167
};
163168

164169
self.last_host_state
165-
.lock()
170+
.write()
166171
.await
167172
.replace(host_state.clone());
168173

@@ -203,40 +208,7 @@ impl MachineMonitor {
203208

204209
#[cfg(feature = "nimby")]
205210
if let Some(nimby) = &*self.nimby {
206-
match (nimby.is_user_active(), last_lock_state) {
207-
// Became locked
208-
(true, LockState::Open) => {
209-
last_lock_state = LockState::NimbyLocked;
210-
211-
// Update registered state
212-
let mut nimby_state = self.nimby_state.write().await;
213-
*nimby_state = last_lock_state;
214-
drop(nimby_state);
215-
216-
info!("Host became nimby locked");
217-
self.lock_all_cores().await;
218-
let count = manager::instance().await?.kill_all_running_frames("Host has been Nimby-Locked").await?;
219-
info!("{count} frames killed after the machine became locked")
220-
}
221-
// Continues locked
222-
(true, LockState::NimbyLocked) => {}
223-
// Continues open
224-
(false, LockState::Open) => {}
225-
// Became unlocked
226-
(false, LockState::NimbyLocked) => {
227-
last_lock_state = LockState::Open;
228-
229-
// Update registered state
230-
let mut nimby_state = self.nimby_state.write().await;
231-
*nimby_state = last_lock_state;
232-
drop(nimby_state);
233-
234-
info!("Host became nimby unlocked");
235-
self.unlock_all_cores().await;
236-
}
237-
// NoOp
238-
_ => ()
239-
}
211+
last_lock_state = self.handle_nimby_state_change(nimby, last_lock_state).await?;
240212
}
241213
}
242214

@@ -245,6 +217,57 @@ impl MachineMonitor {
245217
Ok(())
246218
}
247219

220+
/// Handles NIMBY state changes and performs necessary actions when the host
221+
/// becomes locked or unlocked based on user activity.
222+
///
223+
/// Returns the new lock state after handling any transitions.
224+
#[cfg(feature = "nimby")]
225+
async fn handle_nimby_state_change(
226+
&self,
227+
nimby: &Nimby,
228+
current_state: LockState,
229+
) -> Result<LockState> {
230+
match (nimby.is_user_active(), current_state) {
231+
// Became locked
232+
(true, LockState::Open) => {
233+
let new_state = LockState::NimbyLocked;
234+
235+
// Update registered state
236+
let mut nimby_state = self.nimby_state.write().await;
237+
*nimby_state = new_state;
238+
drop(nimby_state);
239+
240+
info!("Host became nimby locked");
241+
self.lock_all_cores().await;
242+
let count = manager::instance()
243+
.await?
244+
.kill_all_running_frames("Host has been Nimby-Locked")
245+
.await?;
246+
info!("{count} frames killed after the machine became locked");
247+
Ok(new_state)
248+
}
249+
// Continues locked
250+
(true, LockState::NimbyLocked) => Ok(current_state),
251+
// Continues open
252+
(false, LockState::Open) => Ok(current_state),
253+
// Became unlocked
254+
(false, LockState::NimbyLocked) => {
255+
let new_state = LockState::Open;
256+
257+
// Update registered state
258+
let mut nimby_state = self.nimby_state.write().await;
259+
*nimby_state = new_state;
260+
drop(nimby_state);
261+
262+
info!("Host became nimby unlocked");
263+
self.unlock_all_cores().await;
264+
Ok(new_state)
265+
}
266+
// NoOp
267+
_ => Ok(current_state),
268+
}
269+
}
270+
248271
#[cfg(feature = "nimby")]
249272
async fn start_nimby(&self, term_receiver: Receiver<()>) {
250273
// Start nimby monitor
@@ -306,6 +329,7 @@ impl MachineMonitor {
306329
async fn monitor_running_frames(&self) -> Result<()> {
307330
let mut finished_frames: Vec<Arc<RunningFrame>> = Vec::new();
308331
let mut running_frames: Vec<(Arc<RunningFrame>, RunningState)> = Vec::new();
332+
let mut memory_aggressors: Vec<(Arc<RunningFrame>, u64)> = Vec::new();
309333

310334
// Only keep running frames on the cache and store a copy of their state
311335
// to avoid having to deal with the state lock
@@ -340,6 +364,13 @@ impl MachineMonitor {
340364
};
341365

342366
if let Some(proc_stats) = proc_stats_opt {
367+
// Mark memory aggressor frames
368+
if running_frame.request.soft_memory_limit > 0
369+
&& proc_stats.rss as i64 > running_frame.request.soft_memory_limit
370+
{
371+
memory_aggressors.push((Arc::clone(running_frame), proc_stats.max_rss));
372+
}
373+
343374
// Update stats for running frames
344375
running_frame.update_frame_stats(proc_stats);
345376
} else if running_frame.is_marked_for_cache_removal() {
@@ -361,6 +392,26 @@ impl MachineMonitor {
361392
}
362393

363394
self.handle_finished_frames(finished_frames).await;
395+
if let Some((memory_usage, total_memory)) = self.memory_usage().await {
396+
let frames_to_kill =
397+
oom::choose_frames_to_kill(memory_usage, total_memory, memory_aggressors);
398+
399+
// Attempt to kill selected frames.
400+
// Logic will ignore kill errors and try again on the next iteration
401+
for frame in frames_to_kill {
402+
if let Ok(manager) = manager::instance().await {
403+
let kill_result = manager
404+
.kill_running_frame(&frame.frame_id, OOM_REASON_MSG.to_string())
405+
.await;
406+
if let Err(err) = kill_result {
407+
warn!(
408+
"Failed to kill frame {} when under OOM pressure. {}",
409+
frame, err
410+
)
411+
}
412+
}
413+
}
414+
}
364415

365416
// Sanitize dangling reservations
366417
// This mechanism is redundant as handle_finished_frames releases resources reserved to
@@ -387,7 +438,7 @@ impl MachineMonitor {
387438

388439
// Avoid holding a lock while reporting back to cuebot
389440
let host_state_opt = {
390-
let host_state_lock = self.last_host_state.lock().await;
441+
let host_state_lock = self.last_host_state.read().await;
391442
host_state_lock.clone()
392443
};
393444

@@ -487,6 +538,7 @@ impl MachineMonitor {
487538
#[async_trait]
488539
pub trait Machine {
489540
async fn hardware_state(&self) -> Option<HardwareState>;
541+
async fn memory_usage(&self) -> Option<(u32, u64)>;
490542
async fn nimby_locked(&self) -> bool;
491543

492544
/// Reserve CPU cores for a resource
@@ -591,15 +643,23 @@ pub trait Machine {
591643
impl Machine for MachineMonitor {
592644
async fn hardware_state(&self) -> Option<HardwareState> {
593645
self.last_host_state
594-
.lock()
646+
.read()
595647
.await
596648
.as_ref()
597649
.map(|hs| hs.state())
598650
}
599651

652+
async fn memory_usage(&self) -> Option<(u32, u64)> {
653+
self.last_host_state.read().await.as_ref().map(|hs| {
654+
let memory_percentage =
655+
(((hs.total_mem - hs.free_mem) as f64 / hs.total_mem as f64) * 100.0) as u32;
656+
(memory_percentage, hs.total_mem as u64)
657+
})
658+
}
659+
600660
async fn nimby_locked(&self) -> bool {
601661
self.last_host_state
602-
.lock()
662+
.read()
603663
.await
604664
.as_ref()
605665
.map(|hs| hs.nimby_locked)
@@ -634,7 +694,7 @@ impl Machine for MachineMonitor {
634694
}
635695

636696
async fn get_host_name(&self) -> String {
637-
let lock = self.last_host_state.lock().await;
697+
let lock = self.last_host_state.read().await;
638698

639699
lock.as_ref()
640700
.map(|h| h.name.clone())
@@ -725,10 +785,11 @@ impl Machine for MachineMonitor {
725785
};
726786

727787
// Store the last host_state on self
728-
let mut self_host_state_lock = self.last_host_state.lock().await;
788+
let mut self_host_state_lock = self.last_host_state.write().await;
729789
self_host_state_lock.replace(render_host.clone());
730790
drop(self_host_state_lock);
731791

792+
// Refresh list of running frames
732793
self.monitor_running_frames().await?;
733794

734795
Ok(HostReport {

rust/crates/rqd/src/system/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod linux;
44
pub mod machine;
55
#[cfg(feature = "nimby")]
66
pub mod nimby;
7+
mod oom;
78
mod reservation;
89

910
#[cfg(target_os = "macos")]
@@ -14,3 +15,5 @@ pub type ResourceId = Uuid;
1415
pub type CoreId = u32;
1516
pub type PhysId = u32;
1617
pub type ThreadId = u32;
18+
19+
pub use oom::OOM_REASON_MSG;

0 commit comments

Comments
 (0)