Skip to content

Commit 7494aee

Browse files
committed
feat: add consumer stacktrace
1 parent e7b2b0d commit 7494aee

1 file changed

Lines changed: 129 additions & 0 deletions

File tree

  • datafusion/execution/src/memory_pool

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ struct TrackedConsumer {
277277
#[derive(Debug, Clone)]
278278
struct ReportedConsumer {
279279
consumer_id: usize,
280+
parent_id: Option<usize>,
280281
name: String,
281282
can_spill: bool,
282283
reserved: usize,
@@ -301,6 +302,7 @@ impl From<(usize, &TrackedConsumer)> for ReportedConsumer {
301302
fn from((consumer_id, tracked): (usize, &TrackedConsumer)) -> Self {
302303
Self {
303304
consumer_id,
305+
parent_id: tracked.parent_id,
304306
name: tracked.name.clone(),
305307
can_spill: tracked.can_spill,
306308
reserved: tracked.reserved(),
@@ -309,6 +311,62 @@ impl From<(usize, &TrackedConsumer)> for ReportedConsumer {
309311
}
310312
}
311313

314+
/// A stack trace representation of a memory consumer's lineage
315+
#[derive(Debug, Clone)]
316+
struct ConsumerStackTrace {
317+
/// The consumer for which we're building the stack trace
318+
consumer: ReportedConsumer,
319+
/// The trace from immediate parent to oldest ancestor (excluding the consumer itself)
320+
trace: Vec<ReportedConsumer>,
321+
}
322+
323+
impl std::fmt::Display for ConsumerStackTrace {
324+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325+
if self.trace.is_empty() {
326+
// If no trace, just display the consumer
327+
write!(f, "{}", self.consumer)?;
328+
} else {
329+
// If there's a trace, show the full stack backtrace
330+
writeln!(f, "{}:", self.consumer)?;
331+
writeln!(f, "stack backtrace:")?;
332+
writeln!(f, " 0: {}", self.consumer)?;
333+
for (i, consumer) in self.trace.iter().enumerate() {
334+
writeln!(f, " {}: {}", i + 1, consumer)?;
335+
}
336+
}
337+
Ok(())
338+
}
339+
}
340+
341+
/// Builds a stack trace for a consumer, following parent relationships until reaching a root
342+
///
343+
/// # Arguments
344+
/// * `consumer` - The consumer to build a stack trace for
345+
/// * `map` - HashMap mapping consumer_id to ReportedConsumer
346+
///
347+
/// # Returns
348+
/// A ConsumerStackTrace containing the consumer and its parent lineage
349+
fn build_consumer_stack_trace(
350+
consumer: ReportedConsumer,
351+
map: &HashMap<usize, ReportedConsumer>,
352+
) -> ConsumerStackTrace {
353+
let mut trace = Vec::new();
354+
let mut current_parent_id = consumer.parent_id;
355+
356+
// Follow the parent chain until we reach a root (parent_id is None)
357+
while let Some(parent_id) = current_parent_id {
358+
if let Some(parent_consumer) = map.get(&parent_id) {
359+
trace.push(parent_consumer.clone());
360+
current_parent_id = parent_consumer.parent_id;
361+
} else {
362+
// Parent not found in map, stop traversal
363+
break;
364+
}
365+
}
366+
367+
ConsumerStackTrace { consumer, trace }
368+
}
369+
312370
impl TrackedConsumer {
313371
/// Shorthand to return the currently reserved value
314372
fn reserved(&self) -> usize {
@@ -440,6 +498,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
440498
TrackedConsumer {
441499
name: consumer.name().to_string(),
442500
can_spill: consumer.can_spill(),
501+
parent_id: consumer.parent_id(),
443502
reserved: Default::default(),
444503
peak: Default::default(),
445504
},
@@ -823,4 +882,74 @@ mod tests {
823882
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
824883
");
825884
}
885+
886+
#[test]
887+
fn test_build_consumer_stack_trace_no_parent() {
888+
// Test case 1: Consumer with no parent (root consumer)
889+
let mut reported_consumers = HashMap::new();
890+
891+
let root_consumer = ReportedConsumer {
892+
consumer_id: 1,
893+
name: "root".to_string(),
894+
can_spill: false,
895+
parent_id: None,
896+
reserved: 100,
897+
peak: 150,
898+
};
899+
900+
reported_consumers.insert(1, root_consumer.clone());
901+
902+
let stack_trace =
903+
build_consumer_stack_trace(root_consumer.clone(), &reported_consumers);
904+
905+
assert_eq!(stack_trace.consumer.consumer_id, 1);
906+
assert_eq!(stack_trace.trace.len(), 0); // No parents
907+
908+
assert_eq!(
909+
format!("{stack_trace}"),
910+
"root#1(can spill: false) consumed 100.0 B, peak 150.0 B"
911+
);
912+
}
913+
914+
#[test]
915+
fn test_build_consumer_stack_trace_hierarchy() {
916+
let mut reported_consumers = HashMap::new();
917+
918+
// Create: great_grandparent(1) -> grandparent(2) -> parent(3) -> child(4) -> grandchild(5)
919+
for i in 1..=5 {
920+
let parent_id = if i == 1 { None } else { Some(i - 1) };
921+
let consumer = ReportedConsumer {
922+
consumer_id: i,
923+
name: format!("consumer_{i}"),
924+
can_spill: i % 2 == 0,
925+
parent_id,
926+
reserved: i * 10,
927+
peak: i * 15,
928+
};
929+
reported_consumers.insert(i, consumer);
930+
}
931+
932+
let grandchild = reported_consumers.get(&5).unwrap().clone();
933+
let stack_trace = build_consumer_stack_trace(grandchild, &reported_consumers);
934+
935+
assert_eq!(stack_trace.consumer.consumer_id, 5);
936+
assert_eq!(stack_trace.trace.len(), 4);
937+
// Verify the order: immediate parent (4) to oldest ancestor (1)
938+
assert_eq!(stack_trace.trace[0].consumer_id, 4); // immediate parent
939+
assert_eq!(stack_trace.trace[1].consumer_id, 3); // grandparent
940+
assert_eq!(stack_trace.trace[2].consumer_id, 2); // great-grandparent
941+
assert_eq!(stack_trace.trace[3].consumer_id, 1); // oldest ancestor
942+
943+
assert_eq!(
944+
format!("{stack_trace}"),
945+
"consumer_5#5(can spill: false) consumed 50.0 B, peak 75.0 B:
946+
stack backtrace:
947+
0: consumer_5#5(can spill: false) consumed 50.0 B, peak 75.0 B
948+
1: consumer_4#4(can spill: true) consumed 40.0 B, peak 60.0 B
949+
2: consumer_3#3(can spill: false) consumed 30.0 B, peak 45.0 B
950+
3: consumer_2#2(can spill: true) consumed 20.0 B, peak 30.0 B
951+
4: consumer_1#1(can spill: false) consumed 10.0 B, peak 15.0 B
952+
"
953+
);
954+
}
826955
}

0 commit comments

Comments
 (0)