Skip to content

Commit 2e0e564

Browse files
chore(multicatalog): cargo fmt
1 parent bcfecf2 commit 2e0e564

7 files changed

Lines changed: 165 additions & 136 deletions

examples/multicatalog_write.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6060
let mgr = MulticatalogManager::new(pool.clone());
6161
let cat_pg = mgr.create_catalog("pg_prod").await?;
6262
let cat_mysql = mgr.create_catalog("mysql_prod").await?;
63-
println!("✓ catalogs: pg_prod -> {}, mysql_prod -> {}", cat_pg, cat_mysql);
63+
println!(
64+
"✓ catalogs: pg_prod -> {}, mysql_prod -> {}",
65+
cat_pg, cat_mysql
66+
);
6467

6568
// ── Step 3: write through each catalog ────────────────────────────────────
6669
let object_store: Arc<dyn object_store::ObjectStore> = Arc::new(LocalFileSystem::new());
@@ -88,8 +91,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8891
);
8992

9093
// mysql_prod.public.orders
91-
let writer_mysql =
92-
Arc::new(PostgresMetadataWriter::with_pool(pool.clone(), cat_mysql).await?);
94+
let writer_mysql = Arc::new(PostgresMetadataWriter::with_pool(pool.clone(), cat_mysql).await?);
9395
let orders_batch = build_orders_batch();
9496
let tw_mysql = DuckLakeTableWriter::new(writer_mysql.clone(), Arc::clone(&object_store))?;
9597
let orders_result = tw_mysql
@@ -219,7 +221,11 @@ async fn read_via_multicatalog(
219221
continue;
220222
}
221223
let schema = cat.schema(&schema_name).unwrap();
222-
println!(" schema {} -> tables {:?}", schema_name, schema.table_names());
224+
println!(
225+
" schema {} -> tables {:?}",
226+
schema_name,
227+
schema.table_names()
228+
);
223229
}
224230
}
225231

