Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,6 @@ impl Protocol {
/// Check if writing to a table with this protocol is supported. That is: does the kernel
/// support the specified protocol writer version and all enabled writer features?
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
#[cfg(feature = "catalog-managed")]
require!(
!self.is_catalog_managed(),
Error::unsupported("Writes are not yet supported for catalog-managed tables")
);
match &self.writer_features {
Some(writer_features) if self.min_writer_version == 7 => {
// if we're on version 7, make sure we support all the specified features
Expand Down Expand Up @@ -1401,7 +1396,7 @@ mod tests {
.unwrap();
assert_result_error_with_message(
protocol.ensure_write_supported(),
r#"Unsupported: Unknown WriterFeatures: "identityColumns". Supported WriterFeatures: "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
r#"Unsupported: Unknown WriterFeatures: "identityColumns". Supported WriterFeatures: "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview", "v2Checkpoint""#,
);

// Unknown writer features should cause an error
Expand All @@ -1414,7 +1409,7 @@ mod tests {
.unwrap();
assert_result_error_with_message(
protocol.ensure_write_supported(),
r#"Unsupported: Unknown WriterFeatures: "unsupported writer". Supported WriterFeatures: "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
r#"Unsupported: Unknown WriterFeatures: "unsupported writer". Supported WriterFeatures: "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview", "v2Checkpoint""#,
);
}

Expand Down Expand Up @@ -1470,24 +1465,25 @@ mod tests {
assert_eq!(parse_features::<ReaderFeature>(features), expected);
}

#[cfg(feature = "catalog-managed")]
#[test]
fn test_no_catalog_managed_writes() {
fn test_catalog_managed_writes() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeature::CatalogManaged]),
Some([WriterFeature::CatalogManaged]),
)
.unwrap();
assert!(protocol.ensure_write_supported().is_err());
assert!(protocol.ensure_write_supported().is_ok());
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeature::CatalogOwnedPreview]),
Some([WriterFeature::CatalogOwnedPreview]),
)
.unwrap();
assert!(protocol.ensure_write_supported().is_err());
assert!(protocol.ensure_write_supported().is_ok());
}

