Skip to content

Commit be039df

Browse files
committed
style: cargo fmt
CI rustfmt was failing on the branch (only whitespace / wrapping nits — no logic change). Applying `cargo fmt` so the build goes green.
1 parent 521e1b8 commit be039df

5 files changed

Lines changed: 75 additions & 81 deletions

File tree

examples/compare_rowid_against_duckdb.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
161161
.with_region("auto")
162162
.build()?,
163163
);
164-
runtime.register_object_store(
165-
&Url::parse(&format!("s3://{}/", cfg.s3_bucket))?,
166-
s3,
167-
);
164+
runtime.register_object_store(&Url::parse(&format!("s3://{}/", cfg.s3_bucket))?, s3);
168165

169166
let catalog = DuckLakeCatalog::new(provider)?.with_row_lineage(true);
170167
let cfg = SessionConfig::new().with_default_catalog_and_schema("dl", "main");
@@ -175,7 +172,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
175172
// ----- 3. Compare table by table -----
176173
let mut overall_ok = true;
177174
for &(schema, table, key, key_type, limit) in TEST_TABLES {
178-
print_header(&format!("Comparing {schema}.{table} (key={key}, limit={limit})"));
175+
print_header(&format!(
176+
"Comparing {schema}.{table} (key={key}, limit={limit})"
177+
));
179178
match compare_table(&duckdb_conn, &ctx, schema, table, key, key_type, limit).await {
180179
Ok(true) => println!("✅ rowid + {key} match between DuckDB and DataFusion"),
181180
Ok(false) => {
@@ -275,7 +274,10 @@ fn duckdb_query(
275274
} else {
276275
return Err("unsupported key type from DuckDB".into());
277276
};
278-
out.push(Row { rowid, key });
277+
out.push(Row {
278+
rowid,
279+
key,
280+
});
279281
}
280282
Ok(out)
281283
}
@@ -331,7 +333,10 @@ fn extract_rows(
331333
)
332334
.into());
333335
};
334-
out.push(Row { rowid, key });
336+
out.push(Row {
337+
rowid,
338+
key,
339+
});
335340
}
336341
}
337342
Ok(out)

examples/rowid_lifecycle.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,7 @@ async fn run_lifecycle(
138138
// Step 3: DELETE a single row → exercises DeleteFilterExec
139139
// -------------------------------------------------------------------
140140
header("Step 3: DELETE id=4");
141-
duckdb_conn.execute_batch(&format!(
142-
"DELETE FROM dl.main.{TABLE} WHERE id = 4;"
143-
))?;
141+
duckdb_conn.execute_batch(&format!("DELETE FROM dl.main.{TABLE} WHERE id = 4;"))?;
144142
all_passed &= compare_step("After DELETE id=4", duckdb_conn, runtime, cfg).await?;
145143

146144
// -------------------------------------------------------------------
@@ -263,7 +261,11 @@ fn duckdb_read(conn: &duckdb::Connection, sql: &str) -> Result<Vec<Row>, Box<dyn
263261
r.get::<_, Option<i64>>(1)?.unwrap_or(0)
264262
};
265263
let name: Option<String> = r.get(2)?;
266-
out.push(Row { rowid, id, name });
264+
out.push(Row {
265+
rowid,
266+
id,
267+
name,
268+
});
267269
}
268270
Ok(out)
269271
}
@@ -304,9 +306,17 @@ fn rows_from_batches(batches: &[RecordBatch]) -> Result<Vec<Row>, Box<dyn Error>
304306

305307
// id may come back as Int32 or Int64; coerce to i64.
306308
let id: i64 = if let Some(arr) = id_col.as_any().downcast_ref::<Int32Array>() {
307-
if arr.is_null(i) { 0 } else { arr.value(i) as i64 }
309+
if arr.is_null(i) {
310+
0
311+
} else {
312+
arr.value(i) as i64
313+
}
308314
} else if let Some(arr) = id_col.as_any().downcast_ref::<Int64Array>() {
309-
if arr.is_null(i) { 0 } else { arr.value(i) }
315+
if arr.is_null(i) {
316+
0
317+
} else {
318+
arr.value(i)
319+
}
310320
} else {
311321
return Err(format!("unexpected id type {:?}", id_col.data_type()).into());
312322
};
@@ -321,7 +331,11 @@ fn rows_from_batches(batches: &[RecordBatch]) -> Result<Vec<Row>, Box<dyn Error>
321331
return Err(format!("unexpected name type {:?}", name_col.data_type()).into());
322332
};
323333

