Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions crates/core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
//!
//! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log.
use std::sync::{Arc, LazyLock};

use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler;
use std::sync::{Arc, LazyLock};
use tokio::task::JoinHandle;
use tracing::dispatcher;
use tracing::Span;

pub mod arrow;
pub mod error;
Expand All @@ -23,3 +25,20 @@ pub use snapshot::*;

pub(crate) static ARROW_HANDLER: LazyLock<Arc<ArrowEvaluationHandler>> =
LazyLock::new(|| Arc::new(ArrowEvaluationHandler {}));

pub(crate) fn spawn_blocking_with_span<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
// Capture the current dispatcher and span
let dispatch = dispatcher::get_default(|d| d.clone());
let span = Span::current();

tokio::task::spawn_blocking(move || {
dispatcher::with_default(&dispatch, || {
let _enter = span.enter();
f()
})
})
}
21 changes: 12 additions & 9 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
use object_store::path::Path;
use object_store::ObjectStore;
use serde_json::Deserializer;
use tokio::task::spawn_blocking;

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `tokio::task::spawn_blocking`

Check warning on line 42 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `tokio::task::spawn_blocking`
use tracing::Instrument;

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (aws)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `tracing::Instrument`

Check warning on line 43 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `tracing::Instrument`
use url::Url;

use super::{Action, CommitInfo, Metadata, Protocol};
use crate::kernel::arrow::engine_ext::{kernel_to_arrow, ExpressionEvaluatorExt};
use crate::kernel::{StructType, ARROW_HANDLER};
use crate::kernel::{spawn_blocking_with_span, StructType, ARROW_HANDLER};
use crate::logstore::{LogStore, LogStoreExt};
use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter};

Expand Down Expand Up @@ -80,7 +81,7 @@
config: DeltaTableConfig,
version: Option<Version>,
) -> DeltaResult<Self> {
let snapshot = match spawn_blocking(move || {
let snapshot = match spawn_blocking_with_span(move || {
let mut builder = KernelSnapshot::builder_for(table_root);
if let Some(version) = version {
builder = builder.at_version(version);
Expand Down Expand Up @@ -162,7 +163,7 @@
// TODO: bundle operation id with log store ...
let engine = log_store.engine(None);
let current = self.inner.clone();
let snapshot = spawn_blocking(move || {
let snapshot = spawn_blocking_with_span(move || {
let mut builder = KernelSnapshot::builder_from(current);
if let Some(version) = target_version {
builder = builder.at_version(version);
Expand Down Expand Up @@ -432,9 +433,10 @@
// TODO: bundle operation id with log store ...
let engine = log_store.engine(None);
let inner = self.inner.clone();
let version = spawn_blocking(move || inner.get_app_id_version(&app_id, engine.as_ref()))
.await
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
let version =
spawn_blocking_with_span(move || inner.get_app_id_version(&app_id, engine.as_ref()))
.await
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
Ok(version)
}

Expand All @@ -451,9 +453,10 @@
let engine = log_store.engine(None);
let inner = self.inner.clone();
let domain = domain.to_string();
let metadata = spawn_blocking(move || inner.get_domain_metadata(&domain, engine.as_ref()))
.await
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
let metadata =
spawn_blocking_with_span(move || inner.get_domain_metadata(&domain, engine.as_ref()))
.await
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
Ok(metadata)
}
}
Expand Down
16 changes: 13 additions & 3 deletions crates/core/src/kernel/snapshot/stream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! the code in this file is hoisted from datafusion with only slight modifications
//!
use std::pin::Pin;

use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::stream::BoxStream;
use futures::{Future, Stream, StreamExt};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinSet;
use tracing::dispatcher;
use tracing::Span;

use crate::errors::DeltaResult;
use crate::DeltaTableError;
Expand Down Expand Up @@ -106,7 +107,16 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
F: FnOnce() -> DeltaResult<()>,
F: Send + 'static,
{
self.join_set.spawn_blocking(f);
// Capture the current dispatcher and span
let dispatch = dispatcher::get_default(|d| d.clone());
let span = Span::current();

self.join_set.spawn_blocking(move || {
dispatcher::with_default(&dispatch, || {
let _enter = span.enter();
f()
})
});
}

/// Create a stream of all data written to `tx`
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,12 @@ use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use tokio::runtime::RuntimeFlavor;
use tokio::task::spawn_blocking;
use tracing::*;
use url::Url;
use uuid::Uuid;

use crate::kernel::transaction::TransactionError;
use crate::kernel::Action;
use crate::kernel::{spawn_blocking_with_span, Action};
use crate::{DeltaResult, DeltaTableError};

pub use self::config::StorageConfig;
Expand Down Expand Up @@ -660,7 +659,7 @@ pub async fn get_latest_version(
let storage = log_store.engine(None).storage_handler();
let log_root = log_store.log_root_url();

let segment = spawn_blocking(move || {
let segment = spawn_blocking_with_span(move || {
LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None)
})
.await
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use object_store::ObjectStore;
use parquet::arrow::async_writer::ParquetObjectWriter;
use parquet::arrow::AsyncArrowWriter;
use regex::Regex;
use tokio::task::spawn_blocking;
use tracing::{debug, error};
use uuid::Uuid;

use crate::kernel::spawn_blocking_with_span;
use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX};
use crate::table::config::TablePropertiesExt as _;
use crate::{open_table_with_version, DeltaTable};
Expand All @@ -45,7 +45,7 @@ pub(crate) async fn create_checkpoint_for(
let engine = log_store.engine(operation_id);

let task_engine = engine.clone();
let snapshot = spawn_blocking(move || {
let snapshot = spawn_blocking_with_span(move || {
Snapshot::builder_for(table_root)
.at_version(version)
.build(task_engine.as_ref())
Expand All @@ -59,7 +59,7 @@ pub(crate) async fn create_checkpoint_for(
let cp_path = Path::from_url_path(cp_url.path())?;
let mut cp_data = cp_writer.checkpoint_data(engine.as_ref())?;

let (first_batch, mut cp_data) = spawn_blocking(move || {
let (first_batch, mut cp_data) = spawn_blocking_with_span(move || {
let Some(first_batch) = cp_data.next() else {
return Err(DeltaTableError::Generic("No data".to_string()));
};
Expand All @@ -82,7 +82,7 @@ pub(crate) async fn create_checkpoint_for(

let mut current_batch;
loop {
(current_batch, cp_data) = spawn_blocking(move || {
(current_batch, cp_data) = spawn_blocking_with_span(move || {
let Some(first_batch) = cp_data.next() else {
return Ok::<_, DeltaTableError>((None, cp_data));
};
Expand Down Expand Up @@ -116,7 +116,7 @@ pub(crate) async fn create_checkpoint_for(
last_modified: file_meta.last_modified.timestamp_millis(),
};

spawn_blocking(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data))
spawn_blocking_with_span(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data))
.await
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;

Expand Down
Loading