Skip to content

Commit 4f2e16e

Browse files
Shefeek JinnahShefeek Jinnah
authored andcommitted
Refactoring .. Moved to table_changes
1 parent 5f39cbe commit 4f2e16e

3 files changed

Lines changed: 3 additions & 120 deletions

File tree

src/information_schema.rs

Lines changed: 0 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -705,125 +705,6 @@ impl TableProvider for FilesTable {
705705
}
706706
}
707707

708-
/// Live table provider for table changes (CDC) - queries metadata for files added between snapshots
709-
#[derive(Debug)]
710-
pub struct TableChangesTable {
711-
provider: Arc<dyn MetadataProvider>,
712-
table_id: i64,
713-
start_snapshot: i64,
714-
end_snapshot: i64,
715-
schema: SchemaRef,
716-
}
717-
718-
impl TableChangesTable {
719-
pub fn new(
720-
provider: Arc<dyn MetadataProvider>,
721-
table_id: i64,
722-
start_snapshot: i64,
723-
end_snapshot: i64,
724-
) -> Self {
725-
let schema = Arc::new(Schema::new(vec![
726-
Field::new("snapshot_id", DataType::Int64, false),
727-
Field::new("change_type", DataType::Utf8, false),
728-
]));
729-
Self {
730-
provider,
731-
table_id,
732-
start_snapshot,
733-
end_snapshot,
734-
schema,
735-
}
736-
}
737-
738-
fn query_changes(&self) -> DataFusionResult<RecordBatch> {
739-
// Get data files added (INSERT changes)
740-
let data_files = self
741-
.provider
742-
.get_data_files_added_between_snapshots(
743-
self.table_id,
744-
self.start_snapshot,
745-
self.end_snapshot,
746-
)
747-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
748-
749-
// Get delete files added (DELETE changes)
750-
let delete_files = self
751-
.provider
752-
.get_delete_files_added_between_snapshots(
753-
self.table_id,
754-
self.start_snapshot,
755-
self.end_snapshot,
756-
)
757-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
758-
759-
// Collect all changes into a sortable structure
760-
struct ChangeRecord {
761-
snapshot_id: i64,
762-
change_type: &'static str,
763-
}
764-
765-
let mut changes: Vec<ChangeRecord> =
766-
Vec::with_capacity(data_files.len() + delete_files.len());
767-
768-
// Add INSERT changes (data files added)
769-
for data_file in &data_files {
770-
changes.push(ChangeRecord {
771-
snapshot_id: data_file.begin_snapshot,
772-
change_type: "insert",
773-
});
774-
}
775-
776-
// Add DELETE changes (delete files added)
777-
for delete_file in &delete_files {
778-
changes.push(ChangeRecord {
779-
snapshot_id: delete_file.begin_snapshot,
780-
change_type: "delete",
781-
});
782-
}
783-
784-
// Sort by snapshot_id for deterministic output
785-
changes.sort_by_key(|c| c.snapshot_id);
786-
787-
// Build arrays from sorted changes
788-
let snapshot_ids: ArrayRef = Arc::new(Int64Array::from(
789-
changes.iter().map(|c| c.snapshot_id).collect::<Vec<_>>(),
790-
));
791-
let change_types: ArrayRef = Arc::new(StringArray::from(
792-
changes.iter().map(|c| c.change_type).collect::<Vec<_>>(),
793-
));
794-
795-
RecordBatch::try_new(self.schema.clone(), vec![snapshot_ids, change_types])
796-
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
797-
}
798-
}
799-
800-
#[async_trait::async_trait]
801-
impl TableProvider for TableChangesTable {
802-
fn as_any(&self) -> &dyn Any {
803-
self
804-
}
805-
806-
fn schema(&self) -> SchemaRef {
807-
self.schema.clone()
808-
}
809-
810-
fn table_type(&self) -> TableType {
811-
TableType::View
812-
}
813-
814-
async fn scan(
815-
&self,
816-
state: &dyn Session,
817-
projection: Option<&Vec<usize>>,
818-
filters: &[datafusion::prelude::Expr],
819-
limit: Option<usize>,
820-
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
821-
let batch = self.query_changes()?;
822-
let mem_table = MemTable::try_new(self.schema.clone(), vec![vec![batch]])?;
823-
mem_table.scan(state, projection, filters, limit).await
824-
}
825-
}
826-
827708
/// Schema provider for information_schema
828709
///
829710
/// Provides live metadata tables that query the catalog database on every access.

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub mod metadata_provider;
4343
pub mod path_resolver;
4444
pub mod schema;
4545
pub mod table;
46+
pub mod table_changes;
4647
pub mod table_functions;
4748
pub mod types;
4849

src/table_functions.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use datafusion::datasource::TableProvider;
66
use datafusion::logical_expr::Expr;
77
use std::sync::Arc;
88

9-
use crate::information_schema::{FilesTable, SnapshotsTable, TableChangesTable, TableInfoTable};
9+
use crate::information_schema::{FilesTable, SnapshotsTable, TableInfoTable};
1010
use crate::metadata_provider::MetadataProvider;
11+
use crate::table_changes::TableChangesTable;
1112

1213
#[derive(Debug)]
1314
pub struct DucklakeSnapshotsFunction {

0 commit comments

Comments
 (0)