diff --git a/.gitignore b/.gitignore index 425f7371..6e00c7dc 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,7 @@ venv *.db-shm # Cluster logs -cluster_logs/ \ No newline at end of file +cluster_logs/ +*.duckdb +*.duckdb.wal +checkpoints/**/*.parquet \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 59c68bea..7ef07a18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,6 +938,7 @@ dependencies = [ "clap", "data-generation", "etl", + "parquet", "system-adapter-protocol", "tokio", "tokio-util", diff --git a/crates/checkpointer/Cargo.toml b/crates/checkpointer/Cargo.toml index 0bd66451..4f533dd5 100644 --- a/crates/checkpointer/Cargo.toml +++ b/crates/checkpointer/Cargo.toml @@ -17,6 +17,7 @@ clap = { workspace = true, features = ["derive"] } data-generation = { path = "../data-generation" } etl = { path = "../etl", features = ["duckdb"] } system-adapter-protocol = { path = "../system-adapter-protocol" } +parquet.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true diff --git a/crates/checkpointer/src/main.rs b/crates/checkpointer/src/main.rs index eeb17d50..826523bd 100644 --- a/crates/checkpointer/src/main.rs +++ b/crates/checkpointer/src/main.rs @@ -14,17 +14,29 @@ See the License for the specific language governing permissions and limitations under the License. */ -use std::path::PathBuf; +use std::fs; +use std::path::{Path, PathBuf}; use std::sync::Arc; +use arrow::array::RecordBatch; use clap::Parser; use data_generation::config::{DatasetConfig, TargetConfig}; use data_generation::dataset::MutationConfig; use data_generation::storage::s3::S3Storage; use etl::sink::duckdb::DuckDBSink; use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason}; +use parquet::arrow::ArrowWriter; use tracing_subscriber::EnvFilter; +/// Static list of checkpoint queries to run against the DuckDB database at +/// each checkpoint boundary. +const CHECKPOINT_QUERIES: &[&str] = &[ + "SELECT COUNT(*) AS cnt FROM lineitem", + "SELECT COUNT(*) AS cnt FROM orders", + "SELECT COUNT(*) AS cnt FROM customer", + "SELECT * FROM lineitem ORDER BY l_orderkey LIMIT 1000", +]; + #[derive(Parser)] #[command( about = "Run an ETL pipeline that reads from S3, rehydrates data, and writes directly to a SUT via ADBC" @@ -64,6 +76,10 @@ struct Cli { /// Every N steps to take a checkpoint #[arg(long, default_value_t = 100)] checkpoint_interval_steps: u64, + + /// Directory to write checkpoint parquet files into + #[arg(long, default_value = "./checkpoints")] + checkpoint_dir: PathBuf, } impl Cli { @@ -95,6 +111,54 @@ impl Cli { } } +/// Run all checkpoint queries against the DuckDB sink and write each result +/// set to a parquet file at `//.parquet`. +async fn run_checkpoint_queries( + sink: &DuckDBSink, + checkpoint_dir: &Path, + checkpoint_idx: usize, +) -> anyhow::Result<()> { + let resolved_checkpoint_dir = checkpoint_dir.join(checkpoint_idx.to_string()); + fs::create_dir_all(&resolved_checkpoint_dir)?; + + for (query_idx, sql) in CHECKPOINT_QUERIES.iter().enumerate() { + tracing::info!( + checkpoint = checkpoint_idx, + query = query_idx, + sql = %sql, + "Running checkpoint query" + ); + + let batches = sink.query(sql).await?; + let out_path = resolved_checkpoint_dir.join(format!("{query_idx}.parquet")); + write_batches_to_parquet(&batches, &out_path)?; + + tracing::info!( + checkpoint = checkpoint_idx, + query = query_idx, + path = %out_path.display(), + "Checkpoint query result written" + ); + } + + Ok(()) +} + +/// Write a slice of `RecordBatch`es to a single parquet file. +fn write_batches_to_parquet(batches: &[RecordBatch], path: &Path) -> anyhow::Result<()> { + if batches.is_empty() { + anyhow::bail!("No record batches to write to parquet"); + } + let schema = batches[0].schema(); + let file = fs::File::create(path)?; + let mut writer = ArrowWriter::try_new(file, schema, None)?; + for batch in batches { + writer.write(batch)?; + } + writer.close()?; + Ok(()) +} + #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() @@ -109,11 +173,17 @@ async fn main() -> anyhow::Result<()> { let source = Arc::new(S3Storage::new(&cli.source_config())?); let target = Arc::new(DuckDBSink::new(&cli.duckdb_path)?); + let target_sink: Arc = Arc::clone(&target) as Arc; let mutations = MutationConfig::new(0.1, 0.1); - let mut pipeline = - ETLPipeline::new(dataset_source, &dataset_config, source, target, &mutations)?; + let mut pipeline = ETLPipeline::new( + dataset_source, + &dataset_config, + source, + target_sink, + &mutations, + )?; tracing::info!( dataset = %cli.dataset, @@ -122,6 +192,8 @@ async fn main() -> anyhow::Result<()> { duckdb_path = %cli.duckdb_path.display(), scale_factor = cli.scale_factor, num_steps = cli.num_steps, + checkpoint_interval = cli.checkpoint_interval_steps, + checkpoint_dir = %cli.checkpoint_dir.display(), "Starting Checkpointer" ); @@ -134,24 +206,47 @@ async fn main() -> anyhow::Result<()> { pipeline.initialize().await?; pipeline.run(cli.checkpoint_interval_steps as usize)?; - match pipeline.wait().await { - PipelineState::Paused => {} - PipelineState::Stopped(StopReason::Completed) => { - tracing::info!("Checkpointer completed successfully"); - } - PipelineState::Stopped(StopReason::Cancelled) => { - tracing::warn!("Checkpointer was cancelled"); - } - PipelineState::Stopped(StopReason::Error(e)) => { - tracing::error!(error = %e, "Checkpointer stopped with error"); - anyhow::bail!("Checkpointer failed: {e}"); - } - other => { - anyhow::bail!("Unexpected final pipeline state: {other:?}"); + let mut checkpoint_idx: usize = 0; + + loop { + let state = pipeline.wait().await; + + match state { + PipelineState::Paused => { + // Pipeline paused after a batch of steps — take a checkpoint. + tracing::info!( + checkpoint = checkpoint_idx, + "Pipeline paused, running checkpoint queries" + ); + run_checkpoint_queries(&target, &cli.checkpoint_dir, checkpoint_idx).await?; + checkpoint_idx += 1; + + // Resume the pipeline for the next batch of steps. + pipeline.continue_pipeline()?; + } + PipelineState::Stopped(StopReason::Completed) => { + // Take a final checkpoint at completion. + tracing::info!( + checkpoint = checkpoint_idx, + "Pipeline completed, running final checkpoint queries" + ); + run_checkpoint_queries(&target, &cli.checkpoint_dir, checkpoint_idx).await?; + tracing::info!("Checkpointer completed successfully"); + break; + } + PipelineState::Stopped(StopReason::Cancelled) => { + tracing::warn!("Checkpointer was cancelled"); + break; + } + PipelineState::Stopped(StopReason::Error(e)) => { + tracing::error!(error = %e, "Checkpointer stopped with error"); + anyhow::bail!("Checkpointer failed: {e}"); + } + other => { + anyhow::bail!("Unexpected final pipeline state: {other:?}"); + } } } - // TODO: checkpoint the current state, continue the pipeline, and continue checkpointing until completion. - Ok(()) } diff --git a/crates/etl/src/sink/duckdb.rs b/crates/etl/src/sink/duckdb.rs index ac637fc9..d493c0db 100644 --- a/crates/etl/src/sink/duckdb.rs +++ b/crates/etl/src/sink/duckdb.rs @@ -21,6 +21,7 @@ use std::sync::{Arc, Mutex}; use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Schema}; use async_trait::async_trait; +use duckdb::arrow::record_batch::RecordBatch as DuckDBRecordBatch; use tokio::sync::Mutex as TokioMutex; use super::{InsertOp, Sink}; @@ -250,6 +251,55 @@ impl DuckDBSink { } } +impl DuckDBSink { + /// Executes an arbitrary SQL query against the underlying DuckDB connection + /// and returns all result rows collected into `RecordBatch`es. + pub async fn query(&self, sql: &str) -> anyhow::Result> { + let conn = Arc::clone(&self.conn); + let sql = sql.to_string(); + tokio::task::spawn_blocking(move || { + let guard = conn + .lock() + .map_err(|e| anyhow::anyhow!("DuckDB connection lock poisoned: {e}"))?; + let mut stmt = guard + .prepare(&sql) + .map_err(|e| anyhow::anyhow!("Failed to prepare DuckDB query: {e}"))?; + let duckdb_batches: Vec = stmt + .query_arrow([]) + .map_err(|e| anyhow::anyhow!("Failed to execute DuckDB query: {e}"))? + .collect(); + + // Convert from duckdb::arrow RecordBatch to arrow::array::RecordBatch + // via IPC serialization round-trip for crate compatibility. + let mut batches = Vec::with_capacity(duckdb_batches.len()); + for db_batch in duckdb_batches { + let mut buf = Vec::new(); + { + let mut writer = duckdb::arrow::ipc::writer::FileWriter::try_new( + &mut buf, + &db_batch.schema(), + ) + .map_err(|e| anyhow::anyhow!("IPC write init failed: {e}"))?; + writer + .write(&db_batch) + .map_err(|e| anyhow::anyhow!("IPC write failed: {e}"))?; + writer + .finish() + .map_err(|e| anyhow::anyhow!("IPC finish failed: {e}"))?; + } + let reader = + arrow::ipc::reader::FileReader::try_new(std::io::Cursor::new(buf), None) + .map_err(|e| anyhow::anyhow!("IPC read failed: {e}"))?; + for batch in reader { + batches.push(batch.map_err(|e| anyhow::anyhow!("IPC batch read failed: {e}"))?); + } + } + Ok::<_, anyhow::Error>(batches) + }) + .await? + } +} + #[async_trait] impl Sink for DuckDBSink { async fn write(