@@ -53,6 +53,7 @@ use anyhow::Result;
5353use std:: sync:: Arc ;
5454use std:: time:: Duration ;
5555use tokio:: sync:: mpsc;
56+ use tokio:: task:: JoinHandle ;
5657
5758pub use event:: { AuditEvent , EventResult , EventType } ;
5859pub use exporter:: { AuditExporter , NullExporter } ;
@@ -128,19 +129,34 @@ impl AuditConfig {
128129 }
129130
130131 /// Set the buffer size.
132+ ///
133+ /// # Panics
134+ ///
135+ /// Panics if size is 0.
131136 pub fn with_buffer_size ( mut self , size : usize ) -> Self {
137+ assert ! ( size >= 1 , "buffer_size must be at least 1" ) ;
132138 self . buffer_size = size;
133139 self
134140 }
135141
136142 /// Set the batch size.
143+ ///
144+ /// # Panics
145+ ///
146+ /// Panics if size is 0.
137147 pub fn with_batch_size ( mut self , size : usize ) -> Self {
148+ assert ! ( size >= 1 , "batch_size must be at least 1" ) ;
138149 self . batch_size = size;
139150 self
140151 }
141152
142153 /// Set the flush interval.
154+ ///
155+ /// # Panics
156+ ///
157+ /// Panics if secs is 0.
143158 pub fn with_flush_interval ( mut self , secs : u64 ) -> Self {
159+ assert ! ( secs >= 1 , "flush_interval_secs must be at least 1" ) ;
144160 self . flush_interval_secs = secs;
145161 self
146162 }
@@ -166,6 +182,9 @@ pub struct AuditManager {
166182
167183 /// Whether audit logging is enabled
168184 enabled : bool ,
185+
186+ /// Handle to the background worker task
187+ worker_handle : Option < JoinHandle < ( ) > > ,
169188}
170189
171190impl AuditManager {
@@ -208,23 +227,26 @@ impl AuditManager {
208227 exporters. push ( exporter) ;
209228 }
210229
211- let manager = Self {
212- exporters : exporters. clone ( ) ,
213- sender,
214- enabled : config. enabled ,
215- } ;
216-
217230 // Start background worker
218- if config. enabled {
231+ let worker_handle = if config. enabled {
219232 let batch_size = config. batch_size ;
220233 let flush_interval = Duration :: from_secs ( config. flush_interval_secs ) ;
221- tokio:: spawn ( Self :: worker (
234+ Some ( tokio:: spawn ( Self :: worker (
222235 receiver,
223- exporters,
236+ exporters. clone ( ) ,
224237 batch_size,
225238 flush_interval,
226- ) ) ;
227- }
239+ ) ) )
240+ } else {
241+ None
242+ } ;
243+
244+ let manager = Self {
245+ exporters,
246+ sender,
247+ enabled : config. enabled ,
248+ worker_handle,
249+ } ;
228250
229251 Ok ( manager)
230252 }
@@ -263,26 +285,32 @@ impl AuditManager {
263285
264286 loop {
265287 tokio:: select! {
266- Some ( event) = receiver. recv( ) => {
267- buffer. push( event) ;
268-
269- // Flush if buffer is full
270- if buffer. len( ) >= batch_size {
271- Self :: flush_buffer( & exporters, & mut buffer) . await ;
288+ biased;
289+
290+ event_opt = receiver. recv( ) => {
291+ match event_opt {
292+ Some ( event) => {
293+ buffer. push( event) ;
294+
295+ // Flush if buffer is full
296+ if buffer. len( ) >= batch_size {
297+ Self :: flush_buffer( & exporters, & mut buffer) . await ;
298+ }
299+ }
300+ None => {
301+ // Channel closed, flush remaining events and exit
302+ if !buffer. is_empty( ) {
303+ Self :: flush_buffer( & exporters, & mut buffer) . await ;
304+ }
305+ break ;
306+ }
272307 }
273308 }
274309 _ = flush_timer. tick( ) => {
275310 if !buffer. is_empty( ) {
276311 Self :: flush_buffer( & exporters, & mut buffer) . await ;
277312 }
278313 }
279- else => {
280- // Channel closed, flush remaining events and exit
281- if !buffer. is_empty( ) {
282- Self :: flush_buffer( & exporters, & mut buffer) . await ;
283- }
284- break ;
285- }
286314 }
287315 }
288316
@@ -297,7 +325,7 @@ impl AuditManager {
297325 /// Flush the event buffer to all exporters.
298326 async fn flush_buffer ( exporters : & [ Arc < dyn AuditExporter > ] , buffer : & mut Vec < AuditEvent > ) {
299327 for exporter in exporters {
300- if let Err ( e) = exporter. export_batch ( buffer. clone ( ) ) . await {
328+ if let Err ( e) = exporter. export_batch ( buffer) . await {
301329 tracing:: error!( "Audit export failed: {}" , e) ;
302330 }
303331 }
@@ -319,6 +347,33 @@ impl AuditManager {
319347 pub fn is_enabled ( & self ) -> bool {
320348 self . enabled
321349 }
350+
351+ /// Gracefully shut down the audit manager.
352+ ///
353+ /// This method:
354+ /// 1. Drops the sender to signal the worker to stop accepting new events
355+ /// 2. Waits for the worker to finish processing buffered events
356+ /// 3. Ensures all exporters are properly closed
357+ ///
358+ /// After calling this method, the AuditManager should not be used.
359+ ///
360+ /// # Errors
361+ ///
362+ /// Returns an error if the worker task panicked or if there was an issue
363+ /// waiting for the worker to complete.
364+ pub async fn shutdown ( mut self ) -> Result < ( ) > {
365+ // Drop the sender to signal the worker to exit
366+ drop ( self . sender ) ;
367+
368+ // Wait for the worker to finish
369+ if let Some ( handle) = self . worker_handle . take ( ) {
370+ handle
371+ . await
372+ . map_err ( |e| anyhow:: anyhow!( "Worker task panicked: {}" , e) ) ?;
373+ }
374+
375+ Ok ( ( ) )
376+ }
322377}
323378
324379#[ cfg( test) ]
@@ -448,4 +503,63 @@ mod tests {
448503 // Wait for flush interval
449504 tokio:: time:: sleep ( Duration :: from_millis ( 1100 ) ) . await ;
450505 }
506+
507+ #[ tokio:: test( flavor = "multi_thread" ) ]
508+ async fn test_audit_manager_shutdown ( ) {
509+ let config = AuditConfig :: new ( )
510+ . with_enabled ( true )
511+ . with_buffer_size ( 10 )
512+ . with_batch_size ( 5 )
513+ . with_flush_interval ( 1 ) ;
514+
515+ let manager = AuditManager :: new ( & config) . unwrap ( ) ;
516+
517+ // Send some events
518+ for i in 0 ..3 {
519+ let event = AuditEvent :: new (
520+ EventType :: FileUploaded ,
521+ format ! ( "user{}" , i) ,
522+ format ! ( "session-{}" , i) ,
523+ ) ;
524+ manager. log ( event) . await ;
525+ }
526+
527+ // Give a small amount of time for events to be queued
528+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
529+
530+ // Shutdown should wait for all events to be processed
531+ let result = tokio:: time:: timeout ( Duration :: from_secs ( 10 ) , manager. shutdown ( ) ) . await ;
532+ assert ! ( result. is_ok( ) , "Shutdown timed out" ) ;
533+ assert ! ( result. unwrap( ) . is_ok( ) , "Shutdown failed" ) ;
534+ }
535+
536+ #[ test]
537+ #[ should_panic( expected = "buffer_size must be at least 1" ) ]
538+ fn test_audit_config_invalid_buffer_size ( ) {
539+ let _config = AuditConfig :: new ( ) . with_buffer_size ( 0 ) ;
540+ }
541+
542+ #[ test]
543+ #[ should_panic( expected = "batch_size must be at least 1" ) ]
544+ fn test_audit_config_invalid_batch_size ( ) {
545+ let _config = AuditConfig :: new ( ) . with_batch_size ( 0 ) ;
546+ }
547+
548+ #[ test]
549+ #[ should_panic( expected = "flush_interval_secs must be at least 1" ) ]
550+ fn test_audit_config_invalid_flush_interval ( ) {
551+ let _config = AuditConfig :: new ( ) . with_flush_interval ( 0 ) ;
552+ }
553+
554+ #[ test]
555+ fn test_audit_config_valid_minimum_values ( ) {
556+ let config = AuditConfig :: new ( )
557+ . with_buffer_size ( 1 )
558+ . with_batch_size ( 1 )
559+ . with_flush_interval ( 1 ) ;
560+
561+ assert_eq ! ( config. buffer_size, 1 ) ;
562+ assert_eq ! ( config. batch_size, 1 ) ;
563+ assert_eq ! ( config. flush_interval_secs, 1 ) ;
564+ }
451565}
0 commit comments