Skip to content

Commit 1befab9

Browse files
committed
fix: reading single latest version in cdf
Signed-off-by: Ion Koutsouris <[email protected]>
1 parent 8edc7e7 commit 1befab9

File tree

8 files changed

+107
-45
lines changed

8 files changed

+107
-45
lines changed

crates/core/src/errors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ pub enum DeltaTableError {
231231
#[error("Reading a table version: {version} that does not have change data enabled")]
232232
ChangeDataNotEnabled { version: i64 },
233233

234-
#[error("Invalid version start version {start} is greater than version {end}")]
234+
#[error("Invalid version. Start version {start} is greater than end version {end}")]
235235
ChangeDataInvalidVersionRange { start: i64, end: i64 },
236236

237237
#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]

crates/core/src/logstore/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,9 @@ pub async fn get_latest_version(
467467
// This implies no files were fetched during list_offset so either the starting_version is the latest
468468
// or starting_version is invalid, so we use current_version -1, and do one more try.
469469
if empty_stream {
470-
let obj_meta = object_store.head(&commit_uri_from_version(max_version)).await;
470+
let obj_meta = object_store
471+
.head(&commit_uri_from_version(max_version))
472+
.await;
471473
if obj_meta.is_err() {
472474
return Box::pin(get_latest_version(log_store, -1)).await;
473475
}
@@ -480,7 +482,6 @@ pub async fn get_latest_version(
480482
Ok(version)
481483
}
482484

483-
484485
/// Default implementation for retrieving the earliest version
485486
pub async fn get_earliest_version(
486487
log_store: &dyn LogStore,

crates/core/src/operations/load_cdf.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl CdfLoadBuilder {
164164
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
165165
};
166166
}
167-
if start >= latest_version {
167+
if start > latest_version {
168168
return if self.allow_out_of_range {
169169
Ok((change_files, add_files, remove_files))
170170
} else {
@@ -671,10 +671,10 @@ pub(crate) mod tests {
671671
.await;
672672

673673
assert!(table.is_err());
674-
assert!(matches!(
675-
table.unwrap_err(),
676-
DeltaTableError::InvalidVersion { .. }
677-
));
674+
assert!(table
675+
.unwrap_err()
676+
.to_string()
677+
.contains("Invalid version. Start version 5 is greater than end version 4"));
678678

679679
Ok(())
680680
}

crates/core/src/writer/record_batch.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,12 @@ mod tests {
503503

504504
#[tokio::test]
505505
async fn test_buffer_len_includes_unflushed_row_group() {
506+
let table_dir = tempfile::tempdir().unwrap();
507+
let table_path = table_dir.path().to_str().unwrap();
508+
506509
let batch = get_record_batch(None, false);
507510
let partition_cols = vec![];
508-
let table = create_initialized_table(&partition_cols).await;
511+
let table = create_initialized_table(table_path, &partition_cols).await;
509512
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
510513

511514
writer.write(batch).await.unwrap();
@@ -515,9 +518,12 @@ mod tests {
515518

516519
#[tokio::test]
517520
async fn test_divide_record_batch_no_partition() {
521+
let table_dir = tempfile::tempdir().unwrap();
522+
let table_path = table_dir.path().to_str().unwrap();
523+
518524
let batch = get_record_batch(None, false);
519525
let partition_cols = vec![];
520-
let table = create_initialized_table(&partition_cols).await;
526+
let table = create_initialized_table(table_path, &partition_cols).await;
521527
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
522528

523529
let partitions = writer.divide_by_partition_values(&batch).unwrap();
@@ -528,9 +534,12 @@ mod tests {
528534

529535
#[tokio::test]
530536
async fn test_divide_record_batch_single_partition() {
537+
let table_dir = tempfile::tempdir().unwrap();
538+
let table_path = table_dir.path().to_str().unwrap();
539+
531540
let batch = get_record_batch(None, false);
532541
let partition_cols = vec!["modified".to_string()];
533-
let table = create_initialized_table(&partition_cols).await;
542+
let table = create_initialized_table(table_path, &partition_cols).await;
534543
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
535544

536545
let partitions = writer.divide_by_partition_values(&batch).unwrap();
@@ -613,9 +622,11 @@ mod tests {
613622

614623
#[tokio::test]
615624
async fn test_divide_record_batch_multiple_partitions() {
625+
let table_dir = tempfile::tempdir().unwrap();
626+
let table_path = table_dir.path().to_str().unwrap();
616627
let batch = get_record_batch(None, false);
617628
let partition_cols = vec!["modified".to_string(), "id".to_string()];
618-
let table = create_initialized_table(&partition_cols).await;
629+
let table = create_initialized_table(table_path, &partition_cols).await;
619630
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
620631

621632
let partitions = writer.divide_by_partition_values(&batch).unwrap();
@@ -631,9 +642,11 @@ mod tests {
631642

632643
#[tokio::test]
633644
async fn test_write_no_partitions() {
645+
let table_dir = tempfile::tempdir().unwrap();
646+
let table_path = table_dir.path().to_str().unwrap();
634647
let batch = get_record_batch(None, false);
635648
let partition_cols = vec![];
636-
let table = create_initialized_table(&partition_cols).await;
649+
let table = create_initialized_table(table_path, &partition_cols).await;
637650
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
638651

639652
writer.write(batch).await.unwrap();
@@ -643,9 +656,11 @@ mod tests {
643656

644657
#[tokio::test]
645658
async fn test_write_multiple_partitions() {
659+
let table_dir = tempfile::tempdir().unwrap();
660+
let table_path = table_dir.path().to_str().unwrap();
646661
let batch = get_record_batch(None, false);
647662
let partition_cols = vec!["modified".to_string(), "id".to_string()];
648-
let table = create_initialized_table(&partition_cols).await;
663+
let table = create_initialized_table(table_path, &partition_cols).await;
649664
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
650665

651666
writer.write(batch).await.unwrap();
@@ -714,9 +729,12 @@ mod tests {
714729

715730
#[tokio::test]
716731
async fn test_write_mismatched_schema() {
732+
let table_dir = tempfile::tempdir().unwrap();
733+
let table_path = table_dir.path().to_str().unwrap();
734+
717735
let batch = get_record_batch(None, false);
718736
let partition_cols = vec![];
719-
let table = create_initialized_table(&partition_cols).await;
737+
let table = create_initialized_table(table_path, &partition_cols).await;
720738
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
721739

722740
// Write the first batch with the first schema to the table
@@ -895,9 +913,12 @@ mod tests {
895913

896914
#[tokio::test]
897915
async fn test_schema_evolution_column_type_mismatch() {
916+
let table_dir = tempfile::tempdir().unwrap();
917+
let table_path = table_dir.path().to_str().unwrap();
918+
898919
let batch = get_record_batch(None, false);
899920
let partition_cols = vec![];
900-
let mut table = create_initialized_table(&partition_cols).await;
921+
let mut table = create_initialized_table(table_path, &partition_cols).await;
901922

902923
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
903924

crates/core/src/writer/test_utils.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -290,13 +290,10 @@ pub fn create_bare_table() -> DeltaTable {
290290
.unwrap()
291291
}
292292

293-
pub async fn create_initialized_table(partition_cols: &[String]) -> DeltaTable {
293+
pub async fn create_initialized_table(table_path: &str, partition_cols: &[String]) -> DeltaTable {
294294
let table_schema: StructType = get_delta_schema();
295-
let table_dir = tempfile::tempdir().unwrap();
296-
let table_path = table_dir.path();
297-
298295
CreateBuilder::new()
299-
.with_location(table_path.to_str().unwrap())
296+
.with_location(table_path)
300297
.with_table_name("test-table")
301298
.with_comment("A table for running tests")
302299
.with_columns(table_schema.fields().cloned())

crates/core/tests/command_restore.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@ use std::fs;
1313
use std::sync::Arc;
1414
use std::thread;
1515
use std::time::Duration;
16-
use tempfile::TempDir;
1716

1817
#[derive(Debug)]
1918
struct Context {
20-
pub tmp_dir: TempDir,
2119
pub table: DeltaTable,
2220
}
2321

24-
async fn setup_test() -> Result<Context, Box<dyn Error>> {
22+
async fn setup_test(table_uri: &str) -> Result<Context, Box<dyn Error>> {
2523
let columns = vec![
2624
StructField::new(
2725
"id".to_string(),
@@ -34,9 +32,6 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
3432
true,
3533
),
3634
];
37-
38-
let tmp_dir = tempfile::tempdir().unwrap();
39-
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
4035
let table = DeltaOps::try_from_uri(table_uri)
4136
.await?
4237
.create()
@@ -65,7 +60,7 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
6560
.await
6661
.unwrap();
6762

68-
Ok(Context { tmp_dir, table })
63+
Ok(Context { table })
6964
}
7065

7166
fn get_record_batch() -> RecordBatch {
@@ -95,13 +90,16 @@ fn get_record_batch() -> RecordBatch {
9590

9691
#[tokio::test]
9792
async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {
98-
let context = setup_test().await?;
93+
let tmp_dir = tempfile::tempdir().unwrap();
94+
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
95+
96+
let context = setup_test(table_uri).await?;
9997
let table = context.table;
10098
let result = DeltaOps(table).restore().with_version_to_restore(1).await?;
10199
assert_eq!(result.1.num_restored_file, 1);
102100
assert_eq!(result.1.num_removed_file, 2);
103101
assert_eq!(result.0.snapshot()?.version(), 4);
104-
let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap();
102+
105103
let mut table = DeltaOps::try_from_uri(table_uri).await?;
106104
table.0.load_version(1).await?;
107105
let curr_files = table.0.snapshot()?.file_paths_iter().collect_vec();
@@ -118,7 +116,9 @@ async fn test_restore_by_version() -> Result<(), Box<dyn Error>> {
118116

119117
#[tokio::test]
120118
async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {
121-
let context = setup_test().await?;
119+
let tmp_dir = tempfile::tempdir().unwrap();
120+
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
121+
let context = setup_test(table_uri).await?;
122122
let table = context.table;
123123
let version = 1;
124124

@@ -142,7 +142,9 @@ async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {
142142

143143
#[tokio::test]
144144
async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
145-
let context = setup_test().await?;
145+
let tmp_dir = tempfile::tempdir().unwrap();
146+
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
147+
let context = setup_test(table_uri).await?;
146148
let table = context.table;
147149
let history = table.history(Some(10)).await?;
148150
let timestamp = history.get(1).unwrap().timestamp.unwrap();
@@ -157,7 +159,6 @@ async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
157159
assert!(result.is_err());
158160

159161
// version too large
160-
let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap();
161162
let ops = DeltaOps::try_from_uri(table_uri).await?;
162163
let result = ops.restore().with_version_to_restore(5).await;
163164
assert!(result.is_err());
@@ -166,10 +167,12 @@ async fn test_restore_with_error_params() -> Result<(), Box<dyn Error>> {
166167

167168
#[tokio::test]
168169
async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
169-
let context = setup_test().await?;
170+
let tmp_dir = tempfile::tempdir().unwrap();
171+
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
172+
let context = setup_test(table_uri).await?;
170173

171174
for file in context.table.snapshot()?.log_data() {
172-
let p = context.tmp_dir.path().join(file.path().as_ref());
175+
let p = tmp_dir.path().join(file.path().as_ref());
173176
fs::remove_file(p).unwrap();
174177
}
175178

@@ -179,7 +182,7 @@ async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
179182
.all_tombstones(context.table.object_store().clone())
180183
.await?
181184
{
182-
let p = context.tmp_dir.path().join(file.clone().path);
185+
let p = tmp_dir.path().join(file.clone().path);
183186
fs::remove_file(p).unwrap();
184187
}
185188

@@ -193,10 +196,12 @@ async fn test_restore_file_missing() -> Result<(), Box<dyn Error>> {
193196

194197
#[tokio::test]
195198
async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
196-
let context = setup_test().await?;
199+
let tmp_dir = tempfile::tempdir().unwrap();
200+
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
201+
let context = setup_test(table_uri).await?;
197202

198203
for file in context.table.snapshot()?.log_data() {
199-
let p = context.tmp_dir.path().join(file.path().as_ref());
204+
let p = tmp_dir.path().join(file.path().as_ref());
200205
fs::remove_file(p).unwrap();
201206
}
202207

@@ -206,7 +211,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
206211
.all_tombstones(context.table.object_store().clone())
207212
.await?
208213
{
209-
let p = context.tmp_dir.path().join(file.clone().path);
214+
let p = tmp_dir.path().join(file.clone().path);
210215
fs::remove_file(p).unwrap();
211216
}
212217

@@ -221,7 +226,9 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {
221226

222227
#[tokio::test]
223228
async fn test_restore_transaction_conflict() -> Result<(), Box<dyn Error>> {
224-
let context = setup_test().await?;
229+
let tmp_dir = tempfile::tempdir().unwrap();
230+
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
231+
let context = setup_test(table_uri).await?;
225232
let mut table = context.table;
226233
table.load_version(2).await?;
227234

crates/core/tests/integration_datafusion.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,9 +1184,11 @@ async fn simple_query(context: &IntegrationContext) -> TestResult {
11841184
}
11851185

11861186
mod date_partitions {
1187+
use tempfile::TempDir;
1188+
11871189
use super::*;
11881190

1189-
async fn setup_test() -> Result<DeltaTable, Box<dyn Error>> {
1191+
async fn setup_test(table_uri: &str) -> Result<DeltaTable, Box<dyn Error>> {
11901192
let columns = vec![
11911193
StructField::new(
11921194
"id".to_owned(),
@@ -1200,8 +1202,6 @@ mod date_partitions {
12001202
),
12011203
];
12021204

1203-
let tmp_dir = tempfile::tempdir().unwrap();
1204-
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
12051205
let dt = DeltaOps::try_from_uri(table_uri)
12061206
.await?
12071207
.create()
@@ -1238,7 +1238,9 @@ mod date_partitions {
12381238
#[tokio::test]
12391239
async fn test_issue_1445_date_partition() -> Result<()> {
12401240
let ctx = SessionContext::new();
1241-
let mut dt = setup_test().await.unwrap();
1241+
let tmp_dir = tempfile::tempdir().unwrap();
1242+
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
1243+
let mut dt = setup_test(table_uri).await.unwrap();
12421244
let mut writer = RecordBatchWriter::for_table(&dt)?;
12431245
write(
12441246
&mut writer,

0 commit comments

Comments
 (0)