From 5cb8e3a1c3d52239bc68b66d6402bd959812ce61 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 23 Oct 2025 12:45:32 +0200 Subject: [PATCH 1/2] feat: tracing spans across threadpool Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/kernel/mod.rs | 23 +++++++++++++++++++++-- crates/core/src/kernel/snapshot/mod.rs | 21 ++++++++++++--------- crates/core/src/logstore/mod.rs | 5 ++--- crates/core/src/protocol/checkpoints.rs | 10 +++++----- 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/crates/core/src/kernel/mod.rs b/crates/core/src/kernel/mod.rs index adc22d6690..7fea67efec 100644 --- a/crates/core/src/kernel/mod.rs +++ b/crates/core/src/kernel/mod.rs @@ -2,9 +2,11 @@ //! //! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log. -use std::sync::{Arc, LazyLock}; - use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler; +use std::sync::{Arc, LazyLock}; +use tokio::task::JoinHandle; +use tracing::dispatcher; +use tracing::Span; pub mod arrow; pub mod error; @@ -23,3 +25,20 @@ pub use snapshot::*; pub(crate) static ARROW_HANDLER: LazyLock> = LazyLock::new(|| Arc::new(ArrowEvaluationHandler {})); + +pub(crate) fn spawn_blocking_with_span(f: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + // Capture the current dispatcher and span + let dispatch = dispatcher::get_default(|d| d.clone()); + let span = Span::current(); + + tokio::task::spawn_blocking(move || { + dispatcher::with_default(&dispatch, || { + let _enter = span.enter(); + f() + }) + }) +} diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index b858dcda86..810b899af5 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -40,11 +40,12 @@ use object_store::path::Path; use object_store::ObjectStore; use serde_json::Deserializer; use tokio::task::spawn_blocking; +use tracing::Instrument; use url::Url; use super::{Action, CommitInfo, Metadata, Protocol}; use crate::kernel::arrow::engine_ext::{kernel_to_arrow, ExpressionEvaluatorExt}; -use crate::kernel::{StructType, ARROW_HANDLER}; +use crate::kernel::{spawn_blocking_with_span, StructType, ARROW_HANDLER}; use crate::logstore::{LogStore, LogStoreExt}; use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter}; @@ -80,7 +81,7 @@ impl Snapshot { config: DeltaTableConfig, version: Option, ) -> DeltaResult { - let snapshot = match spawn_blocking(move || { + let snapshot = match spawn_blocking_with_span(move || { let mut builder = KernelSnapshot::builder_for(table_root); if let Some(version) = version { builder = builder.at_version(version); @@ -162,7 +163,7 @@ impl Snapshot { // TODO: bundle operation id with log store ... let engine = log_store.engine(None); let current = self.inner.clone(); - let snapshot = spawn_blocking(move || { + let snapshot = spawn_blocking_with_span(move || { let mut builder = KernelSnapshot::builder_from(current); if let Some(version) = target_version { builder = builder.at_version(version); @@ -432,9 +433,10 @@ impl Snapshot { // TODO: bundle operation id with log store ... let engine = log_store.engine(None); let inner = self.inner.clone(); - let version = spawn_blocking(move || inner.get_app_id_version(&app_id, engine.as_ref())) - .await - .map_err(|e| DeltaTableError::GenericError { source: e.into() })??; + let version = + spawn_blocking_with_span(move || inner.get_app_id_version(&app_id, engine.as_ref())) + .await + .map_err(|e| DeltaTableError::GenericError { source: e.into() })??; Ok(version) } @@ -451,9 +453,10 @@ impl Snapshot { let engine = log_store.engine(None); let inner = self.inner.clone(); let domain = domain.to_string(); - let metadata = spawn_blocking(move || inner.get_domain_metadata(&domain, engine.as_ref())) - .await - .map_err(|e| DeltaTableError::GenericError { source: e.into() })??; + let metadata = + spawn_blocking_with_span(move || inner.get_domain_metadata(&domain, engine.as_ref())) + .await + .map_err(|e| DeltaTableError::GenericError { source: e.into() })??; Ok(metadata) } } diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 8dd05508a4..08b65e8419 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -69,13 +69,12 @@ use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; use serde_json::Deserializer; use tokio::runtime::RuntimeFlavor; -use tokio::task::spawn_blocking; use tracing::*; use url::Url; use uuid::Uuid; use crate::kernel::transaction::TransactionError; -use crate::kernel::Action; +use crate::kernel::{spawn_blocking_with_span, Action}; use crate::{DeltaResult, DeltaTableError}; pub use self::config::StorageConfig; @@ -660,7 +659,7 @@ pub async fn get_latest_version( let storage = log_store.engine(None).storage_handler(); let log_root = log_store.log_root_url(); - let segment = spawn_blocking(move || { + let segment = spawn_blocking_with_span(move || { LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None) }) .await diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index e050ea8948..cab278f250 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -17,10 +17,10 @@ use object_store::ObjectStore; use parquet::arrow::async_writer::ParquetObjectWriter; use parquet::arrow::AsyncArrowWriter; use regex::Regex; -use tokio::task::spawn_blocking; use tracing::{debug, error}; use uuid::Uuid; +use crate::kernel::spawn_blocking_with_span; use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX}; use crate::table::config::TablePropertiesExt as _; use crate::{open_table_with_version, DeltaTable}; @@ -45,7 +45,7 @@ pub(crate) async fn create_checkpoint_for( let engine = log_store.engine(operation_id); let task_engine = engine.clone(); - let snapshot = spawn_blocking(move || { + let snapshot = spawn_blocking_with_span(move || { Snapshot::builder_for(table_root) .at_version(version) .build(task_engine.as_ref()) @@ -59,7 +59,7 @@ pub(crate) async fn create_checkpoint_for( let cp_path = Path::from_url_path(cp_url.path())?; let mut cp_data = cp_writer.checkpoint_data(engine.as_ref())?; - let (first_batch, mut cp_data) = spawn_blocking(move || { + let (first_batch, mut cp_data) = spawn_blocking_with_span(move || { let Some(first_batch) = cp_data.next() else { return Err(DeltaTableError::Generic("No data".to_string())); }; @@ -82,7 +82,7 @@ pub(crate) async fn create_checkpoint_for( let mut current_batch; loop { - (current_batch, cp_data) = spawn_blocking(move || { + (current_batch, cp_data) = spawn_blocking_with_span(move || { let Some(first_batch) = cp_data.next() else { return Ok::<_, DeltaTableError>((None, cp_data)); }; @@ -116,7 +116,7 @@ pub(crate) async fn create_checkpoint_for( last_modified: file_meta.last_modified.timestamp_millis(), }; - spawn_blocking(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data)) + spawn_blocking_with_span(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data)) .await .map_err(|e| DeltaTableError::Generic(e.to_string()))??; From af21bd57ab5af48135a8ff8691d6ca3f56d3b19a Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 23 Oct 2025 23:50:18 +0200 Subject: [PATCH 2/2] feat: spawn_blocking with span in streambuilder Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/kernel/snapshot/stream.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/crates/core/src/kernel/snapshot/stream.rs b/crates/core/src/kernel/snapshot/stream.rs index 8e04300c52..58ee969dfb 100644 --- a/crates/core/src/kernel/snapshot/stream.rs +++ b/crates/core/src/kernel/snapshot/stream.rs @@ -1,12 +1,13 @@ //! the code in this file is hoisted from datafusion with only slight modifications //! -use std::pin::Pin; - use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use futures::stream::BoxStream; use futures::{Future, Stream, StreamExt}; +use std::pin::Pin; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinSet; +use tracing::dispatcher; +use tracing::Span; use crate::errors::DeltaResult; use crate::DeltaTableError; @@ -106,7 +107,16 @@ impl ReceiverStreamBuilder { F: FnOnce() -> DeltaResult<()>, F: Send + 'static, { - self.join_set.spawn_blocking(f); + // Capture the current dispatcher and span + let dispatch = dispatcher::get_default(|d| d.clone()); + let span = Span::current(); + + self.join_set.spawn_blocking(move || { + dispatcher::with_default(&dispatch, || { + let _enter = span.enter(); + f() + }) + }); } /// Create a stream of all data written to `tx`