diff --git a/crates/catalog-unity/src/datafusion.rs b/crates/catalog-unity/src/datafusion.rs index 90b5cdea2e..ee81efaabc 100644 --- a/crates/catalog-unity/src/datafusion.rs +++ b/crates/catalog-unity/src/datafusion.rs @@ -183,14 +183,22 @@ impl UnitySchemaProvider { table: &str, ) -> Result { tracing::debug!("Fetching new credential for: {catalog}.{schema}.{table}",); - self.client - .get_temp_table_credentials(catalog, schema, table) - .map(|resp| match resp { - Ok(TableTempCredentialsResponse::Success(temp_creds)) => Ok(temp_creds), - Ok(TableTempCredentialsResponse::Error(err)) => Err(err.into()), - Err(err) => Err(err), - }) + match self + .client + .get_temp_table_credentials_with_permission(catalog, schema, table, "READ_WRITE") .await + { + Ok(TableTempCredentialsResponse::Success(temp_creds)) => Ok(temp_creds), + Ok(TableTempCredentialsResponse::Error(_err)) => match self + .client + .get_temp_table_credentials(catalog, schema, table) + .await? + { + TableTempCredentialsResponse::Success(temp_creds) => Ok(temp_creds), + _ => Err(UnityCatalogError::TemporaryCredentialsFetchFailure), + }, + Err(err) => Err(err), + } } } diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs index cf1f345101..4c8c85ebc9 100644 --- a/crates/catalog-unity/src/lib.rs +++ b/crates/catalog-unity/src/lib.rs @@ -565,15 +565,30 @@ impl UnityCatalogBuilder { let storage_location = unity_catalog .get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name) .await?; + // Attempt to get read/write permissions to begin with. let temp_creds_res = unity_catalog - .get_temp_table_credentials(catalog_id, database_name, table_name) + .get_temp_table_credentials_with_permission( + catalog_id, + database_name, + table_name, + "READ_WRITE", + ) .await?; let credentials = match temp_creds_res { TableTempCredentialsResponse::Success(temp_creds) => temp_creds .get_credentials() .ok_or_else(|| UnityCatalogError::MissingCredential)?, TableTempCredentialsResponse::Error(_error) => { - return Err(UnityCatalogError::TemporaryCredentialsFetchFailure) + // If that fails attempt to get just read permissions. + match unity_catalog + .get_temp_table_credentials(catalog_id, database_name, table_name) + .await? + { + TableTempCredentialsResponse::Success(temp_creds) => temp_creds + .get_credentials() + .ok_or_else(|| UnityCatalogError::MissingCredential)?, + _ => return Err(UnityCatalogError::TemporaryCredentialsFetchFailure), + } } }; Ok((storage_location, credentials)) @@ -816,6 +831,22 @@ impl UnityCatalog { catalog_id: impl AsRef, database_name: impl AsRef, table_name: impl AsRef, + ) -> Result { + self.get_temp_table_credentials_with_permission( + catalog_id, + database_name, + table_name, + "READ", + ) + .await + } + + pub async fn get_temp_table_credentials_with_permission( + &self, + catalog_id: impl AsRef, + database_name: impl AsRef, + table_name: impl AsRef, + operation: impl AsRef, ) -> Result { let token = self.get_credential().await?; let table_info = self @@ -823,7 +854,8 @@ impl UnityCatalog { .await?; let response = match table_info { GetTableResponse::Success(table) => { - let request = TemporaryTableCredentialsRequest::new(&table.table_id, "READ"); + let request = + TemporaryTableCredentialsRequest::new(&table.table_id, operation.as_ref()); Ok(self .client .post(format!( diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index f375cb80fe..b5132996c0 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -947,22 +947,6 @@ impl ExecutionPlan for DeltaScan { })) } - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - self.parquet_scan.execute(partition, context) - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - fn partition_statistics(&self, partition: Option) -> Result { - self.parquet_scan.partition_statistics(partition) - } - fn repartitioned( &self, target_partitions: usize, @@ -980,6 +964,22 @@ impl ExecutionPlan for DeltaScan { Ok(None) } } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.parquet_scan.execute(partition, context) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.parquet_scan.partition_statistics(partition) + } } /// The logical schema for a Deltatable is different from the protocol level schema since partition