Skip to content

Commit 3cb8902

Browse files
Implements Phase 1 of ducklake_table_changes() table function
1 parent 02eaf8d commit 3cb8902

8 files changed

Lines changed: 749 additions & 44 deletions

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub mod metadata_provider;
4343
pub mod path_resolver;
4444
pub mod schema;
4545
pub mod table;
46+
pub mod table_changes;
4647
pub mod table_functions;
4748
pub mod types;
4849

src/metadata_provider.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,24 @@ pub const SQL_TABLE_EXISTS: &str = "SELECT EXISTS(
6767
AND (? < end_snapshot OR end_snapshot IS NULL)
6868
)";
6969

70+
// Queries for table_changes (CDC) - files added/removed between snapshots
71+
72+
pub const SQL_GET_DATA_FILES_ADDED_BETWEEN_SNAPSHOTS: &str = "
73+
SELECT data.begin_snapshot
74+
FROM ducklake_data_file AS data
75+
WHERE data.table_id = ?
76+
AND data.begin_snapshot > ?
77+
AND data.begin_snapshot <= ?
78+
ORDER BY data.begin_snapshot";
79+
80+
pub const SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS: &str = "
81+
SELECT del.begin_snapshot
82+
FROM ducklake_delete_file AS del
83+
WHERE del.table_id = ?
84+
AND del.begin_snapshot > ?
85+
AND del.begin_snapshot <= ?
86+
ORDER BY del.begin_snapshot";
87+
7088
// Bulk queries for information_schema (avoids N+1 query problem)
7189

7290
pub const SQL_LIST_ALL_TABLES: &str = "
@@ -270,6 +288,18 @@ impl DuckLakeTableFile {
270288
}
271289
}
272290

291+
// Change tracking structures for table_changes (CDC) functionality
292+
293+
#[derive(Debug, Clone)]
294+
pub struct DataFileChange {
295+
pub begin_snapshot: i64,
296+
}
297+
298+
#[derive(Debug, Clone)]
299+
pub struct DeleteFileChange {
300+
pub begin_snapshot: i64,
301+
}
302+
273303
pub trait MetadataProvider: Send + Sync + std::fmt::Debug {
274304
/// Get the current snapshot ID (dynamic, not cached)
275305
fn get_current_snapshot(&self) -> Result<i64>;
@@ -323,6 +353,28 @@ pub trait MetadataProvider: Send + Sync + std::fmt::Debug {
323353

324354
/// List all files across all tables for a snapshot
325355
fn list_all_files(&self, snapshot_id: i64) -> Result<Vec<FileWithTable>>;
356+
357+
// Change tracking methods for table_changes (CDC) functionality
358+
359+
/// Get data files added between two snapshots (exclusive start, inclusive end)
360+
/// Returns files where begin_snapshot > start_snapshot AND begin_snapshot <= end_snapshot
361+
/// These represent INSERT changes - new rows added to the table
362+
fn get_data_files_added_between_snapshots(
363+
&self,
364+
table_id: i64,
365+
start_snapshot: i64,
366+
end_snapshot: i64,
367+
) -> Result<Vec<DataFileChange>>;
368+
369+
/// Get delete files added between two snapshots (exclusive start, inclusive end)
370+
/// Returns delete files where begin_snapshot > start_snapshot AND begin_snapshot <= end_snapshot
371+
/// These represent DELETE changes - rows removed from the table
372+
fn get_delete_files_added_between_snapshots(
373+
&self,
374+
table_id: i64,
375+
start_snapshot: i64,
376+
end_snapshot: i64,
377+
) -> Result<Vec<DeleteFileChange>>;
326378
}
327379

328380
#[cfg(feature = "metadata-postgres")]

src/metadata_provider_duckdb.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use crate::DuckLakeError;
22
use crate::metadata_provider::{
3-
ColumnWithTable, DuckLakeFileData, DuckLakeTableColumn, DuckLakeTableFile, FileWithTable,
4-
MetadataProvider, SQL_GET_DATA_FILES, SQL_GET_DATA_PATH, SQL_GET_LATEST_SNAPSHOT,
5-
SQL_GET_SCHEMA_BY_NAME, SQL_GET_TABLE_BY_NAME, SQL_GET_TABLE_COLUMNS, SQL_LIST_ALL_COLUMNS,
6-
SQL_LIST_ALL_FILES, SQL_LIST_ALL_TABLES, SQL_LIST_SCHEMAS, SQL_LIST_SNAPSHOTS, SQL_LIST_TABLES,
7-
SQL_TABLE_EXISTS, SchemaMetadata, SnapshotMetadata, TableMetadata, TableWithSchema,
3+
ColumnWithTable, DataFileChange, DeleteFileChange, DuckLakeFileData, DuckLakeTableColumn,
4+
DuckLakeTableFile, FileWithTable, MetadataProvider, SQL_GET_DATA_FILES,
5+
SQL_GET_DATA_FILES_ADDED_BETWEEN_SNAPSHOTS, SQL_GET_DATA_PATH,
6+
SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS, SQL_GET_LATEST_SNAPSHOT, SQL_GET_SCHEMA_BY_NAME,
7+
SQL_GET_TABLE_BY_NAME, SQL_GET_TABLE_COLUMNS, SQL_LIST_ALL_COLUMNS, SQL_LIST_ALL_FILES,
8+
SQL_LIST_ALL_TABLES, SQL_LIST_SCHEMAS, SQL_LIST_SNAPSHOTS, SQL_LIST_TABLES, SQL_TABLE_EXISTS,
9+
SchemaMetadata, SnapshotMetadata, TableMetadata, TableWithSchema,
810
};
911
use duckdb::AccessMode::ReadOnly;
1012
use duckdb::{Config, Connection, params};
@@ -378,4 +380,44 @@ impl MetadataProvider for DuckdbMetadataProvider {
378380

379381
Ok(files)
380382
}
383+
384+
fn get_data_files_added_between_snapshots(
385+
&self,
386+
table_id: i64,
387+
start_snapshot: i64,
388+
end_snapshot: i64,
389+
) -> crate::Result<Vec<DataFileChange>> {
390+
let conn = self.open_connection()?;
391+
let mut stmt = conn.prepare(SQL_GET_DATA_FILES_ADDED_BETWEEN_SNAPSHOTS)?;
392+
393+
let files = stmt
394+
.query_map(params![table_id, start_snapshot, end_snapshot], |row| {
395+
Ok(DataFileChange {
396+
begin_snapshot: row.get(0)?,
397+
})
398+
})?
399+
.collect::<Result<Vec<_>, _>>()?;
400+
401+
Ok(files)
402+
}
403+
404+
fn get_delete_files_added_between_snapshots(
405+
&self,
406+
table_id: i64,
407+
start_snapshot: i64,
408+
end_snapshot: i64,
409+
) -> crate::Result<Vec<DeleteFileChange>> {
410+
let conn = self.open_connection()?;
411+
let mut stmt = conn.prepare(SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS)?;
412+
413+
let files = stmt
414+
.query_map(params![table_id, start_snapshot, end_snapshot], |row| {
415+
Ok(DeleteFileChange {
416+
begin_snapshot: row.get(0)?,
417+
})
418+
})?
419+
.collect::<Result<Vec<_>, _>>()?;
420+
421+
Ok(files)
422+
}
381423
}

