Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,9 @@ fn try_main() -> DeltaResult<()> {
}
Commands::Actions { oldest_first } => {
let log_schema = get_log_schema();
let actions = snapshot.log_segment().read_actions(
&engine,
log_schema.clone(),
log_schema.clone(),
None,
)?;
let actions = snapshot
.log_segment()
.read_actions(&engine, log_schema.clone(), None)?;

let mut visitor = LogVisitor::new();
for action in actions {
Expand Down
1 change: 0 additions & 1 deletion kernel/src/actions/domain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ fn replay_for_domain_metadatas(
log_segment.read_actions(
engine,
schema.clone(), // Arc clone
schema.clone(), // Arc clone
META_PREDICATE.clone(),
)
}
87 changes: 74 additions & 13 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,20 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
StructField::nullable(COMMIT_INFO_NAME, CommitInfo::to_schema()),
StructField::nullable(CDC_NAME, Cdc::to_schema()),
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
StructField::nullable(CHECKPOINT_METADATA_NAME, CheckpointMetadata::to_schema()),
StructField::nullable(DOMAIN_METADATA_NAME, DomainMetadata::to_schema()),
]))
});

static ALL_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(
get_log_schema()
.fields()
.cloned()
.chain([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())]),
))
});

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
ADD_NAME,
Expand Down Expand Up @@ -111,10 +119,19 @@ static LOG_DOMAIN_METADATA_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
});

#[internal_api]
/// Gets the schema for all actions that can appear in commits
/// logs. This excludes actions that can only appear in checkpoints.
pub(crate) fn get_log_schema() -> &'static SchemaRef {
&LOG_SCHEMA
}

#[internal_api]
#[allow(dead_code)]
/// Gets a schema for all actions defined by the delta spec.
pub(crate) fn get_all_actions_schema() -> &'static SchemaRef {
&ALL_ACTIONS_SCHEMA
}

#[internal_api]
pub(crate) fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
Expand All @@ -132,6 +149,13 @@ pub(crate) fn get_log_domain_metadata_schema() -> &'static SchemaRef {
&LOG_DOMAIN_METADATA_SCHEMA
}

/// Returns true if the schema contains file actions (add or remove)
/// columns.
#[internal_api]
pub(crate) fn schema_contains_file_actions(schema: &SchemaRef) -> bool {
schema.contains(ADD_NAME) || schema.contains(REMOVE_NAME)
}

/// Nest an existing add action schema in an additional [`ADD_NAME`] struct.
///
/// This is useful for JSON conversion, as it allows us to wrap a dynamically maintained add action
Expand Down Expand Up @@ -1159,18 +1183,13 @@ mod tests {

#[test]
fn test_sidecar_schema() {
let schema = get_log_schema()
.project(&[SIDECAR_NAME])
.expect("Couldn't get sidecar field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"sidecar",
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null("sizeInBytes", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
tags_field(),
]),
)]));
let schema = Sidecar::to_schema();
let expected = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null("sizeInBytes", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
tags_field(),
]);
assert_eq!(schema, expected);
}

Expand Down Expand Up @@ -1974,4 +1993,46 @@ mod tests {
assert!(record_batch.column(2).is_null(0));
assert!(record_batch.column(3).is_null(0));
}

#[test]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for completeness should probably check that it works if you have ADD_NAME and also something that's not a file action. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #1406 (should show up here as well).

fn test_schema_contains_file_actions_with_add() {
let schema = get_log_schema()
.project(&[ADD_NAME, PROTOCOL_NAME])
.unwrap();
assert!(schema_contains_file_actions(&schema));
assert!(schema_contains_file_actions(
&schema.project(&[ADD_NAME]).unwrap()
));
}

#[test]
fn test_schema_contains_file_actions_with_remove() {
let schema = get_log_schema()
.project(&[REMOVE_NAME, METADATA_NAME])
.unwrap();
assert!(schema_contains_file_actions(&schema));
assert!(schema_contains_file_actions(
&schema.project(&[REMOVE_NAME]).unwrap()
));
}

#[test]
fn test_schema_contains_file_actions_with_both() {
let schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME]).unwrap();
assert!(schema_contains_file_actions(&schema));
}

#[test]
fn test_schema_contains_file_actions_with_neither() {
let schema = get_log_schema()
.project(&[PROTOCOL_NAME, METADATA_NAME])
.unwrap();
assert!(!schema_contains_file_actions(&schema));
}

