Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 77 additions & 11 deletions core/src/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,7 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
.with_indexes(indexes.clone());
Comment thread
ewgenius marked this conversation as resolved.

let pool = Arc::new(pool);
make_initial_table(Arc::new(table_definition.clone()), &pool)?;

let write_settings = DuckDBWriteSettings::from_params(&options);

let table_writer_builder = DuckDBTableWriterBuilder::new()
.with_table_definition(table_definition)
.with_pool(pool)
.set_on_conflict(on_conflict)
.with_write_settings(write_settings);
make_initial_table(Arc::new(table_definition), &pool)?;

let dyn_pool: Arc<DynDuckDbConnectionPool> = Arc::new(read_pool);

Expand All @@ -510,9 +502,28 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
self.settings_registry
.apply_settings(conn, &options, DuckDBSettingScope::Global)?;

// Read actual DuckDB schema after table creation (may differ from cmd.schema).
let schema_conn = dyn_pool.connect().await?;
let schema = get_schema(schema_conn, &TableReference::bare(name.clone()))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let table_definition =
TableDefinition::new(RelationName::new(name.clone()), Arc::clone(&schema))
.with_constraints(cmd.constraints.clone())
.with_indexes(indexes.clone());

let write_settings = DuckDBWriteSettings::from_params(&options);

let table_writer_builder = DuckDBTableWriterBuilder::new()
.with_table_definition(table_definition)
.with_pool(pool)
.set_on_conflict(on_conflict)
.with_write_settings(write_settings);

let read_provider = Arc::new(DuckDBTable::new_with_schema(
&dyn_pool,
Arc::clone(&schema),
schema,
TableReference::bare(name.clone()),
None,
Some(self.dialect.clone()),
Expand Down Expand Up @@ -798,7 +809,7 @@ pub(crate) mod tests {
use crate::duckdb::write::DuckDBTableWriter;

use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::{Constraints, ToDFSchema};
use datafusion::logical_expr::CreateExternalTable;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -1117,4 +1128,59 @@ pub(crate) mod tests {
assert_eq!(e.to_string(), "External error: Query execution failed.\nInvalid Input Error: Failed to cast value: Could not convert string 'invalid' to BOOL\nFor details, refer to the DuckDB manual: https://duckdb.org/docs/");
}
}

/// Verifies the read provider advertises actual DuckDB storage types,
/// not the requested cmd.schema types.
#[tokio::test]
async fn test_read_provider_schema_reflects_actual_duckdb_types() {
let table_name = TableReference::bare("test_timestamp_schema");
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
false,
),
]);

let mut options = HashMap::new();
options.insert("mode".to_string(), "memory".to_string());

let factory = DuckDBTableProviderFactory::new(duckdb::AccessMode::ReadWrite);
let ctx = SessionContext::new();
let cmd = CreateExternalTable {
schema: Arc::new(schema.to_dfschema().expect("to df schema")),
name: table_name,
location: "".to_string(),
file_type: "".to_string(),
table_partition_cols: vec![],
if_not_exists: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options,
constraints: Constraints::default(),
column_defaults: HashMap::new(),
temporary: false,
or_replace: false,
};

let table_provider = factory
.create(&ctx.state(), &cmd)
.await
.expect("table provider created");

let read_schema = table_provider.schema();
let ts_field = read_schema
.field_with_name("created_at")
.expect("created_at field exists");

// DuckDB stores TIMESTAMPTZ as Microsecond regardless of requested precision.
match ts_field.data_type() {
DataType::Timestamp(TimeUnit::Microsecond, _) => {}
other => panic!(
"Expected Timestamp(Microsecond, _), got {other:?}"
),
}
}
}
Loading