Skip to content

Commit f7cf1c9

Browse files
feat(multicatalog): Phase 1 Postgres multi-catalog support
Add per-tenant catalog isolation for the RuntimeDB use case. Multiple catalogs share one metadata database; each has independent snapshots, schemas, and a dense per-catalog schema_version sequence. New public types: - MulticatalogManager: create/list catalogs - PostgresMetadataWriter: implements MetadataWriter, bound to catalog_id - MulticatalogProvider: implements MetadataProvider, bound to catalog_id - initialize_multicatalog_schema: idempotent DDL bootstrap New tables: - ducklake_catalog (name registry) - ducklake_catalog_snapshot_map, ducklake_catalog_schema_map - ducklake_schema_versions - schema_version column on ducklake_snapshot Correctness: - Per-catalog FOR UPDATE serializes concurrent writers (30s lock_timeout) - Cross-catalog table_id/schema_id rejected at write time - Partial unique index on (schema_id, table_name) WHERE end_snapshot IS NULL - set_data_path rejects silent overwrites Tests (testcontainers Postgres): - 8 writer + 10 reader + 11 hardening integration tests - 7 unit tests for DDL/DML classification Existing single-catalog providers and writers untouched. Refs #107 Phase 1.
1 parent 536729a commit f7cf1c9

9 files changed

