-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathmetadata_provider.rs
More file actions
415 lines (364 loc) · 14.1 KB
/
Copy pathmetadata_provider.rs
File metadata and controls
415 lines (364 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
use crate::Result;
// SQL queries for DuckLake catalog tables
// These queries are database-agnostic and work with DuckDB, SQLite, PostgreSQL, MySQL
pub const SQL_GET_LATEST_SNAPSHOT: &str =
"SELECT COALESCE(MAX(snapshot_id), 0) FROM ducklake_snapshot";
pub const SQL_LIST_SNAPSHOTS: &str = "SELECT snapshot_id, CAST(snapshot_time AS VARCHAR) as timestamp FROM ducklake_snapshot ORDER BY snapshot_id";
pub const SQL_LIST_SCHEMAS: &str =
"SELECT schema_id, schema_name, path, path_is_relative FROM ducklake_schema
WHERE ? >= begin_snapshot AND (? < end_snapshot OR end_snapshot IS NULL)";
pub const SQL_LIST_TABLES: &str =
"SELECT table_id, table_name, path, path_is_relative FROM ducklake_table
WHERE schema_id = ?
AND ? >= begin_snapshot
AND (? < end_snapshot OR end_snapshot IS NULL)";
pub const SQL_GET_TABLE_COLUMNS: &str = "SELECT column_id, column_name, column_type, nulls_allowed
FROM ducklake_column
WHERE table_id = ?
ORDER BY column_order";
pub const SQL_GET_DATA_FILES: &str = "
SELECT
data.data_file_id,
data.path AS data_file_path,
data.path_is_relative AS data_path_is_relative,
data.file_size_bytes AS data_file_size,
data.footer_size AS data_footer_size,
data.encryption_key AS data_encryption_key,
del.delete_file_id,
del.path AS delete_file_path,
del.path_is_relative AS delete_path_is_relative,
del.file_size_bytes AS delete_file_size,
del.footer_size AS delete_footer_size,
del.encryption_key AS delete_encryption_key,
del.delete_count
FROM ducklake_data_file AS data
LEFT JOIN ducklake_delete_file AS del
ON data.data_file_id = del.data_file_id
AND del.table_id = ?
AND ? >= del.begin_snapshot
AND (? < del.end_snapshot OR del.end_snapshot IS NULL)
WHERE data.table_id = ?
AND ? >= data.begin_snapshot
AND (? < data.end_snapshot OR data.end_snapshot IS NULL)";
pub const SQL_GET_DATA_PATH: &str =
"SELECT value FROM ducklake_metadata WHERE key = 'data_path' AND scope IS NULL";
pub const SQL_GET_SCHEMA_BY_NAME: &str =
"SELECT schema_id, schema_name, path, path_is_relative FROM ducklake_schema
WHERE schema_name = ?
AND ? >= begin_snapshot
AND (? < end_snapshot OR end_snapshot IS NULL)";
pub const SQL_GET_TABLE_BY_NAME: &str =
"SELECT table_id, table_name, path, path_is_relative FROM ducklake_table
WHERE schema_id = ?
AND table_name = ?
AND ? >= begin_snapshot
AND (? < end_snapshot OR end_snapshot IS NULL)";
pub const SQL_TABLE_EXISTS: &str = "SELECT EXISTS(
SELECT 1 FROM ducklake_table
WHERE schema_id = ?
AND table_name = ?
AND ? >= begin_snapshot
AND (? < end_snapshot OR end_snapshot IS NULL)
)";
// Queries for table_changes (CDC) - files added/removed between snapshots
pub const SQL_GET_DATA_FILES_ADDED_BETWEEN_SNAPSHOTS: &str = "
SELECT
data.begin_snapshot,
data.path,
data.path_is_relative,
data.file_size_bytes,
data.footer_size,
data.encryption_key
FROM ducklake_data_file AS data
WHERE data.table_id = ?
AND data.begin_snapshot > ?
AND data.begin_snapshot <= ?
ORDER BY data.begin_snapshot";
pub const SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS: &str = "
SELECT del.begin_snapshot
FROM ducklake_delete_file AS del
WHERE del.table_id = ?
AND del.begin_snapshot > ?
AND del.begin_snapshot <= ?
ORDER BY del.begin_snapshot";
// Bulk queries for information_schema (avoids N+1 query problem)
pub const SQL_LIST_ALL_TABLES: &str = "
SELECT
s.schema_name,
t.table_id,
t.table_name,
t.path,
t.path_is_relative
FROM ducklake_schema s
JOIN ducklake_table t ON s.schema_id = t.schema_id
WHERE ? >= s.begin_snapshot
AND (? < s.end_snapshot OR s.end_snapshot IS NULL)
AND ? >= t.begin_snapshot
AND (? < t.end_snapshot OR t.end_snapshot IS NULL)
ORDER BY s.schema_name, t.table_name";
pub const SQL_LIST_ALL_COLUMNS: &str = "
SELECT
s.schema_name,
t.table_name,
c.column_id,
c.column_name,
c.column_type,
c.nulls_allowed
FROM ducklake_schema s
JOIN ducklake_table t ON s.schema_id = t.schema_id
JOIN ducklake_column c ON t.table_id = c.table_id
WHERE ? >= s.begin_snapshot
AND (? < s.end_snapshot OR s.end_snapshot IS NULL)
AND ? >= t.begin_snapshot
AND (? < t.end_snapshot OR t.end_snapshot IS NULL)
ORDER BY s.schema_name, t.table_name, c.column_order";
pub const SQL_LIST_ALL_FILES: &str = "
SELECT
s.schema_name,
t.table_name,
data.data_file_id,
data.path AS data_file_path,
data.path_is_relative AS data_path_is_relative,
data.file_size_bytes AS data_file_size,
data.footer_size AS data_footer_size,
data.encryption_key AS data_encryption_key,
del.delete_file_id,
del.path AS delete_file_path,
del.path_is_relative AS delete_path_is_relative,
del.file_size_bytes AS delete_file_size,
del.footer_size AS delete_footer_size,
del.encryption_key AS delete_encryption_key,
del.delete_count
FROM ducklake_schema s
JOIN ducklake_table t ON s.schema_id = t.schema_id
JOIN ducklake_data_file data ON t.table_id = data.table_id
LEFT JOIN ducklake_delete_file del
ON data.data_file_id = del.data_file_id
AND del.table_id = t.table_id
AND ? >= del.begin_snapshot
AND (? < del.end_snapshot OR del.end_snapshot IS NULL)
WHERE ? >= s.begin_snapshot
AND (? < s.end_snapshot OR s.end_snapshot IS NULL)
AND ? >= t.begin_snapshot
AND (? < t.end_snapshot OR t.end_snapshot IS NULL)
AND ? >= data.begin_snapshot
AND (? < data.end_snapshot OR data.end_snapshot IS NULL)
ORDER BY s.schema_name, t.table_name, data.path";
/// Metadata for a snapshot in the DuckLake catalog
#[derive(Debug, Clone)]
pub struct SnapshotMetadata {
/// Unique identifier for this snapshot
pub snapshot_id: i64,
/// Timestamp when the snapshot was created (optional)
pub timestamp: Option<String>,
}
/// Metadata for a schema in the DuckLake catalog
#[derive(Debug, Clone)]
pub struct SchemaMetadata {
/// Unique identifier for this schema in the catalog
pub schema_id: i64,
/// Name of the schema as it appears in SQL queries
pub schema_name: String,
/// Path to the schema's data directory (may be relative or absolute)
pub path: String,
/// Whether the path is relative to the catalog's data_path
pub path_is_relative: bool,
}
/// Metadata for a table in the DuckLake catalog
#[derive(Debug, Clone)]
pub struct TableMetadata {
/// Unique identifier for this table in the catalog
pub table_id: i64,
/// Name of the table as it appears in SQL queries
pub table_name: String,
/// Path to the table's data directory (may be relative or absolute)
pub path: String,
/// Whether the path is relative to the schema's path
pub path_is_relative: bool,
}
/// Table metadata with its schema name (for bulk queries)
#[derive(Debug, Clone)]
pub struct TableWithSchema {
/// Name of the schema this table belongs to
pub schema_name: String,
/// Table metadata
pub table: TableMetadata,
}
/// Column metadata with its schema and table names (for bulk queries)
#[derive(Debug, Clone)]
pub struct ColumnWithTable {
/// Name of the schema this column's table belongs to
pub schema_name: String,
/// Name of the table this column belongs to
pub table_name: String,
/// Column metadata
pub column: DuckLakeTableColumn,
}
/// File metadata with its schema and table names (for bulk queries)
#[derive(Debug, Clone)]
pub struct FileWithTable {
/// Name of the schema this file's table belongs to
pub schema_name: String,
/// Name of the table this file belongs to
pub table_name: String,
/// File metadata
pub file: DuckLakeTableFile,
}
/// Column definition for a DuckLake table
#[derive(Debug, Clone)]
pub struct DuckLakeTableColumn {
/// Unique identifier for this column in the catalog
pub column_id: i64,
/// Name of the column
pub column_name: String,
/// DuckLake type string (e.g., "varchar", "int64", "decimal(10,2)")
pub column_type: String,
/// Whether this column allows NULL values
pub is_nullable: bool,
}
impl DuckLakeTableColumn {
pub fn new(
column_id: i64,
column_name: String,
column_type: String,
is_nullable: bool,
) -> Self {
Self {
column_id,
column_name,
column_type,
is_nullable,
}
}
}
/// Metadata for a data file or delete file in DuckLake
#[derive(Debug, Clone)]
pub struct DuckLakeFileData {
/// Path to the file (may be relative or absolute)
pub path: String,
/// Whether the path is relative to the table's path
pub path_is_relative: bool,
/// Encryption key for the file (used for Parquet Modular Encryption)
pub encryption_key: Option<String>,
/// Size of the file in bytes
pub file_size_bytes: i64,
/// Size of the Parquet footer in bytes (optional optimization hint)
pub footer_size: Option<i64>,
}
impl DuckLakeFileData {
pub fn new(path: String, path_is_relative: bool, file_size_bytes: i64) -> Self {
Self {
path,
path_is_relative,
encryption_key: None,
file_size_bytes,
footer_size: None,
}
}
}
/// Represents a data file and its associated delete file (if any) for a DuckLake table
#[derive(Debug, Clone)]
pub struct DuckLakeTableFile {
/// Metadata for the data file
pub file: DuckLakeFileData,
/// Optional associated delete file containing deleted row positions
pub delete_file: Option<DuckLakeFileData>,
/// Starting row ID for this file (reserved for future use)
pub row_id_start: Option<i64>,
/// Snapshot ID when this file was created (reserved for future use)
pub snapshot_id: Option<i64>,
/// Maximum number of rows in this file (reserved for future use)
pub max_row_count: Option<i64>,
}
impl DuckLakeTableFile {
pub fn new(file: DuckLakeFileData) -> Self {
Self {
file,
delete_file: None,
row_id_start: None,
snapshot_id: None,
max_row_count: None,
}
}
}
// Change tracking structures for table_changes (CDC) functionality
#[derive(Debug, Clone)]
pub struct DataFileChange {
pub begin_snapshot: i64,
pub path: String,
pub path_is_relative: bool,
pub file_size_bytes: i64,
pub footer_size: Option<i64>,
pub encryption_key: Option<String>,
}
#[derive(Debug, Clone)]
pub struct DeleteFileChange {
pub begin_snapshot: i64,
}
pub trait MetadataProvider: Send + Sync + std::fmt::Debug {
/// Get the current snapshot ID (dynamic, not cached)
fn get_current_snapshot(&self) -> Result<i64>;
/// Get the data path from catalog metadata (not snapshot-dependent)
fn get_data_path(&self) -> Result<String>;
/// List all snapshots in the catalog
fn list_snapshots(&self) -> Result<Vec<SnapshotMetadata>>;
/// List schemas for a specific snapshot
fn list_schemas(&self, snapshot_id: i64) -> Result<Vec<SchemaMetadata>>;
/// List tables for a specific snapshot
fn list_tables(&self, schema_id: i64, snapshot_id: i64) -> Result<Vec<TableMetadata>>;
/// Get table structure (columns) - not snapshot-dependent as column definitions don't change
fn get_table_structure(&self, table_id: i64) -> Result<Vec<DuckLakeTableColumn>>;
/// Get table files for a specific snapshot
fn get_table_files_for_select(
&self,
table_id: i64,
snapshot_id: i64,
) -> Result<Vec<DuckLakeTableFile>>;
// todo: support select with file pruning
// Dynamic lookup methods for on-demand metadata retrieval
/// Get schema by name for a specific snapshot
fn get_schema_by_name(&self, name: &str, snapshot_id: i64) -> Result<Option<SchemaMetadata>>;
/// Get table by name for a specific snapshot
fn get_table_by_name(
&self,
schema_id: i64,
name: &str,
snapshot_id: i64,
) -> Result<Option<TableMetadata>>;
/// Check if table exists for a specific snapshot
fn table_exists(&self, schema_id: i64, name: &str, snapshot_id: i64) -> Result<bool>;
// Bulk query methods for information_schema
/// List all tables across all schemas for a snapshot
fn list_all_tables(&self, snapshot_id: i64) -> Result<Vec<TableWithSchema>>;
/// List all columns across all tables for a snapshot
fn list_all_columns(&self, snapshot_id: i64) -> Result<Vec<ColumnWithTable>>;
/// List all files across all tables for a snapshot
fn list_all_files(&self, snapshot_id: i64) -> Result<Vec<FileWithTable>>;
// Change tracking methods for table_changes (CDC) functionality
/// Get data files added between two snapshots (exclusive start, inclusive end)
/// Returns files where begin_snapshot > start_snapshot AND begin_snapshot <= end_snapshot
/// These represent INSERT changes - new rows added to the table
fn get_data_files_added_between_snapshots(
&self,
table_id: i64,
start_snapshot: i64,
end_snapshot: i64,
) -> Result<Vec<DataFileChange>>;
/// Get delete files added between two snapshots (exclusive start, inclusive end)
/// Returns delete files where begin_snapshot > start_snapshot AND begin_snapshot <= end_snapshot
/// These represent DELETE changes - rows removed from the table
fn get_delete_files_added_between_snapshots(
&self,
table_id: i64,
start_snapshot: i64,
end_snapshot: i64,
) -> Result<Vec<DeleteFileChange>>;
}
#[cfg(any(feature = "metadata-postgres", feature = "metadata-mysql"))]
/// Helper function to bridge async sqlx operations to sync MetadataProvider trait
pub(crate) fn block_on<F, T>(f: F) -> T
where
F: std::future::Future<Output = T>,
{
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
}