Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
fab79e1
feat(tracing): add tracing spans to all I/O sections
fvaleye Sep 30, 2025
3c1b02c
feat(tracing): enable tracing from the Python binding
fvaleye Oct 1, 2025
0726ee4
feat(tracing): enable tracing from the Python binding
fvaleye Oct 1, 2025
f3ca503
feat(tracing): add more tracing on I/O operations
fvaleye Oct 1, 2025
44b478c
feat(tracing): add environment variables documentation for OTEL with …
fvaleye Oct 2, 2025
ada34d4
Merge origin/main into feature/add-tracing-for-performance-analysis
fvaleye Oct 7, 2025
9d4bc9b
fix(commit): revert post commit properties for initial table creation
fvaleye Oct 8, 2025
ae18dab
feat(version): add version to commit entry read error log
fvaleye Oct 8, 2025
4d25f4f
chore: remove unused tracing from DeltaIOStorageBackend
fvaleye Oct 8, 2025
d122206
chore: add path to multipart upload debug logs
fvaleye Oct 8, 2025
27405f7
refactor: use SOURCE_COUNT_ID constant instead of hardcoded string
fvaleye Oct 8, 2025
e04ffd1
fix(writer): revert writer due to bad merge conflict
fvaleye Oct 8, 2025
7de306d
Merge branch 'main' into feature/add-tracing-for-performance-analysis
ion-elgreco Oct 9, 2025
16041b7
Merge main into feature/add-tracing-for-performance-analysis
fvaleye Oct 11, 2025
7ec96e8
Merge branch 'main' into feature/add-tracing-for-performance-analysis
fvaleye Oct 11, 2025
1520e97
Merge branch 'main' into feature/add-tracing-for-performance-analysis
fvaleye Oct 11, 2025
610b4a6
Merge branch 'main' into feature/add-tracing-for-performance-analysis
ion-elgreco Oct 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ uuid = { version = "1" }
async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }

# opentelemetry
tracing-opentelemetry = { version = "0.32" }
opentelemetry = { version = "0.31" }
opentelemetry-otlp = { version = "0.31", features = ["http-proto"] }
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
num_cpus = { version = "1" }

[workspace.metadata.typos]
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ rstest = { version = "0.26.1" }
serial_test = "3"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[features]
default = ["rustls"]
Expand Down
54 changes: 44 additions & 10 deletions crates/core/src/delta_datafusion/find_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::ExecutionPlan;
use itertools::Itertools;
use tracing::*;

use crate::delta_datafusion::{
df_logical_schema, get_path_column, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN,
Expand All @@ -31,6 +32,15 @@ pub(crate) struct FindFiles {
}

/// Finds files in a snapshot that match the provided predicate.
#[instrument(
skip_all,
fields(
version = snapshot.version(),
has_predicate = predicate.is_some(),
partition_scan = field::Empty,
candidate_count = field::Empty
)
)]
pub(crate) async fn find_files(
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
Expand All @@ -53,24 +63,35 @@ pub(crate) async fn find_files(

if expr_properties.partition_only {
let candidates = scan_memory_table(snapshot, predicate).await?;
Ok(FindFiles {
let result = FindFiles {
candidates,
partition_scan: true,
})
};
Span::current().record("partition_scan", result.partition_scan);
Span::current().record("candidate_count", result.candidates.len());
Ok(result)
} else {
let candidates =
find_files_scan(snapshot, log_store, session, predicate.to_owned()).await?;

Ok(FindFiles {
let result = FindFiles {
candidates,
partition_scan: false,
})
};
Span::current().record("partition_scan", result.partition_scan);
Span::current().record("candidate_count", result.candidates.len());
Ok(result)
}
}
None => Ok(FindFiles {
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
partition_scan: true,
}),
None => {
let result = FindFiles {
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
partition_scan: true,
};
Span::current().record("partition_scan", result.partition_scan);
Span::current().record("candidate_count", result.candidates.len());
Ok(result)
}
}
}

Expand Down Expand Up @@ -188,6 +209,14 @@ fn join_batches_with_add_actions(
}

/// Determine which files contain a record that satisfies the predicate
#[instrument(
skip_all,
fields(
version = snapshot.version(),
total_files = field::Empty,
matching_files = field::Empty
)
)]
async fn find_files_scan(
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
Expand All @@ -204,6 +233,8 @@ async fn find_files_scan(
})
.collect();

Span::current().record("total_files", candidate_map.len());

let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(true)
.build(snapshot)?;
Expand Down Expand Up @@ -240,12 +271,15 @@ async fn find_files_scan(
let task_ctx = Arc::new(TaskContext::from(session));
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;

join_batches_with_add_actions(
let result = join_batches_with_add_actions(
path_batches,
candidate_map,
config.file_column_name.as_ref().unwrap(),
true,
)
)?;

Span::current().record("matching_files", result.len());
Ok(result)
}

async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaResult<Vec<Add>> {
Expand Down
Loading
Loading