diff --git a/ffi/src/ffi_tracing.rs b/ffi/src/ffi_tracing.rs index a84f8224f..33be06eef 100644 --- a/ffi/src/ffi_tracing.rs +++ b/ffi/src/ffi_tracing.rs @@ -1,9 +1,11 @@ //! FFI functions to allow engines to receive log and tracing events from kernel +use std::sync::LazyLock; use std::sync::{Arc, Mutex}; use std::{fmt, io}; use delta_kernel::{DeltaResult, Error}; +use tracing::{error, warn}; use tracing::{ field::{Field as TracingField, Visit}, Event as TracingEvent, Subscriber, @@ -242,7 +244,7 @@ impl Visit for MessageFieldVisitor { } struct EventLayer { - callback: TracingEventFn, + callback: Arc>, } impl Layer for EventLayer @@ -267,25 +269,140 @@ where line: metadata.line().unwrap_or(0), file: kernel_string_slice!(file), }; - (self.callback)(event); + if let Ok(cb) = self.callback.lock() { + (cb)(event); + } else { + error!("Failed to lock event callback (mutex poisoned)."); + } + } + } +} + +struct GlobalTracingState { + dispatch: Option, + reload_handle: + Option>, + /// callback for event subscriber + event_callback: Option>>, + /// callback for log line subscriber + log_line_callback: Option>>, +} + +impl GlobalTracingState { + fn uninitialized() -> Self { + GlobalTracingState { + dispatch: None, + reload_handle: None, + event_callback: None, + log_line_callback: None, + } + } + + fn register_event_callback( + &mut self, + callback: TracingEventFn, + max_level: Level, + ) -> DeltaResult<()> { + if !max_level.is_valid() { + return Err(Error::generic("max_level out of range")); + } + + if let (Some(reload), Some(event_cb)) = (&self.reload_handle, &self.event_callback) { + let mut event_cb = event_cb.lock().map_err(|_e| { + Error::generic("Failed to acquire lock for event callback (mutex poisoned).") + })?; + *event_cb = callback; + return reload.reload(LevelFilter::from(max_level)).map_err(|e| { + warn!("Failed to reload tracing level: {e}"); + Error::generic(format!("Unable to reload subscriber: {e}")) + }); } + + let (dispatch, reload_handle, event_callback) = create_event_dispatch(callback, max_level); + set_global_default(dispatch.clone())?; + self.dispatch = Some(dispatch); + self.reload_handle = Some(reload_handle); + self.event_callback = Some(event_callback); + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn register_log_line_callback( + &mut self, + callback: TracingLogLineFn, + max_level: Level, + format: LogLineFormat, + ansi: bool, + with_time: bool, + with_level: bool, + with_target: bool, + ) -> DeltaResult<()> { + if !max_level.is_valid() { + return Err(Error::generic("max_level out of range")); + } + + if let (Some(reload), Some(log_cb)) = (&self.reload_handle, &self.log_line_callback) { + let mut log_cb = log_cb.lock().map_err(|_e| { + Error::generic("Failed to acquire lock for log callback (mutex poisoned).") + })?; + *log_cb = callback; + return reload.reload(LevelFilter::from(max_level)).map_err(|e| { + warn!("Failed to reload log level: {e}"); + Error::generic(format!("Unable to reload subscriber: {e}")) + }); + } + + let (dispatch, reload_handle, log_line_callback) = create_log_line_dispatch( + callback, + max_level, + format, + ansi, + with_time, + with_level, + with_target, + ); + set_global_default(dispatch.clone())?; + self.dispatch = Some(dispatch); + self.reload_handle = Some(reload_handle); + self.log_line_callback = Some(log_line_callback); + Ok(()) } } -fn get_event_dispatcher(callback: TracingEventFn, max_level: Level) -> tracing_core::Dispatch { +static TRACING_STATE: LazyLock> = + LazyLock::new(|| Mutex::new(GlobalTracingState::uninitialized())); + +fn create_event_dispatch( + callback: TracingEventFn, + max_level: Level, +) -> ( + tracing_core::Dispatch, + tracing_subscriber::reload::Handle, + Arc>, +) { use tracing_subscriber::{layer::SubscriberExt, registry::Registry}; - let filter: LevelFilter = max_level.into(); - let event_layer = EventLayer { callback }.with_filter(filter); + + let callback_arc = Arc::new(Mutex::new(callback)); + let (filter_layer, reload_handle) = + tracing_subscriber::reload::Layer::new(LevelFilter::from(max_level)); + let event_layer = EventLayer { + callback: callback_arc.clone(), + } + .with_filter(filter_layer); + let subscriber = Registry::default().with(event_layer); - tracing_core::Dispatch::new(subscriber) + ( + tracing_core::Dispatch::new(subscriber), + reload_handle, + callback_arc, + ) } fn setup_event_subscriber(callback: TracingEventFn, max_level: Level) -> DeltaResult<()> { - if !max_level.is_valid() { - return Err(Error::generic("max_level out of range")); - } - let dispatch = get_event_dispatcher(callback, max_level); - set_global_default(dispatch) + let mut state = TRACING_STATE + .lock() + .map_err(|_e| Error::generic("Poisoned mutex while setting up event subscriber"))?; + state.register_event_callback(callback, max_level) } // utility code below for setting up the tracing subscriber for log lines @@ -294,7 +411,7 @@ type SharedBuffer = Arc>>; struct TriggerLayer { buf: SharedBuffer, - callback: TracingLogLineFn, + callback: Arc>, } impl Layer for TriggerLayer @@ -306,13 +423,21 @@ where Ok(mut buf) => { let message = String::from_utf8_lossy(&buf); let message = kernel_string_slice!(message); - (self.callback)(message); + if let Ok(cb) = self.callback.lock() { + (cb)(message); + } else { + error!("Failed to lock event callback (mutex poisoned)."); + } buf.clear(); } Err(_) => { let message = "INTERNAL KERNEL ERROR: Could not lock message buffer."; let message = kernel_string_slice!(message); - (self.callback)(message); + if let Ok(cb) = self.callback.lock() { + (cb)(message); + } else { + error!("Failed to lock event callback (mutex poisoned)."); + } } } } @@ -347,7 +472,7 @@ impl<'a> MakeWriter<'a> for BufferedMessageWriter { } } -fn get_log_line_dispatch( +fn create_log_line_dispatch( callback: TracingLogLineFn, max_level: Level, format: LogLineFormat, @@ -355,34 +480,47 @@ fn get_log_line_dispatch( with_time: bool, with_level: bool, with_target: bool, -) -> tracing_core::Dispatch { +) -> ( + tracing_core::Dispatch, + tracing_subscriber::reload::Handle, + Arc>, +) { use tracing_subscriber::{layer::SubscriberExt, registry::Registry}; + let buffer = Arc::new(Mutex::new(vec![])); let writer = BufferedMessageWriter { current_buffer: buffer.clone(), }; + let fmt_layer = tracing_subscriber::fmt::layer() .with_writer(writer) .with_ansi(ansi) .with_level(with_level) .with_target(with_target); - let filter: LevelFilter = max_level.into(); + + let (filter_layer, reload_handle) = + tracing_subscriber::reload::Layer::new(LevelFilter::from(max_level)); + + let callback_arc = Arc::new(Mutex::new(callback)); let tracking_layer = TriggerLayer { buf: buffer.clone(), - callback, + callback: callback_arc.clone(), }; - // This repeats some code, but avoids some insane generic wrangling if we try to abstract the - // type of `fmt_layer` over the formatter macro_rules! setup_subscriber { ($($transform:ident()).*) => {{ - let fmt_layer = fmt_layer$(.$transform())*.with_filter(filter); let subscriber = Registry::default() + .with(filter_layer) .with(fmt_layer) - .with(tracking_layer.with_filter(filter)); - tracing_core::Dispatch::new(subscriber) + .with(tracking_layer); + ( + tracing_core::Dispatch::new(subscriber), + reload_handle, + callback_arc.clone(), + ) }}; } + use LogLineFormat::*; match (format, with_time) { (FULL, true) => setup_subscriber!(), @@ -405,10 +543,10 @@ fn setup_log_line_subscriber( with_level: bool, with_target: bool, ) -> DeltaResult<()> { - if !max_level.is_valid() { - return Err(Error::generic("max_level out of range")); - } - let dispatch = get_log_line_dispatch( + let mut state = TRACING_STATE + .lock() + .map_err(|_e| Error::generic("Poisoned mutex while setting up log_line_subscriber"))?; + state.register_log_line_callback( callback, max_level, format, @@ -416,15 +554,16 @@ fn setup_log_line_subscriber( with_time, with_level, with_target, - ); - set_global_default(dispatch) + ) } #[cfg(test)] mod tests { use std::sync::LazyLock; + use tracing::debug; use tracing::info; + use tracing::trace; use tracing_subscriber::fmt::time::FormatTime; use crate::TryFromStringSlice; @@ -436,15 +575,29 @@ mod tests { static TEST_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); static MESSAGES: Mutex>> = Mutex::new(None); - extern "C" fn record_callback(line: KernelStringSlice) { - let s: &str = unsafe { TryFromStringSlice::try_from_slice(&line).unwrap() }; - let s = s.to_string(); - let mut lock = MESSAGES.lock().unwrap(); - if let Some(ref mut msgs) = *lock { - msgs.push(s); + fn record_callback_impl(line: KernelStringSlice, expected_log_lines: Vec<&str>) { + let line_str: &str = unsafe { TryFromStringSlice::try_from_slice(&line).unwrap() }; + let line_str = line_str.to_string(); + let ok = expected_log_lines.is_empty() + || expected_log_lines + .iter() + .any(|expected_log_line| line_str.ends_with(expected_log_line)); + if ok { + let mut lock = MESSAGES.lock().unwrap(); + if let Some(ref mut msgs) = *lock { + msgs.push(line_str); + } } } + extern "C" fn record_callback(line: KernelStringSlice) { + record_callback_impl(line, vec!["Testing 1\n", "Another line\n"]) + } + + extern "C" fn record_callback_2(line: KernelStringSlice) { + record_callback_impl(line, vec!["Testing 2\n", "Yet another line\n"]) + } + fn setup_messages() { *MESSAGES.lock().unwrap() = Some(vec![]); } @@ -481,58 +634,73 @@ mod tests { Some(tstr[..19].to_string()) } + fn check_messages( + expected_lines: Vec<&str>, + expected_time_str: Option, + expected_level_str: &str, + ) { + let lock = MESSAGES.lock().unwrap(); + let Some(ref msgs) = *lock else { + panic!("Messages wasn't Some"); + }; + assert_eq!(msgs.len(), expected_lines.len()); + for (got, expect) in msgs.iter().zip(expected_lines) { + println!("Got: {got}"); + assert!(got.ends_with(expect)); + assert!(got.contains(expected_level_str)); + assert!(got.contains("delta_kernel_ffi::ffi_tracing::tests")); + if let Some(ref tstr) = expected_time_str { + assert!(got.contains(tstr)); + } + } + } + // IMPORTANT: This is the only test that should call the actual `extern "C"` function, as we can // only call it once to set the global subscriber. Other tests ALL need to use // `get_X_dispatcher` and set it locally using `with_default` #[test] - fn info_logs_with_log_line_tracing() { + fn test_enable_log_line_tracing() { let _lock = TEST_LOCK.lock().unwrap(); setup_messages(); unsafe { enable_log_line_tracing(record_callback, Level::INFO); } - let lines = ["Testing 1\n", "Another line\n"]; + let lines = [ + "Testing 1\n", + "Another line\n", + "Testing 2\n", + "Yet another line\n", + ]; + let expected_lines = vec!["Testing 1\n", "Another line\n"]; let test_time_str = get_time_test_str(); - for line in lines { + for line in &lines { // remove final newline which will be added back by logging info!("{}", &line[..(line.len() - 1)]); } - let lock = MESSAGES.lock().unwrap(); - if let Some(ref msgs) = *lock { - assert_eq!(msgs.len(), lines.len()); - for (got, expect) in msgs.iter().zip(lines) { - assert!(got.ends_with(expect)); - assert!(got.contains("INFO")); - assert!(got.contains("delta_kernel_ffi::ffi_tracing::tests")); - if let Some(ref tstr) = test_time_str { - assert!(got.contains(tstr)); - } - } - } else { - panic!("Messages wasn't Some"); - } - // ensure we can't setup again - // do in the same test to ensure ordering - let ok = unsafe { - enable_formatted_log_line_tracing( - record_callback, - Level::TRACE, - LogLineFormat::FULL, - true, // ansi - true, // with_time - true, // with_level - true, // with_target - ) - }; - assert!(!ok, "Should have not set up a second time") + check_messages(expected_lines, test_time_str, "INFO"); + setup_messages(); + + // ensure we can setup again with a new callback and a new tracing level + let ok = unsafe { enable_log_line_tracing(record_callback_2, Level::DEBUG) }; + assert!(ok, "Failed to set up second time"); + + // ensure both callback and tracing level are reloaded. + let expected_lines = vec!["Testing 2\n", "Yet another line\n"]; + let test_time_str = get_time_test_str(); + for line in &lines { + debug!("{}", &line[..(line.len() - 1)]); + // trace must not be visible in messages, because we changed level to debug + trace!("{}", &line[..(line.len() - 1)]); + } + check_messages(expected_lines, test_time_str, "DEBUG"); } #[test] fn info_logs_with_formatted_log_line_tracing() { let _lock = TEST_LOCK.lock().unwrap(); setup_messages(); - let dispatch = get_log_line_dispatch( + let (dispatch, _, _) = create_log_line_dispatch( record_callback, Level::INFO, LogLineFormat::COMPACT, @@ -565,27 +733,78 @@ mod tests { }) } - static EVENTS_OK: Mutex>> = Mutex::new(None); + static EVENTS_OK: Mutex>> = Mutex::new(None); fn setup_events() { *EVENTS_OK.lock().unwrap() = Some(vec![]); } - extern "C" fn event_callback(event: Event) { + fn events_to_string(events: Vec<(String, tracing::Level)>) -> String { + let events_str = events + .iter() + .map(|(s, lvl)| format!("{}:{}", s, lvl)) + .collect::>() + .join(", "); + events_str + } + + fn convert_level(level: Level) -> tracing::Level { + match level { + Level::ERROR => tracing::Level::ERROR, + Level::WARN => tracing::Level::WARN, + Level::INFO => tracing::Level::INFO, + Level::DEBUG => tracing::Level::DEBUG, + Level::TRACE => tracing::Level::TRACE, + } + } + + fn event_callback_impl(event: Event, expected_log_lines: Vec<&str>) { let msg: &str = unsafe { TryFromStringSlice::try_from_slice(&event.message).unwrap() }; let target: &str = unsafe { TryFromStringSlice::try_from_slice(&event.target).unwrap() }; let file: &str = unsafe { TryFromStringSlice::try_from_slice(&event.file).unwrap() }; // file path will use \ on windows use std::path::MAIN_SEPARATOR; - let expected_file = format!("ffi{MAIN_SEPARATOR}src{MAIN_SEPARATOR}ffi_tracing.rs"); + let expected_file = format!("ffi{}src{}ffi_tracing.rs", MAIN_SEPARATOR, MAIN_SEPARATOR); - let ok = event.level == Level::INFO - && target == "delta_kernel_ffi::ffi_tracing::tests" + let ok = target == "delta_kernel_ffi::ffi_tracing::tests" && file == expected_file - && (msg == "Testing 1" || msg == "Another line"); - let mut lock = EVENTS_OK.lock().unwrap(); - if let Some(ref mut events) = *lock { - events.push(ok); + && expected_log_lines + .iter() + .any(|expected_log_line| *expected_log_line == msg); + if ok { + let mut lock = EVENTS_OK.lock().unwrap(); + if let Some(ref mut events) = *lock { + events.push((msg.to_string(), convert_level(event.level))); + } + } + } + + extern "C" fn event_callback(event: Event) { + event_callback_impl(event, vec!["Testing 1", "Another line"]) + } + + extern "C" fn event_callback_2(event: Event) { + event_callback_impl(event, vec!["Testing 2", "Yet another line"]) + } + + fn check_events(expected_level: tracing::Level, expected_messages: Vec<&str>) { + let lock = EVENTS_OK.lock().unwrap(); + if let Some(ref results) = *lock { + assert!(!results.is_empty(), "No events were captured"); + + assert!( + results.iter().all(|(_msg, lvl)| *lvl == expected_level), + "Not all events were {expected_level}" + ); + let events_str = events_to_string(results.to_vec()); + assert!( + results + .iter() + .all(|(msg, _lvl)| expected_messages.contains(&msg.as_str())), + "Not all messages have expected format: {events_str}" + ) + } else { + panic!("Events wasn't Some"); } } @@ -593,19 +812,51 @@ mod tests { fn trace_event_tracking() { let _lock = TEST_LOCK.lock().unwrap(); setup_events(); - let dispatch = get_event_dispatcher(event_callback, Level::TRACE); + let (dispatch, _filter, _) = create_event_dispatch(event_callback, Level::TRACE); tracing_core::dispatcher::with_default(&dispatch, || { let lines = ["Testing 1", "Another line"]; for line in lines { info!("{line}"); } }); - let lock = EVENTS_OK.lock().unwrap(); - if let Some(ref results) = *lock { - assert!(results.iter().all(|x| *x)); - } else { - panic!("Events wasn't Some"); + check_events(tracing::Level::INFO, vec!["Testing 1", "Another line"]); + } + + #[test] + #[ignore] // We cannot run this test if test_enable_log_line_tracing was run before - see comment there, however this test works if run individually. + fn test_enable_event_tracing() { + let _lock = TEST_LOCK.lock().unwrap(); + setup_events(); + unsafe { + enable_event_tracing(event_callback, Level::INFO); + } + let lines = ["Testing 1", "Another line", "Testing 2", "Yet another line"]; + let expected_lines = vec!["Testing 1", "Another line"]; + for line in &lines { + info!("{}", &line); + } + + check_events(tracing::Level::INFO, expected_lines); + setup_events(); + assert!(EVENTS_OK + .lock() + .unwrap() + .as_ref() + .is_none_or(|v| v.is_empty())); + + // ensure we can setup again with a new callback and a new tracing level + unsafe { + enable_event_tracing(event_callback_2, Level::DEBUG); + }; + + // ensure both callback and tracing level are reloaded. + let expected_lines = vec!["Testing 2", "Yet another line"]; + for line in &lines { + debug!("{}", &line); + // trace must not be visible in messages, because we changed level to debug + trace!("{}", &line); } + check_events(tracing::Level::DEBUG, expected_lines); } #[test]