src/metadata_provider_postgres.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
33
use crate::Result;
44
use crate::metadata_provider::{
5-
ColumnWithTable, DuckLakeFileData, DuckLakeTableColumn, DuckLakeTableFile, FileWithTable,
6-
MetadataProvider, SchemaMetadata, SnapshotMetadata, TableMetadata, TableWithSchema, block_on,
5+
ColumnWithTable, DataFileChange, DeleteFileChange, DuckLakeFileData, DuckLakeTableColumn,
6+
DuckLakeTableFile, FileWithTable, MetadataProvider, SchemaMetadata, SnapshotMetadata,
7+
TableMetadata, TableWithSchema, block_on,
78
};
89
use sqlx::Row;
910
use sqlx::postgres::{PgPool, PgPoolOptions};
@@ -485,4 +486,66 @@ impl MetadataProvider for PostgresMetadataProvider {
485486
.collect()
486487
})
487488
}
489+
490+
fn get_data_files_added_between_snapshots(
491+
&self,
492+
table_id: i64,
493+
start_snapshot: i64,
494+
end_snapshot: i64,
495+
) -> Result<Vec<DataFileChange>> {
496+
block_on(async {
497+
let rows = sqlx::query(
498+
"SELECT data.begin_snapshot
499+
FROM ducklake_data_file AS data
500+
WHERE data.table_id = $1
501+
AND data.begin_snapshot > $2
502+
AND data.begin_snapshot <= $3
503+
ORDER BY data.begin_snapshot",
504+
)
505+
.bind(table_id)
506+
.bind(start_snapshot)
507+
.bind(end_snapshot)
508+
.fetch_all(&self.pool)
509+
.await?;
510+
511+
rows.into_iter()
512+
.map(|row| {
513+
Ok(DataFileChange {
514+
begin_snapshot: row.try_get(0)?,
515+
})
516+
})
517+
.collect()
518+
})
519+
}
520+
521+
fn get_delete_files_added_between_snapshots(
522+
&self,
523+
table_id: i64,
524+
start_snapshot: i64,
525+
end_snapshot: i64,
526+
) -> Result<Vec<DeleteFileChange>> {
527+
block_on(async {
528+
let rows = sqlx::query(
529+
"SELECT del.begin_snapshot
530+
FROM ducklake_delete_file AS del
531+
WHERE del.table_id = $1
532+
AND del.begin_snapshot > $2
533+
AND del.begin_snapshot <= $3
534+
ORDER BY del.begin_snapshot",
535+
)
536+
.bind(table_id)
537+
.bind(start_snapshot)
538+
.bind(end_snapshot)
539+
.fetch_all(&self.pool)
540+
.await?;
541+
542+
rows.into_iter()
543+
.map(|row| {
544+
Ok(DeleteFileChange {
545+
begin_snapshot: row.try_get(0)?,
546+
})
547+
})
548+
.collect()
549+
})
550+
}
488551
}

