-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathmetadata_writer.rs
More file actions
326 lines (291 loc) · 10.4 KB
/
metadata_writer.rs
File metadata and controls
326 lines (291 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
//! Metadata writer trait and common types for DuckLake catalog writes.
//!
//! This module provides the `MetadataWriter` trait for writing metadata to DuckLake catalogs,
//! along with helper types for column definitions and data file registration.
use crate::Result;
/// Write mode for table operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteMode {
/// Drop existing data and replace with new data
Replace,
/// Keep existing data and append new records
Append,
}
use crate::types::{arrow_to_ducklake_type, ducklake_to_arrow_type};
use arrow::datatypes::DataType;
/// Column definition for creating or updating a table's schema.
///
/// Unlike `DuckLakeTableColumn` (used for reading), this struct doesn't have a `column_id`
/// field since IDs are assigned by the catalog during write operations.
#[derive(Debug, Clone)]
pub struct ColumnDef {
/// Column name
pub(crate) name: String,
/// DuckLake type string (e.g., "varchar", "int64", "decimal(10,2)")
pub(crate) ducklake_type: String,
/// Whether this column allows NULL values
pub(crate) is_nullable: bool,
}
impl ColumnDef {
/// Returns the column name.
pub fn name(&self) -> &str {
&self.name
}
/// Returns the DuckLake type string.
pub fn ducklake_type(&self) -> &str {
&self.ducklake_type
}
/// Returns whether this column allows NULL values.
pub fn is_nullable(&self) -> bool {
self.is_nullable
}
/// Create a new column definition.
///
/// Validates that `ducklake_type` is a recognized DuckLake type string by converting
/// it to an Arrow DataType. Returns an error if the type is invalid or unsupported.
pub fn new(
name: impl Into<String>,
ducklake_type: impl Into<String>,
is_nullable: bool,
) -> Result<Self> {
let ducklake_type = ducklake_type.into();
// Validate the type string by attempting to convert it to an Arrow type.
// We discard the result; we only care that the conversion succeeds.
ducklake_to_arrow_type(&ducklake_type)?;
Ok(Self {
name: name.into(),
ducklake_type,
is_nullable,
})
}
/// Create a column definition from an Arrow DataType.
///
/// This is a convenience constructor that converts the Arrow type to a DuckLake type string.
/// The resulting DuckLake type is guaranteed to be valid since it was derived from a known
/// Arrow type.
pub fn from_arrow(
name: impl Into<String>,
data_type: &DataType,
is_nullable: bool,
) -> Result<Self> {
let ducklake_type = arrow_to_ducklake_type(data_type)?;
// We use direct struct construction here since the ducklake_type was just
// produced by arrow_to_ducklake_type, so it is guaranteed to be valid.
Ok(Self {
name: name.into(),
ducklake_type,
is_nullable,
})
}
}
/// Information about a data file to register in the catalog.
///
/// This struct contains the metadata needed to register a Parquet file in the DuckLake catalog.
#[derive(Debug, Clone)]
pub struct DataFileInfo {
/// Path to the file (relative to table path or absolute)
pub path: String,
/// Whether the path is relative to the table's path
pub path_is_relative: bool,
/// Size of the file in bytes
pub file_size_bytes: i64,
/// Size of the Parquet footer in bytes (optimization hint for reads)
pub footer_size: Option<i64>,
/// Number of records in the file
pub record_count: i64,
}
impl DataFileInfo {
/// Create a new data file info with relative path.
///
/// # Panics
///
/// Panics if `record_count` is negative. Record counts originate from
/// `RecordBatch::num_rows()` (always non-negative), so a negative value
/// indicates a programming error.
pub fn new(path: impl Into<String>, file_size_bytes: i64, record_count: i64) -> Self {
assert!(
record_count >= 0,
"record_count must be non-negative, got {}",
record_count
);
Self {
path: path.into(),
path_is_relative: true,
file_size_bytes,
footer_size: None,
record_count,
}
}
/// Set the footer size for read optimization.
pub fn with_footer_size(mut self, footer_size: i64) -> Self {
self.footer_size = Some(footer_size);
self
}
/// Mark this file as having an absolute path.
pub fn with_absolute_path(mut self) -> Self {
self.path_is_relative = false;
self
}
}
/// Result of a write operation.
#[derive(Debug)]
pub struct WriteResult {
/// Snapshot ID of the write operation
pub snapshot_id: i64,
/// Table ID (may be newly created)
pub table_id: i64,
/// Schema ID (may be newly created)
pub schema_id: i64,
/// Number of files written
pub files_written: usize,
/// Total records written
pub records_written: i64,
}
/// Result of a transactional write setup operation.
#[derive(Debug)]
pub struct WriteSetupResult {
/// Snapshot ID created for this write
pub snapshot_id: i64,
/// Schema ID (may be newly created)
pub schema_id: i64,
/// Table ID (may be newly created)
pub table_id: i64,
/// Column IDs in order
pub column_ids: Vec<i64>,
}
/// Trait for writing metadata to DuckLake catalogs.
///
/// Implementations must be thread-safe (`Send + Sync`).
pub trait MetadataWriter: Send + Sync + std::fmt::Debug {
/// Create a new snapshot and return its ID.
fn create_snapshot(&self) -> Result<i64>;
/// Get or create a schema, returning `(schema_id, was_created)`.
fn get_or_create_schema(
&self,
name: &str,
path: Option<&str>,
snapshot_id: i64,
) -> Result<(i64, bool)>;
/// Get or create a table, returning `(table_id, was_created)`.
fn get_or_create_table(
&self,
schema_id: i64,
name: &str,
path: Option<&str>,
snapshot_id: i64,
) -> Result<(i64, bool)>;
/// Set columns for a table, returning assigned column IDs.
/// Ends existing columns using end_snapshot pattern for time travel.
fn set_columns(
&self,
table_id: i64,
columns: &[ColumnDef],
snapshot_id: i64,
) -> Result<Vec<i64>>;
/// Register a new data file. Returns the assigned data_file_id.
fn register_data_file(
&self,
table_id: i64,
snapshot_id: i64,
file: &DataFileInfo,
) -> Result<i64>;
/// End all existing data files for a table. Returns count of files ended.
fn end_table_files(&self, table_id: i64, snapshot_id: i64) -> Result<u64>;
/// Get the data path from catalog metadata.
fn get_data_path(&self) -> Result<String>;
/// Set the data path in catalog metadata.
fn set_data_path(&self, path: &str) -> Result<()>;
/// Initialize DuckLake schema tables if they don't exist.
fn initialize_schema(&self) -> Result<()>;
/// Atomically set up catalog metadata for a write operation.
/// Creates snapshot, schema, table, columns in a single transaction.
/// If mode is `WriteMode::Replace`, ends existing data files.
fn begin_write_transaction(
&self,
schema_name: &str,
table_name: &str,
columns: &[ColumnDef],
mode: WriteMode,
) -> Result<WriteSetupResult>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::DuckLakeError;
#[test]
fn test_column_def_new() {
let col = ColumnDef::new("test_col", "int32", true).unwrap();
assert_eq!(col.name, "test_col");
assert_eq!(col.ducklake_type, "int32");
assert!(col.is_nullable);
}
#[test]
fn test_column_def_new_valid_types() {
// Various valid type strings should be accepted
assert!(ColumnDef::new("a", "int32", true).is_ok());
assert!(ColumnDef::new("b", "varchar", false).is_ok());
assert!(ColumnDef::new("c", "boolean", true).is_ok());
assert!(ColumnDef::new("d", "float64", true).is_ok());
assert!(ColumnDef::new("e", "decimal(10,2)", true).is_ok());
assert!(ColumnDef::new("f", "timestamp", true).is_ok());
assert!(ColumnDef::new("g", "date", true).is_ok());
assert!(ColumnDef::new("h", "bigint", true).is_ok());
assert!(ColumnDef::new("i", "text", true).is_ok());
}
#[test]
fn test_column_def_new_invalid_type_rejected() {
let result = ColumnDef::new("col", "not_a_type", true);
assert!(result.is_err());
match result {
Err(DuckLakeError::UnsupportedType(msg)) => {
assert_eq!(msg, "not_a_type");
},
other => panic!("Expected UnsupportedType error, got {:?}", other),
}
}
#[test]
fn test_column_def_new_empty_type_rejected() {
let result = ColumnDef::new("col", "", true);
assert!(result.is_err());
match result {
Err(DuckLakeError::UnsupportedType(_)) => {},
other => panic!("Expected UnsupportedType error, got {:?}", other),
}
}
#[test]
fn test_column_def_from_arrow() {
let col = ColumnDef::from_arrow("id", &DataType::Int64, false).unwrap();
assert_eq!(col.name, "id");
assert_eq!(col.ducklake_type, "int64");
assert!(!col.is_nullable);
}
#[test]
fn test_data_file_info_new() {
let file = DataFileInfo::new("test.parquet", 1024, 100);
assert_eq!(file.path, "test.parquet");
assert!(file.path_is_relative);
assert_eq!(file.file_size_bytes, 1024);
assert_eq!(file.record_count, 100);
assert!(file.footer_size.is_none());
}
#[test]
fn test_data_file_info_with_footer_size() {
let file = DataFileInfo::new("test.parquet", 1024, 100).with_footer_size(256);
assert_eq!(file.footer_size, Some(256));
}
#[test]
fn test_data_file_info_with_absolute_path() {
let file = DataFileInfo::new("/absolute/path.parquet", 1024, 100).with_absolute_path();
assert!(!file.path_is_relative);
}
#[test]
fn test_data_file_info_zero_record_count() {
let file = DataFileInfo::new("empty.parquet", 0, 0);
assert_eq!(file.record_count, 0);
}
#[test]
#[should_panic(expected = "record_count must be non-negative")]
fn test_data_file_info_negative_record_count_panics() {
DataFileInfo::new("test.parquet", 1024, -1);
}
}