Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 215 additions & 2 deletions src/metadata_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ pub const SQL_LIST_TABLES: &str =
AND ? >= begin_snapshot
AND (? < end_snapshot OR end_snapshot IS NULL)";

pub const SQL_GET_TABLE_COLUMNS: &str = "SELECT column_id, column_name, column_type, nulls_allowed
pub const SQL_GET_TABLE_COLUMNS: &str =
"SELECT column_id, column_name, column_type, nulls_allowed, parent_column
FROM ducklake_column
WHERE table_id = ? AND end_snapshot IS NULL
ORDER BY column_order";
Expand Down Expand Up @@ -218,7 +219,8 @@ pub const SQL_LIST_ALL_COLUMNS: &str = "
c.column_id,
c.column_name,
c.column_type,
c.nulls_allowed
c.nulls_allowed,
c.parent_column
FROM ducklake_schema s
JOIN ducklake_table t ON s.schema_id = t.schema_id
JOIN ducklake_column c ON t.table_id = c.table_id
Expand Down Expand Up @@ -356,6 +358,96 @@ impl DuckLakeTableColumn {
}
}

/// Reconstruct list types from parent-child column rows.
///
/// DuckLake stores list columns as two rows in `ducklake_column`:
/// - Parent row: `column_type = "list"`, `parent_column = NULL`
/// - Child row: `column_type = "<element_type>"`, `parent_column = <parent_column_id>`
///
/// This function rewrites the parent's `column_type` to `list<element_type>`
/// and removes child rows from the result.
///
/// Only handles `list` parent types. Struct, map, etc. are left unchanged.
pub fn reconstruct_list_columns(
rows: Vec<(DuckLakeTableColumn, Option<i64>)>,
) -> Vec<DuckLakeTableColumn> {
use std::collections::HashMap;

// Index: column_id -> position in rows
let id_to_index: HashMap<i64, usize> = rows
.iter()
.enumerate()
.map(|(i, (col, _))| (col.column_id, i))
.collect();

// Separate into columns and parent_column arrays
let mut columns: Vec<DuckLakeTableColumn> = Vec::with_capacity(rows.len());
let mut parent_columns: Vec<Option<i64>> = Vec::with_capacity(rows.len());
for (col, parent) in rows {
columns.push(col);
parent_columns.push(parent);
}

// Find children of list parents and rewrite parent types
let mut skip: std::collections::HashSet<usize> = std::collections::HashSet::new();
for (i, parent_id) in parent_columns.iter().enumerate() {
if let Some(pid) = parent_id
&& let Some(&parent_idx) = id_to_index.get(pid)
&& columns[parent_idx].column_type == "list"
{
columns[parent_idx].column_type = format!("list<{}>", columns[i].column_type);
skip.insert(i);
}
}

// Return only top-level columns (not children)
columns
.into_iter()
.enumerate()
.filter(|(i, _)| !skip.contains(i))
.map(|(_, col)| col)
.collect()
}

/// Same as [`reconstruct_list_columns`] but for [`ColumnWithTable`] rows.
pub fn reconstruct_list_columns_with_table(
rows: Vec<(ColumnWithTable, Option<i64>)>,
) -> Vec<ColumnWithTable> {
use std::collections::HashMap;

let id_to_index: HashMap<i64, usize> = rows
.iter()
.enumerate()
.map(|(i, (cwt, _))| (cwt.column.column_id, i))
.collect();

let mut entries: Vec<ColumnWithTable> = Vec::with_capacity(rows.len());
let mut parent_columns: Vec<Option<i64>> = Vec::with_capacity(rows.len());
for (cwt, parent) in rows {
entries.push(cwt);
parent_columns.push(parent);
}

let mut skip: std::collections::HashSet<usize> = std::collections::HashSet::new();
for (i, parent_id) in parent_columns.iter().enumerate() {
if let Some(pid) = parent_id
&& let Some(&parent_idx) = id_to_index.get(pid)
&& entries[parent_idx].column.column_type == "list"
{
entries[parent_idx].column.column_type =
format!("list<{}>", entries[i].column.column_type);
skip.insert(i);
}
}

entries
.into_iter()
.enumerate()
.filter(|(i, _)| !skip.contains(i))
.map(|(_, e)| e)
.collect()
}

