Skip to content

Commit 12ecd1c

Browse files
authored
Properly add support for __created_at column in etl crate (#66)
* Support for e2e latency metric * Fix * __create_at column * Lint * Fix
1 parent e278dcc commit 12ecd1c

4 files changed

Lines changed: 57 additions & 27 deletions

File tree

crates/data-generation/src/dataset/simple_sequence.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ impl SimpleSequenceDataset {
7878
/// Returns the static Arrow schema for the `integer_sequence` table.
7979
///
8080
/// Includes change-tracking columns (`_op`, `_op_index`).
81+
///
82+
/// The time column (`__created_at`) is not included; it will be added during
83+
/// ETL rehydration.
8184
pub fn schema() -> SchemaRef {
8285
Arc::new(Schema::new(vec![
8386
Field::new("id", DataType::Int64, false),

crates/data-generation/src/dataset/tpch.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use crate::dataset::MutationConfig;
2929

3030
use super::{Dataset, DatasetTable};
3131

32-
/// TPC-H table definitions: (table_name, time_column, schema_fn).
33-
const TPCH_TABLE_TIME_COLUMNS: &[(&str, &str)] = &[
32+
/// TPC-H table definitions: `(table_name, time_column)`.
33+
const TPCH_TABLES: &[(&str, &str)] = &[
3434
("region", "r_created_at"),
3535
("nation", "n_created_at"),
3636
("supplier", "s_created_at"),
@@ -217,7 +217,7 @@ impl TpchDataset {
217217
/// Drop the `_new` tables created by `advance_step()`.
218218
fn drop_step_tables(&self) -> anyhow::Result<()> {
219219
let mut sql = String::new();
220-
for (table, _) in TPCH_TABLE_TIME_COLUMNS {
220+
for (table, _) in TPCH_TABLES {
221221
sql.push_str(&format!("DROP TABLE IF EXISTS {table}_new;"));
222222
}
223223
let conn = self
@@ -256,10 +256,7 @@ impl Dataset for TpchDataset {
256256
}
257257

258258
fn num_batches(&self, table: &str) -> u64 {
259-
if !TPCH_TABLE_TIME_COLUMNS
260-
.iter()
261-
.any(|(name, _)| *name == table)
262-
{
259+
if !TPCH_TABLES.iter().any(|(name, _)| *name == table) {
263260
return 0;
264261
}
265262

@@ -274,7 +271,7 @@ impl Dataset for TpchDataset {
274271
.consumed_tables
275272
.read()
276273
.map_err(|e| anyhow::anyhow!("lock poisoned: {e}"))?;
277-
if consumed.len() >= TPCH_TABLE_TIME_COLUMNS.len() {
274+
if consumed.len() >= TPCH_TABLES.len() {
278275
drop(consumed);
279276
if self.current_step.load(Ordering::SeqCst) > 0 {
280277
self.drop_step_tables()?;
@@ -302,10 +299,7 @@ impl Dataset for TpchDataset {
302299
}
303300

304301
// Validate the table name
305-
if !TPCH_TABLE_TIME_COLUMNS
306-
.iter()
307-
.any(|(name, _)| *name == table)
308-
{
302+
if !TPCH_TABLES.iter().any(|(name, _)| *name == table) {
309303
anyhow::bail!("Unknown TPC-H table: {table}");
310304
}
311305

@@ -358,7 +352,7 @@ impl Dataset for TpchDataset {
358352
}
359353

360354
fn tables(&self) -> HashMap<String, DatasetTable> {
361-
TPCH_TABLE_TIME_COLUMNS
355+
TPCH_TABLES
362356
.iter()
363357
.map(|(name, time_col)| {
364358
(

crates/etl/src/lib.rs

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ limitations under the License.
1616

1717
use std::collections::HashMap;
1818
use std::sync::Arc;
19+
use std::time::{SystemTime, UNIX_EPOCH};
1920

21+
use arrow::array::{RecordBatch, TimestampMicrosecondArray};
22+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
2023
use data_generation::config::DatasetConfig as GenerationDatasetConfig;
2124
use data_generation::dataset::simple_sequence::SimpleSequenceDataset;
2225
use data_generation::dataset::tpch::TpchDataset;
@@ -36,6 +39,38 @@ use crate::sink::{InsertOp, Sink};
3639

3740
pub mod sink;
3841

42+
/// Column name appended by the ETL pipeline to every batch.
43+
const CREATED_AT_COLUMN: &str = "__created_at";
44+
45+
/// Returns a new schema with the `__created_at` timestamp column appended.
46+
fn schema_with_created_at(schema: &SchemaRef) -> SchemaRef {
47+
let mut fields: Vec<_> = schema.fields().iter().cloned().collect();
48+
fields.push(Arc::new(Field::new(
49+
CREATED_AT_COLUMN,
50+
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
51+
true,
52+
)));
53+
Arc::new(Schema::new(fields))
54+
}
55+
56+
/// Appends a `__created_at` column (current wall-clock time, microsecond UTC)
57+
/// to the given batch.
58+
fn append_created_at(batch: &RecordBatch) -> anyhow::Result<RecordBatch> {
59+
let now_us = SystemTime::now()
60+
.duration_since(UNIX_EPOCH)
61+
.expect("system time before UNIX epoch")
62+
.as_micros() as i64;
63+
64+
let timestamps =
65+
TimestampMicrosecondArray::from(vec![Some(now_us); batch.num_rows()]).with_timezone("UTC");
66+
67+
let new_schema = schema_with_created_at(&batch.schema());
68+
let mut columns: Vec<_> = batch.columns().to_vec();
69+
columns.push(Arc::new(timestamps));
70+
71+
Ok(RecordBatch::try_new(new_schema, columns)?)
72+
}
73+
3974
/// Specifies which dataset implementation to use for the ETL pipeline.
4075
#[derive(Debug, Clone)]
4176
pub enum DatasetSource {
@@ -191,7 +226,7 @@ impl ETLPipeline {
191226
.into_iter()
192227
.map(|(name, table)| {
193228
let config = ProtocolDatasetConfig {
194-
schema: table.rehydrated_schema(),
229+
schema: schema_with_created_at(&table.schema),
195230
};
196231
(name, config)
197232
})
@@ -220,7 +255,6 @@ impl ETLPipeline {
220255

221256
let mut join_set: JoinSet<Result<String, String>> = JoinSet::new();
222257
for table_name in tables.keys() {
223-
let dataset = Arc::clone(&self.dataset);
224258
let source = Arc::clone(&self.data_storage);
225259
let target = Arc::clone(&self.data_sink);
226260
let table_name = table_name.clone();
@@ -237,8 +271,8 @@ impl ETLPipeline {
237271
let op = sink_op_from_batch_op(&read_result.operation);
238272

239273
for batch in read_result.batches {
240-
let rehydrated = dataset.rehydrate(&table_name, &batch).map_err(|e| {
241-
format!("rehydrate {table_name} batch {first_batch_id}: {e}")
274+
let rehydrated = append_created_at(&batch).map_err(|e| {
275+
format!("append __created_at to {table_name} batch {first_batch_id}: {e}")
242276
})?;
243277

244278
target
@@ -289,8 +323,8 @@ impl ETLPipeline {
289323
/// each batch the task:
290324
///
291325
/// 1. Reads the batch from the [`Source`].
292-
/// 2. Rehydrates it through the [`Dataset`] (appending time columns, etc.).
293-
/// 3. Writes the rehydrated batch to the [`Sink`].
326+
/// 2. Appends the `__created_at` timestamp column.
327+
/// 3. Writes the enriched batch to the [`Sink`].
294328
///
295329
/// The task transitions to [`PipelineState::Stopped`] when all batches are
296330
/// processed, the [`CancellationToken`] is triggered, or an error occurs.
@@ -329,7 +363,7 @@ impl ETLPipeline {
329363
work.sort_by_key(|(_, id)| *id);
330364

331365
let handle = tokio::spawn(async move {
332-
let reason = run_pipeline(dataset, source, target, work, cancel).await;
366+
let reason = run_pipeline(source, target, work, cancel).await;
333367
let _ = state_tx.send(PipelineState::Stopped(reason));
334368
});
335369

@@ -355,7 +389,6 @@ impl ETLPipeline {
355389
/// Groups work items by batch ID and processes all tables within each step
356390
/// concurrently, checking for cancellation between steps.
357391
async fn run_pipeline(
358-
dataset: Arc<dyn Dataset>,
359392
data_storage: Arc<dyn DataStorage>,
360393
data_sink: Arc<dyn Sink>,
361394
work: Vec<(String, u64)>,
@@ -433,7 +466,6 @@ async fn run_pipeline(
433466
// Process all tables for this batch_id concurrently.
434467
let mut join_set: JoinSet<Result<(String, bool), String>> = JoinSet::new();
435468
for table_name in active_tables {
436-
let dataset = Arc::clone(&dataset);
437469
let data_storage = Arc::clone(&data_storage);
438470
let data_sink = Arc::clone(&data_sink);
439471

@@ -462,18 +494,20 @@ async fn run_pipeline(
462494

463495
let op = sink_op_from_batch_op(&read_result.operation);
464496

465-
// 2. Rehydrate each record batch and write to target
497+
// 2. Append __created_at and write to target
466498
for batch in read_result.batches {
467-
let rehydrated = match dataset.rehydrate(&table_name, &batch) {
499+
let rehydrated = match append_created_at(&batch) {
468500
Ok(b) => b,
469501
Err(e) => {
470502
error!(
471503
table = %table_name,
472504
batch_id,
473505
error = %e,
474-
"Failed to rehydrate batch"
506+
"Failed to append __created_at column"
475507
);
476-
return Err(format!("rehydrate {table_name} batch {batch_id}: {e}"));
508+
return Err(format!(
509+
"append __created_at to {table_name} batch {batch_id}: {e}"
510+
));
477511
}
478512
};
479513

src/metrics.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,6 @@ pub static EFFICIENCY_QUERIES_PER_CORE: LazyLock<Gauge<f64>> = LazyLock::new(||
219219

220220
// --- E2E Latency ---
221221

222-
#[allow(dead_code)]
223222
pub static E2E_LATENCY_P99_MS: LazyLock<Histogram<f64>> = LazyLock::new(|| {
224223
meter()
225224
.f64_histogram("e2e_latency_p99_ms")

0 commit comments

Comments
 (0)