src/table_changes.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
//! Table changes (CDC) functionality for DuckLake
2+
3+
use std::any::Any;
4+
use std::sync::Arc;
5+
6+
use arrow::array::{ArrayRef, Int64Array, StringArray};
7+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
8+
use arrow::record_batch::RecordBatch;
9+
use datafusion::catalog::Session;
10+
use datafusion::common::Result as DataFusionResult;
11+
use datafusion::datasource::memory::MemTable;
12+
use datafusion::datasource::{TableProvider, TableType};
13+
use datafusion::physical_plan::ExecutionPlan;
14+
15+
use crate::metadata_provider::MetadataProvider;
16+
17+
#[derive(Debug)]
18+
pub struct TableChangesTable {
19+
provider: Arc<dyn MetadataProvider>,
20+
table_id: i64,
21+
start_snapshot: i64,
22+
end_snapshot: i64,
23+
schema: SchemaRef,
24+
}
25+
26+
impl TableChangesTable {
27+
pub fn new(
28+
provider: Arc<dyn MetadataProvider>,
29+
table_id: i64,
30+
start_snapshot: i64,
31+
end_snapshot: i64,
32+
) -> Self {
33+
let schema = Arc::new(Schema::new(vec![
34+
Field::new("snapshot_id", DataType::Int64, false),
35+
Field::new("change_type", DataType::Utf8, false),
36+
]));
37+
Self {
38+
provider,
39+
table_id,
40+
start_snapshot,
41+
end_snapshot,
42+
schema,
43+
}
44+
}
45+
46+
fn query_changes(&self) -> DataFusionResult<RecordBatch> {
47+
// Get data files added (INSERT changes)
48+
let data_files = self
49+
.provider
50+
.get_data_files_added_between_snapshots(
51+
self.table_id,
52+
self.start_snapshot,
53+
self.end_snapshot,
54+
)
55+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
56+
57+
// Get delete files added (DELETE changes)
58+
let delete_files = self
59+
.provider
60+
.get_delete_files_added_between_snapshots(
61+
self.table_id,
62+
self.start_snapshot,
63+
self.end_snapshot,
64+
)
65+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
66+
67+
// Collect all changes into a sortable structure
68+
struct ChangeRecord {
69+
snapshot_id: i64,
70+
change_type: &'static str,
71+
}
72+
73+
let mut changes: Vec<ChangeRecord> =
74+
Vec::with_capacity(data_files.len() + delete_files.len());
75+
76+
// Add INSERT changes (data files added)
77+
for data_file in &data_files {
78+
changes.push(ChangeRecord {
79+
snapshot_id: data_file.begin_snapshot,
80+
change_type: "insert",
81+
});
82+
}
83+
84+
// Add DELETE changes (delete files added)
85+
for delete_file in &delete_files {
86+
changes.push(ChangeRecord {
87+
snapshot_id: delete_file.begin_snapshot,
88+
change_type: "delete",
89+
});
90+
}
91+
92+
// Sort by snapshot_id for deterministic output
93+
changes.sort_by_key(|c| c.snapshot_id);
94+
95+
// Build arrays from sorted changes
96+
let snapshot_ids: ArrayRef = Arc::new(Int64Array::from(
97+
changes.iter().map(|c| c.snapshot_id).collect::<Vec<_>>(),
98+
));
99+
let change_types: ArrayRef = Arc::new(StringArray::from(
100+
changes.iter().map(|c| c.change_type).collect::<Vec<_>>(),
101+
));
102+
103+
RecordBatch::try_new(self.schema.clone(), vec![snapshot_ids, change_types])
104+
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
105+
}
106+
}
107+
108+
#[async_trait::async_trait]
109+
impl TableProvider for TableChangesTable {
110+
fn as_any(&self) -> &dyn Any {
111+
self
112+
}
113+
114+
fn schema(&self) -> SchemaRef {
115+
self.schema.clone()
116+
}
117+
118+
fn table_type(&self) -> TableType {
119+
TableType::View
120+
}
121+
122+
async fn scan(
123+
&self,
124+
state: &dyn Session,
125+
projection: Option<&Vec<usize>>,
126+
filters: &[datafusion::prelude::Expr],
127+
limit: Option<usize>,
128+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
129+
let batch = self.query_changes()?;
130+
let mem_table = MemTable::try_new(self.schema.clone(), vec![vec![batch]])?;
131+
mem_table.scan(state, projection, filters, limit).await
132+
}
133+
}

0 commit comments

Comments
 (0)