Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 60 additions & 19 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use common::LocationArgs;
use delta_kernel::actions::visitors::{
visit_metadata_at, visit_protocol_at, AddVisitor, CdcVisitor, RemoveVisitor,
SetTransactionVisitor,
visit_metadata_at, visit_protocol_at, AddVisitor, CdcVisitor, DomainMetadataVisitor,
RemoveVisitor, SetTransactionVisitor,
};
use delta_kernel::actions::{
get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SET_TRANSACTION_NAME,
get_log_schema, ADD_NAME, CDC_NAME, DOMAIN_METADATA_NAME, METADATA_NAME, PROTOCOL_NAME,
REMOVE_NAME, SET_TRANSACTION_NAME,
};
use delta_kernel::engine_data::{GetData, RowVisitor, TypedGetData as _};
use delta_kernel::expressions::ColumnName;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{ColumnNamesAndTypes, DataType};
use delta_kernel::schema::{ColumnNamesAndTypes, DataType, Schema, SchemaRef};
use delta_kernel::{DeltaResult, Error, ExpressionRef, Snapshot};

use std::collections::HashMap;
Expand Down Expand Up @@ -46,6 +46,8 @@ enum Commands {
/// Show the log in reverse order (default is log replay order -- newest first)
#[arg(short, long)]
oldest_first: bool,
#[arg(short, long)]
stats_only: bool,
},
}

Expand All @@ -67,19 +69,29 @@ enum Action {
Add(delta_kernel::actions::Add),
SetTransaction(delta_kernel::actions::SetTransaction),
Cdc(delta_kernel::actions::Cdc),
DomainMetadata(delta_kernel::actions::DomainMetadata),
}

fn custom_log_schema() -> SchemaRef {
let fields = get_log_schema()
.fields()
.filter(|f| f.name() != "commitInfo")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a follow-up to actually support reading commit infos as I imagine that could be something useful to dump from a table.

.cloned();
Schema::try_new(fields).unwrap().into()
}
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| get_log_schema().leaves(None));
LazyLock::new(|| custom_log_schema().leaves(None));

struct LogVisitor {
actions: Vec<(Action, usize)>,
stats: HashMap<&'static str, usize>,
offsets: HashMap<String, (usize, usize)>,
previous_rows_seen: usize,
stats_only: bool,
}