#[test]
fn test_schema_contains_file_actions_empty_schema() {
let schema = Arc::new(StructType::new_unchecked([]));
assert!(!schema_contains_file_actions(&schema));
}
}
1 change: 0 additions & 1 deletion kernel/src/actions/set_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ fn replay_for_app_ids(
log_segment.read_actions(
engine,
txn_schema.clone(), // Arc clone
txn_schema.clone(), // Arc clone
META_PREDICATE.clone(),
)
}
Expand Down
1 change: 0 additions & 1 deletion kernel/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ impl CheckpointWriter {
let actions = self.snapshot.log_segment().read_actions(
engine,
CHECKPOINT_ACTIONS_SCHEMA.clone(),
CHECKPOINT_ACTIONS_SCHEMA.clone(),
None,
)?;

Expand Down
1 change: 0 additions & 1 deletion kernel/src/log_compaction/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ impl LogCompactionWriter {
let actions_iter = compaction_log_segment.read_actions(
engine,
COMPACTION_ACTIONS_SCHEMA.clone(),
COMPACTION_ACTIONS_SCHEMA.clone(),
None, // No predicate - we want all actions in the version range
)?;

Expand Down
65 changes: 45 additions & 20 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::sync::{Arc, LazyLock};

use crate::actions::visitors::SidecarVisitor;
use crate::actions::{
get_log_schema, Metadata, Protocol, ADD_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SIDECAR_NAME,
get_log_schema, schema_contains_file_actions, Metadata, Protocol, Sidecar, METADATA_NAME,
PROTOCOL_NAME, SIDECAR_NAME,
};
use crate::last_checkpoint_hint::LastCheckpointHint;
use crate::log_replay::ActionsBatch;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::schema::{SchemaRef, StructField, ToSchema as _};
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, FileMeta, ParquetHandler, Predicate,
Expand Down Expand Up @@ -280,12 +280,18 @@ impl LogSegment {
///
/// `commit_read_schema` is the (physical) schema to read the commit files with, and
/// `checkpoint_read_schema` is the (physical) schema to read checkpoint files with. This can be
/// used to project the log files to a subset of the columns.
/// used to project the log files to a subset of the columns. Having two different
/// schemas can be useful as a cheap way of doing additional filtering on the checkpoint files
/// (e.g. filtering out remove actions).
///
/// The engine data returned might have extra non-log actions (e.g. sidecar
/// actions) that are not part of the schema but this is an implementation
/// detail that should not be relied on and will likely change.
///
/// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the
/// query's predicate, but rather a predicate for filtering log files themselves.
#[internal_api]
pub(crate) fn read_actions(
pub(crate) fn read_actions_with_projected_checkpoint_actions(
&self,
engine: &dyn Engine,
commit_read_schema: SchemaRef,
Expand All @@ -309,6 +315,22 @@ impl LogSegment {
Ok(commit_stream.chain(checkpoint_stream))
}

// Same as above, but uses the same schema for reading checkpoints and commits.
#[internal_api]
pub(crate) fn read_actions(
&self,
engine: &dyn Engine,
action_schema: SchemaRef,
meta_predicate: Option<PredicateRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
self.read_actions_with_projected_checkpoint_actions(
engine,
action_schema.clone(),
action_schema,
meta_predicate,
)
}

/// find a minimal set to cover the range of commits we want. This is greedy so not always
/// optimal, but we assume there are rarely overlapping compactions so this is okay. NB: This
/// returns files is DESCENDING ORDER, as that's what `replay` expects. This function assumes
Expand Down Expand Up @@ -366,21 +388,24 @@ impl LogSegment {
fn create_checkpoint_stream(
&self,
engine: &dyn Engine,
checkpoint_read_schema: SchemaRef,
action_schema: SchemaRef,
meta_predicate: Option<PredicateRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
let need_file_actions = checkpoint_read_schema.contains(ADD_NAME)
|| checkpoint_read_schema.contains(REMOVE_NAME);

// Only validate sidecar requirement if we actually have checkpoint files
if !self.checkpoint_parts.is_empty() {
require!(
!need_file_actions || checkpoint_read_schema.contains(SIDECAR_NAME),
Error::invalid_checkpoint(
"If the checkpoint read schema contains file actions, it must contain the sidecar column"
)
);
}
let need_file_actions = schema_contains_file_actions(&action_schema);

// Sidecars only contain file actions so don't add it to the schema if not needed
let checkpoint_read_schema = if !need_file_actions ||
// Don't duplicate the column if it exists
action_schema.contains(SIDECAR_NAME) ||
// With multiple parts the checkpoint can't be v2, so sidecars aren't needed
self.checkpoint_parts.len() > 1
{
action_schema.clone()
} else {
Arc::new(
action_schema.add([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())])?,
)
};

let checkpoint_file_meta: Vec<_> = self
.checkpoint_parts
Expand Down Expand Up @@ -437,7 +462,7 @@ impl LogSegment {
parquet_handler.clone(), // cheap Arc clone
log_root.clone(),
checkpoint_batch.as_ref(),
checkpoint_read_schema.clone(),
action_schema.clone(),
meta_predicate.clone(),
)?
} else {
Expand Down Expand Up @@ -540,7 +565,7 @@ impl LogSegment {
)))
});
// read the same protocol and metadata schema for both commits and checkpoints
self.read_actions(engine, schema.clone(), schema, META_PREDICATE.clone())
self.read_actions(engine, schema, META_PREDICATE.clone())
}

/// How many commits since a checkpoint, according to this log segment
Expand Down
Loading
Loading