Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 200 additions & 6 deletions tansu-schema/src/lake/berg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ impl Iceberg {
}

async fn load_or_create_table(&self, name: &str, schema: Schema) -> Result<Table> {
if let Some(table) = self.tables.lock().map(|guard| guard.get(name).cloned())? {
debug!(name, ?schema);
if let Some(table) = self.tables.lock().map(|guard| guard.get(name).cloned())?
&& table.metadata().current_schema().as_ref() == &schema
{
return Ok(table);
}

Expand All @@ -201,26 +204,43 @@ impl Iceberg {
.await
.inspect_err(|err| debug!(?err))?;

if table.metadata().current_schema().as_ref() != &schema {
if table.metadata().current_schema().as_ref() == &schema {
table
} else {
debug!(current = ?table.metadata(), ?schema);

_ = TableMetadataBuilder::new_from_metadata(
let _result = TableMetadataBuilder::new_from_metadata(
table.metadata().to_owned(),
table
.metadata_location()
.map(|location| location.to_owned()),
)
.add_schema(schema.clone())
.set_current_schema(-1)
.set_current_schema(TableMetadataBuilder::LAST_ADDED)
.and_then(|builder| builder.build())
.inspect(|update| {
debug!(?update.metadata);
debug!(?update.changes);
debug!(?update.expired_metadata_logs);
})?;
}

table
// let abc = TableCommit::builder()
// .ident(table.identifier().to_owned())
// .updates(result.changes)
// .build();

// let q = self.catalog.update_table(commit).await?;

// let commit_uuid = Uuid::now_v7();
// debug!(%commit_uuid);

// let tx = Transaction::new(&table);

self.catalog
.load_table(&table_ident)
.await
.inspect_err(|err| debug!(?err))?
}
} else {
self.catalog
.create_table(
Expand Down Expand Up @@ -336,11 +356,21 @@ impl LakeHouse for Iceberg {

#[cfg(test)]
mod tests {
use crate::{AsArrow as _, Generator as _};

use super::*;
use arrow::util::pretty::pretty_format_batches;
use bytes::Bytes;
use datafusion::prelude::SessionContext;
use dotenv::dotenv;
use iceberg::spec::{NestedField, PrimitiveType, Type};
use iceberg_datafusion::IcebergCatalogProvider;
use rand::{distr::Alphanumeric, prelude::*, rng};
use std::{env::var, fs::File, marker::PhantomData, sync::Arc, thread};
use tansu_sans_io::{
ConfigResource, ErrorCode, describe_configs_response::DescribeConfigsResourceResult,
record::inflated::Batch,
};
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -556,4 +586,168 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn migrate_schema() -> Result<()> {
_ = dotenv().ok();
let _guard = init_tracing()?;

let catalog_uri = &var("ICEBERG_CATALOG").unwrap_or("http://localhost:8181".into())[..];
let location_uri = &var("DATA_LAKE").unwrap_or("s3://lake".into())[..];
let warehouse = var("ICEBERG_WAREHOUSE").ok();
let namespace = &alphanumeric_string(5)[..];
let table_name = &alphanumeric_string(5)[..];

debug!(catalog_uri, location_uri, ?warehouse, namespace, table_name);

let lake_house = Iceberg::try_from(
Builder::<PhantomData<Url>, PhantomData<Url>>::default()
.location(Url::parse(location_uri)?)
.catalog(Url::parse(catalog_uri)?)
.warehouse(warehouse.clone())
.namespace(Some(namespace.into())),
)?;

let catalog = lake_house.catalog.clone();

let lake_house = House::Iceberg(lake_house);

let partition = 32123;

let record_batch_001 = {
let schema = crate::proto::Schema::try_from(Bytes::from_static(include_bytes!(
"../../tests/migrate-001.proto"
)))?;

Batch::builder()
.record(schema.generate()?)
.base_timestamp(119_731_017_000)
.build()
.map_err(Into::into)
.and_then(|batch| schema.as_arrow(partition, &batch, LakeHouseType::Iceberg))?
};

let config = DescribeConfigsResult::default()
.error_code(ErrorCode::None.into())
.error_message(None)
.resource_type(ConfigResource::Topic.into())
.resource_name(table_name.into())
.configs(Some(vec![
DescribeConfigsResourceResult::default()
.name(String::from("tansu.lake.normalize"))
.value(Some(String::from("true")))
.read_only(true),
]));

let offset = 543212345;

lake_house
.store(
table_name,
partition,
offset,
record_batch_001,
config.clone(),
)
.await
.inspect(|result| debug!(?result))
.inspect_err(|err| debug!(?err))?;

let ctx = SessionContext::new();
assert!(
ctx.register_catalog(
"iceberg",
IcebergCatalogProvider::try_new(catalog.clone())
.await
.map(Arc::new)?,
)
.is_none()
);

let provider = ctx.catalog("iceberg").expect("provider");

debug!(schemas = ?provider.schema_names());

debug!(
tables = ?provider
.schema_names()
.iter()
.flat_map(|schema_name| {
provider
.schema(schema_name)
.map(|schema| {
schema
.table_names()
.iter()
.map(|table_name| format!("{schema_name}.{table_name}"))
.collect::<Vec<_>>()
})
.unwrap_or_default()
})
.collect::<Vec<_>>()
);

let df = ctx
.sql(&format!(r#"select * from iceberg."{namespace}"."{table_name}""#)[..])
.await?;
let results = df.collect().await?;

let pretty_results = pretty_format_batches(&results)?.to_string();

let expected = vec![
"+------------------------------------------------------------------------------------+------------------------+",
"| meta | value |",
"+------------------------------------------------------------------------------------+------------------------+",
"| {partition: 32123, timestamp: 1973-10-17T18:36:57, year: 1973, month: 10, day: 17} | {email_address: lorem} |",
"+------------------------------------------------------------------------------------+------------------------+",
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);

let record_batch_002 = {
let schema = crate::proto::Schema::try_from(Bytes::from_static(include_bytes!(
"../../tests/migrate-002.proto"
)))?;

Batch::builder()
.record(schema.generate()?)
.base_timestamp(119_731_017_000)
.build()
.map_err(Into::into)
.and_then(|batch| schema.as_arrow(partition, &batch, LakeHouseType::Iceberg))?
};

let offset = 654323456;

lake_house
.store(
table_name,
partition,
offset,
record_batch_002,
config.clone(),
)
.await
.inspect(|result| debug!(?result))
.inspect_err(|err| debug!(?err))?;

let df = ctx
.sql(&format!(r#"select * from iceberg."{namespace}"."{table_name}""#)[..])
.await?;
let results = df.collect().await?;

let pretty_results = pretty_format_batches(&results)?.to_string();

let expected = vec![
"+------------------------------------------------------------------------------------+------------------------+",
"| meta | value |",
"+------------------------------------------------------------------------------------+------------------------+",
"| {partition: 32123, timestamp: 1973-10-17T18:36:57, year: 1973, month: 10, day: 17} | {email_address: lorem} |",
"+------------------------------------------------------------------------------------+------------------------+",
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);

Ok(())
}
}
Loading