Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions crates/catalog-unity/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,22 @@ impl UnitySchemaProvider {
table: &str,
) -> Result<TemporaryTableCredentials, UnityCatalogError> {
tracing::debug!("Fetching new credential for: {catalog}.{schema}.{table}",);
self.client
.get_temp_table_credentials(catalog, schema, table)
.map(|resp| match resp {
Ok(TableTempCredentialsResponse::Success(temp_creds)) => Ok(temp_creds),
Ok(TableTempCredentialsResponse::Error(err)) => Err(err.into()),
Err(err) => Err(err),
})
match self
.client
.get_temp_table_credentials_with_permission(catalog, schema, table, "READ_WRITE")
.await
{
Ok(TableTempCredentialsResponse::Success(temp_creds)) => Ok(temp_creds),
Ok(TableTempCredentialsResponse::Error(_err)) => match self
.client
.get_temp_table_credentials(catalog, schema, table)
.await?
{
TableTempCredentialsResponse::Success(temp_creds) => Ok(temp_creds),
_ => Err(UnityCatalogError::TemporaryCredentialsFetchFailure),
},
Err(err) => Err(err),
}
}
}

Expand Down
38 changes: 35 additions & 3 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,15 +565,30 @@ impl UnityCatalogBuilder {
let storage_location = unity_catalog
.get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
.await?;
// Attempt to get read/write permissions to begin with.
let temp_creds_res = unity_catalog
.get_temp_table_credentials(catalog_id, database_name, table_name)
.get_temp_table_credentials_with_permission(
catalog_id,
database_name,
table_name,
"READ_WRITE",
)
.await?;
let credentials = match temp_creds_res {
TableTempCredentialsResponse::Success(temp_creds) => temp_creds
.get_credentials()
.ok_or_else(|| UnityCatalogError::MissingCredential)?,
TableTempCredentialsResponse::Error(_error) => {
return Err(UnityCatalogError::TemporaryCredentialsFetchFailure)
// If that fails attempt to get just read permissions.
match unity_catalog
.get_temp_table_credentials(catalog_id, database_name, table_name)
.await?
{
TableTempCredentialsResponse::Success(temp_creds) => temp_creds
.get_credentials()
.ok_or_else(|| UnityCatalogError::MissingCredential)?,
_ => return Err(UnityCatalogError::TemporaryCredentialsFetchFailure),
}
}
};
Ok((storage_location, credentials))
Expand Down Expand Up @@ -816,14 +831,31 @@ impl UnityCatalog {
catalog_id: impl AsRef<str>,
database_name: impl AsRef<str>,
table_name: impl AsRef<str>,
) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
self.get_temp_table_credentials_with_permission(
catalog_id,
database_name,
table_name,
"READ",
)
.await
}

pub async fn get_temp_table_credentials_with_permission(
&self,
catalog_id: impl AsRef<str>,
database_name: impl AsRef<str>,
table_name: impl AsRef<str>,
operation: impl AsRef<str>,
) -> Result<TableTempCredentialsResponse, UnityCatalogError> {
let token = self.get_credential().await?;
let table_info = self
.get_table(catalog_id, database_name, table_name)
.await?;
let response = match table_info {
GetTableResponse::Success(table) => {
let request = TemporaryTableCredentialsRequest::new(&table.table_id, "READ");
let request =
TemporaryTableCredentialsRequest::new(&table.table_id, operation.as_ref());
Ok(self
.client
.post(format!(
Expand Down
32 changes: 16 additions & 16 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,22 +947,6 @@ impl ExecutionPlan for DeltaScan {
}))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.parquet_scan.execute(partition, context)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.parquet_scan.partition_statistics(partition)
}

fn repartitioned(
&self,
target_partitions: usize,
Expand All @@ -980,6 +964,22 @@ impl ExecutionPlan for DeltaScan {
Ok(None)
}
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.parquet_scan.execute(partition, context)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.parquet_scan.partition_statistics(partition)
}
}

/// The logical schema for a Deltatable is different from the protocol level schema since partition
Expand Down
Loading