Skip to content

Commit e7b2b0d

Browse files
committed
refactor: make a separate ReportedConsumer whcih represents a snapshot
1 parent 18cc9b1 commit e7b2b0d

1 file changed

Lines changed: 40 additions & 19 deletions

File tree

  • datafusion/execution/src/memory_pool

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,47 @@ fn insufficient_capacity_err(
268268
struct TrackedConsumer {
269269
name: String,
270270
can_spill: bool,
271+
parent_id: Option<usize>,
271272
reserved: AtomicUsize,
272273
peak: AtomicUsize,
273274
}
274275

276+
/// A snapshot of a TrackedConsumer with static values and consumer ID
277+
#[derive(Debug, Clone)]
278+
struct ReportedConsumer {
279+
consumer_id: usize,
280+
name: String,
281+
can_spill: bool,
282+
reserved: usize,
283+
peak: usize,
284+
}
285+
286+
impl std::fmt::Display for ReportedConsumer {
287+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288+
write!(
289+
f,
290+
"{}#{}(can spill: {}) consumed {}, peak {}",
291+
self.name,
292+
self.consumer_id,
293+
self.can_spill,
294+
human_readable_size(self.reserved),
295+
human_readable_size(self.peak)
296+
)
297+
}
298+
}
299+
300+
impl From<(usize, &TrackedConsumer)> for ReportedConsumer {
301+
fn from((consumer_id, tracked): (usize, &TrackedConsumer)) -> Self {
302+
Self {
303+
consumer_id,
304+
name: tracked.name.clone(),
305+
can_spill: tracked.can_spill,
306+
reserved: tracked.reserved(),
307+
peak: tracked.peak(),
308+
}
309+
}
310+
}
311+
275312
impl TrackedConsumer {
276313
/// Shorthand to return the currently reserved value
277314
fn reserved(&self) -> usize {
@@ -380,29 +417,13 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
380417
.tracked_consumers
381418
.lock()
382419
.iter()
383-
.map(|(consumer_id, tracked_consumer)| {
384-
(
385-
(
386-
*consumer_id,
387-
tracked_consumer.name.to_owned(),
388-
tracked_consumer.can_spill,
389-
tracked_consumer.peak(),
390-
),
391-
tracked_consumer.reserved(),
392-
)
393-
})
420+
.map(|(id, consumer_stats)| ReportedConsumer::from((*id, consumer_stats)))
394421
.collect::<Vec<_>>();
395-
consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
422+
consumers.sort_by(|a, b| b.reserved.cmp(&a.reserved)); // inverse ordering
396423

397424
consumers[0..std::cmp::min(top, consumers.len())]
398425
.iter()
399-
.map(|((id, name, can_spill, peak), size)| {
400-
format!(
401-
" {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
402-
human_readable_size(*size),
403-
human_readable_size(*peak),
404-
)
405-
})
426+
.map(|reported_consumer| format!(" {reported_consumer}"))
406427
.collect::<Vec<_>>()
407428
.join(",\n")
408429
+ "."

0 commit comments

Comments
 (0)