Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 52 additions & 21 deletions crates/data-generation/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,11 @@ impl S3Storage {
}
}

pub(crate) fn batch_metadata_object_path(&self, table_name: &str, batch_id: u64) -> ObjectPath {
pub(crate) fn table_metadata_object_path(&self, table_name: &str) -> ObjectPath {
if self.prefix.is_empty() {
ObjectPath::from(format!("{table_name}/batch-{batch_id:06}.metadata.json"))
ObjectPath::from(format!("{table_name}/metadata.json"))
} else {
ObjectPath::from(format!(
"{}/{table_name}/batch-{batch_id:06}.metadata.json",
self.prefix
))
ObjectPath::from(format!("{}/{table_name}/metadata.json", self.prefix))
}
}

Expand All @@ -119,15 +116,22 @@ impl S3Storage {
table_name: &str,
batch_id: u64,
) -> anyhow::Result<BatchOperation> {
let metadata_path = self.batch_metadata_object_path(table_name, batch_id);
let metadata_path = self.table_metadata_object_path(table_name);
let get_result = match self.store.get(&metadata_path).await {
Ok(r) => r,
Err(object_store::Error::NotFound { .. }) => return Ok(BatchOperation::Insert),
Err(e) => return Err(e.into()),
};

let bytes = get_result.bytes().await?;
let json: serde_json::Value = serde_json::from_slice(&bytes)?;
let table_meta: serde_json::Value = serde_json::from_slice(&bytes)?;

// Look up the batch entry (keyed by batch_id string) under "batches".
let batch_key = batch_id.to_string();
let json = match table_meta.get("batches").and_then(|b| b.get(&batch_key)) {
Some(entry) => entry,
None => return Ok(BatchOperation::Insert),
};

let op = json
.get("operation")
Expand All @@ -139,14 +143,16 @@ impl S3Storage {
let parse_key_columns = || -> anyhow::Result<Vec<String>> {
let Some(keys_value) = json.get("key_columns") else {
anyhow::bail!(
"Missing 'key_columns' in metadata sidecar for {}",
metadata_path
"Missing 'key_columns' in metadata for table '{}' batch {}",
table_name,
batch_id
);
};
let Some(keys) = keys_value.as_array() else {
anyhow::bail!(
"Invalid 'key_columns' (expected string array) in metadata sidecar for {}",
metadata_path
"Invalid 'key_columns' (expected string array) in metadata for table '{}' batch {}",
table_name,
batch_id
);
};

Expand All @@ -155,17 +161,19 @@ impl S3Storage {
.map(|v| {
v.as_str().map(ToOwned::to_owned).ok_or_else(|| {
anyhow::anyhow!(
"Invalid key column entry (expected string) in metadata sidecar for {}",
metadata_path
"Invalid key column entry (expected string) in metadata for table '{}' batch {}",
table_name,
batch_id
)
})
})
.collect::<anyhow::Result<Vec<String>>>()?;

if parsed.is_empty() {
anyhow::bail!(
"'key_columns' cannot be empty in metadata sidecar for {}",
metadata_path
"'key_columns' cannot be empty in metadata for table '{}' batch {}",
table_name,
batch_id
);
}

Expand All @@ -181,8 +189,9 @@ impl S3Storage {
key_columns: parse_key_columns()?,
}),
other => anyhow::bail!(
"Unsupported operation '{other}' in metadata sidecar for {}",
metadata_path
"Unsupported operation '{other}' in metadata for table '{}' batch {}",
table_name,
batch_id
),
}
}
Expand Down Expand Up @@ -269,9 +278,20 @@ impl DataStorage for S3Storage {
batch_id: u64,
operation: &BatchOperation,
) -> anyhow::Result<()> {
let path = self.batch_metadata_object_path(table_name, batch_id);
let path = self.table_metadata_object_path(table_name);

// Read existing table-level metadata (if any) so we can merge.
let mut table_meta = match self.store.get(&path).await {
Ok(r) => {
let bytes = r.bytes().await?;
serde_json::from_slice::<serde_json::Value>(&bytes)
.unwrap_or_else(|_| serde_json::json!({ "batches": {} }))
}
Err(object_store::Error::NotFound { .. }) => serde_json::json!({ "batches": {} }),
Err(e) => return Err(e.into()),
};

let value = match operation {
let batch_entry = match operation {
BatchOperation::Insert => serde_json::json!({
"operation": "insert"
}),
Expand All @@ -285,7 +305,18 @@ impl DataStorage for S3Storage {
}),
};

let bytes = serde_json::to_vec(&value)?;
// Ensure the "batches" object exists and insert/update the entry.
let batches = table_meta
.as_object_mut()
.expect("table metadata should be an object")
.entry("batches")
.or_insert_with(|| serde_json::json!({}));
batches
.as_object_mut()
.expect("batches should be an object")
.insert(batch_id.to_string(), batch_entry);

let bytes = serde_json::to_vec_pretty(&table_meta)?;
self.store.put(&path, PutPayload::from(bytes)).await?;
Ok(())
}
Expand Down