Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion uc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ release = false
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["time"] }
tokio = { version = "1", features = ["full"] }
thiserror = "2.0"
tracing = "0.1"
url = "2.5"
Expand Down
26 changes: 24 additions & 2 deletions uc-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use url::Url;

use crate::config::{ClientConfig, ClientConfigBuilder};
use crate::error::{Error, Result};
use crate::models::commits::{CommitsRequest, CommitsResponse};
use crate::models::commits::{CommitRequest, CommitsRequest, CommitsResponse};
use crate::models::credentials::{CredentialsRequest, Operation, TemporaryTableCredentials};
use crate::models::tables::TablesResponse;

/// An HTTP client for interacting with the Unity Catalog API.
#[derive(Debug, Clone)]
pub struct UCClient {
client: Client,
Expand All @@ -19,6 +20,7 @@ pub struct UCClient {
}

impl UCClient {
/// Create a new client from [ClientConfig].
pub fn new(config: ClientConfig) -> Result<Self> {
// default headers with authorization and content type
let mut headers = header::HeaderMap::new();
Expand All @@ -44,14 +46,15 @@ impl UCClient {
})
}

/// Create a new [UCClientBuilder] to configure and build a [UCClient].
pub fn builder(workspace: impl Into<String>, token: impl Into<String>) -> UCClientBuilder {
UCClientBuilder::new(workspace, token)
}

/// Get the latest commits for the table.
#[instrument(skip(self))]
pub async fn get_commits(&self, request: CommitsRequest) -> Result<CommitsResponse> {
let url = self.base_url.join("delta/preview/commits")?;

let response = self
.execute_with_retry(|| {
self.client
Expand All @@ -64,6 +67,23 @@ impl UCClient {
self.handle_response(response).await
}

/// Commit a new version to the table.
#[instrument(skip(self))]
pub async fn commit(&self, request: CommitRequest) -> Result<()> {
let url = self.base_url.join("delta/preview/commits")?;
let response = self
.execute_with_retry(|| {
self.client
.request(reqwest::Method::POST, url.clone())
.json(&request)
.send()
})
.await?;

self.handle_response(response).await
}

/// Resolve the table by name.
#[instrument(skip(self))]
pub async fn get_table(&self, table_name: &str) -> Result<TablesResponse> {
let url = self.base_url.join(&format!("tables/{}", table_name))?;
Expand All @@ -78,6 +98,7 @@ impl UCClient {
}
}

/// Get temporary cloud storage credentials for accessing a table.
#[instrument(skip(self))]
pub async fn get_credentials(
&self,
Expand Down Expand Up @@ -163,6 +184,7 @@ impl UCClient {
}
}

/// A builder for configuring and creating a [UCClient].
pub struct UCClientBuilder {
config_builder: ClientConfigBuilder,
}
Expand Down
60 changes: 58 additions & 2 deletions uc-client/src/models/commits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,26 @@ pub struct Commit {
pub file_name: String,
pub file_size: i64,
pub file_modification_timestamp: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_disown_commit: Option<bool>,
}

impl Commit {
/// Create a new commit to send to UC with the specified version and timestamp.
pub fn new(
version: i64,
timestamp: i64,
file_name: impl Into<String>,
file_size: i64,
file_modification_timestamp: i64,
) -> Self {
Self {
version,
timestamp,
file_name: file_name.into(),
file_size,
file_modification_timestamp,
}
}

pub fn timestamp_as_datetime(&self) -> Option<chrono::DateTime<chrono::Utc>> {
chrono::DateTime::from_timestamp_millis(self.timestamp)
}
Expand All @@ -58,3 +73,44 @@ impl Commit {
chrono::DateTime::from_timestamp_millis(self.file_modification_timestamp)
}
}

/// Request to commit a new version to the table. It must include either a `commit_info` or
/// `latest_backfilled_version`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommitRequest {
pub table_id: String,
pub table_uri: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub commit_info: Option<Commit>,
#[serde(skip_serializing_if = "Option::is_none")]
pub latest_backfilled_version: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub protocol: Option<serde_json::Value>,
}

impl CommitRequest {
pub fn new(
table_id: impl Into<String>,
table_uri: impl Into<String>,
commit_info: Commit,
latest_backfilled_version: Option<i64>,
) -> Self {
Self {
table_id: table_id.into(),
table_uri: table_uri.into(),
commit_info: Some(commit_info),
latest_backfilled_version,
metadata: None,
protocol: None,
}
}

pub fn with_latest_backfilled_version(mut self, version: i64) -> Self {
self.latest_backfilled_version = Some(version);
self
}

// TODO: expose metadata/protocol (with_metadata, with_protocol)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put an issue here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it will generally be part of the larger effort of supporting better UC APIs so gonna wait on those plans to materialize first (this comment is mostly just a marker)

}
2 changes: 1 addition & 1 deletion uc-client/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ pub mod commits;
pub mod credentials;
pub mod tables;

pub use commits::{Commit, CommitsRequest, CommitsResponse};
pub use commits::{Commit, CommitRequest, CommitsRequest, CommitsResponse};
pub use credentials::{AwsTempCredentials, TemporaryTableCredentials};
pub use tables::TablesResponse;
Loading