Skip to content

Commit 5ab32ec

Browse files
authored
Raise concurrency errors properly for glue tables (apache#1875) (#26)
1 parent 8768883 commit 5ab32ec

1 file changed

Lines changed: 69 additions & 40 deletions

File tree

crates/catalog/glue/src/catalog.rs

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,62 @@ impl GlueCatalog {
196196
pub fn file_io(&self) -> FileIO {
197197
self.file_io.clone()
198198
}
199+
200+
/// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
201+
///
202+
/// # Returns
203+
/// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
204+
/// from Glue that should be used for optimistic concurrency control when updating the table.
205+
///
206+
/// # Errors
207+
/// This function may return an error in several scenarios, including:
208+
/// - Failure to validate the namespace.
209+
/// - Failure to retrieve the table from the Glue Catalog.
210+
/// - Absence of metadata location information in the table's properties.
211+
/// - Issues reading or deserializing the table's metadata file.
212+
async fn load_table_with_version_id(
213+
&self,
214+
table: &TableIdent,
215+
) -> Result<(Table, Option<String>)> {
216+
let db_name = validate_namespace(table.namespace())?;
217+
let table_name = table.name();
218+
219+
let builder = self
220+
.client
221+
.0
222+
.get_table()
223+
.database_name(&db_name)
224+
.name(table_name);
225+
let builder = with_catalog_id!(builder, self.config);
226+
227+
let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
228+
229+
let glue_table = glue_table_output.table().ok_or_else(|| {
230+
Error::new(
231+
ErrorKind::TableNotFound,
232+
format!(
233+
"Table object for database: {db_name} and table: {table_name} does not exist"
234+
),
235+
)
236+
})?;
237+
238+
let version_id = glue_table.version_id.clone();
239+
let metadata_location = get_metadata_location(&glue_table.parameters)?;
240+
241+
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
242+
243+
let table = Table::builder()
244+
.file_io(self.file_io())
245+
.metadata_location(metadata_location)
246+
.metadata(metadata)
247+
.identifier(TableIdent::new(
248+
NamespaceIdent::new(db_name),
249+
table_name.to_owned(),
250+
))
251+
.build()?;
252+
253+
Ok((table, version_id))
254+
}
199255
}
200256

201257
#[async_trait]
@@ -514,43 +570,8 @@ impl Catalog for GlueCatalog {
514570
/// - Absence of metadata location information in the table's properties.
515571
/// - Issues reading or deserializing the table's metadata file.
516572
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
517-
let db_name = validate_namespace(table.namespace())?;
518-
let table_name = table.name();
519-
520-
let builder = self
521-
.client
522-
.0
523-
.get_table()
524-
.database_name(&db_name)
525-
.name(table_name);
526-
let builder = with_catalog_id!(builder, self.config);
527-
528-
let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
529-
530-
match glue_table_output.table() {
531-
None => Err(Error::new(
532-
ErrorKind::TableNotFound,
533-
format!(
534-
"Table object for database: {} and table: {} does not exist",
535-
db_name, table_name
536-
),
537-
)),
538-
Some(table) => {
539-
let metadata_location = get_metadata_location(&table.parameters)?;
540-
541-
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
542-
543-
Table::builder()
544-
.file_io(self.file_io())
545-
.metadata_location(metadata_location)
546-
.metadata(metadata)
547-
.identifier(TableIdent::new(
548-
NamespaceIdent::new(db_name),
549-
table_name.to_owned(),
550-
))
551-
.build()
552-
}
553-
}
573+
let (table, _) = self.load_table_with_version_id(table).await?;
574+
Ok(table)
554575
}
555576

556577
/// Asynchronously drops a table from the database.
@@ -767,7 +788,9 @@ impl Catalog for GlueCatalog {
767788
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
768789
let table_ident = commit.identifier().clone();
769790
let table_namespace = validate_namespace(table_ident.namespace())?;
770-
let current_table = self.load_table(&table_ident).await?;
791+
792+
let (current_table, current_version_id) =
793+
self.load_table_with_version_id(&table_ident).await?;
771794
let current_metadata_location = current_table.metadata_location_result()?.to_string();
772795

773796
let staged_table = commit.apply(current_table)?;
@@ -779,8 +802,8 @@ impl Catalog for GlueCatalog {
779802
.write_to(staged_table.file_io(), staged_metadata_location)
780803
.await?;
781804

782-
// Persist staged table to Glue
783-
let builder = self
805+
// Persist staged table to Glue with optimistic locking
806+
let mut builder = self
784807
.client
785808
.0
786809
.update_table()
@@ -793,6 +816,12 @@ impl Catalog for GlueCatalog {
793816
staged_table.metadata().properties(),
794817
Some(current_metadata_location),
795818
)?);
819+
820+
// Add VersionId for optimistic locking
821+
if let Some(version_id) = current_version_id {
822+
builder = builder.version_id(version_id);
823+
}
824+
796825
let builder = with_catalog_id!(builder, self.config);
797826
let _ = builder.send().await.map_err(|e| {
798827
let error = e.into_service_error();

0 commit comments

Comments
 (0)