@@ -317,11 +323,12 @@ async fn dump_query(
317323
.map(|c| sqlx::Column::name(c).to_string())
318324
.collect();
319325
println!(" {}", header.join(" | "));
320-
println!(" {}", "-".repeat(header.iter().map(|s| s.len()).sum::<usize>() + header.len() * 3));
326+
println!(
327+
" {}",
328+
"-".repeat(header.iter().map(|s| s.len()).sum::<usize>() + header.len() * 3)
329+
);
321330
for row in &rows {
322-
let cols: Vec<String> = (0..row.len())
323-
.map(|i| format_col(row, i))
324-
.collect();
331+
let cols: Vec<String> = (0..row.len()).map(|i| format_col(row, i)).collect();
325332
println!(" {}", cols.join(" | "));
326333
}
327334
Ok(())

src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ pub mod metadata_provider_sqlite;
6666
pub mod insert_exec;
6767
#[cfg(feature = "write")]
6868
pub mod metadata_writer;
69-
#[cfg(feature = "write-sqlite")]
70-
pub mod metadata_writer_sqlite;
7169
#[cfg(feature = "write-postgres")]
7270
pub mod metadata_writer_postgres;
71+
#[cfg(feature = "write-sqlite")]
72+
pub mod metadata_writer_sqlite;
7373
#[cfg(feature = "write-postgres")]
7474
pub mod multicatalog;
7575
#[cfg(feature = "multicatalog-postgres")]
@@ -105,10 +105,10 @@ pub use insert_exec::DuckLakeInsertExec;
105105
pub use metadata_writer::{
106106
ColumnDef, DataFileInfo, MetadataWriter, WriteMode, WriteResult, WriteSetupResult,
107107
};
108-
#[cfg(feature = "write-sqlite")]
109-
pub use metadata_writer_sqlite::SqliteMetadataWriter;
110108
#[cfg(feature = "write-postgres")]
111109
pub use metadata_writer_postgres::PostgresMetadataWriter;
110+
#[cfg(feature = "write-sqlite")]
111+
pub use metadata_writer_sqlite::SqliteMetadataWriter;
112112
#[cfg(feature = "write-postgres")]
113113
pub use multicatalog::{MulticatalogManager, initialize_multicatalog_schema};
114114
#[cfg(feature = "multicatalog-postgres")]

src/metadata_writer_postgres.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -500,12 +500,11 @@ impl MetadataWriter for PostgresMetadataWriter {
500500

501501
fn get_data_path(&self) -> Result<String> {
502502
block_on(async {
503-
let row = sqlx::query(
504-
"SELECT value FROM ducklake_metadata WHERE key = $1 AND scope IS NULL",
505-
)
506-
.bind("data_path")
507-
.fetch_optional(&self.pool)
508-
.await?;
503+
let row =
504+
sqlx::query("SELECT value FROM ducklake_metadata WHERE key = $1 AND scope IS NULL")
505+
.bind("data_path")
506+
.fetch_optional(&self.pool)
507+
.await?;
509508

510509
match row {
511510
Some(r) => Ok(r.try_get(0)?),
@@ -846,10 +845,8 @@ mod tests {
846845

847846
#[test]
848847
fn test_columns_differ_identical() {
849-
let existing = vec![
850-
("id".into(), "int64".into(), false),
851-
("name".into(), "varchar".into(), true),
852-
];
848+
let existing =
849+
vec![("id".into(), "int64".into(), false), ("name".into(), "varchar".into(), true)];
853850
let proposed = vec![
854851
ColumnDef::new("id", "int64", false).unwrap(),
855852
ColumnDef::new("name", "varchar", true).unwrap(),

src/multicatalog.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,16 @@ use sqlx::postgres::PgPool;
1010

1111
/// Bootstrap standard + multicatalog tables. Idempotent.
1212
pub async fn initialize_multicatalog_schema(pool: &PgPool) -> Result<()> {
13-
execute_ddl_statements(pool, crate::metadata_writer_postgres::SQL_CREATE_STANDARD_TABLES)
14-
.await?;
15-
execute_ddl_statements(pool, crate::metadata_writer_postgres::SQL_CREATE_MULTICATALOG_TABLES)
16-
.await?;
13+
execute_ddl_statements(
14+
pool,
15+
crate::metadata_writer_postgres::SQL_CREATE_STANDARD_TABLES,
16+
)
17+
.await?;
18+
execute_ddl_statements(
19+
pool,
20+
crate::metadata_writer_postgres::SQL_CREATE_MULTICATALOG_TABLES,
21+
)
22+
.await?;
1723
Ok(())
1824
}
1925

tests/multicatalog_hardening_tests.rs

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
1313
use std::sync::Arc;
1414

15-
use datafusion_ducklake::metadata_writer::{
16-
ColumnDef, DataFileInfo, MetadataWriter, WriteMode,
17-
};
15+
use datafusion_ducklake::metadata_writer::{ColumnDef, DataFileInfo, MetadataWriter, WriteMode};
1816
use datafusion_ducklake::{
1917
MulticatalogManager, PostgresMetadataWriter, initialize_multicatalog_schema,
2018
};
@@ -85,7 +83,10 @@ async fn concurrent_get_or_create_schema_no_duplicates() {
8583
"all callers must see the same schema_id, got {:?}",
8684
ids
8785
);
88-
assert_eq!(created_count, 1, "exactly one writer should report was_created");
86+
assert_eq!(
87+
created_count, 1,
88+
"exactly one writer should report was_created"
89+
);
8990

9091
// And the catalog only has one schema row.
9192
let n: i64 = sqlx::query(
@@ -114,7 +115,9 @@ async fn concurrent_get_or_create_table_no_duplicates() {
114115
.unwrap(),
115116
);
116117
let snapshot_id = writer.create_snapshot().unwrap();
117-
let (schema_id, _) = writer.get_or_create_schema("public", None, snapshot_id).unwrap();
118+
let (schema_id, _) = writer
119+
.get_or_create_schema("public", None, snapshot_id)
120+
.unwrap();
118121

119122
let mut handles = Vec::new();
120123
for _ in 0..10 {
@@ -199,8 +202,12 @@ async fn seed_two_catalogs(pool: &PgPool) -> (i64, i64, i64, i64) {
199202
let mgr = MulticatalogManager::new(pool.clone());
200203
let cat_a = mgr.create_catalog("cat_a").await.unwrap();
201204
let cat_b = mgr.create_catalog("cat_b").await.unwrap();
202-
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a).await.unwrap();
203-
let wb = PostgresMetadataWriter::with_pool(pool.clone(), cat_b).await.unwrap();
205+
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a)
206+
.await
207+
.unwrap();
208+
let wb = PostgresMetadataWriter::with_pool(pool.clone(), cat_b)
209+
.await
210+
.unwrap();
204211
wa.set_data_path("/data").unwrap();
205212

206213
let setup_a = wa
@@ -217,7 +224,9 @@ async fn seed_two_catalogs(pool: &PgPool) -> (i64, i64, i64, i64) {
217224
async fn register_data_file_rejects_cross_catalog_table_id() {
218225
let (pool, _c) = spin_up_postgres().await.unwrap();
219226
let (cat_a, _cat_b, _table_a, table_b) = seed_two_catalogs(&pool).await;
220-
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a).await.unwrap();
227+
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a)
228+
.await
229+
.unwrap();
221230
let snap = wa.create_snapshot().unwrap();
222231

223232
let result = wa.register_data_file(
@@ -233,13 +242,12 @@ async fn register_data_file_rejects_cross_catalog_table_id() {
233242
);
234243

235244
// And no row was inserted.
236-
let n: i64 =
237-
sqlx::query("SELECT COUNT(*) FROM ducklake_data_file WHERE path = 'evil.parquet'")
238-
.fetch_one(&pool)
239-
.await
240-
.unwrap()
241-
.try_get(0)
242-
.unwrap();
245+
let n: i64 = sqlx::query("SELECT COUNT(*) FROM ducklake_data_file WHERE path = 'evil.parquet'")
246+
.fetch_one(&pool)
247+
.await
248+
.unwrap()
249+
.try_get(0)
250+
.unwrap();
243251
assert_eq!(n, 0);
244252
}
245253

@@ -248,7 +256,9 @@ async fn register_data_file_rejects_cross_catalog_table_id() {
248256
async fn end_table_files_rejects_cross_catalog_table_id() {
249257
let (pool, _c) = spin_up_postgres().await.unwrap();
250258
let (cat_a, _cat_b, _table_a, table_b) = seed_two_catalogs(&pool).await;
251-
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a).await.unwrap();
259+
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a)
260+
.await
261+
.unwrap();
252262
let snap = wa.create_snapshot().unwrap();
253263

254264
let result = wa.end_table_files(table_b, snap);
@@ -265,7 +275,9 @@ async fn end_table_files_rejects_cross_catalog_table_id() {
265275
async fn set_columns_rejects_cross_catalog_table_id() {
266276
let (pool, _c) = spin_up_postgres().await.unwrap();
267277
let (cat_a, _cat_b, _table_a, table_b) = seed_two_catalogs(&pool).await;
268-
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a).await.unwrap();
278+
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a)
279+
.await
280+
.unwrap();
269281
let snap = wa.create_snapshot().unwrap();
270282

271283
let result = wa.set_columns(table_b, &users_cols(), snap);
@@ -284,8 +296,12 @@ async fn get_or_create_table_rejects_cross_catalog_schema_id() {
284296
let mgr = MulticatalogManager::new(pool.clone());
285297
let cat_a = mgr.create_catalog("cat_a").await.unwrap();
286298
let cat_b = mgr.create_catalog("cat_b").await.unwrap();
287-
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a).await.unwrap();
288-
let wb = PostgresMetadataWriter::with_pool(pool.clone(), cat_b).await.unwrap();
299+
let wa = PostgresMetadataWriter::with_pool(pool.clone(), cat_a)
300+
.await
301+
.unwrap();
302+
let wb = PostgresMetadataWriter::with_pool(pool.clone(), cat_b)
303+
.await
304+
.unwrap();
289305
wa.set_data_path("/data").unwrap();
290306

291307
// Set up a schema in each catalog.
@@ -322,12 +338,10 @@ async fn set_data_path_rejects_silent_overwrite() {
322338
// Same value is idempotent.
323339
w.set_data_path("/data/a").unwrap();
324340
// Different value rejected.
325-
let err = w.set_data_path("/data/b").expect_err("must reject overwrite");
326-
assert!(
327-
err.to_string().contains("already set"),
328-
"got: {}",
329-
err
330-
);
341+
let err = w
342+
.set_data_path("/data/b")
343+
.expect_err("must reject overwrite");
344+
assert!(err.to_string().contains("already set"), "got: {}", err);
331345
// Original value is untouched.
332346
assert_eq!(w.get_data_path().unwrap(), "/data/a");
333347
}
@@ -368,15 +382,23 @@ async fn rollback_leaves_no_orphan_rows() {
368382
ColumnDef::new("name", "varchar", true).unwrap(),
369383
];
370384
let snaps_before_fail: i64 = sqlx::query("SELECT COUNT(*) FROM ducklake_snapshot")
371-
.fetch_one(&pool).await.unwrap().try_get(0).unwrap();
372-
let err =
373-
w.begin_write_transaction("public", "users", &bad_cols, WriteMode::Append)
374-
.expect_err("incompatible type change must fail");
385+
.fetch_one(&pool)
386+
.await
387+
.unwrap()
388+
.try_get(0)
389+
.unwrap();
390+
let err = w
391+
.begin_write_transaction("public", "users", &bad_cols, WriteMode::Append)
392+
.expect_err("incompatible type change must fail");
375393
assert!(err.to_string().contains("Schema evolution error"));
376394

377395
// The failed call must not have left a snapshot behind.
378396
let snaps_after_fail: i64 = sqlx::query("SELECT COUNT(*) FROM ducklake_snapshot")
379-
.fetch_one(&pool).await.unwrap().try_get(0).unwrap();
397+
.fetch_one(&pool)
398+
.await
399+
.unwrap()
400+
.try_get(0)
401+
.unwrap();
380402
assert_eq!(
381403
snaps_after_fail, snaps_before_fail,
382404
"rollback should leave snapshot count unchanged"
@@ -458,15 +480,12 @@ async fn unknown_catalog_id_errors_clearly_on_lock() {
458480
// but the map insert will succeed because there's no FK to ducklake_catalog.
459481
// The lock-taking methods are the ones that must reject. begin_write_transaction
460482
// takes the lock first:
461-
let result = bogus.begin_write_transaction(
462-
"public",
463-
"users",
464-
&users_cols(),
465-
WriteMode::Replace,
466-
);
483+
let result =
484+
bogus.begin_write_transaction("public", "users", &users_cols(), WriteMode::Replace);
467485
let err = result.expect_err("bogus catalog_id should error");
468486
assert!(
469-
err.to_string().contains("999999") || err.to_string().contains("not found")
487+
err.to_string().contains("999999")
488+
|| err.to_string().contains("not found")
470489
|| err.to_string().to_lowercase().contains("catalog"),
471490
"expected catalog-related error, got: {}",
472491
err

0 commit comments

Comments
 (0)