@@ -14,7 +14,7 @@ use anyhow::Context;
1414#[ cfg( target_os = "linux" ) ]
1515use tracing:: error;
1616#[ cfg( target_os = "linux" ) ]
17- use libbpf_rs:: { RingBufferBuilder , Object , ObjectBuilder , RingBuffer } ;
17+ use libbpf_rs:: { RingBufferBuilder , Object , ObjectBuilder } ;
1818
1919pub mod events;
2020
@@ -202,27 +202,38 @@ impl EbpfLoader {
202202
203203 #[ cfg( target_os = "linux" ) ]
204204 async fn initialize_linux ( & self ) -> Result < Object > {
205- let object = ObjectBuilder :: default ( )
205+ let mut object = ObjectBuilder :: default ( )
206206 . open_file ( "kernel-agent/src/sentinel.bpf.o" )
207207 . context ( "Cannot open eBPF object file" ) ?
208208 . load ( )
209209 . context ( "Cannot load eBPF program" ) ?;
210210
211- object. attach ( ) . context ( "Cannot attach eBPF program to kernel" ) ?;
211+ // Attach all programs in the object
212+ for prog in object. progs_iter_mut ( ) {
213+ if let Err ( e) = prog. attach ( ) {
214+ warn ! ( "Failed to attach program {}: {}" , prog. name( ) , e) ;
215+ } else {
216+ info ! ( "Successfully attached program: {}" , prog. name( ) ) ;
217+ }
218+ }
212219
213220 let rb_map = object
214221 . map ( "events" )
215222 . context ( "Cannot find ring buffer map" ) ?;
216223
217- // Create async ring buffer processor
224+ // Clone the map for the processor
225+ let rb_map_clone = rb_map. clone ( ) ;
226+
227+ // Create async ring buffer processor
218228 let sender = self . event_sender . clone ( ) ;
219229 let metrics = Arc :: clone ( & self . metrics ) ;
220230 let rate_limiter = Arc :: clone ( & self . rate_limiter ) ;
221231 let shutdown = Arc :: clone ( & self . shutdown_signal ) ;
222232 let config = self . config . clone ( ) ;
223233
234+ // Spawn the processor with cloned map
224235 tokio:: spawn ( Self :: ring_buffer_processor (
225- rb_map ,
236+ rb_map_clone ,
226237 sender,
227238 metrics,
228239 rate_limiter,
@@ -242,106 +253,106 @@ impl EbpfLoader {
242253 shutdown : Arc < tokio:: sync:: Notify > ,
243254 config : EbpfConfig ,
244255 ) {
245- let mut rb_builder = RingBufferBuilder :: new ( ) ;
256+ info ! ( "📡 Starting async ring buffer processor" ) ;
257+
258+ // Create a channel for events from the ring buffer callback
259+ let ( event_tx, mut event_rx) = mpsc:: unbounded_channel :: < Vec < u8 > > ( ) ;
246260
247- rb_builder. add ( & rb_map, move |data : & [ u8 ] | {
248- let sender = sender. clone ( ) ;
249- let metrics = Arc :: clone ( & metrics) ;
250- let rate_limiter = Arc :: clone ( & rate_limiter) ;
261+ // Build ring buffer in a blocking context to avoid Send issues
262+ let rb_result = tokio:: task:: spawn_blocking ( move || -> Result < _ > {
263+ let mut rb_builder = RingBufferBuilder :: new ( ) ;
251264
252- // Rate limiting
253- if let Err ( _) = rate_limiter. try_acquire ( ) {
254- tokio:: spawn ( async move {
255- let mut metrics_guard = metrics. write ( ) . await ;
256- metrics_guard. events_dropped += 1 ;
257- } ) ;
258- return 0 ;
259- }
260-
261- // Parse event
262- match Self :: parse_event_sync ( data) {
263- Ok ( event) => {
264- tokio:: spawn ( async move {
265- if let Err ( _) = sender. send ( event) . await {
266- // Channel full, update metrics
267- let mut metrics_guard = metrics. write ( ) . await ;
268- metrics_guard. events_dropped += 1 ;
269- } else {
270- let mut metrics_guard = metrics. write ( ) . await ;
271- metrics_guard. events_processed += 1 ;
272- metrics_guard. last_event_timestamp = Some ( Instant :: now ( ) ) ;
273- }
274- } ) ;
275- }
276- Err ( e) => {
277- tokio:: spawn ( async move {
278- let mut metrics_guard = metrics. write ( ) . await ;
279- metrics_guard. processing_errors += 1 ;
280- debug ! ( "Event parsing error: {}" , e) ;
281- } ) ;
265+ rb_builder. add ( & rb_map, {
266+ let event_tx = event_tx. clone ( ) ;
267+ move |data : & [ u8 ] | {
268+ // Copy data and send through channel for async processing
269+ if let Err ( _) = event_tx. send ( data. to_vec ( ) ) {
270+ // Channel closed, stop processing
271+ return -1 ;
272+ }
273+ 0
282274 }
283- }
284- 0
285- } ) ;
275+ } ) ;
286276
287- let mut rb = match rb_builder. build ( ) {
288- Ok ( rb) => rb,
289- Err ( e) => {
277+ rb_builder. build ( )
278+ } ) . await ;
279+
280+ let rb = match rb_result {
281+ Ok ( Ok ( rb) ) => rb,
282+ Ok ( Err ( e) ) => {
290283 error ! ( "Cannot create ring buffer: {}" , e) ;
291284 return ;
292285 }
286+ Err ( e) => {
287+ error ! ( "Ring buffer task error: {}" , e) ;
288+ return ;
289+ }
293290 } ;
294291
295- info ! ( "📡 Async ring buffer processor started" ) ;
292+ // Spawn a blocking task for ring buffer polling
293+ let poll_handle = tokio:: task:: spawn_blocking ( move || {
294+ loop {
295+ match rb. poll ( Duration :: from_millis ( 100 ) ) {
296+ Ok ( ( ) ) => {
297+ // Events are handled through the callback
298+ }
299+ Err ( e) => {
300+ error ! ( "Ring buffer polling error: {}" , e) ;
301+ break ;
302+ }
303+ }
304+ }
305+ } ) ;
296306
297- let mut poll_interval = interval ( Duration :: from_millis ( config . poll_timeout_ms ) ) ;
307+ info ! ( "📡 Ring buffer processor started" ) ;
298308
309+ // Main event processing loop
299310 loop {
300311 tokio:: select! {
301- _ = poll_interval. tick( ) => {
302- // Use spawn_blocking to handle the synchronous poll operation
303- // We need to use a different approach since RingBuffer doesn't support clone
304- match tokio:: task:: spawn_blocking( {
305- // Move ownership temporarily using Option
306- let timeout = Duration :: from_millis( config. poll_timeout_ms) ;
307- move || {
308- // This is a workaround - we need to restructure to avoid cloning
309- // For now, we'll use a shorter timeout in a different way
310- std:: thread:: sleep( Duration :: from_millis( 10 ) ) ;
311- Ok :: <( ) , anyhow:: Error >( ( ) )
312- }
313- } ) . await {
314- Ok ( Ok ( ( ) ) ) => {
315- // Poll the ring buffer directly without clone
316- if let Err ( e) = rb. poll( Duration :: from_millis( 1 ) ) {
317- error!( "Ring buffer polling error: {}" , e) ;
318- if config. auto_recovery {
319- warn!( "Attempting auto recovery..." ) ;
320- tokio:: time:: sleep( Duration :: from_secs( 1 ) ) . await ;
321- continue ;
322- }
323- break ;
324- }
325- }
326- Ok ( Err ( e) ) => {
327- error!( "Ring buffer polling error: {}" , e) ;
328- if config. auto_recovery {
329- warn!( "Attempting auto recovery..." ) ;
330- tokio:: time:: sleep( Duration :: from_secs( 1 ) ) . await ;
331- continue ;
312+ // Process events from ring buffer
313+ Some ( data) = event_rx. recv( ) => {
314+ // Rate limiting
315+ if rate_limiter. try_acquire( ) . is_err( ) {
316+ let mut metrics_guard = metrics. write( ) . await ;
317+ metrics_guard. events_dropped += 1 ;
318+ continue ;
319+ }
320+
321+ // Parse and send event
322+ match Self :: parse_event_sync( & data) {
323+ Ok ( event) => {
324+ if let Err ( _) = sender. send( event) . await {
325+ // Channel full, update metrics
326+ let mut metrics_guard = metrics. write( ) . await ;
327+ metrics_guard. events_dropped += 1 ;
328+ } else {
329+ let mut metrics_guard = metrics. write( ) . await ;
330+ metrics_guard. events_processed += 1 ;
331+ metrics_guard. last_event_timestamp = Some ( Instant :: now( ) ) ;
332332 }
333- break ;
334333 }
335334 Err ( e) => {
336- error!( "Ring buffer task error: {}" , e) ;
337- break ;
335+ let mut metrics_guard = metrics. write( ) . await ;
336+ metrics_guard. processing_errors += 1 ;
337+ debug!( "Event parsing error: {}" , e) ;
338338 }
339339 }
340340 }
341+
342+ // Handle shutdown
341343 _ = shutdown. notified( ) => {
342344 info!( "Received shutdown signal, stopping ring buffer processor" ) ;
343345 break ;
344346 }
347+
348+ // Check if polling task finished (error case)
349+ _ = & mut poll_handle => {
350+ error!( "Ring buffer polling task terminated unexpectedly" ) ;
351+ if config. auto_recovery {
352+ warn!( "Auto recovery not implemented for polling task termination" ) ;
353+ }
354+ break ;
355+ }
345356 }
346357 }
347358
@@ -647,7 +658,7 @@ impl EbpfLoader {
647658 _ => {
648659 // Create error event
649660 Ok ( RawEvent :: Error ( EventError {
650- timestamp : chrono:: Utc :: now ( ) . timestamp_nanos ( ) as u64 ,
661+ timestamp : chrono:: Utc :: now ( ) . timestamp_nanos_opt ( ) . unwrap_or ( 0 ) as u64 ,
651662 error_type : ErrorType :: ParseError ,
652663 message : "Unknown event type" . to_string ( ) ,
653664 context : "parse_event_sync" . to_string ( ) ,
0 commit comments