From c1778efb88eadbe16b67839e7cccecb8214bbd53 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 27 Aug 2025 14:14:50 +0200 Subject: [PATCH 1/5] Allow to change tracing callback and level --- ffi/src/ffi_tracing.rs | 397 +++++++++++++++++++++++++++++++++-------- 1 file changed, 323 insertions(+), 74 deletions(-) diff --git a/ffi/src/ffi_tracing.rs b/ffi/src/ffi_tracing.rs index a84f8224f..dedc8d06c 100644 --- a/ffi/src/ffi_tracing.rs +++ b/ffi/src/ffi_tracing.rs @@ -1,5 +1,6 @@ //! FFI functions to allow engines to receive log and tracing events from kernel +use std::sync::LazyLock; use std::sync::{Arc, Mutex}; use std::{fmt, io}; @@ -8,6 +9,9 @@ use tracing::{ field::{Field as TracingField, Visit}, Event as TracingEvent, Subscriber, }; + +use tracing::trace; +use tracing::warn; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::{filter::LevelFilter, layer::Context, registry::LookupSpan, Layer}; @@ -242,7 +246,7 @@ impl Visit for MessageFieldVisitor { } struct EventLayer { - callback: TracingEventFn, + callback: Arc>, } impl Layer for EventLayer @@ -267,25 +271,99 @@ where line: metadata.line().unwrap_or(0), file: kernel_string_slice!(file), }; - (self.callback)(event); + let cb = self.callback.lock().unwrap(); + (cb)(event); } } } -fn get_event_dispatcher(callback: TracingEventFn, max_level: Level) -> tracing_core::Dispatch { +struct GlobalTracingState { + dispatch: Option, + reload_handle: + Option>, + /// callback for event subscriber + event_callback: Option>>, + /// callback for log line subscriber + log_line_callback: Option>>, +} + +static TRACING_STATE: LazyLock> = LazyLock::new(|| { + Mutex::new(GlobalTracingState { + dispatch: None, + reload_handle: None, + event_callback: None, + log_line_callback: None, + }) +}); + +fn get_event_dispatcher( + callback: TracingEventFn, + max_level: Level, +) -> ( + tracing_core::Dispatch, + tracing_subscriber::reload::Handle, + Arc>, +) { use tracing_subscriber::{layer::SubscriberExt, registry::Registry}; - let filter: LevelFilter = max_level.into(); - let event_layer = EventLayer { callback }.with_filter(filter); + + let callback_arc = Arc::new(Mutex::new(callback)); + let (filter_layer, reload_handle) = + tracing_subscriber::reload::Layer::new(LevelFilter::from(max_level)); + let event_layer = EventLayer { + callback: callback_arc.clone(), + } + .with_filter(filter_layer); + let subscriber = Registry::default().with(event_layer); - tracing_core::Dispatch::new(subscriber) + ( + tracing_core::Dispatch::new(subscriber), + reload_handle, + callback_arc, + ) } fn setup_event_subscriber(callback: TracingEventFn, max_level: Level) -> DeltaResult<()> { if !max_level.is_valid() { return Err(Error::generic("max_level out of range")); } - let dispatch = get_event_dispatcher(callback, max_level); - set_global_default(dispatch) + let mut state = TRACING_STATE.lock().unwrap(); + // If already initialized - just update callback and level + if let Some(reload_handle) = &state.reload_handle { + if let Some(event_callback) = &state.event_callback { + *event_callback.lock().unwrap() = callback; + } + return reload_handle + .reload(LevelFilter::from(max_level)) + .map_err(|e| { + warn!("Failed to reload tracing level: {e}"); + Error::generic(format!("Unable to reload subscriber: {e}")) + }); + } + // First-time setup + let (dispatch, reload_handle, event_callback) = get_event_dispatcher(callback, max_level); + match set_global_default(dispatch.clone()) { + Ok(()) => { + state.dispatch = Some(dispatch); + state.reload_handle = Some(reload_handle); + state.event_callback = Some(event_callback); + Ok(()) + } + Err(e) => { + trace!("Failed to set global default: {e}. Will try to reload"); + match reload_handle.reload(LevelFilter::from(max_level)) { + Ok(_) => { + state.dispatch = Some(dispatch); + state.reload_handle = Some(reload_handle); + state.event_callback = Some(event_callback); + Ok(()) + } + Err(e) => { + warn!("Failed to reload fallback tracing level: {e}"); + Err(Error::generic(format!("Unable to reload subscriber: {e}"))) + } + } + } + } } // utility code below for setting up the tracing subscriber for log lines @@ -294,7 +372,7 @@ type SharedBuffer = Arc>>; struct TriggerLayer { buf: SharedBuffer, - callback: TracingLogLineFn, + callback: Arc>, } impl Layer for TriggerLayer @@ -306,13 +384,15 @@ where Ok(mut buf) => { let message = String::from_utf8_lossy(&buf); let message = kernel_string_slice!(message); - (self.callback)(message); + let cb = self.callback.lock().unwrap(); + (cb)(message); buf.clear(); } Err(_) => { let message = "INTERNAL KERNEL ERROR: Could not lock message buffer."; let message = kernel_string_slice!(message); - (self.callback)(message); + let cb = self.callback.lock().unwrap(); + (cb)(message); } } } @@ -327,7 +407,7 @@ impl io::Write for BufferedMessageWriter { fn write(&mut self, buf: &[u8]) -> io::Result { self.current_buffer .lock() - .map_err(|_| io::Error::other("Could not lock buffer"))? + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Could not lock buffer"))? .extend_from_slice(buf); Ok(buf.len()) } @@ -355,34 +435,47 @@ fn get_log_line_dispatch( with_time: bool, with_level: bool, with_target: bool, -) -> tracing_core::Dispatch { +) -> ( + tracing_core::Dispatch, + tracing_subscriber::reload::Handle, + Arc>, +) { use tracing_subscriber::{layer::SubscriberExt, registry::Registry}; + let buffer = Arc::new(Mutex::new(vec![])); let writer = BufferedMessageWriter { current_buffer: buffer.clone(), }; + let fmt_layer = tracing_subscriber::fmt::layer() .with_writer(writer) .with_ansi(ansi) .with_level(with_level) .with_target(with_target); - let filter: LevelFilter = max_level.into(); + + let (filter_layer, reload_handle) = + tracing_subscriber::reload::Layer::new(LevelFilter::from(max_level)); + + let callback_arc = Arc::new(Mutex::new(callback)); let tracking_layer = TriggerLayer { buf: buffer.clone(), - callback, + callback: callback_arc.clone(), }; - // This repeats some code, but avoids some insane generic wrangling if we try to abstract the - // type of `fmt_layer` over the formatter macro_rules! setup_subscriber { ($($transform:ident()).*) => {{ - let fmt_layer = fmt_layer$(.$transform())*.with_filter(filter); let subscriber = Registry::default() + .with(filter_layer) .with(fmt_layer) - .with(tracking_layer.with_filter(filter)); - tracing_core::Dispatch::new(subscriber) + .with(tracking_layer); + ( + tracing_core::Dispatch::new(subscriber), + reload_handle, + callback_arc.clone(), + ) }}; } + use LogLineFormat::*; match (format, with_time) { (FULL, true) => setup_subscriber!(), @@ -408,7 +501,25 @@ fn setup_log_line_subscriber( if !max_level.is_valid() { return Err(Error::generic("max_level out of range")); } - let dispatch = get_log_line_dispatch( + + let mut state = TRACING_STATE.lock().unwrap(); + + // Already initialized - just update callback and level + if let Some(reload_handle) = &state.reload_handle { + if let Some(log_line_callback) = &state.log_line_callback { + *log_line_callback.lock().unwrap() = callback; + } + + return reload_handle + .reload(LevelFilter::from(max_level)) + .map_err(|e| { + warn!("Failed to reload log level: {e}"); + Error::generic(format!("Unable to reload subscriber: {e}")) + }); + } + + // First-time setup + let (dispatch, reload_handle, log_line_callback) = get_log_line_dispatch( callback, max_level, format, @@ -417,14 +528,39 @@ fn setup_log_line_subscriber( with_level, with_target, ); - set_global_default(dispatch) + + match set_global_default(dispatch.clone()) { + Ok(()) => { + state.dispatch = Some(dispatch); + state.reload_handle = Some(reload_handle); + state.log_line_callback = Some(log_line_callback); + Ok(()) + } + Err(e) => { + trace!("Failed to set global default subscriber: {e}. Will try to reload"); + match reload_handle.reload(LevelFilter::from(max_level)) { + Ok(_) => { + state.dispatch = Some(dispatch); + state.reload_handle = Some(reload_handle); + state.log_line_callback = Some(log_line_callback); + Ok(()) + } + Err(e) => { + warn!("Failed to reload fallback tracing level: {e}"); + Err(Error::generic(format!("Unable to reload subscriber: {e}"))) + } + } + } + } } #[cfg(test)] mod tests { use std::sync::LazyLock; + use tracing::debug; use tracing::info; + use tracing::trace; use tracing_subscriber::fmt::time::FormatTime; use crate::TryFromStringSlice; @@ -436,15 +572,31 @@ mod tests { static TEST_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); static MESSAGES: Mutex>> = Mutex::new(None); - extern "C" fn record_callback(line: KernelStringSlice) { - let s: &str = unsafe { TryFromStringSlice::try_from_slice(&line).unwrap() }; - let s = s.to_string(); - let mut lock = MESSAGES.lock().unwrap(); - if let Some(ref mut msgs) = *lock { - msgs.push(s); + extern "C" fn record_callback_impl(line: KernelStringSlice, expected_log_lines: Vec<&str>) { + let line_str: &str = unsafe { TryFromStringSlice::try_from_slice(&line).unwrap() }; + let line_str = line_str.to_string(); + let ok = expected_log_lines.is_empty() + || expected_log_lines.iter().any(|expected_log_line| { + let res = line_str.ends_with(expected_log_line); + //println!("Line: {line_str}, expected: {expected_log_line}, res: {res}"); + res + }); + if ok { + let mut lock = MESSAGES.lock().unwrap(); + if let Some(ref mut msgs) = *lock { + msgs.push(line_str); + } } } + extern "C" fn record_callback(line: KernelStringSlice) { + record_callback_impl(line, vec!["Testing 1\n", "Another line\n"]) + } + + extern "C" fn record_callback_2(line: KernelStringSlice) { + record_callback_impl(line, vec!["Testing 2\n", "Yet another line\n"]) + } + fn setup_messages() { *MESSAGES.lock().unwrap() = Some(vec![]); } @@ -481,58 +633,70 @@ mod tests { Some(tstr[..19].to_string()) } + fn check_messages( + expected_lines: Vec<&str>, + expected_time_str: Option, + expected_level_str: &str, + ) { + let lock = MESSAGES.lock().unwrap(); + if let Some(ref msgs) = *lock { + assert_eq!(msgs.len(), expected_lines.len()); + for (got, expect) in msgs.iter().zip(expected_lines) { + println!("Got: {got}"); + assert!(got.ends_with(expect)); + assert!(got.contains(expected_level_str)); + assert!(got.contains("delta_kernel_ffi::ffi_tracing::tests")); + if let Some(ref tstr) = expected_time_str { + assert!(got.contains(tstr)); + } + } + } else { + panic!("Messages wasn't Some"); + } + } + // IMPORTANT: This is the only test that should call the actual `extern "C"` function, as we can // only call it once to set the global subscriber. Other tests ALL need to use // `get_X_dispatcher` and set it locally using `with_default` #[test] - fn info_logs_with_log_line_tracing() { + fn test_enable_log_line_tracing() { let _lock = TEST_LOCK.lock().unwrap(); setup_messages(); unsafe { enable_log_line_tracing(record_callback, Level::INFO); } - let lines = ["Testing 1\n", "Another line\n"]; + let lines = ["Testing 1\n", "Another line\n", "Testing 2\n", "Yet another line\n"]; + let expected_lines = vec!["Testing 1\n", "Another line\n"]; let test_time_str = get_time_test_str(); - for line in lines { + for line in &lines { // remove final newline which will be added back by logging info!("{}", &line[..(line.len() - 1)]); } - let lock = MESSAGES.lock().unwrap(); - if let Some(ref msgs) = *lock { - assert_eq!(msgs.len(), lines.len()); - for (got, expect) in msgs.iter().zip(lines) { - assert!(got.ends_with(expect)); - assert!(got.contains("INFO")); - assert!(got.contains("delta_kernel_ffi::ffi_tracing::tests")); - if let Some(ref tstr) = test_time_str { - assert!(got.contains(tstr)); - } - } - } else { - panic!("Messages wasn't Some"); - } - // ensure we can't setup again + check_messages(expected_lines, test_time_str, "INFO"); + setup_messages(); + + // ensure we can setup again with a new callback and a new tracing level // do in the same test to ensure ordering - let ok = unsafe { - enable_formatted_log_line_tracing( - record_callback, - Level::TRACE, - LogLineFormat::FULL, - true, // ansi - true, // with_time - true, // with_level - true, // with_target - ) - }; - assert!(!ok, "Should have not set up a second time") + let ok = unsafe { enable_log_line_tracing(record_callback_2, Level::DEBUG) }; + assert!(ok, "Failed to set up second time"); + + // ensure both callback and tracing level are reloaded. + let expected_lines = vec!["Testing 2\n", "Yet another line\n"]; + let test_time_str = get_time_test_str(); + for line in &lines { + debug!("{}", &line[..(line.len() - 1)]); + // trace must not be visible in messages, because we changed level to debug + trace!("{}", &line[..(line.len() - 1)]); + } + check_messages(expected_lines, test_time_str, "DEBUG"); } #[test] fn info_logs_with_formatted_log_line_tracing() { let _lock = TEST_LOCK.lock().unwrap(); setup_messages(); - let dispatch = get_log_line_dispatch( + let (dispatch, _, _) = get_log_line_dispatch( record_callback, Level::INFO, LogLineFormat::COMPACT, @@ -565,27 +729,78 @@ mod tests { }) } - static EVENTS_OK: Mutex>> = Mutex::new(None); + static EVENTS_OK: Mutex>> = Mutex::new(None); fn setup_events() { *EVENTS_OK.lock().unwrap() = Some(vec![]); } - extern "C" fn event_callback(event: Event) { + fn events_to_string(events: Vec<(String, tracing::Level)>) -> String { + let events_str = events + .iter() + .map(|(s, lvl)| format!("{}:{}", s, lvl)) + .collect::>() + .join(", "); + events_str + } + + fn convert_level(level: Level) -> tracing::Level { + match level { + Level::ERROR => tracing::Level::ERROR, + Level::WARN => tracing::Level::WARN, + Level::INFO => tracing::Level::INFO, + Level::DEBUG => tracing::Level::DEBUG, + Level::TRACE => tracing::Level::TRACE, + } + } + + extern "C" fn event_callback_impl(event: Event, expected_log_lines: Vec<&str>) { let msg: &str = unsafe { TryFromStringSlice::try_from_slice(&event.message).unwrap() }; let target: &str = unsafe { TryFromStringSlice::try_from_slice(&event.target).unwrap() }; let file: &str = unsafe { TryFromStringSlice::try_from_slice(&event.file).unwrap() }; // file path will use \ on windows use std::path::MAIN_SEPARATOR; - let expected_file = format!("ffi{MAIN_SEPARATOR}src{MAIN_SEPARATOR}ffi_tracing.rs"); + let expected_file = format!("ffi{}src{}ffi_tracing.rs", MAIN_SEPARATOR, MAIN_SEPARATOR); - let ok = event.level == Level::INFO - && target == "delta_kernel_ffi::ffi_tracing::tests" + let ok = target == "delta_kernel_ffi::ffi_tracing::tests" && file == expected_file - && (msg == "Testing 1" || msg == "Another line"); - let mut lock = EVENTS_OK.lock().unwrap(); - if let Some(ref mut events) = *lock { - events.push(ok); + && expected_log_lines + .iter() + .any(|expected_log_line| *expected_log_line == msg); + if ok { + let mut lock = EVENTS_OK.lock().unwrap(); + if let Some(ref mut events) = *lock { + events.push((msg.to_string(), convert_level(event.level))); + } + } + } + + extern "C" fn event_callback(event: Event) { + event_callback_impl(event, vec!["Testing 1", "Another line"]) + } + + extern "C" fn event_callback_2(event: Event) { + event_callback_impl(event, vec!["Testing 2", "Yet another line"]) + } + + fn check_events(expected_level: tracing::Level, expected_messages: Vec<&str>) { + let lock = EVENTS_OK.lock().unwrap(); + if let Some(ref results) = *lock { + assert!(!results.is_empty(), "No events were captured"); + + assert!( + results.iter().all(|(_msg, lvl)| *lvl == expected_level), + "Not all events were {expected_level}" + ); + let events_str = events_to_string((&results).to_vec()); + assert!( + results + .iter() + .all(|(msg, _lvl)| expected_messages.contains(&msg.as_str())), + "Not all messages have expected format: {events_str}" + ) + } else { + panic!("Events wasn't Some"); } } @@ -593,19 +808,53 @@ mod tests { fn trace_event_tracking() { let _lock = TEST_LOCK.lock().unwrap(); setup_events(); - let dispatch = get_event_dispatcher(event_callback, Level::TRACE); + let (dispatch, _filter, _) = get_event_dispatcher(event_callback, Level::TRACE); tracing_core::dispatcher::with_default(&dispatch, || { let lines = ["Testing 1", "Another line"]; for line in lines { info!("{line}"); } }); - let lock = EVENTS_OK.lock().unwrap(); - if let Some(ref results) = *lock { - assert!(results.iter().all(|x| *x)); - } else { - panic!("Events wasn't Some"); + check_events(tracing::Level::INFO, vec!["Testing 1", "Another line"]); + } + + #[test] + #[ignore] // We cannot run this test if test_enable_log_line_tracing was run before - see comment there, however this test works if run individually. + fn test_enable_event_tracing() { + let _lock = TEST_LOCK.lock().unwrap(); + setup_events(); + unsafe { + enable_event_tracing(event_callback, Level::INFO); + } + let lines = ["Testing 1", "Another line", "Testing 2", "Yet another line"]; + let expected_lines = vec!["Testing 1", "Another line"]; + for line in &lines { + // remove final newline which will be added back by logging + info!("{}", &line); + } + + check_events(tracing::Level::INFO, expected_lines); + setup_events(); + assert!(EVENTS_OK + .lock() + .unwrap() + .as_ref() + .map_or(true, |v| v.is_empty())); + + // ensure we can setup again + // do in the same test to ensure ordering + unsafe { + enable_event_tracing(event_callback_2, Level::DEBUG); + }; + + // ensure both callback is reloaded. + let expected_lines = vec!["Testing 2", "Yet another line"]; + for line in &lines { + // changing log level for log_line_callback does not work + debug!("{}", &line); + trace!("{}", &line); } + check_events(tracing::Level::DEBUG, expected_lines); } #[test] From 007e76c74fb1c2c876db2e6954ffdf3a7342fd22 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 27 Aug 2025 17:32:51 +0200 Subject: [PATCH 2/5] Refactor --- ffi/src/ffi_tracing.rs | 147 +++++++++++++++++------------------------ 1 file changed, 62 insertions(+), 85 deletions(-) diff --git a/ffi/src/ffi_tracing.rs b/ffi/src/ffi_tracing.rs index dedc8d06c..2173024e3 100644 --- a/ffi/src/ffi_tracing.rs +++ b/ffi/src/ffi_tracing.rs @@ -278,23 +278,16 @@ where } struct GlobalTracingState { - dispatch: Option, - reload_handle: - Option>, + dispatch: tracing_core::Dispatch, + reload_handle: tracing_subscriber::reload::Handle, /// callback for event subscriber event_callback: Option>>, /// callback for log line subscriber log_line_callback: Option>>, } -static TRACING_STATE: LazyLock> = LazyLock::new(|| { - Mutex::new(GlobalTracingState { - dispatch: None, - reload_handle: None, - event_callback: None, - log_line_callback: None, - }) -}); +static TRACING_STATE: LazyLock>> = + LazyLock::new(|| Mutex::new(None)); fn get_event_dispatcher( callback: TracingEventFn, @@ -326,42 +319,35 @@ fn setup_event_subscriber(callback: TracingEventFn, max_level: Level) -> DeltaRe if !max_level.is_valid() { return Err(Error::generic("max_level out of range")); } - let mut state = TRACING_STATE.lock().unwrap(); - // If already initialized - just update callback and level - if let Some(reload_handle) = &state.reload_handle { + let mut locked_state = TRACING_STATE.lock().unwrap(); + if let Some(state) = locked_state.as_mut() { if let Some(event_callback) = &state.event_callback { *event_callback.lock().unwrap() = callback; + } else { + state.event_callback = Some(Arc::new(Mutex::new(callback))); } - return reload_handle + return state + .reload_handle .reload(LevelFilter::from(max_level)) .map_err(|e| { warn!("Failed to reload tracing level: {e}"); Error::generic(format!("Unable to reload subscriber: {e}")) }); - } - // First-time setup - let (dispatch, reload_handle, event_callback) = get_event_dispatcher(callback, max_level); - match set_global_default(dispatch.clone()) { - Ok(()) => { - state.dispatch = Some(dispatch); - state.reload_handle = Some(reload_handle); - state.event_callback = Some(event_callback); - Ok(()) - } - Err(e) => { - trace!("Failed to set global default: {e}. Will try to reload"); - match reload_handle.reload(LevelFilter::from(max_level)) { - Ok(_) => { - state.dispatch = Some(dispatch); - state.reload_handle = Some(reload_handle); - state.event_callback = Some(event_callback); - Ok(()) - } - Err(e) => { - warn!("Failed to reload fallback tracing level: {e}"); - Err(Error::generic(format!("Unable to reload subscriber: {e}"))) - } + } else { + let (dispatch, reload_handle, event_callback) = get_event_dispatcher(callback, max_level); + match set_global_default(dispatch.clone()) { + Ok(()) => { + *locked_state = Some(GlobalTracingState { + dispatch, + reload_handle, + event_callback: Some(event_callback), + log_line_callback: None, + }); + Ok(()) } + Err(e) => Err(Error::generic(format!( + "Unable to set default subscriber: {e}" + ))), } } } @@ -407,7 +393,7 @@ impl io::Write for BufferedMessageWriter { fn write(&mut self, buf: &[u8]) -> io::Result { self.current_buffer .lock() - .map_err(|_| io::Error::new(io::ErrorKind::Other, "Could not lock buffer"))? + .map_err(|_| io::Error::other("Could not lock buffer"))? .extend_from_slice(buf); Ok(buf.len()) } @@ -501,55 +487,44 @@ fn setup_log_line_subscriber( if !max_level.is_valid() { return Err(Error::generic("max_level out of range")); } - - let mut state = TRACING_STATE.lock().unwrap(); - - // Already initialized - just update callback and level - if let Some(reload_handle) = &state.reload_handle { + let mut locked_state = TRACING_STATE.lock().unwrap(); + if let Some(state) = locked_state.as_mut() { if let Some(log_line_callback) = &state.log_line_callback { *log_line_callback.lock().unwrap() = callback; + } else { + state.log_line_callback = Some(Arc::new(Mutex::new(callback))); } - return reload_handle + return state + .reload_handle .reload(LevelFilter::from(max_level)) .map_err(|e| { warn!("Failed to reload log level: {e}"); Error::generic(format!("Unable to reload subscriber: {e}")) }); - } - - // First-time setup - let (dispatch, reload_handle, log_line_callback) = get_log_line_dispatch( - callback, - max_level, - format, - ansi, - with_time, - with_level, - with_target, - ); - - match set_global_default(dispatch.clone()) { - Ok(()) => { - state.dispatch = Some(dispatch); - state.reload_handle = Some(reload_handle); - state.log_line_callback = Some(log_line_callback); - Ok(()) - } - Err(e) => { - trace!("Failed to set global default subscriber: {e}. Will try to reload"); - match reload_handle.reload(LevelFilter::from(max_level)) { - Ok(_) => { - state.dispatch = Some(dispatch); - state.reload_handle = Some(reload_handle); - state.log_line_callback = Some(log_line_callback); - Ok(()) - } - Err(e) => { - warn!("Failed to reload fallback tracing level: {e}"); - Err(Error::generic(format!("Unable to reload subscriber: {e}"))) - } + } else { + let (dispatch, reload_handle, log_line_callback) = get_log_line_dispatch( + callback, + max_level, + format, + ansi, + with_time, + with_level, + with_target, + ); + match set_global_default(dispatch.clone()) { + Ok(()) => { + *locked_state = Some(GlobalTracingState { + dispatch, + reload_handle, + event_callback: None, + log_line_callback: Some(log_line_callback), + }); + Ok(()) } + Err(e) => Err(Error::generic(format!( + "Unable to set default subscriber: {e}" + ))), } } } @@ -665,7 +640,12 @@ mod tests { unsafe { enable_log_line_tracing(record_callback, Level::INFO); } - let lines = ["Testing 1\n", "Another line\n", "Testing 2\n", "Yet another line\n"]; + let lines = [ + "Testing 1\n", + "Another line\n", + "Testing 2\n", + "Yet another line\n", + ]; let expected_lines = vec!["Testing 1\n", "Another line\n"]; let test_time_str = get_time_test_str(); for line in &lines { @@ -677,7 +657,6 @@ mod tests { setup_messages(); // ensure we can setup again with a new callback and a new tracing level - // do in the same test to ensure ordering let ok = unsafe { enable_log_line_tracing(record_callback_2, Level::DEBUG) }; assert!(ok, "Failed to set up second time"); @@ -829,7 +808,6 @@ mod tests { let lines = ["Testing 1", "Another line", "Testing 2", "Yet another line"]; let expected_lines = vec!["Testing 1", "Another line"]; for line in &lines { - // remove final newline which will be added back by logging info!("{}", &line); } @@ -841,17 +819,16 @@ mod tests { .as_ref() .map_or(true, |v| v.is_empty())); - // ensure we can setup again - // do in the same test to ensure ordering + // ensure we can setup again with a new callback and a new tracing level unsafe { enable_event_tracing(event_callback_2, Level::DEBUG); }; - // ensure both callback is reloaded. + // ensure both callback and tracing level are reloaded. let expected_lines = vec!["Testing 2", "Yet another line"]; for line in &lines { - // changing log level for log_line_callback does not work debug!("{}", &line); + // trace must not be visible in messages, because we changed level to debug trace!("{}", &line); } check_events(tracing::Level::DEBUG, expected_lines); From 9e7b4f0bde27733b7e7e64d582e23379fb4df372 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 24 Sep 2025 16:14:32 +0200 Subject: [PATCH 3/5] Review fixes --- ffi/src/ffi_tracing.rs | 185 +++++++++++++++++++++-------------------- 1 file changed, 97 insertions(+), 88 deletions(-) diff --git a/ffi/src/ffi_tracing.rs b/ffi/src/ffi_tracing.rs index 2173024e3..1df028de6 100644 --- a/ffi/src/ffi_tracing.rs +++ b/ffi/src/ffi_tracing.rs @@ -10,7 +10,6 @@ use tracing::{ Event as TracingEvent, Subscriber, }; -use tracing::trace; use tracing::warn; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::{filter::LevelFilter, layer::Context, registry::LookupSpan, Layer}; @@ -278,18 +277,93 @@ where } struct GlobalTracingState { - dispatch: tracing_core::Dispatch, - reload_handle: tracing_subscriber::reload::Handle, + dispatch: Option, + reload_handle: + Option>, /// callback for event subscriber event_callback: Option>>, /// callback for log line subscriber log_line_callback: Option>>, } -static TRACING_STATE: LazyLock>> = - LazyLock::new(|| Mutex::new(None)); +impl GlobalTracingState { + fn uninitialized() -> Self { + GlobalTracingState { + dispatch: None, + reload_handle: None, + event_callback: None, + log_line_callback: None, + } + } + + fn register_event_callback( + &mut self, + callback: TracingEventFn, + max_level: Level, + ) -> DeltaResult<()> { + if !max_level.is_valid() { + return Err(Error::generic("max_level out of range")); + } + + if let (Some(reload), Some(event_cb)) = (&self.reload_handle, &self.event_callback) { + *event_cb.lock().unwrap() = callback; + return reload.reload(LevelFilter::from(max_level)).map_err(|e| { + warn!("Failed to reload tracing level: {e}"); + Error::generic(format!("Unable to reload subscriber: {e}")) + }); + } -fn get_event_dispatcher( + let (dispatch, reload_handle, event_callback) = create_event_dispatch(callback, max_level); + set_global_default(dispatch.clone())?; + self.dispatch = Some(dispatch); + self.reload_handle = Some(reload_handle); + self.event_callback = Some(event_callback); + Ok(()) + } + + fn register_log_line_callback( + &mut self, + callback: TracingLogLineFn, + max_level: Level, + format: LogLineFormat, + ansi: bool, + with_time: bool, + with_level: bool, + with_target: bool, + ) -> DeltaResult<()> { + if !max_level.is_valid() { + return Err(Error::generic("max_level out of range")); + } + + if let (Some(reload), Some(log_cb)) = (&self.reload_handle, &self.log_line_callback) { + *log_cb.lock().unwrap() = callback; + return reload.reload(LevelFilter::from(max_level)).map_err(|e| { + warn!("Failed to reload log level: {e}"); + Error::generic(format!("Unable to reload subscriber: {e}")) + }); + } + + let (dispatch, reload_handle, log_line_callback) = create_log_line_dispatch( + callback, + max_level, + format, + ansi, + with_time, + with_level, + with_target, + ); + set_global_default(dispatch.clone())?; + self.dispatch = Some(dispatch); + self.reload_handle = Some(reload_handle); + self.log_line_callback = Some(log_line_callback); + Ok(()) + } +} + +static TRACING_STATE: LazyLock> = + LazyLock::new(|| Mutex::new(GlobalTracingState::uninitialized())); + +fn create_event_dispatch( callback: TracingEventFn, max_level: Level, ) -> ( @@ -316,40 +390,8 @@ fn get_event_dispatcher( } fn setup_event_subscriber(callback: TracingEventFn, max_level: Level) -> DeltaResult<()> { - if !max_level.is_valid() { - return Err(Error::generic("max_level out of range")); - } - let mut locked_state = TRACING_STATE.lock().unwrap(); - if let Some(state) = locked_state.as_mut() { - if let Some(event_callback) = &state.event_callback { - *event_callback.lock().unwrap() = callback; - } else { - state.event_callback = Some(Arc::new(Mutex::new(callback))); - } - return state - .reload_handle - .reload(LevelFilter::from(max_level)) - .map_err(|e| { - warn!("Failed to reload tracing level: {e}"); - Error::generic(format!("Unable to reload subscriber: {e}")) - }); - } else { - let (dispatch, reload_handle, event_callback) = get_event_dispatcher(callback, max_level); - match set_global_default(dispatch.clone()) { - Ok(()) => { - *locked_state = Some(GlobalTracingState { - dispatch, - reload_handle, - event_callback: Some(event_callback), - log_line_callback: None, - }); - Ok(()) - } - Err(e) => Err(Error::generic(format!( - "Unable to set default subscriber: {e}" - ))), - } - } + let mut state = TRACING_STATE.lock().unwrap(); + state.register_event_callback(callback, max_level) } // utility code below for setting up the tracing subscriber for log lines @@ -413,7 +455,7 @@ impl<'a> MakeWriter<'a> for BufferedMessageWriter { } } -fn get_log_line_dispatch( +fn create_log_line_dispatch( callback: TracingLogLineFn, max_level: Level, format: LogLineFormat, @@ -484,49 +526,16 @@ fn setup_log_line_subscriber( with_level: bool, with_target: bool, ) -> DeltaResult<()> { - if !max_level.is_valid() { - return Err(Error::generic("max_level out of range")); - } - let mut locked_state = TRACING_STATE.lock().unwrap(); - if let Some(state) = locked_state.as_mut() { - if let Some(log_line_callback) = &state.log_line_callback { - *log_line_callback.lock().unwrap() = callback; - } else { - state.log_line_callback = Some(Arc::new(Mutex::new(callback))); - } - - return state - .reload_handle - .reload(LevelFilter::from(max_level)) - .map_err(|e| { - warn!("Failed to reload log level: {e}"); - Error::generic(format!("Unable to reload subscriber: {e}")) - }); - } else { - let (dispatch, reload_handle, log_line_callback) = get_log_line_dispatch( - callback, - max_level, - format, - ansi, - with_time, - with_level, - with_target, - ); - match set_global_default(dispatch.clone()) { - Ok(()) => { - *locked_state = Some(GlobalTracingState { - dispatch, - reload_handle, - event_callback: None, - log_line_callback: Some(log_line_callback), - }); - Ok(()) - } - Err(e) => Err(Error::generic(format!( - "Unable to set default subscriber: {e}" - ))), - } - } + let mut state = TRACING_STATE.lock().unwrap(); + state.register_log_line_callback( + callback, + max_level, + format, + ansi, + with_time, + with_level, + with_target, + ) } #[cfg(test)] @@ -547,7 +556,7 @@ mod tests { static TEST_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); static MESSAGES: Mutex>> = Mutex::new(None); - extern "C" fn record_callback_impl(line: KernelStringSlice, expected_log_lines: Vec<&str>) { + fn record_callback_impl(line: KernelStringSlice, expected_log_lines: Vec<&str>) { let line_str: &str = unsafe { TryFromStringSlice::try_from_slice(&line).unwrap() }; let line_str = line_str.to_string(); let ok = expected_log_lines.is_empty() @@ -675,7 +684,7 @@ mod tests { fn info_logs_with_formatted_log_line_tracing() { let _lock = TEST_LOCK.lock().unwrap(); setup_messages(); - let (dispatch, _, _) = get_log_line_dispatch( + let (dispatch, _, _) = create_log_line_dispatch( record_callback, Level::INFO, LogLineFormat::COMPACT, @@ -732,7 +741,7 @@ mod tests { } } - extern "C" fn event_callback_impl(event: Event, expected_log_lines: Vec<&str>) { + fn event_callback_impl(event: Event, expected_log_lines: Vec<&str>) { let msg: &str = unsafe { TryFromStringSlice::try_from_slice(&event.message).unwrap() }; let target: &str = unsafe { TryFromStringSlice::try_from_slice(&event.target).unwrap() }; let file: &str = unsafe { TryFromStringSlice::try_from_slice(&event.file).unwrap() }; @@ -787,7 +796,7 @@ mod tests { fn trace_event_tracking() { let _lock = TEST_LOCK.lock().unwrap(); setup_events(); - let (dispatch, _filter, _) = get_event_dispatcher(event_callback, Level::TRACE); + let (dispatch, _filter, _) = create_event_dispatch(event_callback, Level::TRACE); tracing_core::dispatcher::with_default(&dispatch, || { let lines = ["Testing 1", "Another line"]; for line in lines { From 7a0c019ee7dd3750d984720106471663b94e8fa0 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 8 Oct 2025 13:42:34 +0200 Subject: [PATCH 4/5] Fix review comments; fix CI --- ffi/src/ffi_tracing.rs | 45 ++++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/ffi/src/ffi_tracing.rs b/ffi/src/ffi_tracing.rs index 1df028de6..4b78278b9 100644 --- a/ffi/src/ffi_tracing.rs +++ b/ffi/src/ffi_tracing.rs @@ -5,12 +5,11 @@ use std::sync::{Arc, Mutex}; use std::{fmt, io}; use delta_kernel::{DeltaResult, Error}; +use tracing::{error, warn}; use tracing::{ field::{Field as TracingField, Visit}, Event as TracingEvent, Subscriber, }; - -use tracing::warn; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::{filter::LevelFilter, layer::Context, registry::LookupSpan, Layer}; @@ -270,8 +269,11 @@ where line: metadata.line().unwrap_or(0), file: kernel_string_slice!(file), }; - let cb = self.callback.lock().unwrap(); - (cb)(event); + if let Ok(cb) = self.callback.lock() { + (cb)(event); + } else { + error!("Failed to lock event callback (mutex poisoned)."); + } } } } @@ -306,7 +308,11 @@ impl GlobalTracingState { } if let (Some(reload), Some(event_cb)) = (&self.reload_handle, &self.event_callback) { - *event_cb.lock().unwrap() = callback; + if let Ok(mut event_cb) = event_cb.lock() { + *event_cb = callback; + } else { + error!("Failed to acquire lock for event callback (mutex poisoned)."); + } return reload.reload(LevelFilter::from(max_level)).map_err(|e| { warn!("Failed to reload tracing level: {e}"); Error::generic(format!("Unable to reload subscriber: {e}")) @@ -321,6 +327,7 @@ impl GlobalTracingState { Ok(()) } + #[allow(clippy::too_many_arguments)] fn register_log_line_callback( &mut self, callback: TracingLogLineFn, @@ -336,7 +343,11 @@ impl GlobalTracingState { } if let (Some(reload), Some(log_cb)) = (&self.reload_handle, &self.log_line_callback) { - *log_cb.lock().unwrap() = callback; + if let Ok(mut log_cb) = log_cb.lock() { + *log_cb = callback; + } else { + error!("Failed to acquire lock for log callback (mutex poisoned)."); + } return reload.reload(LevelFilter::from(max_level)).map_err(|e| { warn!("Failed to reload log level: {e}"); Error::generic(format!("Unable to reload subscriber: {e}")) @@ -412,15 +423,21 @@ where Ok(mut buf) => { let message = String::from_utf8_lossy(&buf); let message = kernel_string_slice!(message); - let cb = self.callback.lock().unwrap(); - (cb)(message); + if let Ok(cb) = self.callback.lock() { + (cb)(message); + } else { + error!("Failed to lock event callback (mutex poisoned)."); + } buf.clear(); } Err(_) => { let message = "INTERNAL KERNEL ERROR: Could not lock message buffer."; let message = kernel_string_slice!(message); - let cb = self.callback.lock().unwrap(); - (cb)(message); + if let Ok(cb) = self.callback.lock() { + (cb)(message); + } else { + error!("Failed to lock event callback (mutex poisoned)."); + } } } } @@ -561,9 +578,7 @@ mod tests { let line_str = line_str.to_string(); let ok = expected_log_lines.is_empty() || expected_log_lines.iter().any(|expected_log_line| { - let res = line_str.ends_with(expected_log_line); - //println!("Line: {line_str}, expected: {expected_log_line}, res: {res}"); - res + line_str.ends_with(expected_log_line) }); if ok { let mut lock = MESSAGES.lock().unwrap(); @@ -780,7 +795,7 @@ mod tests { results.iter().all(|(_msg, lvl)| *lvl == expected_level), "Not all events were {expected_level}" ); - let events_str = events_to_string((&results).to_vec()); + let events_str = events_to_string(results.to_vec()); assert!( results .iter() @@ -826,7 +841,7 @@ mod tests { .lock() .unwrap() .as_ref() - .map_or(true, |v| v.is_empty())); + .is_none_or(|v| v.is_empty())); // ensure we can setup again with a new callback and a new tracing level unsafe { From 95672f9306d2d17676c9e96ffda605b1186340af Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 27 Oct 2025 14:31:58 +0100 Subject: [PATCH 5/5] Review fixes --- ffi/src/ffi_tracing.rs | 55 +++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/ffi/src/ffi_tracing.rs b/ffi/src/ffi_tracing.rs index 4b78278b9..33be06eef 100644 --- a/ffi/src/ffi_tracing.rs +++ b/ffi/src/ffi_tracing.rs @@ -308,11 +308,10 @@ impl GlobalTracingState { } if let (Some(reload), Some(event_cb)) = (&self.reload_handle, &self.event_callback) { - if let Ok(mut event_cb) = event_cb.lock() { - *event_cb = callback; - } else { - error!("Failed to acquire lock for event callback (mutex poisoned)."); - } + let mut event_cb = event_cb.lock().map_err(|_e| { + Error::generic("Failed to acquire lock for event callback (mutex poisoned).") + })?; + *event_cb = callback; return reload.reload(LevelFilter::from(max_level)).map_err(|e| { warn!("Failed to reload tracing level: {e}"); Error::generic(format!("Unable to reload subscriber: {e}")) @@ -343,11 +342,10 @@ impl GlobalTracingState { } if let (Some(reload), Some(log_cb)) = (&self.reload_handle, &self.log_line_callback) { - if let Ok(mut log_cb) = log_cb.lock() { - *log_cb = callback; - } else { - error!("Failed to acquire lock for log callback (mutex poisoned)."); - } + let mut log_cb = log_cb.lock().map_err(|_e| { + Error::generic("Failed to acquire lock for log callback (mutex poisoned).") + })?; + *log_cb = callback; return reload.reload(LevelFilter::from(max_level)).map_err(|e| { warn!("Failed to reload log level: {e}"); Error::generic(format!("Unable to reload subscriber: {e}")) @@ -401,7 +399,9 @@ fn create_event_dispatch( } fn setup_event_subscriber(callback: TracingEventFn, max_level: Level) -> DeltaResult<()> { - let mut state = TRACING_STATE.lock().unwrap(); + let mut state = TRACING_STATE + .lock() + .map_err(|_e| Error::generic("Poisoned mutex while setting up event subscriber"))?; state.register_event_callback(callback, max_level) } @@ -543,7 +543,9 @@ fn setup_log_line_subscriber( with_level: bool, with_target: bool, ) -> DeltaResult<()> { - let mut state = TRACING_STATE.lock().unwrap(); + let mut state = TRACING_STATE + .lock() + .map_err(|_e| Error::generic("Poisoned mutex while setting up log_line_subscriber"))?; state.register_log_line_callback( callback, max_level, @@ -577,9 +579,9 @@ mod tests { let line_str: &str = unsafe { TryFromStringSlice::try_from_slice(&line).unwrap() }; let line_str = line_str.to_string(); let ok = expected_log_lines.is_empty() - || expected_log_lines.iter().any(|expected_log_line| { - line_str.ends_with(expected_log_line) - }); + || expected_log_lines + .iter() + .any(|expected_log_line| line_str.ends_with(expected_log_line)); if ok { let mut lock = MESSAGES.lock().unwrap(); if let Some(ref mut msgs) = *lock { @@ -638,19 +640,18 @@ mod tests { expected_level_str: &str, ) { let lock = MESSAGES.lock().unwrap(); - if let Some(ref msgs) = *lock { - assert_eq!(msgs.len(), expected_lines.len()); - for (got, expect) in msgs.iter().zip(expected_lines) { - println!("Got: {got}"); - assert!(got.ends_with(expect)); - assert!(got.contains(expected_level_str)); - assert!(got.contains("delta_kernel_ffi::ffi_tracing::tests")); - if let Some(ref tstr) = expected_time_str { - assert!(got.contains(tstr)); - } - } - } else { + let Some(ref msgs) = *lock else { panic!("Messages wasn't Some"); + }; + assert_eq!(msgs.len(), expected_lines.len()); + for (got, expect) in msgs.iter().zip(expected_lines) { + println!("Got: {got}"); + assert!(got.ends_with(expect)); + assert!(got.contains(expected_level_str)); + assert!(got.contains("delta_kernel_ffi::ffi_tracing::tests")); + if let Some(ref tstr) = expected_time_str { + assert!(got.contains(tstr)); + } } }