Skip to content

Commit b1ca86f

Browse files
Shefeek JinnahShefeek Jinnah
authored andcommitted
Aligning it as per ducklake spec
1 parent 13b1135 commit b1ca86f

2 files changed

Lines changed: 9 additions & 49 deletions

File tree

src/information_schema.rs

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -725,9 +725,6 @@ impl TableChangesTable {
725725
let schema = Arc::new(Schema::new(vec![
726726
Field::new("snapshot_id", DataType::Int64, false),
727727
Field::new("change_type", DataType::Utf8, false),
728-
Field::new("file_path", DataType::Utf8, false),
729-
Field::new("file_size_bytes", DataType::Int64, false),
730-
Field::new("row_count", DataType::Int64, true),
731728
]));
732729
Self {
733730
provider,
@@ -763,9 +760,6 @@ impl TableChangesTable {
763760
struct ChangeRecord {
764761
snapshot_id: i64,
765762
change_type: &'static str,
766-
file_path: String,
767-
file_size_bytes: i64,
768-
row_count: Option<i64>,
769763
}
770764

771765
let mut changes: Vec<ChangeRecord> =
@@ -776,9 +770,6 @@ impl TableChangesTable {
776770
changes.push(ChangeRecord {
777771
snapshot_id: data_file.begin_snapshot,
778772
change_type: "insert",
779-
file_path: data_file.file.path.clone(),
780-
file_size_bytes: data_file.file.file_size_bytes,
781-
row_count: None,
782773
});
783774
}
784775

@@ -787,9 +778,6 @@ impl TableChangesTable {
787778
changes.push(ChangeRecord {
788779
snapshot_id: delete_file.begin_snapshot,
789780
change_type: "delete",
790-
file_path: delete_file.delete_file.path.clone(),
791-
file_size_bytes: delete_file.delete_file.file_size_bytes,
792-
row_count: delete_file.delete_count,
793781
});
794782
}
795783

@@ -803,27 +791,9 @@ impl TableChangesTable {
803791
let change_types: ArrayRef = Arc::new(StringArray::from(
804792
changes.iter().map(|c| c.change_type).collect::<Vec<_>>(),
805793
));
806-
let file_paths: ArrayRef = Arc::new(StringArray::from(
807-
changes
808-
.iter()
809-
.map(|c| c.file_path.as_str())
810-
.collect::<Vec<_>>(),
811-
));
812-
let file_sizes: ArrayRef = Arc::new(Int64Array::from(
813-
changes
814-
.iter()
815-
.map(|c| c.file_size_bytes)
816-
.collect::<Vec<_>>(),
817-
));
818-
let row_counts: ArrayRef = Arc::new(Int64Array::from(
819-
changes.iter().map(|c| c.row_count).collect::<Vec<_>>(),
820-
));
821794

822-
RecordBatch::try_new(
823-
self.schema.clone(),
824-
vec![snapshot_ids, change_types, file_paths, file_sizes, row_counts],
825-
)
826-
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
795+
RecordBatch::try_new(self.schema.clone(), vec![snapshot_ids, change_types])
796+
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
827797
}
828798
}
829799

tests/table_changes_tests.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ mod integration_tests {
9292

9393
// Query changes from snapshot 0 to current (should include all inserts)
9494
let df = ctx
95-
.sql("SELECT change_type, file_path FROM ducklake_table_changes('main.events', 0, 10)")
95+
.sql(
96+
"SELECT snapshot_id, change_type FROM ducklake_table_changes('main.events', 0, 10)",
97+
)
9698
.await?;
9799

98100
let batches: Vec<RecordBatch> = df.collect().await?;
@@ -101,9 +103,9 @@ mod integration_tests {
101103
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
102104
assert!(total_rows > 0, "Should have some changes");
103105

104-
// Check that we have insert changes
106+
// Check that we have insert changes (change_type is column 1)
105107
for batch in &batches {
106-
let change_types = get_string_column(batch, 0);
108+
let change_types = get_string_column(batch, 1);
107109
for change_type in change_types {
108110
assert!(
109111
change_type == "insert" || change_type == "delete",
@@ -130,7 +132,7 @@ mod integration_tests {
130132
// Query only delete changes (from last snapshot which has the delete)
131133
// We need to find the delete files by querying a range that includes the delete operation
132134
let df = ctx
133-
.sql("SELECT change_type, row_count FROM ducklake_table_changes('main.events', 0, 100) WHERE change_type = 'delete'")
135+
.sql("SELECT snapshot_id, change_type FROM ducklake_table_changes('main.events', 0, 100) WHERE change_type = 'delete'")
134136
.await?;
135137

136138
let batches: Vec<RecordBatch> = df.collect().await?;
@@ -187,7 +189,7 @@ mod integration_tests {
187189

188190
let schema = df.schema();
189191

190-
// Verify the expected columns are present
192+
// Verify the expected columns are present (Phase 1: only spec columns)
191193
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
192194

193195
assert!(
@@ -198,18 +200,6 @@ mod integration_tests {
198200
field_names.contains(&"change_type"),
199201
"Schema should contain 'change_type'"
200202
);
201-
assert!(
202-
field_names.contains(&"file_path"),
203-
"Schema should contain 'file_path'"
204-
);
205-
assert!(
206-
field_names.contains(&"file_size_bytes"),
207-
"Schema should contain 'file_size_bytes'"
208-
);
209-
assert!(
210-
field_names.contains(&"row_count"),
211-
"Schema should contain 'row_count'"
212-
);
213203

214204
Ok(())
215205
}

0 commit comments

Comments
 (0)