Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,32 @@ impl TryFrom<Format> for Scalar {
)]
#[internal_api]
pub(crate) struct Metadata {
// TODO: Make the struct fields private to force using the try_new function.
/// Unique identifier for this table
pub(crate) id: String,
id: String,
/// User-provided identifier for this table
pub(crate) name: Option<String>,
name: Option<String>,
/// User-provided description for this table
pub(crate) description: Option<String>,
description: Option<String>,
/// Specification of the encoding for the files stored in the table
pub(crate) format: Format,
format: Format,
/// Schema of the table
pub(crate) schema_string: String,
schema_string: String,
/// Column names by which the data should be partitioned
pub(crate) partition_columns: Vec<String>,
partition_columns: Vec<String>,
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub(crate) created_time: Option<i64>,
created_time: Option<i64>,
/// Configuration options for the metadata action. These are parsed into [`TableProperties`].
pub(crate) configuration: HashMap<String, String>,
configuration: HashMap<String, String>,
}

impl Metadata {
/// Create a new [`Metadata`] instances.
///
/// # Errors
///
/// Returns an error if there are any metadata columns in the schema.
// TODO: remove allow(dead_code) after we use this API in CREATE TABLE, etc.
#[internal_api]
#[allow(dead_code)]
pub(crate) fn try_new(
name: Option<String>,
Expand Down Expand Up @@ -246,6 +251,13 @@ impl Metadata {
})
}

#[internal_api]
pub(crate) fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Metadata>> {
let mut visitor = MetadataVisitor::default();
visitor.visit_rows_of(data)?;
Ok(visitor.metadata)
}

// TODO(#1068/1069): make these just pub directly or make better internal_api macro for fields
#[internal_api]
#[allow(dead_code)]
Expand All @@ -271,12 +283,6 @@ impl Metadata {
self.created_time
}

pub(crate) fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Metadata>> {
let mut visitor = MetadataVisitor::default();
visitor.visit_rows_of(data)?;
Ok(visitor.metadata)
}

#[internal_api]
#[allow(dead_code)]
pub(crate) fn configuration(&self) -> &HashMap<String, String> {
Expand Down
19 changes: 13 additions & 6 deletions kernel/src/checkpoint/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use crate::action_reconciliation::{
deleted_file_retention_timestamp_with_time, DEFAULT_RETENTION_SECS,
Expand All @@ -9,6 +9,7 @@ use crate::arrow::datatypes::{DataType, Schema};
use crate::checkpoint::create_last_checkpoint_data;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
use crate::schema::{DataType as KernelDataType, StructField, StructType};
use crate::utils::test_utils::Action;
use crate::{DeltaResult, FileMeta, Snapshot};

Expand Down Expand Up @@ -206,11 +207,17 @@ fn create_v2_checkpoint_protocol_action() -> Action {

/// Create a Metadata action
fn create_metadata_action() -> Action {
Action::Metadata(Metadata {
id: "test-table".into(),
schema_string: "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}".to_string(),
..Default::default()
})
Action::Metadata(
Metadata::try_new(
Some("test-table".into()),
None,
StructType::new_unchecked([StructField::nullable("value", KernelDataType::INTEGER)]),
vec![],
0,
HashMap::new(),
)
.unwrap(),
)
}

/// Create an Add action with the specified path
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,9 @@ mod tests {
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let metadata = Metadata::try_new_from_data(parsed.as_ref())?.unwrap();
assert_eq!(metadata.id, "aff5cb91-8cd9-4195-aef9-446908507302");
assert_eq!(metadata.created_time, Some(1670892997849));
assert_eq!(metadata.partition_columns, vec!("c1", "c2"));
assert_eq!(metadata.id(), "aff5cb91-8cd9-4195-aef9-446908507302");
assert_eq!(metadata.created_time(), Some(1670892997849));
assert_eq!(*metadata.partition_columns(), vec!("c1", "c2"));
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl ScanBuilder {
let logical_schema = self.schema.unwrap_or_else(|| self.snapshot.schema());
let state_info = StateInfo::try_new(
logical_schema.as_ref(),
&self.snapshot.metadata().partition_columns,
self.snapshot.metadata().partition_columns(),
self.snapshot.table_configuration().column_mapping_mode(),
)?;

Expand Down
105 changes: 61 additions & 44 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,26 @@ fn result_to_sv(iter: impl Iterator<Item = DeltaResult<TableChangesScanMetadata>
async fn metadata_protocol() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
let schema_string = serde_json::to_string(&get_schema()).unwrap();
mock_table
.commit([
Action::Metadata(Metadata {
schema_string,
configuration: HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
..Default::default()
}),
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new(
3,
Expand All @@ -95,16 +100,21 @@ async fn metadata_protocol() {
async fn cdf_not_enabled() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
let schema_string = serde_json::to_string(&get_schema()).unwrap();
mock_table
.commit([Action::Metadata(Metadata {
schema_string,
configuration: HashMap::from([(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
)]),
..Default::default()
})])
.commit([Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
)]),
)
.unwrap(),
)])
.await;

let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
Expand Down Expand Up @@ -150,20 +160,25 @@ async fn unsupported_reader_feature() {
async fn column_mapping_should_fail() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
let schema_string = serde_json::to_string(&get_schema()).unwrap();
mock_table
.commit([Action::Metadata(Metadata {
schema_string,
configuration: HashMap::from([
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
("delta.columnMapping.mode".to_string(), "id".to_string()),
]),
..Default::default()
})])
.commit([Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
("delta.columnMapping.mode".to_string(), "id".to_string()),
]),
)
.unwrap(),
)])
.await;

let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
Expand All @@ -185,16 +200,18 @@ async fn incompatible_schemas_fail() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();

let schema_string = serde_json::to_string(&commit_schema).unwrap();
mock_table
.commit([Action::Metadata(Metadata {
schema_string,
configuration: HashMap::from([(
"delta.enableChangeDataFeed".to_string(),
"true".to_string(),
)]),
..Default::default()
})])
.commit([Action::Metadata(
Metadata::try_new(
None,
None,
commit_schema,
vec![],
0,
HashMap::from([("delta.enableChangeDataFeed".to_string(), "true".to_string())]),
)
.unwrap(),
)])
.await;

let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl TableChanges {
}
/// The partition columns that will be read.
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
self.end_snapshot.metadata().partition_columns()
}

/// Create a [`TableChangesScanBuilder`] for an `Arc<TableChanges>`.
Expand Down
Loading
Loading