diff --git a/crates/data-generation/src/storage/s3.rs b/crates/data-generation/src/storage/s3.rs index 4c79522c..acf6fbcc 100644 --- a/crates/data-generation/src/storage/s3.rs +++ b/crates/data-generation/src/storage/s3.rs @@ -103,14 +103,11 @@ impl S3Storage { } } - pub(crate) fn batch_metadata_object_path(&self, table_name: &str, batch_id: u64) -> ObjectPath { + pub(crate) fn table_metadata_object_path(&self, table_name: &str) -> ObjectPath { if self.prefix.is_empty() { - ObjectPath::from(format!("{table_name}/batch-{batch_id:06}.metadata.json")) + ObjectPath::from(format!("{table_name}/metadata.json")) } else { - ObjectPath::from(format!( - "{}/{table_name}/batch-{batch_id:06}.metadata.json", - self.prefix - )) + ObjectPath::from(format!("{}/{table_name}/metadata.json", self.prefix)) } } @@ -119,7 +116,7 @@ impl S3Storage { table_name: &str, batch_id: u64, ) -> anyhow::Result { - let metadata_path = self.batch_metadata_object_path(table_name, batch_id); + let metadata_path = self.table_metadata_object_path(table_name); let get_result = match self.store.get(&metadata_path).await { Ok(r) => r, Err(object_store::Error::NotFound { .. }) => return Ok(BatchOperation::Insert), @@ -127,7 +124,14 @@ impl S3Storage { }; let bytes = get_result.bytes().await?; - let json: serde_json::Value = serde_json::from_slice(&bytes)?; + let table_meta: serde_json::Value = serde_json::from_slice(&bytes)?; + + // Look up the batch entry (keyed by batch_id string) under "batches". + let batch_key = batch_id.to_string(); + let json = match table_meta.get("batches").and_then(|b| b.get(&batch_key)) { + Some(entry) => entry, + None => return Ok(BatchOperation::Insert), + }; let op = json .get("operation") @@ -139,14 +143,16 @@ impl S3Storage { let parse_key_columns = || -> anyhow::Result> { let Some(keys_value) = json.get("key_columns") else { anyhow::bail!( - "Missing 'key_columns' in metadata sidecar for {}", - metadata_path + "Missing 'key_columns' in metadata for table '{}' batch {}", + table_name, + batch_id ); }; let Some(keys) = keys_value.as_array() else { anyhow::bail!( - "Invalid 'key_columns' (expected string array) in metadata sidecar for {}", - metadata_path + "Invalid 'key_columns' (expected string array) in metadata for table '{}' batch {}", + table_name, + batch_id ); }; @@ -155,8 +161,9 @@ impl S3Storage { .map(|v| { v.as_str().map(ToOwned::to_owned).ok_or_else(|| { anyhow::anyhow!( - "Invalid key column entry (expected string) in metadata sidecar for {}", - metadata_path + "Invalid key column entry (expected string) in metadata for table '{}' batch {}", + table_name, + batch_id ) }) }) @@ -164,8 +171,9 @@ impl S3Storage { if parsed.is_empty() { anyhow::bail!( - "'key_columns' cannot be empty in metadata sidecar for {}", - metadata_path + "'key_columns' cannot be empty in metadata for table '{}' batch {}", + table_name, + batch_id ); } @@ -181,8 +189,9 @@ impl S3Storage { key_columns: parse_key_columns()?, }), other => anyhow::bail!( - "Unsupported operation '{other}' in metadata sidecar for {}", - metadata_path + "Unsupported operation '{other}' in metadata for table '{}' batch {}", + table_name, + batch_id ), } } @@ -269,9 +278,20 @@ impl DataStorage for S3Storage { batch_id: u64, operation: &BatchOperation, ) -> anyhow::Result<()> { - let path = self.batch_metadata_object_path(table_name, batch_id); + let path = self.table_metadata_object_path(table_name); + + // Read existing table-level metadata (if any) so we can merge. + let mut table_meta = match self.store.get(&path).await { + Ok(r) => { + let bytes = r.bytes().await?; + serde_json::from_slice::(&bytes) + .unwrap_or_else(|_| serde_json::json!({ "batches": {} })) + } + Err(object_store::Error::NotFound { .. }) => serde_json::json!({ "batches": {} }), + Err(e) => return Err(e.into()), + }; - let value = match operation { + let batch_entry = match operation { BatchOperation::Insert => serde_json::json!({ "operation": "insert" }), @@ -285,7 +305,18 @@ impl DataStorage for S3Storage { }), }; - let bytes = serde_json::to_vec(&value)?; + // Ensure the "batches" object exists and insert/update the entry. + let batches = table_meta + .as_object_mut() + .expect("table metadata should be an object") + .entry("batches") + .or_insert_with(|| serde_json::json!({})); + batches + .as_object_mut() + .expect("batches should be an object") + .insert(batch_id.to_string(), batch_entry); + + let bytes = serde_json::to_vec_pretty(&table_meta)?; self.store.put(&path, PutPayload::from(bytes)).await?; Ok(()) }