Skip to content

Commit 5d2cf48

Browse files
committed
fix: tombstone replay
Signed-off-by: Robert Pack <[email protected]>
1 parent 51349f4 commit 5d2cf48

File tree

3 files changed

+58
-49
lines changed

3 files changed

+58
-49
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ __blobstorage__
2222
.githubchangeloggenerator.cache.log
2323
.githubchangeloggenerator.cache/
2424
.githubchangeloggenerator*
25-
data
2625
.zed/
2726

2827
# Add all Cargo.lock files except for those in binary crates

crates/core/src/kernel/snapshot_next/lazy.rs

Lines changed: 57 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,24 @@ use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionSc
99
use delta_kernel::actions::{get_log_schema, REMOVE_NAME};
1010
use delta_kernel::actions::{Metadata, Protocol, SetTransaction};
1111
use delta_kernel::engine::arrow_data::ArrowEngineData;
12+
use delta_kernel::engine::arrow_expression::evaluate_expression;
1213
use delta_kernel::engine::default::executor::tokio::{
1314
TokioBackgroundExecutor, TokioMultiThreadExecutor,
1415
};
1516
use delta_kernel::engine::default::DefaultEngine;
1617
use delta_kernel::log_segment::LogSegment;
17-
use delta_kernel::schema::Schema;
18+
use delta_kernel::schema::{DataType, Schema};
1819
use delta_kernel::snapshot::Snapshot as SnapshotInner;
1920
use delta_kernel::table_properties::TableProperties;
20-
use delta_kernel::{Engine, Expression, ExpressionRef, Table, Version};
21+
use delta_kernel::{Engine, Expression, ExpressionHandler, ExpressionRef, Table, Version};
2122
use itertools::Itertools;
2223
use object_store::path::Path;
2324
use object_store::ObjectStore;
2425
use url::Url;
2526

2627
use super::cache::CommitCacheObjectStore;
2728
use super::{replay_file_actions, Snapshot};
28-
use crate::kernel::{Action, CommitInfo};
29+
use crate::kernel::{Action, CommitInfo, ARROW_HANDLER};
2930
use crate::{DeltaResult, DeltaTableError};
3031

3132
// TODO: avoid repetitive parsing of json stats
@@ -94,11 +95,8 @@ impl Snapshot for LazySnapshot {
9495
}
9596

9697
fn tombstones(&self) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>> {
97-
static META_PREDICATE: LazyLock<Option<ExpressionRef>> = LazyLock::new(|| {
98-
Some(Arc::new(
99-
Expression::column([REMOVE_NAME, "path"]).is_not_null(),
100-
))
101-
});
98+
static META_PREDICATE: LazyLock<ExpressionRef> =
99+
LazyLock::new(|| Arc::new(Expression::column([REMOVE_NAME, "path"]).is_not_null()));
102100
let read_schema = get_log_schema().project(&[REMOVE_NAME])?;
103101
Ok(Box::new(
104102
self.inner
@@ -107,9 +105,23 @@ impl Snapshot for LazySnapshot {
107105
self.engine.as_ref(),
108106
read_schema.clone(),
109107
read_schema,
110-
META_PREDICATE.clone(),
108+
Some(META_PREDICATE.clone()),
111109
)?
112-
.map_ok(|(d, _)| Ok(RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?)))
110+
.map_ok(|(d, _)| {
111+
let batch = RecordBatch::from(ArrowEngineData::try_from_engine_data(d)?);
112+
let selection = evaluate_expression(
113+
META_PREDICATE.as_ref(),
114+
&batch,
115+
Some(&DataType::BOOLEAN),
116+
)?;
117+
let filter = selection
118+
.as_any()
119+
.downcast_ref::<BooleanArray>()
120+
.ok_or_else(|| {
121+
DeltaTableError::generic("failed to downcast to BooleanArray")
122+
})?;
123+
Ok(filter_record_batch(&batch, filter)?)
124+
})
113125
.flatten(),
114126
))
115127
}
@@ -247,37 +259,46 @@ impl LazySnapshot {
247259

248260
#[cfg(test)]
249261
mod tests {
250-
use deltalake_test::acceptance::{read_dat_case, TestCaseInfo};
262+
use delta_kernel::schema::StructType;
263+
use deltalake_test::utils::*;
251264
use deltalake_test::TestResult;
252265

253-
use super::super::tests::get_dat_dir;
254266
use super::*;
255267

256268
async fn load_snapshot() -> TestResult<()> {
257-
// some comment
258-
let mut dat_dir = get_dat_dir();
259-
dat_dir.push("multi_partitioned");
260-
261-
let dat_info: TestCaseInfo = read_dat_case(dat_dir)?;
262-
let table_info = dat_info.table_summary()?;
263-
264-
let table = Table::try_from_uri(dat_info.table_root()?)?;
265-
266-
let snapshot = LazySnapshot::try_new(
267-
table,
268-
Arc::new(object_store::local::LocalFileSystem::default()),
269-
None,
270-
)
271-
.await?;
272-
273-
assert_eq!(snapshot.version(), table_info.version);
274-
assert_eq!(
275-
(
276-
snapshot.protocol().min_reader_version(),
277-
snapshot.protocol().min_writer_version()
278-
),
279-
(table_info.min_reader_version, table_info.min_writer_version)
280-
);
269+
let ctx = IntegrationContext::new(Box::<LocalStorageIntegration>::default())?;
270+
ctx.load_table(TestTables::Simple).await?;
271+
272+
let store = ctx
273+
.table_builder(TestTables::Simple)
274+
.build_storage()?
275+
.object_store(None);
276+
let table = Table::try_from_uri("memory:///")?;
277+
let snapshot = LazySnapshot::try_new(table, store, None).await?;
278+
279+
let schema_string = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}"#;
280+
let expected: StructType = serde_json::from_str(schema_string)?;
281+
assert_eq!(snapshot.schema(), &expected);
282+
283+
let infos = snapshot.commit_infos(None, None)?.collect_vec();
284+
assert_eq!(infos.len(), 5);
285+
286+
let tombstones: Vec<_> = snapshot.tombstones()?.try_collect()?;
287+
let num_tombstones = tombstones.iter().map(|b| b.num_rows() as i64).sum::<i64>();
288+
assert_eq!(num_tombstones, 31);
289+
290+
let expected = vec![
291+
"part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet",
292+
"part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet",
293+
"part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet",
294+
"part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet",
295+
"part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet",
296+
];
297+
let file_names: Vec<_> = snapshot
298+
.logical_files_view(None)?
299+
.map_ok(|f| f.path().to_owned())
300+
.try_collect()?;
301+
assert_eq!(file_names, expected);
281302

282303
Ok(())
283304
}

crates/core/src/kernel/snapshot_next/mod.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -361,24 +361,13 @@ fn scan_as_log_data(
361361

362362
#[cfg(test)]
363363
mod tests {
364-
use std::{future::Future, path::PathBuf, pin::Pin};
364+
use std::{future::Future, pin::Pin};
365365

366366
use delta_kernel::Table;
367367
use deltalake_test::utils::*;
368368

369369
use super::*;
370370

371-
pub(super) fn get_dat_dir() -> PathBuf {
372-
let d = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
373-
let mut rep_root = d
374-
.parent()
375-
.and_then(|p| p.parent())
376-
.expect("valid directory")
377-
.to_path_buf();
378-
rep_root.push("dat/out/reader_tests/generated");
379-
rep_root
380-
}
381-
382371
fn get_lazy(
383372
ctx: &IntegrationContext,
384373
table: TestTables,

0 commit comments

Comments
 (0)