324-
out.push(Row { rowid, id, name });
334+
out.push(Row {
335+
rowid,
336+
id,
337+
name,
338+
});
325339
}
326340
}
327341
Ok(out)

src/row_id.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,7 @@ mod tests {
263263
let b1 = small_batch(input_schema.clone(), &[10, 20, 30]);
264264
let b2 = small_batch(input_schema.clone(), &[40, 50]);
265265
let mem =
266-
MemorySourceConfig::try_new_exec(&[vec![b1, b2]], input_schema.clone(), None)
267-
.unwrap();
266+
MemorySourceConfig::try_new_exec(&[vec![b1, b2]], input_schema.clone(), None).unwrap();
268267

269268
let exec = Arc::new(RowIdExec::new(mem, Some(1000)));
270269
assert_eq!(exec.schema().field(1).name(), ROWID_COLUMN_NAME);
@@ -309,8 +308,7 @@ mod tests {
309308
)
310309
.unwrap();
311310
let mem =
312-
MemorySourceConfig::try_new_exec(&[vec![batch]], input_schema.clone(), None)
313-
.unwrap();
311+
MemorySourceConfig::try_new_exec(&[vec![batch]], input_schema.clone(), None).unwrap();
314312

315313
// Insert rowid at position 1 → schema should be [a, rowid, b]
316314
let exec = Arc::new(RowIdExec::new_at(mem, Some(500), 1));
@@ -364,8 +362,7 @@ mod tests {
364362
)
365363
.unwrap();
366364
let mem =
367-
MemorySourceConfig::try_new_exec(&[vec![batch]], input_schema.clone(), None)
368-
.unwrap();
365+
MemorySourceConfig::try_new_exec(&[vec![batch]], input_schema.clone(), None).unwrap();
369366

370367
let exec = Arc::new(RowIdExec::new(mem, Some(42)));
371368
assert_eq!(exec.schema().fields().len(), 1);
@@ -435,8 +432,7 @@ mod tests {
435432
// ends up appended at the end.
436433
let input_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
437434
let b = small_batch(input_schema.clone(), &[1, 2]);
438-
let mem =
439-
MemorySourceConfig::try_new_exec(&[vec![b]], input_schema.clone(), None).unwrap();
435+
let mem = MemorySourceConfig::try_new_exec(&[vec![b]], input_schema.clone(), None).unwrap();
440436

441437
let exec = Arc::new(RowIdExec::new_at(mem, Some(10), 99));
442438
assert_eq!(exec.schema().fields().len(), 2);
@@ -453,8 +449,7 @@ mod tests {
453449
// Distribution shape — just that it is SinglePartition for the
454450
// sole child.
455451
let input_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
456-
let mem =
457-
MemorySourceConfig::try_new_exec(&[vec![]], input_schema, None).unwrap();
452+
let mem = MemorySourceConfig::try_new_exec(&[vec![]], input_schema, None).unwrap();
458453
let exec = RowIdExec::new(mem, Some(0));
459454

460455
let dists = exec.required_input_distribution();
@@ -476,8 +471,7 @@ mod tests {
476471
async fn emits_null_when_row_id_start_is_none() {
477472
let input_schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
478473
let b = small_batch(input_schema.clone(), &[1, 2]);
479-
let mem =
480-
MemorySourceConfig::try_new_exec(&[vec![b]], input_schema.clone(), None).unwrap();
474+
let mem = MemorySourceConfig::try_new_exec(&[vec![b]], input_schema.clone(), None).unwrap();
481475

482476
let exec = Arc::new(RowIdExec::new(mem, None));
483477
let ctx = Arc::new(TaskContext::default());

src/table.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ use crate::metadata_provider::{
1111
DuckLakeFileData, DuckLakeTableColumn, DuckLakeTableFile, MetadataProvider,
1212
};
1313
use crate::path_resolver::resolve_path;
14-
use crate::row_id::{
15-
ROW_ID_PARQUET_FIELD_ID, ROWID_COLUMN_NAME, RowIdExec, rowid_field,
16-
};
14+
use crate::row_id::{ROW_ID_PARQUET_FIELD_ID, ROWID_COLUMN_NAME, RowIdExec, rowid_field};
1715
use crate::types::{
1816
build_arrow_schema, build_read_schema_with_field_id_mapping, extract_parquet_field_ids,
1917
};
@@ -259,7 +257,8 @@ impl DuckLakeTable {
259257

260258
/// Index of the synthetic `rowid` column in `self.schema`, when enabled.
261259
fn rowid_index(&self) -> Option<usize> {
262-
self.row_lineage.then(|| self.physical_schema.fields().len())
260+
self.row_lineage
261+
.then(|| self.physical_schema.fields().len())
263262
}
264263

265264
/// Resolve a file path (data or delete file) to its absolute path
@@ -618,16 +617,14 @@ impl DuckLakeTable {
618617
if !key.is_empty() {
619618
let key_bytes = crate::encryption::DuckLakeEncryptionFactory::decode_key(key)?;
620619
let decryption_props =
621-
parquet::encryption::decrypt::FileDecryptionProperties::builder(
622-
key_bytes,
623-
)
624-
.build()
625-
.map_err(|e| {
626-
DataFusionError::Execution(format!(
627-
"Failed to create decryption properties: {}",
628-
e
629-
))
630-
})?;
620+
parquet::encryption::decrypt::FileDecryptionProperties::builder(key_bytes)
621+
.build()
622+
.map_err(|e| {
623+
DataFusionError::Execution(format!(
624+
"Failed to create decryption properties: {}",
625+
e
626+
))
627+
})?;
631628
ArrowReaderOptions::new().with_file_decryption_properties(decryption_props)
632629
} else {
633630
ArrowReaderOptions::new()
@@ -687,9 +684,7 @@ impl DuckLakeTable {
687684

688685
{
689686
let mut cache = self.file_read_config_cache.lock().unwrap();
690-
cache
691-
.entry(resolved_path)
692-
.or_insert_with(|| cfg.clone());
687+
cache.entry(resolved_path).or_insert_with(|| cfg.clone());
693688
}
694689

695690
Ok(cfg)
@@ -720,14 +715,15 @@ impl DuckLakeTable {
720715
.filter(|&&i| i != rowid_idx)
721716
.copied()
722717
.collect();
723-
let rowid_insert_pos: usize = user_proj
724-
.iter()
725-
.position(|&i| i == rowid_idx)
726-
.ok_or_else(|| {
727-
DataFusionError::Internal(
728-
"build_exec_for_file_with_rowid called without rowid in projection".into(),
729-
)
730-
})?;
718+
let rowid_insert_pos: usize =
719+
user_proj
720+
.iter()
721+
.position(|&i| i == rowid_idx)
722+
.ok_or_else(|| {
723+
DataFusionError::Internal(
724+
"build_exec_for_file_with_rowid called without rowid in projection".into(),
725+
)
726+
})?;
731727

732728
// Match the C++ extension: if the file embeds no rowid column AND the
733729
// catalog didn't record a `row_id_start`, lineage cannot be

tests/row_id_tests.rs

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,9 @@ fn create_catalog_rowid_two_files(catalog_path: &Path) -> Result<()> {
3232

3333
conn.execute("CREATE TABLE c.t(i INTEGER);", [])?;
3434
// First file: rows 0..3
35-
conn.execute(
36-
"INSERT INTO c.t SELECT i FROM range(0, 3) t(i);",
37-
[],
38-
)?;
35+
conn.execute("INSERT INTO c.t SELECT i FROM range(0, 3) t(i);", [])?;
3936
// Second file: rows 10..15
40-
conn.execute(
41-
"INSERT INTO c.t SELECT i FROM range(10, 15) t(i);",
42-
[],
43-
)?;
37+
conn.execute("INSERT INTO c.t SELECT i FROM range(10, 15) t(i);", [])?;
4438
Ok(())
4539
}
4640

@@ -131,8 +125,8 @@ fn collect_rowid_i_sorted(batches: &[RecordBatch]) -> Vec<(i64, i32)> {
131125

132126
#[tokio::test]
133127
async fn rowid_disabled_by_default() -> DataFusionResult<()> {
134-
let temp = TempDir::new()
135-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
128+
let temp =
129+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
136130
let path = temp.path().join("rowid_default.ducklake");
137131
create_catalog_rowid_two_files(&path).map_err(common::to_datafusion_error)?;
138132

@@ -168,8 +162,8 @@ async fn rowid_disabled_by_default() -> DataFusionResult<()> {
168162

169163
#[tokio::test]
170164
async fn rowid_sequential_across_files() -> DataFusionResult<()> {
171-
let temp = TempDir::new()
172-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
165+
let temp =
166+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
173167
let path = temp.path().join("rowid_two_files.ducklake");
174168
create_catalog_rowid_two_files(&path).map_err(common::to_datafusion_error)?;
175169

@@ -194,16 +188,7 @@ async fn rowid_sequential_across_files() -> DataFusionResult<()> {
194188
let pairs = collect_rowid_i_sorted(&batches);
195189
assert_eq!(
196190
pairs,
197-
vec![
198-
(0, 0),
199-
(1, 1),
200-
(2, 2),
201-
(3, 10),
202-
(4, 11),
203-
(5, 12),
204-
(6, 13),
205-
(7, 14),
206-
],
191+
vec![(0, 0), (1, 1), (2, 2), (3, 10), (4, 11), (5, 12), (6, 13), (7, 14),],
207192
"rowids should be contiguous 0..8 across the two files",
208193
);
209194

@@ -220,8 +205,8 @@ async fn rowid_sequential_across_files() -> DataFusionResult<()> {
220205

221206
#[tokio::test]
222207
async fn rowid_preserved_under_deletes() -> DataFusionResult<()> {
223-
let temp = TempDir::new()
224-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
208+
let temp =
209+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
225210
let path = temp.path().join("rowid_with_deletes.ducklake");
226211
create_catalog_rowid_with_deletes(&path).map_err(common::to_datafusion_error)?;
227212

@@ -250,8 +235,8 @@ async fn rowid_preserved_under_deletes() -> DataFusionResult<()> {
250235
async fn rowid_only_projection() -> DataFusionResult<()> {
251236
// Edge case: physical projection is empty (just rowid). Verifies that
252237
// RowIdExec works when ParquetExec emits zero-column count batches.
253-
let temp = TempDir::new()
254-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
238+
let temp =
239+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
255240
let path = temp.path().join("rowid_only.ducklake");
256241
create_catalog_rowid_two_files(&path).map_err(common::to_datafusion_error)?;
257242

@@ -283,8 +268,8 @@ async fn rowid_preserved_across_update_rewrite() -> DataFusionResult<()> {
283268
// The critical test: UPDATE rewrites the file with embedded rowids.
284269
// Our scan must read those embedded values rather than compute
285270
// `row_id_start + position`, otherwise the rowids drift.
286-
let temp = TempDir::new()
287-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
271+
let temp =
272+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
288273
let path = temp.path().join("rowid_update.ducklake");
289274
create_catalog_rowid_with_update(&path).map_err(common::to_datafusion_error)?;
290275

@@ -319,8 +304,8 @@ async fn rowid_preserved_across_update_rewrite() -> DataFusionResult<()> {
319304
#[tokio::test]
320305
async fn rowid_count_star_unaffected() -> DataFusionResult<()> {
321306
// COUNT(*) should not trigger the rowid path (it asks for zero columns).
322-
let temp = TempDir::new()
323-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
307+
let temp =
308+
TempDir::new().map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
324309
let path = temp.path().join("rowid_count_star.ducklake");
325310
create_catalog_rowid_two_files(&path).map_err(common::to_datafusion_error)?;
326311

0 commit comments

Comments
 (0)