#[test]
Expand Down
18 changes: 8 additions & 10 deletions kernel/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod tests {

#[cfg(feature = "catalog-managed")]
#[tokio::test]
async fn catalog_managed_tables_block_transactions() {
async fn disallow_filesystem_committer_for_catalog_managed_tables() {
let storage = Arc::new(InMemory::new());
let table_root = Url::parse("memory:///").unwrap();
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
Expand All @@ -226,18 +226,16 @@ mod tests {
let snapshot = crate::snapshot::SnapshotBuilder::new_for(table_root)
.build(&engine)
.unwrap();
// Try to create a transaction with FileSystemCommitter
// Try to commit a transaction with FileSystemCommitter
let committer = Box::new(FileSystemCommitter::new());
let err = snapshot.transaction(committer).unwrap_err();
let err = snapshot
.transaction(committer)
.unwrap()
.commit(&engine)
.unwrap_err();
assert!(matches!(
err,
crate::Error::Unsupported(e) if e.contains("Writes are not yet supported for catalog-managed tables")
crate::Error::Generic(e) if e.contains("The FileSystemCommitter cannot be used to commit to catalog-managed tables. Please provide a committer for your catalog via Transaction::with_committer().")
));
// after allowing writes, we will check that this disallows default committer for
// catalog-managed tables.
// assert!(matches!(
// err,
// crate::Error::Generic(e) if e.contains("Cannot use the default committer for a catalog-managed table")
// ));
}
}
6 changes: 6 additions & 0 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,21 @@ pub(crate) static SUPPORTED_READER_FEATURES: LazyLock<Vec<ReaderFeature>> = Lazy
pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock<Vec<WriterFeature>> = LazyLock::new(|| {
vec![
WriterFeature::AppendOnly,
#[cfg(feature = "catalog-managed")]
WriterFeature::CatalogManaged,
#[cfg(feature = "catalog-managed")]
WriterFeature::CatalogOwnedPreview,
WriterFeature::DeletionVectors,
WriterFeature::DomainMetadata,
WriterFeature::InCommitTimestamp,
WriterFeature::Invariants,
WriterFeature::RowTracking,
WriterFeature::TimestampWithoutTimezone,
WriterFeature::VacuumProtocolCheck,
WriterFeature::VariantType,
WriterFeature::VariantTypePreview,
WriterFeature::VariantShreddingPreview,
WriterFeature::V2Checkpoint,
Comment on lines +244 to +248
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the test tables i was playing with have V2Checkpoint and VacuumProtocolCheck I went ahead and listed them here. I think this is safe (and even desirable) since (1) we don't yet write v2 checkpoint and (2) we don't vacuum and this will unblock writes to such tables

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, V2Checkpoint disallows multi-part checkpoints. I guess we don't support that in kernel so we're okay?

]
});

Expand Down
1 change: 1 addition & 0 deletions uc-catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
url = "2"
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
delta_kernel = { path = "../kernel", features = ["arrow-56", "default-engine-rustls", "catalog-managed"] }
108 changes: 108 additions & 0 deletions uc-catalog/src/committer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::sync::Arc;

use delta_kernel::committer::{CommitMetadata, CommitResponse, Committer};
use delta_kernel::{DeltaResult, Engine, Error as DeltaError, FilteredEngineData};
use uc_client::models::commits::{Commit, CommitRequest};
use uc_client::prelude::UCClient;

use url::Url;

// A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to a specific
/// delta table in UC.
#[derive(Debug, Clone)]
pub struct UCCommitter {
client: Arc<UCClient>,
table_id: String,
table_uri: String,
}

impl UCCommitter {
pub fn new(
client: Arc<UCClient>,
table_id: impl Into<String>,
table_uri: impl Into<String>,
) -> Self {
UCCommitter {
client,
table_id: table_id.into(),
table_uri: table_uri.into(),
}
}
}

impl Committer for UCCommitter {
fn commit(
&self,
engine: &dyn Engine,
actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
let uuid = uuid::Uuid::new_v4();
let filename = format!(
"{version:020}.{uuid}.json",
version = commit_metadata.version()
);
// FIXME use table path from commit_metadata?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a FIXME or a TODO?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ the ones after

let mut commit_path = Url::parse(&self.table_uri)?;
commit_path.path_segments_mut().unwrap().extend(&[
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the unwraps should be ok_ors

"_delta_log",
"_staged_commits",
&filename,
]);

engine
.json_handler()
.write_json_file(&commit_path, Box::new(actions), false)?;

let mut other = Url::parse(&self.table_uri)?;
other.path_segments_mut().unwrap().extend(&[
"_delta_log",
"_staged_commits",
&format!("{:020}", commit_metadata.version()),
]);
let committed = engine
.storage_handler()
.list_from(&other)?
.next()
.unwrap()
.unwrap();
println!("wrote commit file: {:?}", committed);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is part of a lib right? we should info!


// FIXME: ?
let timestamp: i64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.map_err(|e| DeltaError::Generic(format!("Unable to convert timestamp to i64: {e}")))?;

// let last_backfilled_version =
// commit_metadata.latest_published_version.map(|v| v.try_into().unwrap()); // FIXME
let last_backfilled_version = None;
let commit_req = CommitRequest::new(
self.table_id.clone(),
self.table_uri.clone(),
Commit::new(
commit_metadata.version().try_into().unwrap(),
timestamp,
filename,
committed.size as i64,
committed.last_modified,
),
last_backfilled_version,
);
// FIXME
let handle = tokio::runtime::Handle::current();
tokio::task::block_in_place(|| {
handle.block_on(async move {
self.client
.commit(commit_req)
.await
.map_err(|e| DeltaError::Generic(format!("UC commit error: {e}")))
})
})?;
Ok(CommitResponse::Committed {
version: commit_metadata.version(),
})
}
}
104 changes: 101 additions & 3 deletions uc-catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! UCCatalog implements a high-level interface for interacting with Delta Tables in Unity Catalog.

mod committer;
pub use committer::UCCommitter;

use std::sync::Arc;

use delta_kernel::{Engine, LogPath, Snapshot, Version};
Expand Down Expand Up @@ -63,8 +66,10 @@ impl<'a> UCCatalog<'a> {
start_version: Some(0),
end_version: version.and_then(|v| v.try_into().ok()),
};
// TODO: does it paginate?
let commits = self.client.get_commits(req).await?;
let mut commits = self.client.get_commits(req).await?;
if let Some(commits) = &mut commits.commits {
commits.sort_by_key(|c| c.version);
}

// if commits are present, we ensure they are sorted+contiguous
if let Some(commits) = &commits.commits {
Expand All @@ -88,7 +93,11 @@ impl<'a> UCCatalog<'a> {
};

// consume uc-client's Commit and hand back a delta_kernel LogPath
let table_url = Url::parse(&table_uri)?;
let mut table_url = Url::parse(&table_uri)?;
// add trailing slash
if !table_url.path().ends_with('/') {
table_url.path_segments_mut().unwrap().push("");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we trying to push "/" ?

}
let commits: Vec<_> = commits
.commits
.unwrap_or_default()
Expand Down Expand Up @@ -120,6 +129,7 @@ mod tests {

use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::transaction::CommitResult;

use super::*;

Expand Down Expand Up @@ -192,4 +202,92 @@ mod tests {

Ok(())
}

// ignored test which you can run manually to play around with writing to a UC table. run with:
// `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored`
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn write_uc_table() -> Result<(), Box<dyn std::error::Error>> {
let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set");
let token = env::var("TOKEN").expect("TOKEN environment variable not set");
let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set");

// build UC client, get table info and credentials
let client = Arc::new(UCClient::builder(endpoint, &token).build()?);
let (table_id, table_uri) = get_table(&client, &table_name).await?;
let creds = client
.get_credentials(&table_id, Operation::ReadWrite)
.await
.map_err(|e| format!("Failed to get credentials: {}", e))?;

// build catalog
let catalog = UCCatalog::new(&client);

// TODO: support non-AWS
let creds = creds
.aws_temp_credentials
.ok_or("No AWS temporary credentials found")?;

let options = [
("region", "us-west-2"),
("access_key_id", &creds.access_key_id),
("secret_access_key", &creds.secret_access_key),
("session_token", &creds.session_token),
];

let table_url = Url::parse(&table_uri)?;
let (store, _path) = object_store::parse_url_opts(&table_url, options)?;
let store: Arc<dyn object_store::ObjectStore> = store.into();

let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
let committer = Box::new(UCCommitter::new(
client.clone(),
table_id.clone(),
table_uri.clone(),
));
let snapshot = catalog
.load_snapshot(&table_id, &table_uri, &engine)
.await?;
println!("latest snapshot version: {:?}", snapshot.version());
let txn = snapshot.clone().transaction(committer)?;
let _write_context = txn.get_write_context();
// do a write.

// print the log
// use futures::stream::StreamExt;
// let mut stream = store.list(Some(&path));
// while let Some(path) = stream.next().await {
// println!("object: {:?}", path.unwrap());
// }

match txn.commit(&engine)? {
CommitResult::CommittedTransaction(t) => {
println!("🎉 committed version {}", t.commit_version());
// FIXME! ideally should use post-commit snapshot (need to make sure to plumb
// through log tail)
// let snapshot = t.unwrap().post_commit_snapshot(&engine)?;
let _snapshot = catalog
.load_snapshot_at(&table_id, &table_uri, t.commit_version(), &engine)
.await?;
// let published = Arc::into_inner(snapshot).unwrap().publish(&engine)?;
// println!("published snapshot: {published:?}");
}
CommitResult::ConflictedTransaction(t) => {
println!("💥 commit conflicted at version {}", t.conflict_version());
}
CommitResult::RetryableTransaction(_) => {
println!("we should retry...");
}
}

// some debug
// let commit = store
// .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/3553b6b9-91ce-47d7-b2f9-8eb4ba5e93f5/_delta_log/_staged_commits/00000000000000000001.5cabaf8b-037d-4a84-b493-2e3d8ab899df.json"))
// .await
// .unwrap();
// let bytes = commit.bytes().await?;
// println!("commit file contents:\n{}", String::from_utf8_lossy(&bytes));

Ok(())
}
}
8 changes: 7 additions & 1 deletion uc-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;
use std::time::Duration;

use reqwest::{header, Client, Response, StatusCode};
use serde::Deserialize;
use tracing::{instrument, warn};
use url::Url;

Expand Down Expand Up @@ -80,7 +81,12 @@ impl UCClient {
})
.await?;

self.handle_response(response).await
// Note: can't just deserialize to () directly so we make an empty struct to deserialize
// the `{}` into. Externally we still return unit type for ease of use/understanding.
#[derive(Deserialize)]
struct EmptyResponse {}
let _: EmptyResponse = self.handle_response(response).await?;
Ok(())
Comment on lines +84 to +89
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed latent bug

}

/// Resolve the table by name.
Expand Down
Loading