/// Metadata for a data file or delete file in DuckLake
#[derive(Debug, Clone)]
pub struct DuckLakeFileData {
Expand Down Expand Up @@ -534,3 +626,124 @@ where
{
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_reconstruct_list_columns_basic() {
let rows = vec![
(
DuckLakeTableColumn::new(1, "id".into(), "int64".into(), false),
None,
),
(
DuckLakeTableColumn::new(6, "vector".into(), "list".into(), true),
None,
),
(
DuckLakeTableColumn::new(7, "element".into(), "float64".into(), true),
Some(6),
),
];

let result = reconstruct_list_columns(rows);
assert_eq!(result.len(), 2);
assert_eq!(result[0].column_name, "id");
assert_eq!(result[0].column_type, "int64");
assert_eq!(result[1].column_name, "vector");
assert_eq!(result[1].column_type, "list<float64>");
}

#[test]
fn test_reconstruct_list_columns_no_lists() {
let rows = vec![
(
DuckLakeTableColumn::new(1, "id".into(), "int64".into(), false),
None,
),
(
DuckLakeTableColumn::new(2, "name".into(), "varchar".into(), true),
None,
),
];

let result = reconstruct_list_columns(rows);
assert_eq!(result.len(), 2);
assert_eq!(result[0].column_type, "int64");
assert_eq!(result[1].column_type, "varchar");
}

#[test]
fn test_reconstruct_list_columns_struct_parent_unchanged() {
// Struct parents should NOT be rewritten — child stays in result
let rows = vec![
(
DuckLakeTableColumn::new(1, "data".into(), "struct".into(), true),
None,
),
(
DuckLakeTableColumn::new(2, "field_a".into(), "int32".into(), true),
Some(1),
),
];

let result = reconstruct_list_columns(rows);
assert_eq!(result.len(), 2); // both remain
assert_eq!(result[0].column_type, "struct"); // unchanged
}

#[test]
fn test_reconstruct_list_columns_multiple_lists() {
let rows = vec![
(
DuckLakeTableColumn::new(1, "tags".into(), "list".into(), true),
None,
),
(
DuckLakeTableColumn::new(2, "element".into(), "varchar".into(), true),
Some(1),
),
(
DuckLakeTableColumn::new(3, "scores".into(), "list".into(), true),
None,
),
(
DuckLakeTableColumn::new(4, "element".into(), "float64".into(), true),
Some(3),
),
];

let result = reconstruct_list_columns(rows);
assert_eq!(result.len(), 2);
assert_eq!(result[0].column_type, "list<varchar>");
assert_eq!(result[1].column_type, "list<float64>");
}

#[test]
fn test_reconstruct_list_columns_with_table_basic() {
let rows = vec![
(
ColumnWithTable {
schema_name: "main".into(),
table_name: "t".into(),
column: DuckLakeTableColumn::new(6, "vector".into(), "list".into(), true),
},
None,
),
(
ColumnWithTable {
schema_name: "main".into(),
table_name: "t".into(),
column: DuckLakeTableColumn::new(7, "element".into(), "float64".into(), true),
},
Some(6),
),
];

let result = reconstruct_list_columns_with_table(rows);
assert_eq!(result.len(), 1);
assert_eq!(result[0].column.column_type, "list<float64>");
}
}
39 changes: 24 additions & 15 deletions src/metadata_provider_duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::metadata_provider::{
SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS, SQL_GET_LATEST_SNAPSHOT, SQL_GET_SCHEMA_BY_NAME,
SQL_GET_TABLE_BY_NAME, SQL_GET_TABLE_COLUMNS, SQL_LIST_ALL_COLUMNS, SQL_LIST_ALL_FILES,
SQL_LIST_ALL_TABLES, SQL_LIST_SCHEMAS, SQL_LIST_SNAPSHOTS, SQL_LIST_TABLES, SQL_TABLE_EXISTS,
SchemaMetadata, SnapshotMetadata, TableMetadata, TableWithSchema,
SchemaMetadata, SnapshotMetadata, TableMetadata, TableWithSchema, reconstruct_list_columns,
reconstruct_list_columns_with_table,
};
use duckdb::AccessMode::ReadOnly;
use duckdb::{Config, Connection, params};
Expand Down Expand Up @@ -145,22 +146,26 @@ impl MetadataProvider for DuckdbMetadataProvider {
let conn = self.connection();
let mut stmt = conn.prepare(SQL_GET_TABLE_COLUMNS)?;

let columns = stmt
let raw_columns: Vec<(DuckLakeTableColumn, Option<i64>)> = stmt
.query_map([table_id], |row| {
let column_id: i64 = row.get(0)?;
let column_name: String = row.get(1)?;
let column_type: String = row.get(2)?;
let nulls_allowed: Option<bool> = row.get(3)?;
Ok(DuckLakeTableColumn::new(
column_id,
column_name,
column_type,
nulls_allowed.unwrap_or(true),
let parent_column: Option<i64> = row.get(4)?;
Ok((
DuckLakeTableColumn::new(
column_id,
column_name,
column_type,
nulls_allowed.unwrap_or(true),
),
parent_column,
))
})?
.collect::<Result<Vec<_>, _>>()?;

Ok(columns)
Ok(reconstruct_list_columns(raw_columns))
}

fn get_table_files_for_select(
Expand Down Expand Up @@ -307,29 +312,33 @@ impl MetadataProvider for DuckdbMetadataProvider {
let conn = self.connection();
let mut stmt = conn.prepare(SQL_LIST_ALL_COLUMNS)?;

let columns = stmt
let raw_columns: Vec<(ColumnWithTable, Option<i64>)> = stmt
.query_map(
params![snapshot_id, snapshot_id, snapshot_id, snapshot_id],
|row| {
let schema_name: String = row.get(0)?;
let table_name: String = row.get(1)?;
let nulls_allowed: Option<bool> = row.get(5)?;
let parent_column: Option<i64> = row.get(6)?;
let column = DuckLakeTableColumn {
column_id: row.get(2)?,
column_name: row.get(3)?,
column_type: row.get(4)?,
is_nullable: nulls_allowed.unwrap_or(true),
};
Ok(ColumnWithTable {
schema_name,
table_name,
column,
})
Ok((
ColumnWithTable {
schema_name,
table_name,
column,
},
parent_column,
))
},
)?
.collect::<Result<Vec<_>, _>>()?;

Ok(columns)
Ok(reconstruct_list_columns_with_table(raw_columns))
}

fn list_all_files(&self, snapshot_id: i64) -> crate::Result<Vec<FileWithTable>> {
Expand Down
Loading
Loading