Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/data-generation/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct ReadResult {
pub batches: Vec<RecordBatch>,
pub rows_read: u64,
pub bytes_read: u64,
pub operation: BatchOperation,
pub key_columns: Vec<String>,
}

pub struct WriteResult {
Expand Down Expand Up @@ -75,6 +75,13 @@ pub trait DataStorage: Send + Sync + 'static {
Ok(())
}

/// Reads the key columns from the table-level metadata.
///
/// Returns `Ok(Vec::new())` if no key columns are defined (pure inserts).
async fn read_key_columns(&self, _table_name: &str) -> anyhow::Result<Vec<String>> {
Ok(Vec::new())
}

fn table_params(&self, table_name: &str) -> HashMap<String, serde_json::Value>;

/// Returns the list of file paths/URIs that would exist after a successful
Expand Down
100 changes: 30 additions & 70 deletions crates/data-generation/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,89 +112,45 @@ impl S3Storage {
}
}

async fn read_batch_operation(
/// Reads the key columns from the table-level metadata.
///
/// Scans the batch entries in `metadata.json` and returns the
/// `key_columns` array from the first entry that contains one.
/// Returns an empty `Vec` if no key columns are found (pure inserts).
async fn read_key_columns_from_metadata(
&self,
table_name: &str,
batch_id: u64,
) -> anyhow::Result<BatchOperation> {
) -> anyhow::Result<Vec<String>> {
let metadata_path = self.table_metadata_object_path(table_name);
let get_result = match self.store.get(&metadata_path).await {
Ok(r) => r,
Err(object_store::Error::NotFound { .. }) => return Ok(BatchOperation::Insert),
Err(object_store::Error::NotFound { .. }) => return Ok(Vec::new()),
Err(e) => return Err(e.into()),
};

let bytes = get_result.bytes().await?;
let table_meta: serde_json::Value = serde_json::from_slice(&bytes)?;

// Look up the batch entry (keyed by batch_id string) under "batches".
let batch_key = batch_id.to_string();
let json = match table_meta.get("batches").and_then(|b| b.get(&batch_key)) {
Some(entry) => entry,
None => return Ok(BatchOperation::Insert),
let Some(batches) = table_meta.get("batches").and_then(|b| b.as_object()) else {
return Ok(Vec::new());
};

let op = json
.get("operation")
.and_then(serde_json::Value::as_str)
.or_else(|| json.get("op").and_then(serde_json::Value::as_str))
.unwrap_or("insert")
.to_ascii_lowercase();

let parse_key_columns = || -> anyhow::Result<Vec<String>> {
let Some(keys_value) = json.get("key_columns") else {
anyhow::bail!(
"Missing 'key_columns' in metadata for table '{}' batch {}",
table_name,
batch_id
);
};
let Some(keys) = keys_value.as_array() else {
anyhow::bail!(
"Invalid 'key_columns' (expected string array) in metadata for table '{}' batch {}",
table_name,
batch_id
);
};

let parsed = keys
.iter()
.map(|v| {
v.as_str().map(ToOwned::to_owned).ok_or_else(|| {
anyhow::anyhow!(
"Invalid key column entry (expected string) in metadata for table '{}' batch {}",
table_name,
batch_id
)
})
})
.collect::<anyhow::Result<Vec<String>>>()?;

if parsed.is_empty() {
anyhow::bail!(
"'key_columns' cannot be empty in metadata for table '{}' batch {}",
table_name,
batch_id
);
// Find the first batch entry that has key_columns defined.
for (_batch_key, entry) in batches {
if let Some(keys_value) = entry.get("key_columns") {
if let Some(keys) = keys_value.as_array() {
let parsed: Vec<String> = keys
.iter()
.filter_map(|v| v.as_str().map(ToOwned::to_owned))
.collect();
if !parsed.is_empty() {
return Ok(parsed);
}
}
}

Ok(parsed)
};

match op.as_str() {
"insert" => Ok(BatchOperation::Insert),
"update" => Ok(BatchOperation::Update {
key_columns: parse_key_columns()?,
}),
"delete" => Ok(BatchOperation::Delete {
key_columns: parse_key_columns()?,
}),
other => anyhow::bail!(
"Unsupported operation '{other}' in metadata for table '{}' batch {}",
table_name,
batch_id
),
}

Ok(Vec::new())
}
}

Expand Down Expand Up @@ -359,6 +315,10 @@ impl DataStorage for S3Storage {
Ok(VecDeque::from(ids))
}

async fn read_key_columns(&self, table_name: &str) -> anyhow::Result<Vec<String>> {
self.read_key_columns_from_metadata(table_name).await
}

async fn read_batch(
&self,
table_name: &str,
Expand All @@ -384,13 +344,13 @@ impl DataStorage for S3Storage {
batches.push(batch);
}

let operation = self.read_batch_operation(table_name, batch_id).await?;
let key_columns = self.read_key_columns_from_metadata(table_name).await?;

Ok(Some(ReadResult {
batches,
rows_read,
bytes_read,
operation,
key_columns,
}))
}
}
Loading