Skip to content

Commit 850f10f

Browse files
committed
chore: add docs for how to use TrackConsumersPool
1 parent f3941b2 commit 850f10f

2 files changed

Lines changed: 252 additions & 2 deletions

File tree

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 250 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,228 @@ impl TrackedConsumer {
295295
///
296296
/// By tracking memory reservations more carefully this pool
297297
/// can provide better error messages on the largest memory users
298+
/// when memory allocation fails.
298299
///
299300
/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
300301
/// The same consumer can have multiple reservations.
302+
///
303+
/// # Automatic Usage with RuntimeEnvBuilder
304+
///
305+
/// The easiest way to use `TrackConsumersPool` is through [`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit),
306+
/// which automatically creates a `TrackConsumersPool` wrapping a [`GreedyMemoryPool`] with tracking
307+
/// for the top 5 memory consumers:
308+
///
309+
/// ```no_run
310+
/// # use datafusion_execution::runtime_env::RuntimeEnvBuilder;
311+
/// # use datafusion_execution::config::SessionConfig;
312+
/// # use std::sync::Arc;
313+
/// # async fn example() -> datafusion_common::Result<()> {
314+
/// // Create a runtime with 20MB memory limit and consumer tracking
315+
/// let runtime = RuntimeEnvBuilder::new()
316+
/// .with_memory_limit(20_000_000, 1.0) // 20MB, 100% utilization
317+
/// .build_arc()?;
318+
///
319+
/// let config = SessionConfig::new();
320+
///
321+
/// // Note: In real usage, you would use datafusion::prelude::SessionContext
322+
/// // let ctx = SessionContext::new_with_config_rt(config, runtime);
323+
///
324+
/// // Register your table
325+
/// // ctx.register_table("t", table)?;
326+
///
327+
/// // Run a memory-intensive query
328+
/// let query = "
329+
/// COPY (select * from t)
330+
/// TO '/tmp/output.parquet'
331+
/// STORED AS PARQUET OPTIONS (compression 'uncompressed')";
332+
///
333+
/// // If memory is exhausted, you'll get detailed error messages like:
334+
/// // "Additional allocation failed with top memory consumers (across reservations) as:
335+
/// // ParquetSink(ArrowColumnWriter)#123(can spill: false) consumed 15.2 MB,
336+
/// // HashJoin#456(can spill: false) consumed 3.1 MB,
337+
/// // Sort#789(can spill: true) consumed 1.8 MB,
338+
/// // Aggregation#101(can spill: false) consumed 892.0 KB,
339+
/// // Filter#202(can spill: false) consumed 156.0 KB.
340+
/// // Error: Failed to allocate additional 2.5 MB..."
341+
///
342+
/// // let result = ctx.sql(query).await?.collect().await;
343+
/// # Ok(())
344+
/// # }
345+
/// ```
346+
///
347+
/// # How to use in ExecutionPlan implementations
348+
///
349+
/// When implementing custom ExecutionPlans that need to use significant memory, you should
350+
/// use the memory pool to track and limit memory usage:
351+
///
352+
/// ```rust
353+
/// use std::sync::Arc;
354+
/// use datafusion_execution::memory_pool::{MemoryPool, MemoryConsumer, MemoryReservation};
355+
/// use datafusion_common::Result;
356+
///
357+
/// /// Example of an external batch bufferer that uses memory reservation.
358+
/// ///
359+
/// /// It's a simple example which spills all existing data to disk
360+
/// /// whenever the memory limit is reached.
361+
/// struct MyExternalBatchBufferer {
362+
/// buffer: Vec<u8>,
363+
/// reservation: MemoryReservation,
364+
/// }
365+
///
366+
/// impl MyExternalBatchBufferer {
367+
/// fn new(reservation: MemoryReservation) -> Self {
368+
/// Self {
369+
/// buffer: Vec::new(),
370+
/// reservation,
371+
/// }
372+
/// }
373+
///
374+
/// fn add_batch(&mut self, batch_data: Vec<u8>) -> Result<()> {
375+
/// let additional_memory = batch_data.len();
376+
///
377+
/// // Try to reserve memory before allocating
378+
/// if let Err(_) = self.reservation.try_grow(additional_memory) {
379+
/// // Memory limit reached - handle by spilling or other strategy
380+
/// self.spill_to_disk()?;
381+
/// }
382+
///
383+
/// // Now safe to allocate and add the data
384+
/// self.reservation.try_grow(additional_memory)?;
385+
/// self.buffer.extend_from_slice(&batch_data);
386+
/// Ok(())
387+
/// }
388+
///
389+
/// fn spill_to_disk(&mut self) -> Result<()> {
390+
/// // Write buffer to disk
391+
/// // ... spilling logic ...
392+
///
393+
/// // Free the memory after spilling
394+
/// let freed_bytes = self.buffer.len();
395+
/// self.buffer.clear();
396+
/// self.reservation.shrink(freed_bytes);
397+
///
398+
/// Ok(())
399+
/// }
400+
///
401+
/// fn finish(&mut self) -> Vec<u8> {
402+
/// let result = std::mem::take(&mut self.buffer);
403+
/// // Free the memory when done
404+
/// self.reservation.free();
405+
/// result
406+
/// }
407+
/// }
408+
///
409+
/// # #[cfg(feature = "example")]
410+
/// # {
411+
/// use futures::StreamExt;
412+
/// use datafusion_execution::{SendableRecordBatchStream, TaskContext};
413+
/// use datafusion_physical_plan::{ExecutionPlan, DisplayAs, DisplayFormatType};
414+
/// use datafusion_common::{internal_err, DataFusionError};
415+
/// use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
416+
/// use arrow_schema::{Schema, SchemaRef};
417+
/// use futures::stream::{self, StreamExt};
418+
/// use std::any::Any;
419+
/// use std::fmt;
420+
///
421+
/// /// Example of an ExecutionPlan that uses the MyExternalBatchBufferer.
422+
/// #[derive(Debug)]
423+
/// struct MyBufferingExecutionPlan {
424+
/// schema: SchemaRef,
425+
/// input: Arc<dyn ExecutionPlan>,
426+
/// }
427+
///
428+
/// impl MyBufferingExecutionPlan {
429+
/// fn new(schema: SchemaRef, input: Arc<dyn ExecutionPlan>) -> Self {
430+
/// Self { schema, input }
431+
/// }
432+
/// }
433+
///
434+
/// impl DisplayAs for MyBufferingExecutionPlan {
435+
/// fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
436+
/// write!(f, "MyBufferingExecutionPlan")
437+
/// }
438+
/// }
439+
///
440+
/// impl ExecutionPlan for MyBufferingExecutionPlan {
441+
/// fn name(&self) -> &'static str {
442+
/// "MyBufferingExecutionPlan"
443+
/// }
444+
///
445+
/// fn as_any(&self) -> &dyn Any {
446+
/// self
447+
/// }
448+
///
449+
/// fn schema(&self) -> SchemaRef {
450+
/// self.schema.clone()
451+
/// }
452+
///
453+
/// fn properties(&self) -> &datafusion_physical_expr::PlanProperties {
454+
/// todo!()
455+
/// }
456+
///
457+
/// fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
458+
/// vec![&self.input]
459+
/// }
460+
///
461+
/// fn with_new_children(
462+
/// self: Arc<Self>,
463+
/// children: Vec<Arc<dyn ExecutionPlan>>,
464+
/// ) -> Result<Arc<dyn ExecutionPlan>> {
465+
/// todo!()
466+
/// }
467+
///
468+
/// fn execute(
469+
/// &self,
470+
/// partition: usize,
471+
/// context: Arc<TaskContext>,
472+
/// ) -> Result<SendableRecordBatchStream> {
473+
/// // Register memory consumer with the context's memory pool
474+
/// let reservation = MemoryConsumer::new("MyExternalBatchBufferer")
475+
/// .with_can_spill(true)
476+
/// .register(context.memory_pool());
477+
///
478+
/// let mut operator = MyExternalBatchBufferer::new(reservation);
479+
///
480+
/// // Process incoming stream of batches
481+
/// let stream = self.input.execute(partition, context)?;
482+
///
483+
/// // Process the stream and collect all batches
484+
/// let processed_stream = stream
485+
/// .map(|batch_result| {
486+
/// batch_result.map(|batch| {
487+
/// // Convert RecordBatch to bytes for this example
488+
/// vec![1u8; batch.get_array_memory_size()]
489+
/// })
490+
/// })
491+
/// .try_for_each(move |batch_data| async move {
492+
/// operator.add_batch(batch_data)?;
493+
/// Ok(())
494+
/// })
495+
/// .map(move |result| {
496+
/// match result {
497+
/// Ok(_) => {
498+
/// // Finish processing and get results
499+
/// let _final_result = operator.finish();
500+
/// // In a real implementation, you would convert final_result back to RecordBatches
501+
/// }
502+
/// Err(_) => {
503+
/// // Handle error case
504+
/// }
505+
/// }
506+
/// });
507+
///
508+
/// // Since this is a simplified example, return an empty stream
509+
/// // In a real implementation, you would create a stream from the processed results
510+
/// let result_stream = stream::empty();
511+
/// Ok(Box::pin(result_stream))
512+
/// }
513+
/// }
514+
/// # }
515+
/// ```
516+
///
517+
/// # Runtime Monitoring
518+
///
519+
/// TODO(kosiew/wiedld): Docs to be added after <https://github.com/apache/datafusion/pull/17021/>.
301520
#[derive(Debug)]
302521
pub struct TrackConsumersPool<I> {
303522
/// The wrapped memory pool that actually handles reservation logic
@@ -311,6 +530,36 @@ pub struct TrackConsumersPool<I> {
311530
impl<I: MemoryPool> TrackConsumersPool<I> {
312531
/// Creates a new [`TrackConsumersPool`].
313532
///
533+
/// # Arguments
534+
/// * `inner` - The underlying memory pool that handles actual memory allocation
535+
/// * `top` - The number of top memory consumers to include in error messages
536+
///
537+
/// # Note
538+
/// In most cases, you should use [`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit)
539+
/// instead of creating this pool manually, as it automatically sets up tracking with
540+
/// sensible defaults (top 5 consumers).
541+
///
542+
/// # Example
543+
///
544+
/// ```rust
545+
/// use std::num::NonZeroUsize;
546+
/// use datafusion_execution::memory_pool::{TrackConsumersPool, GreedyMemoryPool, FairSpillPool};
547+
///
548+
/// // Create with a greedy pool backend, reporting top 3 consumers in error messages
549+
/// let tracked_greedy = TrackConsumersPool::new(
550+
/// GreedyMemoryPool::new(1024 * 1024), // 1MB limit
551+
/// NonZeroUsize::new(3).unwrap(),
552+
/// );
553+
///
554+
/// // Create with a fair spill pool backend, reporting top 5 consumers in error messages
555+
/// let tracked_fair = TrackConsumersPool::new(
556+
/// FairSpillPool::new(2 * 1024 * 1024), // 2MB limit
557+
/// NonZeroUsize::new(5).unwrap(),
558+
/// );
559+
/// ```
560+
///
561+
/// # Impact on Error Messages
562+
///
314563
/// The `top` determines how many Top K [`MemoryConsumer`]s to include
315564
/// in the reported [`DataFusionError::ResourcesExhausted`].
316565
pub fn new(inner: I, top: NonZeroUsize) -> Self {
@@ -321,7 +570,7 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
321570
}
322571
}
323572

324-
/// The top consumers in a report string.
573+
/// Returns a formatted string with the top memory consumers.
325574
pub fn report_top(&self, top: usize) -> String {
326575
let mut consumers = self
327576
.tracked_consumers

datafusion/execution/src/runtime_env.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,8 @@ impl RuntimeEnvBuilder {
258258
/// Specify the total memory to use while running the DataFusion
259259
/// plan to `max_memory * memory_fraction` in bytes.
260260
///
261-
/// This defaults to using [`GreedyMemoryPool`]
261+
/// This defaults to using [`GreedyMemoryPool`] wrapped in the
262+
/// [`TrackConsumersPool`] with a maximum of 5 consumers.
262263
///
263264
/// Note DataFusion does not yet respect this limit in all cases.
264265
pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {

0 commit comments

Comments
 (0)