1616// under the License.
1717
1818use crate :: memory_pool:: { MemoryConsumer , MemoryPool , MemoryReservation } ;
19+ use chrono:: Utc ;
1920use datafusion_common:: { resources_datafusion_err, DataFusionError , Result } ;
2021use hashbrown:: HashMap ;
2122use log:: debug;
2223use parking_lot:: Mutex ;
2324use std:: {
2425 num:: NonZeroUsize ,
25- sync:: atomic:: { AtomicU64 , AtomicUsize , Ordering } ,
26+ sync:: {
27+ atomic:: { AtomicU64 , AtomicUsize , Ordering } ,
28+ Arc ,
29+ } ,
2630} ;
2731
2832/// A [`MemoryPool`] that enforces no limit
@@ -261,7 +265,7 @@ fn insufficient_capacity_err(
261265pub struct TrackConsumersPool < I > {
262266 inner : I ,
263267 top : NonZeroUsize ,
264- tracked_consumers : Mutex < HashMap < MemoryConsumer , AtomicU64 > > ,
268+ tracked_consumers : Arc < Mutex < HashMap < MemoryConsumer , AtomicU64 > > > ,
265269}
266270
267271impl < I : MemoryPool > TrackConsumersPool < I > {
@@ -270,28 +274,53 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
270274 /// The `top` determines how many Top K [`MemoryConsumer`]s to include
271275 /// in the reported [`DataFusionError::ResourcesExhausted`].
272276 pub fn new ( inner : I , top : NonZeroUsize ) -> Self {
277+ let tracked_consumers = Default :: default ( ) ;
278+
279+ let _captured: Arc <
280+ parking_lot:: lock_api:: Mutex <
281+ parking_lot:: RawMutex ,
282+ HashMap < MemoryConsumer , AtomicU64 > ,
283+ > ,
284+ > = Arc :: clone ( & tracked_consumers) ;
285+ #[ allow( clippy:: disallowed_methods) ]
286+ tokio:: spawn ( async move {
287+ loop {
288+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 100 ) ) . await ;
289+
290+ println ! (
291+ "REPORT: {}, {}" ,
292+ Utc :: now( ) ,
293+ Self :: _report_top( 5 , Arc :: clone( & _captured) )
294+ ) ;
295+ }
296+ } ) ;
297+
273298 Self {
274299 inner,
275300 top,
276- tracked_consumers : Default :: default ( ) ,
301+ tracked_consumers,
277302 }
278303 }
279304
280305 /// Determine if there are multiple [`MemoryConsumer`]s registered
281306 /// which have the same name.
282307 ///
283308 /// This is very tied to the implementation of the memory consumer.
284- fn has_multiple_consumers ( & self , name : & String ) -> bool {
309+ fn has_multiple_consumers (
310+ name : & String ,
311+ tracked_consumers : & Arc < Mutex < HashMap < MemoryConsumer , AtomicU64 > > > ,
312+ ) -> bool {
285313 let consumer = MemoryConsumer :: new ( name) ;
286314 let consumer_with_spill = consumer. clone ( ) . with_can_spill ( true ) ;
287- let guard = self . tracked_consumers . lock ( ) ;
315+ let guard = tracked_consumers. lock ( ) ;
288316 guard. contains_key ( & consumer) && guard. contains_key ( & consumer_with_spill)
289317 }
290318
291- /// The top consumers in a report string.
292- pub fn report_top ( & self , top : usize ) -> String {
293- let mut consumers = self
294- . tracked_consumers
319+ fn _report_top (
320+ top : usize ,
321+ tracked_consumers : Arc < Mutex < HashMap < MemoryConsumer , AtomicU64 > > > ,
322+ ) -> String {
323+ let mut consumers = tracked_consumers
295324 . lock ( )
296325 . iter ( )
297326 . map ( |( consumer, reserved) | {
@@ -301,12 +330,13 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
301330 )
302331 } )
303332 . collect :: < Vec < _ > > ( ) ;
304- consumers. sort_by ( |a, b| b. 1 . cmp ( & a. 1 ) ) ; // inverse ordering
333+ // consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
334+ consumers. sort_by ( |a, b| b. 0 . cmp ( & a. 0 ) ) ; // sort by name, make stdout easily parsible
305335
306336 consumers[ 0 ..std:: cmp:: min ( top, consumers. len ( ) ) ]
307337 . iter ( )
308338 . map ( |( ( name, can_spill) , size) | {
309- if self . has_multiple_consumers ( name) {
339+ if Self :: has_multiple_consumers ( name, & tracked_consumers ) {
310340 format ! ( "{name}(can_spill={}) consumed {:?} bytes" , can_spill, size)
311341 } else {
312342 format ! ( "{name} consumed {:?} bytes" , size)
@@ -315,6 +345,11 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
315345 . collect :: < Vec < _ > > ( )
316346 . join ( ", " )
317347 }
348+
349+ /// The top consumers in a report string.
350+ pub fn report_top ( & self , top : usize ) -> String {
351+ Self :: _report_top ( top, Arc :: clone ( & self . tracked_consumers ) )
352+ }
318353}
319354
320355impl < I : MemoryPool > MemoryPool for TrackConsumersPool < I > {
@@ -380,6 +415,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
380415 . and_modify ( |bytes| {
381416 bytes. fetch_add ( additional as u64 , Ordering :: AcqRel ) ;
382417 } ) ;
418+
383419 Ok ( ( ) )
384420 }
385421
0 commit comments