Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
530 changes: 335 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

35 changes: 33 additions & 2 deletions docs/catalog/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ values (1, 'New Record', now());

### Limitations for Insertion

- Schema evolution during insert is not supported
- Only append operations are supported (no upserts)
- Complex data types (nested structs, arrays, maps) have limited support

Expand All @@ -371,13 +370,45 @@ When using the `create_table_if_not_exists` option, please be aware of the follo
- **Partitioning**: The automatically created table will use default partitioning settings. You cannot specify custom partition or sort specifications during automatic creation.
- **Identifier Fields**: The automatically created table will not have any identifier fields specified. If you need identifier fields, you must create the Iceberg table manually beforehand.

## Schema Evolution

The Iceberg FDW supports [Apache Iceberg schema evolution](https://iceberg.apache.org/spec/#schema-evolution). When columns are added to an Iceberg table after data has already been written, rows from older data files will return `NULL` for those new columns, which matches Iceberg spec behavior.

For example, given a table that initially has `id` and `name` columns, and later gains a `score` column:

```sql
-- rows written before the column was added return NULL for 'score',
-- while newer rows return the actual value
select id, name, score from iceberg.members order by id;

-- id | name | score
-- ----+-------+-------
-- 1 | alice | NULL
-- 2 | bob | NULL
-- 3 | carol | 42
-- 4 | dave | 99
```

Filter pushdown on newly-added columns also works correctly:

```sql
select name from iceberg.members where score > 50;

-- name
-- ------
-- dave
```

!!! note

The foreign table definition in Postgres must include any new columns to read them. Re-run `import foreign schema` (which will refresh the `schema_id` option) or add the columns manually with `alter foreign table` and update or drop any pinned `schema_id` on the foreign table; otherwise, the FDW may still use an older schema and report `ColumnNotFound` for newly-evolved columns.

## Limitations

This section describes important limitations and considerations when using this FDW:

- Only supports specific data type mappings between Postgres and Iceberg
- UPDATE, DELETE, and TRUNCATE operations are not supported
- [Apache Iceberg schema evolution](https://iceberg.apache.org/spec/#schema-evolution) is not supported
- When using Iceberg REST catalog, only supports AWS S3 (or compatible) as the storage
- Materialized views using these foreign tables may fail during logical backups

Expand Down
26 changes: 16 additions & 10 deletions wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ iceberg_fdw = [
"uuid",
]
duckdb_fdw = [
"arrow-array",
"arrow-schema",
"arrow-json",
"arrow-array-compat",
"arrow-schema-compat",
"arrow-json-compat",
"chrono",
"duckdb",
"regex",
Expand Down Expand Up @@ -281,10 +281,10 @@ async-compression = { version = "0.3.15", features = [
"zlib",
], optional = true }
http = { version = "0.2", optional = true }
parquet = { version = "55.1.0", features = ["async"], optional = true }
arrow-array = { version = "55.1.0", optional = true }
arrow-schema = { version = "55.1.0", optional = true }
arrow-json = { version = "55.1.0", optional = true }
parquet = { version = "57.3.0", features = ["async"], optional = true }
arrow-array = { version = "57.3.0", optional = true }
arrow-schema = { version = "57.3.0", optional = true }
arrow-json = { version = "57.3.0", optional = true }

# for mssql_fdw
tiberius = { version = "0.12.2", features = [
Expand Down Expand Up @@ -319,13 +319,19 @@ anyhow = { version = "1.0.81", optional = true }
uuid = { version = "1.18.0", features = ["v7"], optional = true }

# for iceberg_fdw
iceberg = { version = "0.6.0", optional = true }
iceberg-catalog-s3tables = { git = "https://github.com/burmecia/iceberg-rust", rev = "6548db2cc02b8ecd65e698e58d372d7dfb342b9c", package="iceberg-catalog-s3tables", optional = true }
iceberg-catalog-rest = { version = "0.6.0", optional = true }
iceberg = { version = "0.8.0", optional = true }
iceberg-catalog-s3tables = { version = "0.8.0", optional = true }
iceberg-catalog-rest = { version = "0.8.0", optional = true }
rust_decimal = { version = "1.37.1", optional = true }

# for duckdb_fdw
duckdb = { version = "=1.3.2", features = ["bundled"], optional = true }
# Version-pinned arrow compat deps matching duckdb's bundled arrow (55.x).
# The workspace uses arrow 57.x for iceberg_fdw; using 57.x here causes type
# incompatibilities because duckdb::arrow::* types implement the 55.x traits.
arrow-array-compat = { package = "arrow-array", version = "55", optional = true }
arrow-json-compat = { package = "arrow-json", version = "55", optional = true }
arrow-schema-compat = { package = "arrow-schema", version = "55", optional = true }

[dev-dependencies]
pgrx-tests = "=0.16.1"
57 changes: 57 additions & 0 deletions wrappers/dockerfiles/s3/iceberg_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,66 @@ def create_asks_table(catalog, namespace):
print(data)


def create_schema_evolution_table(catalog, namespace):
"""Creates a table and evolves its schema to test schema evolution support.

Step 1: Create table with initial schema (id, name).
Step 2: Insert rows using the original schema.
Step 3: Add a new nullable column 'score' via update_schema().
Step 4: Add another new nullable column 'tag' via update_schema().
Step 5: Insert rows that include the new columns.
Rows inserted before the schema update should return NULL for 'score' and 'tag'.
"""
tblname = f"{namespace}.schema_evolution"
schema = Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
identifier_field_ids=[1],
)

if catalog.table_exists(tblname):
catalog.purge_table(tblname)
tbl = catalog.create_table(identifier=tblname, schema=schema)

# Insert initial rows (no 'score' or 'tag' column yet)
df_v1 = pa.Table.from_pylist(
[
{"id": 1, "name": "alice"},
{"id": 2, "name": "bob"},
],
schema=tbl.schema().as_arrow(),
)
tbl.overwrite(df_v1)

# Evolve schema: add 'score' (integer) column
with tbl.update_schema() as update:
update.add_column("score", IntegerType())

# Evolve schema again: add 'tag' (string) column
with tbl.update_schema() as update:
update.add_column("tag", StringType())

# Refresh table handle so it picks up the new schema
tbl = catalog.load_table(tblname)

# Insert new rows that use the evolved schema
df_v2 = pa.Table.from_pylist(
[
{"id": 3, "name": "carol", "score": 42, "tag": "gold"},
{"id": 4, "name": "dave", "score": 99, "tag": "silver"},
],
schema=tbl.schema().as_arrow(),
)
tbl.append(df_v2)

data = tbl.scan().to_arrow()
print(data)


catalog.create_namespace_if_not_exists(namespace)
ns = catalog.list_namespaces()
tables = catalog.list_tables(namespace)

create_bids_table(catalog, namespace)
create_asks_table(catalog, namespace)
create_schema_evolution_table(catalog, namespace)
4 changes: 2 additions & 2 deletions wrappers/src/fdw/duckdb_fdw/mapper.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow_array::{Array, RecordBatch, array::ArrayRef};
use arrow_json::ArrayWriter;
use arrow_array_compat::{Array, RecordBatch, array::ArrayRef};
use arrow_json_compat::ArrayWriter;
use duckdb::{
self,
types::{EnumType, ListType, ValueRef},
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/fdw/duckdb_fdw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum DuckdbFdwError {
NumericError(#[from] pgrx::datum::numeric_support::error::Error),

#[error("arrow error: {0}")]
ArrowError(#[from] arrow_schema::ArrowError),
ArrowError(#[from] arrow_schema_compat::ArrowError),

#[error("uuid error: {0}")]
UuidConversionError(#[from] uuid::Error),
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/fdw/iceberg_fdw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ This is a foreign data wrapper for [Apache Iceberg](https://iceberg.apache.org/)

| Version | Date | Notes |
| ------- | ---------- | ---------------------------------------------------------------------- |
| 0.1.5 | 2026-03-24 | Add schema evolution support |
| 0.1.4 | 2025-11-21 | Add create_table_if_not_exists option and improve insertion performance |
| 0.1.3 | 2025-09-20 | Add data insertion support |
| 0.1.2 | 2025-07-30 | Large data set query performance improvement |
| 0.1.1 | 2025-05-15 | Refactor server options passdown |
| 0.1.0 | 2025-05-07 | Initial version |

88 changes: 54 additions & 34 deletions wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use arrow_array::{Array, RecordBatch, array::ArrayRef, builder::ArrayBuilder};
use futures::StreamExt;
use iceberg::{
Catalog, NamespaceIdent, TableCreation, TableIdent,
Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent,
arrow::schema_to_arrow_schema,
expr::Predicate,
scan::ArrowRecordBatchStream,
spec::{DataFileFormat, NestedFieldRef, PrimitiveType, Type},
spec::{DataFileFormat, NestedFieldRef, PartitionKey, PrimitiveType, Type},
table::Table,
transaction::{ApplyTransactionAction, Transaction},
writer::{
IcebergWriter, IcebergWriterBuilder, base_writer::data_file_writer::DataFileWriterBuilder,
file_writer::ParquetWriterBuilder,
IcebergWriter, IcebergWriterBuilder,
base_writer::data_file_writer::DataFileWriterBuilder,
file_writer::{ParquetWriterBuilder, rolling_writer::RollingFileWriterBuilder},
},
};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_catalog_s3tables::{S3TablesCatalog, S3TablesCatalogConfig};
use iceberg_catalog_rest::{
REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
};
use iceberg_catalog_s3tables::S3TablesCatalogBuilder;
use parquet::file::properties::WriterProperties;
use pgrx::pg_sys;
use std::collections::{HashMap, HashSet, VecDeque};
Expand All @@ -32,7 +36,7 @@ use super::{
use crate::stats;

#[wrappers_fdw(
version = "0.1.4",
version = "0.1.5",
author = "Supabase",
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/iceberg_fdw",
error_type = "IcebergFdwError"
Expand Down Expand Up @@ -200,7 +204,7 @@ impl IcebergFdw {
if let Some(table) = &self.table {
let metadata = table.metadata();
let iceberg_schema = metadata.current_schema();
let schema: arrow_schema::Schema = (iceberg_schema.as_ref()).try_into()?;
let schema = schema_to_arrow_schema(iceberg_schema.as_ref())?;

for partition_rows in partitions.iter() {
// create builder for each column
Expand Down Expand Up @@ -261,19 +265,26 @@ impl IcebergFdw {
// get partition value from location generator
let partition_value = location_generator.partition_value();

let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
schema.clone(),
let parquet_writer_builder =
ParquetWriterBuilder::new(WriterProperties::default(), schema.clone());
let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
parquet_writer_builder,
table.file_io().clone(),
location_generator,
file_name_generator,
);
let data_file_writer_builder = DataFileWriterBuilder::new(
parquet_writer_builder,
partition_value,
metadata.default_partition_spec().spec_id(),
);
let mut data_file_writer = self.rt.block_on(data_file_writer_builder.build())?;
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
// convert Option<Struct> partition value to Option<PartitionKey>
let partition_key = partition_value.map(|pv| {
PartitionKey::new(
metadata.default_partition_spec().as_ref().clone(),
metadata.current_schema().clone(),
pv,
)
});
let mut data_file_writer = self
.rt
.block_on(data_file_writer_builder.build(partition_key))?;

// write the record batch to Iceberg and close the writer and get
// the data file
Expand Down Expand Up @@ -432,21 +443,20 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
// 1. S3 tables
// 2. REST catalog with S3 (or compatible) as backend storage
let catalog: Box<dyn Catalog> =
if let Some(aws_s3table_arn) = props.get("aws_s3table_bucket_arn") {
let catalog_config = S3TablesCatalogConfig::builder()
.table_bucket_arn(aws_s3table_arn.into())
.properties(props)
.build();
Box::new(rt.block_on(S3TablesCatalog::new(catalog_config))?)
if let Some(aws_s3table_arn) = props.get("aws_s3table_bucket_arn").cloned() {
Box::new(
rt.block_on(
S3TablesCatalogBuilder::default()
.with_table_bucket_arn(aws_s3table_arn)
.load("s3tables", props),
)?,
)
} else {
let catalog_uri = require_option("catalog_uri", &props)?;
let warehouse = require_option_or("warehouse", &props, "warehouse");
let catalog_config = RestCatalogConfig::builder()
.warehouse(warehouse.into())
.uri(catalog_uri.into())
.props(props)
.build();
Box::new(RestCatalog::new(catalog_config))
let catalog_uri = require_option("catalog_uri", &props)?.to_string();
let warehouse = require_option_or("warehouse", &props, "warehouse").to_string();
props.insert(REST_CATALOG_PROP_URI.to_string(), catalog_uri);
props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse);
Box::new(rt.block_on(RestCatalogBuilder::default().load("rest", props))?)
};

stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);
Expand Down Expand Up @@ -482,7 +492,16 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
let tbl_ident = TableIdent::from_strs(require_option("table", options)?.split("."))?;
let table = self.rt.block_on(self.catalog.load_table(&tbl_ident))?;

let schema = table.metadata().current_schema();
// resolve schema by id if specified (for schema evolution support), else use current
let schema = if let Some(id_str) = options.get("schema_id") {
let schema_id = id_str.parse::<i32>()?;
table
.metadata()
.schema_by_id(schema_id)
.ok_or_else(|| IcebergFdwError::SchemaNotFound(schema_id))?
} else {
table.metadata().current_schema()
};
for tgt_col in columns {
let col_name = &tgt_col.name;
let field = schema
Expand All @@ -491,7 +510,7 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
self.src_fields.push(field.clone());
}

self.predicate = try_pushdown(&table, quals)?;
self.predicate = try_pushdown(schema, quals)?;
self.table = table.into();
self.tgt_cols = columns.to_vec();

Expand Down Expand Up @@ -670,11 +689,12 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
r#"create foreign table if not exists {} (
{}
)
server {} options (table '{}'{})"#,
server {} options (table '{}', schema_id '{}'{})"#,
tbl.identifier().name,
fields.join(","),
stmt.server_name,
tbl.identifier(),
schema.schema_id(),
rowid_column.unwrap_or_default(),
));
}
Expand Down
3 changes: 3 additions & 0 deletions wrappers/src/fdw/iceberg_fdw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ enum IcebergFdwError {
#[error("cannot import column '{0}' data type '{1}'")]
ImportColumnError(String, String),

#[error("schema with id {0} not found")]
SchemaNotFound(i32),

#[error("vault error: '{0}'")]
VaultError(String),

Expand Down
6 changes: 2 additions & 4 deletions wrappers/src/fdw/iceberg_fdw/pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use super::{IcebergFdwError, IcebergFdwResult};
use chrono::NaiveDate;
use iceberg::{
expr::{Predicate, Reference},
spec::{Datum, PrimitiveType, Type},
table::Table,
spec::{Datum, PrimitiveType, Schema, Type},
};
use pgrx::varlena;
use rust_decimal::Decimal;
Expand Down Expand Up @@ -98,8 +97,7 @@ fn cell_to_iceberg_datum(cell: &Cell, tgt_type: &Type) -> IcebergFdwResult<Optio

// try to translate quals to predicates and push them down to Iceberg,
// return None if pushdown is not possible
pub(super) fn try_pushdown(table: &Table, quals: &[Qual]) -> IcebergFdwResult<Option<Predicate>> {
let schema = table.metadata().current_schema();
pub(super) fn try_pushdown(schema: &Schema, quals: &[Qual]) -> IcebergFdwResult<Option<Predicate>> {
let mut preds: Vec<Predicate> = Vec::new();

for qual in quals {
Expand Down
Loading
Loading