impl LogVisitor {
fn new() -> LogVisitor {
fn new(stats_only: bool) -> LogVisitor {
// Grab the start offset for each top-level column name, then compute the end offset by
// skipping the rest of the leaves for that column.
let mut offsets = HashMap::new();
Expand All @@ -93,8 +105,10 @@ impl LogVisitor {
}
LogVisitor {
actions: vec![],
stats: HashMap::new(),
offsets,
previous_rows_seen: 0,
stats_only,
}
}
}
Expand All @@ -104,7 +118,7 @@ impl RowVisitor for LogVisitor {
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
if getters.len() != 55 {
if getters.len() != 59 {
return Err(Error::InternalError(format!(
"Wrong number of LogVisitor getters: {}",
getters.len()
Expand All @@ -116,34 +130,47 @@ impl RowVisitor for LogVisitor {
let (protocol_start, protocol_end) = self.offsets[PROTOCOL_NAME];
let (txn_start, txn_end) = self.offsets[SET_TRANSACTION_NAME];
let (cdc_start, cdc_end) = self.offsets[CDC_NAME];
let (dm_start, dm_end) = self.offsets[DOMAIN_METADATA_NAME];

for i in 0..row_count {
let action = if let Some(path) = getters[add_start].get_opt(i, "add.path")? {
let (action, name) = if let Some(path) = getters[add_start].get_opt(i, "add.path")? {
let add = AddVisitor::visit_add(i, path, &getters[add_start..add_end])?;
Action::Add(add)
(Action::Add(add), "add")
} else if let Some(path) = getters[remove_start].get_opt(i, "remove.path")? {
let remove =
RemoveVisitor::visit_remove(i, path, &getters[remove_start..remove_end])?;
Action::Remove(remove)
(Action::Remove(remove), "remove")
} else if let Some(metadata) =
visit_metadata_at(i, &getters[metadata_start..metadata_end])?
{
Action::Metadata(metadata)
(Action::Metadata(metadata), "metadata")
} else if let Some(protocol) =
visit_protocol_at(i, &getters[protocol_start..protocol_end])?
{
Action::Protocol(protocol)
(Action::Protocol(protocol), "protocol")
} else if let Some(app_id) = getters[txn_start].get_opt(i, "txn.appId")? {
let txn =
SetTransactionVisitor::visit_txn(i, app_id, &getters[txn_start..txn_end])?;
Action::SetTransaction(txn)
(Action::SetTransaction(txn), "setTransaction")
} else if let Some(path) = getters[cdc_start].get_opt(i, "cdc.path")? {
let cdc = CdcVisitor::visit_cdc(i, path, &getters[cdc_start..cdc_end])?;
Action::Cdc(cdc)
(Action::Cdc(cdc), "cdc")
} else if let Some(domain) = getters[dm_start].get_opt(i, "domainMetadata.name")? {
let dm = DomainMetadataVisitor::visit_domain_metadata(
i,
domain,
&getters[dm_start..dm_end],
)?;
(Action::DomainMetadata(dm), "domainMetadata")
} else {
// TODO: Add CommitInfo support (tricky because all fields are optional)
continue;
};
self.actions.push((action, self.previous_rows_seen + i));

*self.stats.entry(name).or_default() += 1;
if !self.stats_only {
self.actions.push((action, self.previous_rows_seen + i));
}
}
self.previous_rows_seen += row_count;
Ok(())
Expand Down Expand Up @@ -202,20 +229,33 @@ fn try_main() -> DeltaResult<()> {
scan_metadata.visit_scan_files((), print_scan_file)?;
}
}
Commands::Actions { oldest_first } => {
let log_schema = get_log_schema();
Commands::Actions {
oldest_first,
stats_only,
} => {
let log_schema = custom_log_schema();
let actions = snapshot.log_segment().read_actions(
&engine,
log_schema.clone(),
log_schema.clone(),
None,
)?;

let mut visitor = LogVisitor::new();
let mut visitor = LogVisitor::new(stats_only);
for action in actions {
visitor.visit_rows_of(action?.actions())?;
}

let mut stats: Vec<_> = visitor
.stats
.iter()
.map(|(&name, &count)| (count, name))
.collect();
stats.sort_by(|(count1, _), (count2, _)| count2.cmp(count1));
for (count, name) in stats {
println!("Found {count} '{name}' actions");
}

if oldest_first {
visitor.actions.reverse();
}
Expand All @@ -227,6 +267,7 @@ fn try_main() -> DeltaResult<()> {
Action::Add(a) => println!("\nAction {row}:\n{a:#?}"),
Action::SetTransaction(t) => println!("\nAction {row}:\n{t:#?}"),
Action::Cdc(c) => println!("\nAction {row}:\n{c:#?}"),
Action::DomainMetadata(d) => println!("\n Action {row}:\n{d:#?}"),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ impl RowVisitor for SidecarVisitor {
/// included to only retain the domain metadata for a specific domain (in order to bound memory
/// requirements).
#[derive(Debug, Default)]
#[internal_api]
pub(crate) struct DomainMetadataVisitor {
domain_metadatas: DomainMetadataMap,
domain_filter: Option<String>,
Expand All @@ -444,6 +445,7 @@ impl DomainMetadataVisitor {
}
}

#[internal_api]
pub(crate) fn visit_domain_metadata<'a>(
row_index: usize,
domain: String,
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/engine/arrow_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use itertools::Itertools;
use crate::error::Error;
use crate::schema::{
ArrayType, DataType, MapType, MetadataValue, PrimitiveType, StructField, StructType,
UNSHREDDED_VARIANT_SCHEMA,
};

pub(crate) const LIST_ARRAY_ROOT: &str = "element";
Expand Down Expand Up @@ -162,7 +163,7 @@ impl TryFromKernel<&DataType> for ArrowDataType {
false,
)),
DataType::Variant(s) => {
if *t == DataType::unshredded_variant() {
if *t == *UNSHREDDED_VARIANT_SCHEMA {
Ok(ArrowDataType::Struct(
s.fields()
.map(TryIntoArrow::try_into_arrow)
Expand Down Expand Up @@ -326,11 +327,10 @@ mod tests {

#[test]
fn test_variant_shredded_type_fail() -> DeltaResult<()> {
let unshredded_variant = DataType::unshredded_variant();
let unshredded_variant_arrow = ArrowDataType::try_from_kernel(&unshredded_variant)?;
let unshredded_variant_arrow = ArrowDataType::try_from_kernel(&UNSHREDDED_VARIANT_SCHEMA)?;
assert!(unshredded_variant_arrow == unshredded_variant_arrow_type());
let shredded_variant = DataType::variant_type([
StructField::nullable("metadata", DataType::BINARY),
StructField::not_null("metadata", DataType::BINARY),
StructField::nullable("value", DataType::BINARY),
StructField::nullable("typed_value", DataType::INTEGER),
])?;
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{Arc, OnceLock};

use crate::engine::arrow_conversion::{TryFromKernel as _, TryIntoArrow as _};
use crate::engine::ensure_data_types::DataTypeCompat;
use crate::schema::{ColumnMetadataKey, MetadataValue};
use crate::schema::{ColumnMetadataKey, MetadataValue, UNSHREDDED_VARIANT_SCHEMA};
use crate::{
engine::arrow_data::ArrowEngineData,
schema::{DataType, MetadataColumnSpec, Schema, SchemaRef, StructField, StructType},
Expand Down Expand Up @@ -429,7 +429,7 @@ fn get_indices(
{
// If the field is a variant, make sure the parquet schema matches the unshredded variant
// representation. This is to ensure that shredded reads are not performed.
if requested_field.data_type == DataType::unshredded_variant() {
if requested_field.data_type == *UNSHREDDED_VARIANT_SCHEMA {
validate_parquet_variant(field)?;
}
match field.data_type() {
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/ensure_data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod tests {

use crate::engine::arrow_conversion::TryFromKernel as _;
use crate::engine::arrow_data::unshredded_variant_arrow_type;
use crate::schema::{ArrayType, DataType, MapType, StructField};
use crate::schema::{ArrayType, DataType, MapType, StructField, UNSHREDDED_VARIANT_SCHEMA};
use crate::utils::test_utils::assert_result_error_with_message;

use super::*;
Expand Down Expand Up @@ -341,14 +341,14 @@ mod tests {
}

assert!(ensure_data_types(
&DataType::unshredded_variant(),
&UNSHREDDED_VARIANT_SCHEMA,
&unshredded_variant_arrow_type(),
true
)
.is_ok());
assert_result_error_with_message(
ensure_data_types(
&DataType::unshredded_variant(),
&UNSHREDDED_VARIANT_SCHEMA,
&incorrect_variant_arrow_type(),
true,
),
Expand Down
11 changes: 3 additions & 8 deletions kernel/src/engine/parquet_row_group_skipping/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::*;
use crate::expressions::{column_name, column_pred};
use crate::kernel_predicates::DataSkippingPredicateEvaluator as _;
use crate::parquet::arrow::arrow_reader::ArrowReaderMetadata;
use crate::schema::UNSHREDDED_VARIANT_SCHEMA;
use crate::Predicate;
use std::fs::File;

Expand Down Expand Up @@ -214,10 +215,7 @@ fn test_get_stat_values() {
// Read a random column as Variant. The actual read does not need to be performed, as stats on
// Variant should always return None.
assert_eq!(
filter.get_min_stat(
&column_name!("chrono.date32"),
&DataType::unshredded_variant()
),
filter.get_min_stat(&column_name!("chrono.date32"), &UNSHREDDED_VARIANT_SCHEMA,),
None
);

Expand Down Expand Up @@ -396,10 +394,7 @@ fn test_get_stat_values() {
// Read a random column as Variant. The actual read does not need to be performed, as stats on
// Variant should always return None.
assert_eq!(
filter.get_max_stat(
&column_name!("chrono.date32"),
&DataType::unshredded_variant()
),
filter.get_max_stat(&column_name!("chrono.date32"), &UNSHREDDED_VARIANT_SCHEMA,),
None
);

Expand Down
21 changes: 15 additions & 6 deletions kernel/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,15 @@ impl From<SchemaRef> for DataType {
}
}

/// A static reference to the canonical unshredded variant schema. Equivalent to
/// [`DataType::unshredded_variant()`], but static.
pub static UNSHREDDED_VARIANT_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
DataType::Variant(Box::new(StructType::new_unchecked([
StructField::not_null("metadata", DataType::BINARY),
StructField::not_null("value", DataType::BINARY),
])))
});

/// cbindgen:ignore
impl DataType {
pub const STRING: Self = DataType::Primitive(PrimitiveType::String);
Expand Down Expand Up @@ -1361,13 +1370,13 @@ impl DataType {
StructType::new_unchecked(fields).into()
}

/// Create a new unshredded [`DataType::Variant`]. This data type is a struct of two not-null
/// Creates a new unshredded [`DataType::Variant`]. This data type is a struct of two not-null
/// binary fields: `metadata` and `value`.
///
/// NOTE: Callers who only need a borrowed reference can avoid allocations by using
/// [`UNSHREDDED_VARIANT_SCHEMA`] instead.
pub fn unshredded_variant() -> Self {
DataType::Variant(Box::new(StructType::new_unchecked([
StructField::not_null("metadata", DataType::BINARY),
StructField::not_null("value", DataType::BINARY),
])))
UNSHREDDED_VARIANT_SCHEMA.clone()
}

/// Create a new [`DataType::Variant`] from the provided fields. For unshredded variants, you
Expand Down Expand Up @@ -1822,7 +1831,7 @@ mod tests {
}
"#;
let field: StructField = serde_json::from_str(data).unwrap();
assert_eq!(field.data_type, DataType::unshredded_variant());
assert_eq!(field.data_type, *UNSHREDDED_VARIANT_SCHEMA);

let json_str = serde_json::to_string(&field).unwrap();
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/schema/variant_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ pub(crate) fn validate_variant_type_feature_support(
mod tests {
use super::*;
use crate::actions::Protocol;
use crate::schema::{DataType, StructField, StructType};
use crate::schema::{DataType, StructField, StructType, UNSHREDDED_VARIANT_SCHEMA};
use crate::table_features::{ReaderFeature, WriterFeature};
use crate::utils::test_utils::assert_result_error_with_message;

#[test]
fn test_is_unshredded_variant() {
fn is_unshredded_variant(s: &DataType) -> bool {
s == &DataType::unshredded_variant()
*s == *UNSHREDDED_VARIANT_SCHEMA
}
assert!(!is_unshredded_variant(
&DataType::variant_type([
Expand Down
8 changes: 5 additions & 3 deletions kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use serde_json::json;
use serde_json::Deserializer;
use tempfile::tempdir;

use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::schema::{
DataType, SchemaRef, StructField, StructType, UNSHREDDED_VARIANT_SCHEMA,
};

use test_utils::{
assert_result_error_with_message, create_table, engine_store_setup, setup_test_tables,
Expand Down Expand Up @@ -951,7 +953,7 @@ async fn test_append_variant() -> Result<(), Box<dyn std::error::Error>> {
let value_nested_v_array = Arc::new(BinaryArray::from(value_nested_v)) as ArrayRef;
let metadata_nested_v_array = Arc::new(BinaryArray::from(metadata_nested_v)) as ArrayRef;

let variant_arrow = ArrowDataType::try_from_kernel(&DataType::unshredded_variant()).unwrap();
let variant_arrow = ArrowDataType::try_from_kernel(&UNSHREDDED_VARIANT_SCHEMA).unwrap();
let variant_arrow_flipped = variant_arrow_type_flipped();

let i_values = vec![31, 32, 33];
Expand Down Expand Up @@ -1064,7 +1066,7 @@ async fn test_append_variant() -> Result<(), Box<dyn std::error::Error>> {
Some(null_bitmap),
)?);
let variant_arrow_type: ArrowDataType =
ArrowDataType::try_from_kernel(&DataType::unshredded_variant()).unwrap();
ArrowDataType::try_from_kernel(&UNSHREDDED_VARIANT_SCHEMA).unwrap();
let expected_data = RecordBatch::try_new(
Arc::new(expected_schema.as_ref().try_into_arrow()?),
vec![
Expand Down
Loading