Lines changed: 3351 additions & 1 deletion

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,7 @@ encryption = ["parquet/encryption", "datafusion/parquet_encryption", "dep:base64
7171

7272
# Write support
7373
write = ["dep:uuid"]
74-
write-sqlite = ["write", "metadata-sqlite"]
74+
write-sqlite = ["write", "metadata-sqlite"]
75+
write-postgres = ["write", "metadata-postgres", "multicatalog-postgres"]
76+
# Multicatalog read provider (independent of write support)
77+
multicatalog-postgres = ["metadata-postgres"]

examples/multicatalog_write.rs

Lines changed: 380 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
//! End-to-end demo of the multicatalog Postgres writer.
2+
//!
3+
//! Walks through:
4+
//! 1. Bootstrapping the catalog tables in Postgres
5+
//! 2. Creating two catalogs ("pg_prod", "mysql_prod")
6+
//! 3. Writing real Parquet data through each catalog via DuckLakeTableWriter
7+
//! 4. Dumping the resulting catalog rows so you can see multi-catalog isolation
8+
//! 5. Reading the Parquet files back through DataFusion to prove the data is real
9+
//!
10+
//! Usage:
11+
//! cargo run --example multicatalog_write --no-default-features \
12+
//! --features write-postgres,metadata-postgres -- <POSTGRES_URL> <DATA_DIR>
13+
//!
14+
//! Example:
15+
//! cargo run --example multicatalog_write --no-default-features \
16+
//! --features write-postgres,metadata-postgres -- \
17+
//! "postgresql://postgres:postgres@127.0.0.1:55432/postgres" /tmp/ducklake-mc
18+
19+
use std::sync::Arc;
20+
21+
use arrow::array::{Float64Array, Int32Array, Int64Array, StringArray};
22+
use arrow::datatypes::{DataType, Field, Schema};
23+
use arrow::record_batch::RecordBatch;
24+
use datafusion::execution::runtime_env::RuntimeEnv;
25+
use datafusion::prelude::*;
26+
use datafusion_ducklake::{
27+
DuckLakeCatalog, DuckLakeTableWriter, MetadataProvider, MetadataWriter, MulticatalogManager,
28+
MulticatalogProvider, PostgresMetadataWriter, initialize_multicatalog_schema,
29+
};
30+
use object_store::local::LocalFileSystem;
31+
use sqlx::Row;
32+
use sqlx::postgres::PgPoolOptions;
33+
34+
#[tokio::main]
35+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
36+
let args: Vec<String> = std::env::args().collect();
37+
if args.len() < 3 {
38+
eprintln!("Usage: {} <POSTGRES_URL> <DATA_DIR>", args[0]);
39+
std::process::exit(1);
40+
}
41+
let pg_url = &args[1];
42+
let data_dir = std::path::PathBuf::from(&args[2]);
43+
std::fs::create_dir_all(&data_dir)?;
44+
let data_dir_str = data_dir.canonicalize()?.to_string_lossy().to_string();
45+
46+
println!("== Multicatalog Postgres writer demo ==");
47+
println!("postgres : {}", pg_url);
48+
println!("data dir : {}", data_dir_str);
49+
println!();
50+
51+
// ── Step 1: connect + bootstrap schema ────────────────────────────────────
52+
let pool = PgPoolOptions::new()
53+
.max_connections(5)
54+
.connect(pg_url)
55+
.await?;
56+
initialize_multicatalog_schema(&pool).await?;
57+
println!("✓ schema bootstrapped");
58+
59+
// ── Step 2: create catalogs ───────────────────────────────────────────────
60+
let mgr = MulticatalogManager::new(pool.clone());
61+
let cat_pg = mgr.create_catalog("pg_prod").await?;
62+
let cat_mysql = mgr.create_catalog("mysql_prod").await?;
63+
println!("✓ catalogs: pg_prod -> {}, mysql_prod -> {}", cat_pg, cat_mysql);
64+
65+
// ── Step 3: write through each catalog ────────────────────────────────────
66+
let object_store: Arc<dyn object_store::ObjectStore> = Arc::new(LocalFileSystem::new());
67+
68+
// pg_prod.public.users
69+
let writer_pg = Arc::new(PostgresMetadataWriter::with_pool(pool.clone(), cat_pg).await?);
70+
writer_pg.set_data_path(&data_dir_str)?;
71+
let users_batch = build_users_batch();
72+
let tw_pg = DuckLakeTableWriter::new(writer_pg.clone(), Arc::clone(&object_store))?;
73+
let users_result = tw_pg
74+
.write_table("public", "users", std::slice::from_ref(&users_batch))
75+
.await?;
76+
println!(
77+
"✓ wrote pg_prod.public.users — snapshot {}, file count {}, rows {}",
78+
users_result.snapshot_id, users_result.files_written, users_result.records_written
79+
);
80+
81+
// pg_prod.public.users again (DML, same schema) — should carry forward schema_version.
82+
let users_dml = tw_pg
83+
.write_table("public", "users", std::slice::from_ref(&users_batch))
84+
.await?;
85+
println!(
86+
"✓ wrote pg_prod.public.users AGAIN (DML) — snapshot {}",
87+
users_dml.snapshot_id
88+
);
89+
90+
// mysql_prod.public.orders
91+
let writer_mysql =
92+
Arc::new(PostgresMetadataWriter::with_pool(pool.clone(), cat_mysql).await?);
93+
let orders_batch = build_orders_batch();
94+
let tw_mysql = DuckLakeTableWriter::new(writer_mysql.clone(), Arc::clone(&object_store))?;
95+
let orders_result = tw_mysql
96+
.write_table("public", "orders", std::slice::from_ref(&orders_batch))
97+
.await?;
98+
println!(
99+
"✓ wrote mysql_prod.public.orders — snapshot {}, file count {}, rows {}",
100+
orders_result.snapshot_id, orders_result.files_written, orders_result.records_written
101+
);
102+
103+
// pg_prod.public.users with an added column — DDL, schema_version bumps.
104+
let users_v2_batch = build_users_v2_batch();
105+
let users_v2 = tw_pg
106+
.write_table("public", "users", std::slice::from_ref(&users_v2_batch))
107+
.await?;
108+
println!(
109+
"✓ wrote pg_prod.public.users WITH age column (DDL) — snapshot {}",
110+
users_v2.snapshot_id
111+
);
112+
113+
// ── Step 4: dump catalog state ────────────────────────────────────────────
114+
println!();
115+
println!("== Catalog state ==");
116+
dump_query(
117+
&pool,
118+
"ducklake_catalog",
119+
"SELECT catalog_id, catalog_name FROM ducklake_catalog ORDER BY catalog_id",
120+
)
121+
.await?;
122+
dump_query(
123+
&pool,
124+
"ducklake_catalog_snapshot_map",
125+
"SELECT catalog_id, snapshot_id FROM ducklake_catalog_snapshot_map ORDER BY catalog_id, snapshot_id",
126+
)
127+
.await?;
128+
dump_query(
129+
&pool,
130+
"ducklake_catalog_schema_map",
131+
"SELECT catalog_id, schema_id FROM ducklake_catalog_schema_map ORDER BY catalog_id",
132+
)
133+
.await?;
134+
dump_query(
135+
&pool,
136+
"ducklake_snapshot",
137+
"SELECT snapshot_id, schema_version FROM ducklake_snapshot ORDER BY snapshot_id",
138+
)
139+
.await?;
140+
dump_query(
141+
&pool,
142+
"ducklake_schema",
143+
"SELECT schema_id, schema_name, path, begin_snapshot, end_snapshot FROM ducklake_schema ORDER BY schema_id",
144+
)
145+
.await?;
146+
dump_query(
147+
&pool,
148+
"ducklake_table",
149+
"SELECT table_id, schema_id, table_name, begin_snapshot, end_snapshot FROM ducklake_table ORDER BY table_id",
150+
)
151+
.await?;
152+
dump_query(
153+
&pool,
154+
"ducklake_schema_versions",
155+
"SELECT begin_snapshot, schema_version, table_id FROM ducklake_schema_versions ORDER BY begin_snapshot",
156+
)
157+
.await?;
158+
dump_query(
159+
&pool,
160+
"ducklake_data_file",
161+
"SELECT data_file_id, table_id, path, record_count, begin_snapshot, end_snapshot FROM ducklake_data_file ORDER BY data_file_id",
162+
)
163+
.await?;
164+
165+
// ── Step 5: read back through the catalog layer ───────────────────────────
166+
println!();
167+
println!("== Reading via MulticatalogProvider + DuckLakeCatalog ==");
168+
println!();
169+
170+
// Each catalog gets its own MulticatalogProvider and SessionContext, mimicking
171+
// how RuntimeDB would isolate per-tenant connections.
172+
read_via_multicatalog(&pool, "pg_prod", "users", "SELECT * FROM users ORDER BY id").await?;
173+
read_via_multicatalog(
174+
&pool,
175+
"mysql_prod",
176+
"orders",
177+
"SELECT * FROM orders ORDER BY order_id",
178+
)
179+
.await?;
180+
181+
// Cross-check: pg_prod's catalog should NOT see "orders" (lives in mysql_prod).
182+
println!("\n -- cross-catalog leakage check (pg_prod must NOT see 'orders') --");
183+
let cross = pg_prod_sees_orders(&pool).await?;
184+
if cross {
185+
println!(" LEAK! pg_prod can see mysql_prod's table");
186+
} else {
187+
println!(" ✓ pg_prod cannot see mysql_prod.orders — isolation works");
188+
}
189+
190+
println!("\n✓ end-to-end demo complete");
191+
Ok(())
192+
}
193+
194+
async fn read_via_multicatalog(
195+
pool: &sqlx::PgPool,
196+
catalog_name: &str,
197+
expected_table: &str,
198+
sql: &str,
199+
) -> Result<(), Box<dyn std::error::Error>> {
200+
println!("\n -- {} via MulticatalogProvider --", catalog_name);
201+
let provider = MulticatalogProvider::with_pool(pool.clone(), catalog_name).await?;
202+
let snapshot = provider.get_current_snapshot()?;
203+
println!(
204+
" catalog_id={}, current snapshot={}",
205+
provider.catalog_id(),
206+
snapshot
207+
);
208+
209+
let catalog = DuckLakeCatalog::with_snapshot(Arc::new(provider), snapshot)?;
210+
let runtime = Arc::new(RuntimeEnv::default());
211+
let config = SessionConfig::new().with_default_catalog_and_schema(catalog_name, "public");
212+
let ctx = SessionContext::new_with_config_rt(config, runtime);
213+
ctx.register_catalog(catalog_name, Arc::new(catalog));
214+
215+
// List what this catalog can see — should be exactly the one table.
216+
if let Some(cat) = ctx.catalog(catalog_name) {
217+
for schema_name in cat.schema_names() {
218+
if schema_name == "information_schema" {
219+
continue;
220+
}
221+
let schema = cat.schema(&schema_name).unwrap();
222+
println!(" schema {} -> tables {:?}", schema_name, schema.table_names());
223+
}
224+
}
225+
226+
println!(" query: {}", sql);
227+
let df = ctx.sql(sql).await?;
228+
df.show().await?;
229+
let _ = expected_table; // illustrative arg
230+
Ok(())
231+
}
232+
233+
async fn pg_prod_sees_orders(pool: &sqlx::PgPool) -> Result<bool, Box<dyn std::error::Error>> {
234+
let provider = MulticatalogProvider::with_pool(pool.clone(), "pg_prod").await?;
235+
let sn = provider.get_current_snapshot()?;
236+
let catalog = DuckLakeCatalog::with_snapshot(Arc::new(provider), sn)?;
237+
let ctx = SessionContext::new();
238+
ctx.register_catalog("pg_prod", Arc::new(catalog));
239+
let cat = ctx.catalog("pg_prod").unwrap();
240+
let schema = match cat.schema("public") {
241+
Some(s) => s,
242+
None => return Ok(false),
243+
};
244+
Ok(schema.table_names().iter().any(|n| n == "orders"))
245+
}
246+
247+
fn build_users_batch() -> RecordBatch {
248+
let schema = Arc::new(Schema::new(vec![
249+
Field::new("id", DataType::Int32, false),
250+
Field::new("name", DataType::Utf8, true),
251+
]));
252+
RecordBatch::try_new(
253+
schema,
254+
vec![
255+
Arc::new(Int32Array::from(vec![1, 2, 3])),
256+
Arc::new(StringArray::from(vec![
257+
Some("Alice"),
258+
Some("Bob"),
259+
Some("Carol"),
260+
])),
261+
],
262+
)
263+
.unwrap()
264+
}
265+
266+
fn build_users_v2_batch() -> RecordBatch {
267+
let schema = Arc::new(Schema::new(vec![
268+
Field::new("id", DataType::Int32, false),
269+
Field::new("name", DataType::Utf8, true),
270+
Field::new("age", DataType::Int32, true),
271+
]));
272+
RecordBatch::try_new(
273+
schema,
274+
vec![
275+
Arc::new(Int32Array::from(vec![1, 2, 3])),
276+
Arc::new(StringArray::from(vec![
277+
Some("Alice"),
278+
Some("Bob"),
279+
Some("Carol"),
280+
])),
281+
Arc::new(Int32Array::from(vec![Some(30), Some(25), None])),
282+
],
283+
)
284+
.unwrap()
285+
}
286+
287+
fn build_orders_batch() -> RecordBatch {
288+
let schema = Arc::new(Schema::new(vec![
289+
Field::new("order_id", DataType::Int64, false),
290+
Field::new("amount", DataType::Float64, false),
291+
]));
292+
RecordBatch::try_new(
293+
schema,
294+
vec![
295+
Arc::new(Int64Array::from(vec![100, 101, 102])),
296+
Arc::new(Float64Array::from(vec![19.99, 4.50, 250.00])),
297+
],
298+
)
299+
.unwrap()
300+
}
301+
302+
async fn dump_query(
303+
pool: &sqlx::PgPool,
304+
label: &str,
305+
sql: &str,
306+
) -> Result<(), Box<dyn std::error::Error>> {
307+
println!("\n -- {} --", label);
308+
let rows = sqlx::query(sql).fetch_all(pool).await?;
309+
if rows.is_empty() {
310+
println!(" (no rows)");
311+
return Ok(());
312+
}
313+
// Print header from column names of the first row.
314+
let header: Vec<String> = rows[0]
315+
.columns()
316+
.iter()
317+
.map(|c| sqlx::Column::name(c).to_string())
318+
.collect();
319+
println!(" {}", header.join(" | "));
320+
println!(" {}", "-".repeat(header.iter().map(|s| s.len()).sum::<usize>() + header.len() * 3));
321+
for row in &rows {
322+
let cols: Vec<String> = (0..row.len())
323+
.map(|i| format_col(row, i))
324+
.collect();
325+
println!(" {}", cols.join(" | "));
326+
}
327+
Ok(())
328+
}
329+
330+
fn format_col(row: &sqlx::postgres::PgRow, i: usize) -> String {
331+
// Try a few common types; fall back to "<binary>".
332+
if let Ok(v) = row.try_get::<Option<i64>, _>(i) {
333+
return v.map(|x| x.to_string()).unwrap_or("NULL".into());
334+
}
335+
if let Ok(v) = row.try_get::<Option<i32>, _>(i) {
336+
return v.map(|x| x.to_string()).unwrap_or("NULL".into());
337+
}
338+
if let Ok(v) = row.try_get::<Option<bool>, _>(i) {
339+
return v.map(|x| x.to_string()).unwrap_or("NULL".into());
340+
}
341+
if let Ok(v) = row.try_get::<Option<String>, _>(i) {
342+
return v.unwrap_or("NULL".into());
343+
}
344+
"<unprintable>".into()
345+
}
346+
347+
#[allow(dead_code)]
348+
async fn visible_files_for_catalog(
349+
pool: &sqlx::PgPool,
350+
catalog_id: i64,
351+
table_name: &str,
352+
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
353+
// Current snapshot for the catalog
354+
let cur: i64 = sqlx::query(
355+
"SELECT COALESCE(MAX(snapshot_id), 0) FROM ducklake_catalog_snapshot_map WHERE catalog_id = $1",
356+
)
357+
.bind(catalog_id)
358+
.fetch_one(pool)
359+
.await?
360+
.try_get(0)?;
361+
362+
let rows = sqlx::query(
363+
"SELECT f.path FROM ducklake_data_file f
364+
JOIN ducklake_table t ON t.table_id = f.table_id
365+
JOIN ducklake_schema s ON s.schema_id = t.schema_id
366+
JOIN ducklake_catalog_schema_map m ON m.schema_id = s.schema_id
367+
WHERE m.catalog_id = $1
368+
AND t.table_name = $2
369+
AND f.begin_snapshot <= $3
370+
AND (f.end_snapshot IS NULL OR f.end_snapshot > $3)
371+
ORDER BY f.path",
372+
)
373+
.bind(catalog_id)
374+
.bind(table_name)
375+
.bind(cur)
376+
.fetch_all(pool)
377+
.await?;
378+
379+
Ok(rows.into_iter().map(|r| r.try_get(0).unwrap()).collect())
380+
}

0 commit comments

Comments
 (0)