Skip to content

Commit 10f1ffc

Browse files
Support CPU count
Currently one queue item is one CPU count. This change ensures that the cpu_count in the properties is used.
1 parent 140b0f4 commit 10f1ffc

2 files changed

Lines changed: 61 additions & 40 deletions

File tree

src/action_info.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tokio::sync::mpsc::Receiver;
2222
pub(crate) type PropertyName = String;
2323
pub(crate) type PropertyValue = String;
2424
pub(crate) type PropertySet = BTreeMap<PropertyName, PropertyValue>;
25-
pub(crate) type OperationCount = usize;
25+
pub(crate) type OperationCount = u64;
2626
type OperationId = String;
2727

2828
/// The name of the field in the Redis hash that stores the data.
@@ -144,7 +144,8 @@ fn monitor_changes(
144144
rx
145145
}
146146

147-
type QueuedOperations = HashSet<OperationId>;
147+
type CpuCount = u64; // This is scaled by 1000.
148+
type QueuedOperations = HashMap<OperationId, CpuCount>;
148149
type ActiveOperations = HashMap<PropertySet, QueuedOperations>;
149150

150151
#[derive(Deserialize)]
@@ -193,6 +194,7 @@ struct Operation {
193194
operation_id: OperationId,
194195
state: OperationState,
195196
properties: PropertySet,
197+
cpu_count: CpuCount,
196198
}
197199

198200
impl RedisOperationId {
@@ -254,6 +256,16 @@ impl FromValue for RedisCursorData {
254256
}
255257
}
256258

259+
fn get_cpu_count(properties: &HashMap<PropertyName, PropertyValue>) -> CpuCount {
260+
properties
261+
.get("cpu_count")
262+
.and_then(|count| {
263+
// Round up to the nearest 1000.
264+
count.parse::<u64>().ok().map(|v| v.div_ceil(1000))
265+
})
266+
.unwrap_or(1)
267+
}
268+
257269
async fn list_operations(
258270
redis_client: &Pool,
259271
group_by: &HashSet<PropertyName>,
@@ -306,11 +318,12 @@ async fn list_operations(
306318
continue;
307319
}
308320
};
321+
let cpu_count = get_cpu_count(&redis_operation.action_info.platform_properties);
309322
let properties =
310323
filter_properties(group_by, redis_operation.action_info.platform_properties);
311324

