-
Couldn't load subscription status.
- Fork 118
feat(catalog-managed): UCCommitter #1418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| use std::sync::Arc; | ||
|
|
||
| use delta_kernel::committer::{CommitMetadata, CommitResponse, Committer}; | ||
| use delta_kernel::{DeltaResult, Engine, Error as DeltaError, FilteredEngineData}; | ||
| use uc_client::models::commits::{Commit, CommitRequest}; | ||
| use uc_client::prelude::UCClient; | ||
|
|
||
| use url::Url; | ||
|
|
||
| // A [UCCommitter] is a Unity Catalog [Committer] implementation for committing to a specific | ||
| /// delta table in UC. | ||
| #[derive(Debug, Clone)] | ||
| pub struct UCCommitter { | ||
| client: Arc<UCClient>, | ||
| table_id: String, | ||
| table_uri: String, | ||
| } | ||
|
|
||
| impl UCCommitter { | ||
| pub fn new( | ||
| client: Arc<UCClient>, | ||
| table_id: impl Into<String>, | ||
| table_uri: impl Into<String>, | ||
| ) -> Self { | ||
| UCCommitter { | ||
| client, | ||
| table_id: table_id.into(), | ||
| table_uri: table_uri.into(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Committer for UCCommitter { | ||
| fn commit( | ||
| &self, | ||
| engine: &dyn Engine, | ||
| actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>, | ||
| commit_metadata: CommitMetadata, | ||
| ) -> DeltaResult<CommitResponse> { | ||
| let uuid = uuid::Uuid::new_v4(); | ||
| let filename = format!( | ||
| "{version:020}.{uuid}.json", | ||
| version = commit_metadata.version() | ||
| ); | ||
| // FIXME use table path from commit_metadata? | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be a FIXME or a TODO? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ the ones after |
||
| let mut commit_path = Url::parse(&self.table_uri)?; | ||
| commit_path.path_segments_mut().unwrap().extend(&[ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all the |
||
| "_delta_log", | ||
| "_staged_commits", | ||
| &filename, | ||
| ]); | ||
|
|
||
| engine | ||
| .json_handler() | ||
| .write_json_file(&commit_path, Box::new(actions), false)?; | ||
|
|
||
| let mut other = Url::parse(&self.table_uri)?; | ||
| other.path_segments_mut().unwrap().extend(&[ | ||
| "_delta_log", | ||
| "_staged_commits", | ||
| &format!("{:020}", commit_metadata.version()), | ||
| ]); | ||
| let committed = engine | ||
| .storage_handler() | ||
| .list_from(&other)? | ||
| .next() | ||
| .unwrap() | ||
| .unwrap(); | ||
| println!("wrote commit file: {:?}", committed); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is part of a lib right? we should |
||
|
|
||
| // FIXME: ? | ||
| let timestamp: i64 = std::time::SystemTime::now() | ||
| .duration_since(std::time::UNIX_EPOCH) | ||
| .unwrap() | ||
| .as_millis() | ||
| .try_into() | ||
| .map_err(|e| DeltaError::Generic(format!("Unable to convert timestamp to i64: {e}")))?; | ||
|
|
||
| // let last_backfilled_version = | ||
| // commit_metadata.latest_published_version.map(|v| v.try_into().unwrap()); // FIXME | ||
| let last_backfilled_version = None; | ||
| let commit_req = CommitRequest::new( | ||
| self.table_id.clone(), | ||
| self.table_uri.clone(), | ||
| Commit::new( | ||
| commit_metadata.version().try_into().unwrap(), | ||
| timestamp, | ||
| filename, | ||
| committed.size as i64, | ||
| committed.last_modified, | ||
| ), | ||
| last_backfilled_version, | ||
| ); | ||
| // FIXME | ||
| let handle = tokio::runtime::Handle::current(); | ||
| tokio::task::block_in_place(|| { | ||
| handle.block_on(async move { | ||
| self.client | ||
| .commit(commit_req) | ||
| .await | ||
| .map_err(|e| DeltaError::Generic(format!("UC commit error: {e}"))) | ||
| }) | ||
| })?; | ||
| Ok(CommitResponse::Committed { | ||
| version: commit_metadata.version(), | ||
| }) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,8 @@ | ||
| //! UCCatalog implements a high-level interface for interacting with Delta Tables in Unity Catalog. | ||
|
|
||
| mod committer; | ||
| pub use committer::UCCommitter; | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use delta_kernel::{Engine, LogPath, Snapshot, Version}; | ||
|
|
@@ -63,8 +66,10 @@ impl<'a> UCCatalog<'a> { | |
| start_version: Some(0), | ||
| end_version: version.and_then(|v| v.try_into().ok()), | ||
| }; | ||
| // TODO: does it paginate? | ||
| let commits = self.client.get_commits(req).await?; | ||
| let mut commits = self.client.get_commits(req).await?; | ||
| if let Some(commits) = &mut commits.commits { | ||
| commits.sort_by_key(|c| c.version); | ||
| } | ||
|
|
||
| // if commits are present, we ensure they are sorted+contiguous | ||
| if let Some(commits) = &commits.commits { | ||
|
|
@@ -88,7 +93,11 @@ impl<'a> UCCatalog<'a> { | |
| }; | ||
|
|
||
| // consume uc-client's Commit and hand back a delta_kernel LogPath | ||
| let table_url = Url::parse(&table_uri)?; | ||
| let mut table_url = Url::parse(&table_uri)?; | ||
| // add trailing slash | ||
| if !table_url.path().ends_with('/') { | ||
| table_url.path_segments_mut().unwrap().push(""); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we trying to push "/" ? |
||
| } | ||
| let commits: Vec<_> = commits | ||
| .commits | ||
| .unwrap_or_default() | ||
|
|
@@ -120,6 +129,7 @@ mod tests { | |
|
|
||
| use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; | ||
| use delta_kernel::engine::default::DefaultEngine; | ||
| use delta_kernel::transaction::CommitResult; | ||
|
|
||
| use super::*; | ||
|
|
||
|
|
@@ -192,4 +202,92 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| // ignored test which you can run manually to play around with writing to a UC table. run with: | ||
| // `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored` | ||
| #[ignore] | ||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn write_uc_table() -> Result<(), Box<dyn std::error::Error>> { | ||
| let endpoint = env::var("ENDPOINT").expect("ENDPOINT environment variable not set"); | ||
| let token = env::var("TOKEN").expect("TOKEN environment variable not set"); | ||
| let table_name = env::var("TABLENAME").expect("TABLENAME environment variable not set"); | ||
|
|
||
| // build UC client, get table info and credentials | ||
| let client = Arc::new(UCClient::builder(endpoint, &token).build()?); | ||
| let (table_id, table_uri) = get_table(&client, &table_name).await?; | ||
| let creds = client | ||
| .get_credentials(&table_id, Operation::ReadWrite) | ||
| .await | ||
| .map_err(|e| format!("Failed to get credentials: {}", e))?; | ||
|
|
||
| // build catalog | ||
| let catalog = UCCatalog::new(&client); | ||
|
|
||
| // TODO: support non-AWS | ||
| let creds = creds | ||
| .aws_temp_credentials | ||
| .ok_or("No AWS temporary credentials found")?; | ||
|
|
||
| let options = [ | ||
| ("region", "us-west-2"), | ||
| ("access_key_id", &creds.access_key_id), | ||
| ("secret_access_key", &creds.secret_access_key), | ||
| ("session_token", &creds.session_token), | ||
| ]; | ||
|
|
||
| let table_url = Url::parse(&table_uri)?; | ||
| let (store, _path) = object_store::parse_url_opts(&table_url, options)?; | ||
| let store: Arc<dyn object_store::ObjectStore> = store.into(); | ||
|
|
||
| let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); | ||
| let committer = Box::new(UCCommitter::new( | ||
| client.clone(), | ||
| table_id.clone(), | ||
| table_uri.clone(), | ||
| )); | ||
| let snapshot = catalog | ||
| .load_snapshot(&table_id, &table_uri, &engine) | ||
| .await?; | ||
| println!("latest snapshot version: {:?}", snapshot.version()); | ||
| let txn = snapshot.clone().transaction(committer)?; | ||
| let _write_context = txn.get_write_context(); | ||
| // do a write. | ||
|
|
||
| // print the log | ||
| // use futures::stream::StreamExt; | ||
| // let mut stream = store.list(Some(&path)); | ||
| // while let Some(path) = stream.next().await { | ||
| // println!("object: {:?}", path.unwrap()); | ||
| // } | ||
|
|
||
| match txn.commit(&engine)? { | ||
| CommitResult::CommittedTransaction(t) => { | ||
| println!("🎉 committed version {}", t.commit_version()); | ||
| // FIXME! ideally should use post-commit snapshot (need to make sure to plumb | ||
| // through log tail) | ||
| // let snapshot = t.unwrap().post_commit_snapshot(&engine)?; | ||
| let _snapshot = catalog | ||
| .load_snapshot_at(&table_id, &table_uri, t.commit_version(), &engine) | ||
| .await?; | ||
| // let published = Arc::into_inner(snapshot).unwrap().publish(&engine)?; | ||
| // println!("published snapshot: {published:?}"); | ||
| } | ||
| CommitResult::ConflictedTransaction(t) => { | ||
| println!("💥 commit conflicted at version {}", t.conflict_version()); | ||
| } | ||
| CommitResult::RetryableTransaction(_) => { | ||
| println!("we should retry..."); | ||
| } | ||
| } | ||
|
|
||
| // some debug | ||
| // let commit = store | ||
| // .get(&object_store::path::Path::from("19a85dee-54bc-43a2-87ab-023d0ec16013/tables/3553b6b9-91ce-47d7-b2f9-8eb4ba5e93f5/_delta_log/_staged_commits/00000000000000000001.5cabaf8b-037d-4a84-b493-2e3d8ab899df.json")) | ||
| // .await | ||
| // .unwrap(); | ||
| // let bytes = commit.bytes().await?; | ||
| // println!("commit file contents:\n{}", String::from_utf8_lossy(&bytes)); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ use std::future::Future; | |
| use std::time::Duration; | ||
|
|
||
| use reqwest::{header, Client, Response, StatusCode}; | ||
| use serde::Deserialize; | ||
| use tracing::{instrument, warn}; | ||
| use url::Url; | ||
|
|
||
|
|
@@ -80,7 +81,12 @@ impl UCClient { | |
| }) | ||
| .await?; | ||
|
|
||
| self.handle_response(response).await | ||
| // Note: can't just deserialize to () directly so we make an empty struct to deserialize | ||
| // the `{}` into. Externally we still return unit type for ease of use/understanding. | ||
| #[derive(Deserialize)] | ||
| struct EmptyResponse {} | ||
| let _: EmptyResponse = self.handle_response(response).await?; | ||
| Ok(()) | ||
|
Comment on lines
+84
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed latent bug |
||
| } | ||
|
|
||
| /// Resolve the table by name. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the test tables i was playing with have V2Checkpoint and VacuumProtocolCheck I went ahead and listed them here. I think this is safe (and even desirable) since (1) we don't yet write v2 checkpoint and (2) we don't vacuum and this will unblock writes to such tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm, V2Checkpoint disallows multi-part checkpoints. I guess we don't support that in kernel so we're okay?