From d2d5761acd6e5cd94a48087e9ee63af715622bc4 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:08:10 -0600 Subject: [PATCH 1/2] fix: Split batches correctly by InsertOp --- crates/data-generation/src/storage/mod.rs | 9 +- crates/data-generation/src/storage/s3.rs | 100 ++++------- crates/etl/src/lib.rs | 209 ++++++++++++++++------ 3 files changed, 192 insertions(+), 126 deletions(-) diff --git a/crates/data-generation/src/storage/mod.rs b/crates/data-generation/src/storage/mod.rs index 4567d130..e60dc15a 100644 --- a/crates/data-generation/src/storage/mod.rs +++ b/crates/data-generation/src/storage/mod.rs @@ -32,7 +32,7 @@ pub struct ReadResult { pub batches: Vec, pub rows_read: u64, pub bytes_read: u64, - pub operation: BatchOperation, + pub key_columns: Vec, } pub struct WriteResult { @@ -75,6 +75,13 @@ pub trait DataStorage: Send + Sync + 'static { Ok(()) } + /// Reads the key columns from the table-level metadata. + /// + /// Returns `Ok(Vec::new())` if no key columns are defined (pure inserts). + async fn read_key_columns(&self, _table_name: &str) -> anyhow::Result> { + Ok(Vec::new()) + } + fn table_params(&self, table_name: &str) -> HashMap; /// Returns the list of file paths/URIs that would exist after a successful diff --git a/crates/data-generation/src/storage/s3.rs b/crates/data-generation/src/storage/s3.rs index ebd0b209..f69c6e10 100644 --- a/crates/data-generation/src/storage/s3.rs +++ b/crates/data-generation/src/storage/s3.rs @@ -112,89 +112,45 @@ impl S3Storage { } } - async fn read_batch_operation( + /// Reads the key columns from the table-level metadata. + /// + /// Scans the batch entries in `metadata.json` and returns the + /// `key_columns` array from the first entry that contains one. + /// Returns an empty `Vec` if no key columns are found (pure inserts). + async fn read_key_columns_from_metadata( &self, table_name: &str, - batch_id: u64, - ) -> anyhow::Result { + ) -> anyhow::Result> { let metadata_path = self.table_metadata_object_path(table_name); let get_result = match self.store.get(&metadata_path).await { Ok(r) => r, - Err(object_store::Error::NotFound { .. }) => return Ok(BatchOperation::Insert), + Err(object_store::Error::NotFound { .. }) => return Ok(Vec::new()), Err(e) => return Err(e.into()), }; let bytes = get_result.bytes().await?; let table_meta: serde_json::Value = serde_json::from_slice(&bytes)?; - // Look up the batch entry (keyed by batch_id string) under "batches". - let batch_key = batch_id.to_string(); - let json = match table_meta.get("batches").and_then(|b| b.get(&batch_key)) { - Some(entry) => entry, - None => return Ok(BatchOperation::Insert), + let Some(batches) = table_meta.get("batches").and_then(|b| b.as_object()) else { + return Ok(Vec::new()); }; - let op = json - .get("operation") - .and_then(serde_json::Value::as_str) - .or_else(|| json.get("op").and_then(serde_json::Value::as_str)) - .unwrap_or("insert") - .to_ascii_lowercase(); - - let parse_key_columns = || -> anyhow::Result> { - let Some(keys_value) = json.get("key_columns") else { - anyhow::bail!( - "Missing 'key_columns' in metadata for table '{}' batch {}", - table_name, - batch_id - ); - }; - let Some(keys) = keys_value.as_array() else { - anyhow::bail!( - "Invalid 'key_columns' (expected string array) in metadata for table '{}' batch {}", - table_name, - batch_id - ); - }; - - let parsed = keys - .iter() - .map(|v| { - v.as_str().map(ToOwned::to_owned).ok_or_else(|| { - anyhow::anyhow!( - "Invalid key column entry (expected string) in metadata for table '{}' batch {}", - table_name, - batch_id - ) - }) - }) - .collect::>>()?; - - if parsed.is_empty() { - anyhow::bail!( - "'key_columns' cannot be empty in metadata for table '{}' batch {}", - table_name, - batch_id - ); + // Find the first batch entry that has key_columns defined. + for (_batch_key, entry) in batches { + if let Some(keys_value) = entry.get("key_columns") { + if let Some(keys) = keys_value.as_array() { + let parsed: Vec = keys + .iter() + .filter_map(|v| v.as_str().map(ToOwned::to_owned)) + .collect(); + if !parsed.is_empty() { + return Ok(parsed); + } + } } - - Ok(parsed) - }; - - match op.as_str() { - "insert" => Ok(BatchOperation::Insert), - "update" => Ok(BatchOperation::Update { - key_columns: parse_key_columns()?, - }), - "delete" => Ok(BatchOperation::Delete { - key_columns: parse_key_columns()?, - }), - other => anyhow::bail!( - "Unsupported operation '{other}' in metadata for table '{}' batch {}", - table_name, - batch_id - ), } + + Ok(Vec::new()) } } @@ -359,6 +315,10 @@ impl DataStorage for S3Storage { Ok(VecDeque::from(ids)) } + async fn read_key_columns(&self, table_name: &str) -> anyhow::Result> { + self.read_key_columns_from_metadata(table_name).await + } + async fn read_batch( &self, table_name: &str, @@ -384,13 +344,13 @@ impl DataStorage for S3Storage { batches.push(batch); } - let operation = self.read_batch_operation(table_name, batch_id).await?; + let key_columns = self.read_key_columns_from_metadata(table_name).await?; Ok(Some(ReadResult { batches, rows_read, bytes_read, - operation, + key_columns, })) } } diff --git a/crates/etl/src/lib.rs b/crates/etl/src/lib.rs index 59c73f32..6eafbf70 100644 --- a/crates/etl/src/lib.rs +++ b/crates/etl/src/lib.rs @@ -18,13 +18,14 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use arrow::array::{RecordBatch, TimestampMicrosecondArray}; +use arrow::array::{Array, RecordBatch, StringArray, TimestampMicrosecondArray}; +use arrow::compute; use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use data_generation::config::DatasetConfig as GenerationDatasetConfig; use data_generation::dataset::simple_sequence::SimpleSequenceDataset; use data_generation::dataset::tpch::TpchDataset; use data_generation::dataset::{Dataset, MutationConfig}; -use data_generation::storage::{BatchOperation, DataStorage}; +use data_generation::storage::DataStorage; use std::collections::{BTreeMap, HashSet}; use std::sync::Arc as StdArc; use std::sync::Mutex as StdMutex; @@ -44,7 +45,7 @@ pub mod sink; const CREATED_AT_COLUMN: &str = "__created_at"; /// Internal columns that must be stripped before writing to the sink. -const INTERNAL_COLUMNS: &[&str] = &["__op", "__op_index"]; +const INTERNAL_COLUMNS: &[&str] = &["_op", "_op_index"]; /// Returns a new schema with the `__created_at` timestamp column appended. fn schema_with_created_at(schema: &SchemaRef) -> SchemaRef { @@ -75,7 +76,7 @@ fn append_created_at(batch: &RecordBatch) -> anyhow::Result { Ok(RecordBatch::try_new(new_schema, columns)?) } -/// Removes internal bookkeeping columns (`__op`, `__op_index`) from a +/// Removes internal bookkeeping columns (`_op`, `_op_index`) from a /// [`RecordBatch`] so they are not persisted to the sink. fn strip_internal_columns(batch: &RecordBatch) -> anyhow::Result { let schema = batch.schema(); @@ -106,6 +107,110 @@ fn strip_internal_columns(batch: &RecordBatch) -> anyhow::Result { )?) } +/// A sub-batch of rows sharing the same operation type, derived from the +/// `_op` column values. +struct OpSegment { + /// The sink operation for this segment. + op: InsertOp, + /// The `RecordBatch` containing only the rows for this segment, with + /// internal columns (`_op`, `_op_index`) already stripped. + batch: RecordBatch, +} + +/// Sorts rows by `_op_index`, then splits the batch into consecutive segments +/// of the same `_op` value. Each segment is returned as an [`OpSegment`] +/// with internal columns stripped. +/// +/// If the batch has no `_op` / `_op_index` columns (e.g. a pure-insert +/// initial batch), a single `Insert` segment covering all rows is returned. +fn split_batch_by_op( + batch: &RecordBatch, + key_columns: &[String], +) -> anyhow::Result> { + let schema = batch.schema(); + + // If there is no _op column, treat the whole batch as an insert. + let op_idx = match schema.index_of("_op") { + Ok(idx) => idx, + Err(_) => { + let stripped = strip_internal_columns(batch)?; + return Ok(vec![OpSegment { + op: InsertOp::Insert, + batch: stripped, + }]); + } + }; + + // Sort by _op_index to ensure correct replay order. + let sorted_batch = if let Ok(oi_idx) = schema.index_of("_op_index") { + let op_index_col = batch.column(oi_idx); + let sort_indices = compute::sort_to_indices(op_index_col, None, None)?; + let columns: Vec<_> = batch + .columns() + .iter() + .map(|c| compute::take(c.as_ref(), &sort_indices, None).map_err(|e| e.into())) + .collect::>>()?; + RecordBatch::try_new(batch.schema(), columns)? + } else { + batch.clone() + }; + + let op_array = sorted_batch + .column(op_idx) + .as_any() + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("_op column is not a StringArray"))?; + + let num_rows = sorted_batch.num_rows(); + if num_rows == 0 { + return Ok(Vec::new()); + } + + // Walk through rows and group consecutive runs of the same operation. + let mut segments = Vec::new(); + let mut run_start = 0usize; + let mut current_op = op_array.value(0); + + for i in 1..num_rows { + let row_op = op_array.value(i); + if row_op != current_op { + // Flush the current run. + let slice = sorted_batch.slice(run_start, i - run_start); + let stripped = strip_internal_columns(&slice)?; + segments.push(OpSegment { + op: op_str_to_insert_op(current_op, key_columns), + batch: stripped, + }); + run_start = i; + current_op = row_op; + } + } + + // Flush the final run. + let slice = sorted_batch.slice(run_start, num_rows - run_start); + let stripped = strip_internal_columns(&slice)?; + segments.push(OpSegment { + op: op_str_to_insert_op(current_op, key_columns), + batch: stripped, + }); + + Ok(segments) +} + +/// Maps a `_op` column value (`"c"`, `"u"`, `"d"`) to an [`InsertOp`]. +fn op_str_to_insert_op(op: &str, key_columns: &[String]) -> InsertOp { + match op { + "u" => InsertOp::Update { + key_columns: key_columns.to_vec(), + }, + "d" => InsertOp::Delete { + key_columns: key_columns.to_vec(), + }, + // "c" and anything else default to Insert. + _ => InsertOp::Insert, + } +} + /// Specifies which dataset implementation to use for the ETL pipeline. #[derive(Debug, Clone)] pub enum DatasetSource { @@ -335,22 +440,25 @@ impl ETLPipeline { format!("No data for table {table_name} at batch {first_batch_id}") })?; - let op = sink_op_from_batch_op(&read_result.operation); + let key_columns = &read_result.key_columns; - for batch in read_result.batches { - let stripped = strip_internal_columns(&batch).map_err(|e| { + for batch in &read_result.batches { + let segments = split_batch_by_op(batch, key_columns).map_err(|e| { format!( - "strip internal columns from {table_name} batch {first_batch_id}: {e}" + "split batch by op for {table_name} batch {first_batch_id}: {e}" ) })?; - let rehydrated = append_created_at(&stripped).map_err(|e| { - format!("append __created_at to {table_name} batch {first_batch_id}: {e}") - })?; - target - .write(&table_name, first_batch_id, rehydrated, op.clone()) - .await - .map_err(|e| format!("write {table_name} batch {first_batch_id}: {e}"))?; + for segment in segments { + let rehydrated = append_created_at(&segment.batch).map_err(|e| { + format!("append __created_at to {table_name} batch {first_batch_id}: {e}") + })?; + + target + .write(&table_name, first_batch_id, rehydrated, segment.op) + .await + .map_err(|e| format!("write {table_name} batch {first_batch_id}: {e}"))?; + } } debug!( @@ -676,51 +784,54 @@ async fn run_pipeline( } }; - let op = sink_op_from_batch_op(&read_result.operation); + let key_columns = &read_result.key_columns; - // 2. Strip internal columns, append __created_at, and write to target - for batch in read_result.batches { - let stripped = match strip_internal_columns(&batch) { - Ok(b) => b, + // 2. Split by _op, strip internal columns, append __created_at, and write to target + for batch in &read_result.batches { + let segments = match split_batch_by_op(batch, key_columns) { + Ok(s) => s, Err(e) => { error!( table = %table_name, batch_id, error = %e, - "Failed to strip internal columns" + "Failed to split batch by operation" ); return Err(format!( - "strip internal columns from {table_name} batch {batch_id}: {e}" + "split batch by op for {table_name} batch {batch_id}: {e}" )); } }; - let rehydrated = match append_created_at(&stripped) { - Ok(b) => b, - Err(e) => { + + for segment in segments { + let rehydrated = match append_created_at(&segment.batch) { + Ok(b) => b, + Err(e) => { + error!( + table = %table_name, + batch_id, + error = %e, + "Failed to append __created_at column" + ); + return Err(format!( + "append __created_at to {table_name} batch {batch_id}: {e}" + )); + } + }; + + // 3. Write to sink + if let Err(e) = data_sink + .write(&table_name, batch_id, rehydrated, segment.op) + .await + { error!( table = %table_name, batch_id, error = %e, - "Failed to append __created_at column" + "Failed to write batch to target" ); - return Err(format!( - "append __created_at to {table_name} batch {batch_id}: {e}" - )); + return Err(format!("write {table_name} batch {batch_id}: {e}")); } - }; - - // 3. Write to sink - if let Err(e) = data_sink - .write(&table_name, batch_id, rehydrated, op.clone()) - .await - { - error!( - table = %table_name, - batch_id, - error = %e, - "Failed to write batch to target" - ); - return Err(format!("write {table_name} batch {batch_id}: {e}")); } } @@ -773,15 +884,3 @@ async fn run_pipeline( ); PipelineState::Stopped(StopReason::Completed) } - -fn sink_op_from_batch_op(op: &BatchOperation) -> InsertOp { - match op { - BatchOperation::Insert => InsertOp::Insert, - BatchOperation::Update { key_columns } => InsertOp::Update { - key_columns: key_columns.clone(), - }, - BatchOperation::Delete { key_columns } => InsertOp::Delete { - key_columns: key_columns.clone(), - }, - } -} From aa3e5d53550cb25ceba76442af6cc464b1735482 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:25:56 -0600 Subject: [PATCH 2/2] fix --- crates/etl/src/lib.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/etl/src/lib.rs b/crates/etl/src/lib.rs index 6eafbf70..0bd96738 100644 --- a/crates/etl/src/lib.rs +++ b/crates/etl/src/lib.rs @@ -444,20 +444,22 @@ impl ETLPipeline { for batch in &read_result.batches { let segments = split_batch_by_op(batch, key_columns).map_err(|e| { - format!( - "split batch by op for {table_name} batch {first_batch_id}: {e}" - ) + format!("split batch by op for {table_name} batch {first_batch_id}: {e}") })?; for segment in segments { let rehydrated = append_created_at(&segment.batch).map_err(|e| { - format!("append __created_at to {table_name} batch {first_batch_id}: {e}") + format!( + "append __created_at to {table_name} batch {first_batch_id}: {e}" + ) })?; target .write(&table_name, first_batch_id, rehydrated, segment.op) .await - .map_err(|e| format!("write {table_name} batch {first_batch_id}: {e}"))?; + .map_err(|e| { + format!("write {table_name} batch {first_batch_id}: {e}") + })?; } }