312325
let queued_operations = active_operations.entry(properties).or_default();
313-
queued_operations.insert(redis_operation.operation_id.into_string());
326+
queued_operations.insert(redis_operation.operation_id.into_string(), cpu_count);
314327
}
315328
}
316329
if data.cursor == 0 {
@@ -359,15 +372,21 @@ async fn get_operation(
359372
.await
360373
.ok()??;
361374
let redis_operation: RedisOperation = serde_json::from_slice(&data).ok()?;
375+
let cpu_count = get_cpu_count(&redis_operation.action_info.platform_properties);
362376
let properties = filter_properties(group_by, redis_operation.action_info.platform_properties);
363377
let state = redis_operation.state.stage.into();
364378
Some(Operation {
365379
operation_id: redis_operation.operation_id.into_string(),
366380
state,
367381
properties,
382+
cpu_count,
368383
})
369384
}
370385

386+
fn count_queue(operations: &QueuedOperations) -> u64 {
387+
operations.values().sum()
388+
}
389+
371390
fn operation_manager(
372391
redis_client: Pool,
373392
mut operation_channel: Receiver<OperationId>,
@@ -388,8 +407,9 @@ fn operation_manager(
388407
// If there were no notifications, then just re-populate.
389408
let new_active_operations = list_operations(&redis_client, &group_by, &index_name).await;
390409
for (properties, queued) in &new_active_operations {
391-
tracing::info!(queue=queued.len(), properties=?properties, "Refreshed queue");
392-
if active_operations.get(properties).is_none_or(|previously_queued: &QueuedOperations| previously_queued.len() != queued.len()) && tx.send((properties.clone(), queued.len())).await.is_err() {
410+
let queue_length = count_queue(queued);
411+
tracing::info!(queue=queue_length, properties=?properties, "Refreshed queue");
412+
if active_operations.get(properties).is_none_or(|previously_queued: &QueuedOperations| count_queue(previously_queued) != queue_length) && tx.send((properties.clone(), queue_length)).await.is_err() {
393413
return;
394414
}
395415
}
@@ -412,8 +432,9 @@ fn operation_manager(
412432
// Re-populate with all operations.
413433
let new_active_operations = list_operations(&redis_client, &group_by, &index_name).await;
414434
for (properties, queued) in &new_active_operations {
415-
tracing::info!(queue=queued.len(), properties=?properties, "Refreshed queue");
416-
if active_operations.get(properties).is_none_or(|previously_queued: &QueuedOperations| previously_queued.len() != queued.len()) && tx.send((properties.clone(), queued.len())).await.is_err() {
435+
let queue_length = count_queue(queued);
436+
tracing::info!(queue=queue_length, properties=?properties, "Refreshed queue");
437+
if active_operations.get(properties).is_none_or(|previously_queued: &QueuedOperations| count_queue(previously_queued) != queue_length) && tx.send((properties.clone(), queue_length)).await.is_err() {
417438
return;
418439
}
419440
}
@@ -434,19 +455,19 @@ fn operation_manager(
434455
};
435456
let mut entry = match active_operations.entry(operation.properties) {
436457
std::collections::hash_map::Entry::Occupied(entry) => entry,
437-
std::collections::hash_map::Entry::Vacant(entry) => entry.insert_entry(HashSet::new()),
458+
std::collections::hash_map::Entry::Vacant(entry) => entry.insert_entry(QueuedOperations::new()),
438459
};
439460
let queued_operations = entry.get_mut();
440-
let original_size = queued_operations.len();
461+
let original_size = count_queue(queued_operations);
441462
match operation.state {
442463
OperationState::Queued => {
443-
queued_operations.insert(operation.operation_id);
464+
queued_operations.insert(operation.operation_id, operation.cpu_count);
444465
}
445466
_ => {
446467
queued_operations.remove(&operation.operation_id);
447468
}
448469
}
449-
let new_size = queued_operations.len();
470+
let new_size = count_queue(queued_operations);
450471
if new_size != original_size && tx.send((entry.key().clone(), new_size)).await.is_err() {
451472
return;
452473
}

src/main.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::HashMap;
22
use std::env;
33
use std::fs::File;
4-
use std::num::NonZeroUsize;
4+
use std::num::NonZeroU64;
55
use std::path::Path;
66
use std::pin::Pin;
77
use std::time::{Duration, Instant};
@@ -29,16 +29,16 @@ struct Config {
2929
namespace: Option<String>,
3030
/// The maximum number of instances per container image, this does not
3131
/// include the base worker.
32-
max_pods: NonZeroUsize,
32+
max_pods: NonZeroU64,
3333
/// The number of tasks in the queue per allocated CPU.
34-
queue_per_cpu: NonZeroUsize,
34+
queue_per_cpu: NonZeroU64,
3535
/// The number of CPUs to allocate on the base worker, this is a worker
3636
/// that remains for two days after a property set is used.
37-
base_worker_cpu: usize,
37+
base_worker_cpu: action_info::OperationCount,
3838
/// The number of GB of RAM to allocate per CPU allocated to a worker.
39-
memory_to_cpu: NonZeroUsize,
39+
memory_to_cpu: NonZeroU64,
4040
/// The number of CPUs to allocate to a standard worker.
41-
worker_cpu: NonZeroUsize,
41+
worker_cpu: NonZeroU64,
4242
}
4343

4444
const DOCKER_IMAGE_PREFIX: &str = "docker://";
@@ -60,7 +60,7 @@ struct WorkerInfo {
6060

6161
#[derive(Debug)]
6262
struct PropertyWorkers {
63-
queue_size: usize,
63+
queue_size: action_info::OperationCount,
6464
workers: HashMap<WorkerName, WorkerInfo>,
6565
base_worker: Option<WorkerName>,
6666
last_queue_update: Instant,
@@ -71,7 +71,7 @@ impl PropertyWorkers {
7171
Self::new_with_workers(0, HashMap::new())
7272
}
7373

74-
fn new_with_workers(queue_size: usize, workers: HashMap<WorkerName, WorkerInfo>) -> Self {
74+
fn new_with_workers(queue_size: action_info::OperationCount, workers: HashMap<WorkerName, WorkerInfo>) -> Self {
7575
Self {
7676
queue_size,
7777
workers,
@@ -104,7 +104,7 @@ impl PropertyWorkers {
104104
.map(|(name, info)| (name, info.start_time))
105105
}
106106

107-
fn update_queue(&mut self, queue_size: usize) {
107+
fn update_queue(&mut self, queue_size: action_info::OperationCount) {
108108
if self.queue_size != 0 || queue_size != 0 {
109109
self.last_queue_update = Instant::now();
110110
}
@@ -119,8 +119,8 @@ impl PropertyWorkers {
119119
self.base_worker.is_none() && self.base_worker_required()
120120
}
121121

122-
fn workers_count(&self) -> usize {
123-
self.workers.len()
122+
fn workers_count(&self) -> action_info::OperationCount {
123+
self.workers.len() as u64
124124
}
125125

126126
fn set_base_worker(&mut self, name: WorkerName) -> bool {
@@ -150,19 +150,19 @@ impl PropertyWorkers {
150150
}
151151
}
152152

153-
fn scale_down_to(&mut self, workers: usize) -> Vec<(WorkerName, WorkerInfo)> {
154-
if self.workers.len() <= workers {
153+
fn scale_down_to(&mut self, workers: action_info::OperationCount) -> Vec<(WorkerName, WorkerInfo)> {
154+
if self.workers_count() <= workers {
155155
return Vec::new();
156156
}
157-
let to_remove = self.workers.len() - workers;
157+
let to_remove = self.workers_count() - workers;
158158
let mut all_workers: Vec<_> = self.workers.iter().collect();
159159
all_workers.sort_by(|a, b| b.1.start_time.cmp(&a.1.start_time));
160160
let keep_names: Vec<_> = all_workers
161161
.iter()
162-
.skip(to_remove)
162+
.skip(to_remove as usize)
163163
.filter_map(|(name, info)| info.death_time.is_some().then_some((*name).clone()))
164164
.collect();
165-
all_workers.truncate(to_remove);
165+
all_workers.truncate(to_remove as usize);
166166
let now: Instant = Instant::now();
167167
let (to_add_death_time, workers_to_remove): (Vec<_>, Vec<_>) = all_workers
168168
.into_iter()
@@ -194,22 +194,22 @@ impl PropertyWorkers {
194194
removed_workers
195195
}
196196

197-
fn get_queue_size(&self) -> usize {
197+
fn get_queue_size(&self) -> action_info::OperationCount {
198198
self.queue_size
199199
}
200200
}
201201

202202
type WorkerMap = HashMap<action_info::PropertySet, PropertyWorkers>;
203203

204204
struct ScaleInfo {
205-
max_pods: usize,
206-
queue_per_cpu: usize,
207-
base_worker_cpu: usize,
208-
worker_cpu: usize,
205+
max_pods: action_info::OperationCount,
206+
queue_per_cpu: action_info::OperationCount,
207+
base_worker_cpu: action_info::OperationCount,
208+
worker_cpu: action_info::OperationCount,
209209
}
210210

211211
impl ScaleInfo {
212-
fn required_workers(&self, queue_size: usize) -> usize {
212+
fn required_workers(&self, queue_size: action_info::OperationCount) -> action_info::OperationCount {
213213
let base_queue = self.queue_per_cpu * self.base_worker_cpu;
214214
let required_workers = if queue_size > base_queue {
215215
let queue_per_pod = self.queue_per_cpu * self.worker_cpu;
@@ -265,7 +265,7 @@ async fn get_worker_pods(
265265
.collect()
266266
}
267267

268-
async fn enumerate_existing_workers(pods: &Api<Pod>, base_cpu: usize) -> WorkerMap {
268+
async fn enumerate_existing_workers(pods: &Api<Pod>, base_cpu: action_info::OperationCount) -> WorkerMap {
269269
let pod_list = match pods.list(&ListParams::default()).await {
270270
Ok(list) => list,
271271
Err(e) => {
@@ -287,7 +287,7 @@ async fn enumerate_existing_workers(pods: &Api<Pod>, base_cpu: usize) -> WorkerM
287287
.as_ref()
288288
.and_then(|resources| resources.requests.as_ref())
289289
.and_then(|requests| requests.get("cpu"))
290-
.and_then(|cpu| cpu.0.parse::<usize>().ok())
290+
.and_then(|cpu| cpu.0.parse::<action_info::OperationCount>().ok())
291291
.is_some_and(|request_cpu| request_cpu == base_cpu);
292292
Some((image_name, name, is_base))
293293
});
@@ -321,9 +321,9 @@ struct WorkerManager {
321321
workers: WorkerMap,
322322
operations_change: Receiver<(action_info::PropertySet, action_info::OperationCount)>,
323323
watcher: Pin<Box<dyn Stream<Item = kube::runtime::watcher::Result<Event<Pod>>>>>,
324-
memory_to_cpu: usize,
325-
worker_cpu: usize,
326-
base_worker_cpu: usize,
324+
memory_to_cpu: action_info::OperationCount,
325+
worker_cpu: action_info::OperationCount,
326+
base_worker_cpu: action_info::OperationCount,
327327
}
328328

329329
impl WorkerManager {
@@ -409,7 +409,7 @@ impl WorkerManager {
409409
None
410410
}
411411

412-
fn configure_pod(pod: &Pod, cpu: usize, memory: usize) -> Pod {
412+
fn configure_pod(pod: &Pod, cpu: action_info::OperationCount, memory: action_info::OperationCount) -> Pod {
413413
let mut this_pod = pod.clone();
414414
// Create a unique name for this pod.
415415
this_pod.metadata.name = Some(format!(
@@ -453,7 +453,7 @@ impl WorkerManager {
453453
this_pod
454454
}
455455

456-
async fn maybe_scale_up(&mut self, properties: action_info::PropertySet, queue_size: usize) {
456+
async fn maybe_scale_up(&mut self, properties: action_info::PropertySet, queue_size: action_info::OperationCount) {
457457
let mut workers_entry = match self.workers.entry(properties.clone()) {
458458
std::collections::hash_map::Entry::Occupied(mut occupied_entry) => {
459459
occupied_entry.get_mut().update_queue(queue_size);

0 commit comments

Comments
 (0)