Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ venv
*.db-shm

# Cluster logs
cluster_logs/
cluster_logs/
*.duckdb
*.duckdb.wal
checkpoints/**/*.parquet
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/checkpointer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ clap = { workspace = true, features = ["derive"] }
data-generation = { path = "../data-generation" }
etl = { path = "../etl", features = ["duckdb"] }
system-adapter-protocol = { path = "../system-adapter-protocol" }
parquet.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
Expand Down
133 changes: 114 additions & 19 deletions crates/checkpointer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,29 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::path::PathBuf;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow::array::RecordBatch;
use clap::Parser;
use data_generation::config::{DatasetConfig, TargetConfig};
use data_generation::dataset::MutationConfig;
use data_generation::storage::s3::S3Storage;
use etl::sink::duckdb::DuckDBSink;
use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
use parquet::arrow::ArrowWriter;
use tracing_subscriber::EnvFilter;

/// Static list of checkpoint queries to run against the DuckDB database at
/// each checkpoint boundary.
const CHECKPOINT_QUERIES: &[&str] = &[
"SELECT COUNT(*) AS cnt FROM lineitem",
"SELECT COUNT(*) AS cnt FROM orders",
"SELECT COUNT(*) AS cnt FROM customer",
"SELECT * FROM lineitem ORDER BY l_orderkey LIMIT 1000",
];

#[derive(Parser)]
#[command(
about = "Run an ETL pipeline that reads from S3, rehydrates data, and writes directly to a SUT via ADBC"
Expand Down Expand Up @@ -64,6 +76,10 @@ struct Cli {
/// Every N steps to take a checkpoint
#[arg(long, default_value_t = 100)]
checkpoint_interval_steps: u64,

/// Directory to write checkpoint parquet files into
#[arg(long, default_value = "./checkpoints")]
checkpoint_dir: PathBuf,
}

impl Cli {
Expand Down Expand Up @@ -95,6 +111,54 @@ impl Cli {
}
}

/// Run all checkpoint queries against the DuckDB sink and write each result
/// set to a parquet file at `<checkpoint_dir>/<checkpoint_idx>/<query_idx>.parquet`.
async fn run_checkpoint_queries(
sink: &DuckDBSink,
checkpoint_dir: &Path,
checkpoint_idx: usize,
) -> anyhow::Result<()> {
let resolved_checkpoint_dir = checkpoint_dir.join(checkpoint_idx.to_string());
fs::create_dir_all(&resolved_checkpoint_dir)?;

for (query_idx, sql) in CHECKPOINT_QUERIES.iter().enumerate() {
tracing::info!(
checkpoint = checkpoint_idx,
query = query_idx,
sql = %sql,
"Running checkpoint query"
);

let batches = sink.query(sql).await?;
let out_path = resolved_checkpoint_dir.join(format!("{query_idx}.parquet"));
write_batches_to_parquet(&batches, &out_path)?;

tracing::info!(
checkpoint = checkpoint_idx,
query = query_idx,
path = %out_path.display(),
"Checkpoint query result written"
);
}

Ok(())
}

/// Write a slice of `RecordBatch`es to a single parquet file.
fn write_batches_to_parquet(batches: &[RecordBatch], path: &Path) -> anyhow::Result<()> {
if batches.is_empty() {
anyhow::bail!("No record batches to write to parquet");
}
let schema = batches[0].schema();
let file = fs::File::create(path)?;
let mut writer = ArrowWriter::try_new(file, schema, None)?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
Expand All @@ -109,11 +173,17 @@ async fn main() -> anyhow::Result<()> {
let source = Arc::new(S3Storage::new(&cli.source_config())?);

let target = Arc::new(DuckDBSink::new(&cli.duckdb_path)?);
let target_sink: Arc<dyn etl::sink::Sink> = Arc::clone(&target) as Arc<dyn etl::sink::Sink>;

let mutations = MutationConfig::new(0.1, 0.1);

let mut pipeline =
ETLPipeline::new(dataset_source, &dataset_config, source, target, &mutations)?;
let mut pipeline = ETLPipeline::new(
dataset_source,
&dataset_config,
source,
target_sink,
&mutations,
)?;

tracing::info!(
dataset = %cli.dataset,
Expand All @@ -122,6 +192,8 @@ async fn main() -> anyhow::Result<()> {
duckdb_path = %cli.duckdb_path.display(),
scale_factor = cli.scale_factor,
num_steps = cli.num_steps,
checkpoint_interval = cli.checkpoint_interval_steps,
checkpoint_dir = %cli.checkpoint_dir.display(),
"Starting Checkpointer"
);

Expand All @@ -134,24 +206,47 @@ async fn main() -> anyhow::Result<()> {
pipeline.initialize().await?;
pipeline.run(cli.checkpoint_interval_steps as usize)?;

match pipeline.wait().await {
PipelineState::Paused => {}
PipelineState::Stopped(StopReason::Completed) => {
tracing::info!("Checkpointer completed successfully");
}
PipelineState::Stopped(StopReason::Cancelled) => {
tracing::warn!("Checkpointer was cancelled");
}
PipelineState::Stopped(StopReason::Error(e)) => {
tracing::error!(error = %e, "Checkpointer stopped with error");
anyhow::bail!("Checkpointer failed: {e}");
}
other => {
anyhow::bail!("Unexpected final pipeline state: {other:?}");
let mut checkpoint_idx: usize = 0;

loop {
let state = pipeline.wait().await;

match state {
PipelineState::Paused => {
// Pipeline paused after a batch of steps — take a checkpoint.
tracing::info!(
checkpoint = checkpoint_idx,
"Pipeline paused, running checkpoint queries"
);
run_checkpoint_queries(&target, &cli.checkpoint_dir, checkpoint_idx).await?;
checkpoint_idx += 1;

// Resume the pipeline for the next batch of steps.
pipeline.continue_pipeline()?;
}
PipelineState::Stopped(StopReason::Completed) => {
// Take a final checkpoint at completion.
tracing::info!(
checkpoint = checkpoint_idx,
"Pipeline completed, running final checkpoint queries"
);
run_checkpoint_queries(&target, &cli.checkpoint_dir, checkpoint_idx).await?;
tracing::info!("Checkpointer completed successfully");
break;
}
PipelineState::Stopped(StopReason::Cancelled) => {
tracing::warn!("Checkpointer was cancelled");
break;
}
PipelineState::Stopped(StopReason::Error(e)) => {
tracing::error!(error = %e, "Checkpointer stopped with error");
anyhow::bail!("Checkpointer failed: {e}");
}
other => {
anyhow::bail!("Unexpected final pipeline state: {other:?}");
}
}
}

// TODO: checkpoint the current state, continue the pipeline, and continue checkpointing until completion.

Ok(())
}
50 changes: 50 additions & 0 deletions crates/etl/src/sink/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::{Arc, Mutex};
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Schema};
use async_trait::async_trait;
use duckdb::arrow::record_batch::RecordBatch as DuckDBRecordBatch;
use tokio::sync::Mutex as TokioMutex;

use super::{InsertOp, Sink};
Expand Down Expand Up @@ -250,6 +251,55 @@ impl DuckDBSink {
}
}

impl DuckDBSink {
/// Executes an arbitrary SQL query against the underlying DuckDB connection
/// and returns all result rows collected into `RecordBatch`es.
pub async fn query(&self, sql: &str) -> anyhow::Result<Vec<RecordBatch>> {
let conn = Arc::clone(&self.conn);
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
let guard = conn
.lock()
.map_err(|e| anyhow::anyhow!("DuckDB connection lock poisoned: {e}"))?;
let mut stmt = guard
.prepare(&sql)
.map_err(|e| anyhow::anyhow!("Failed to prepare DuckDB query: {e}"))?;
let duckdb_batches: Vec<DuckDBRecordBatch> = stmt
.query_arrow([])
.map_err(|e| anyhow::anyhow!("Failed to execute DuckDB query: {e}"))?
.collect();

// Convert from duckdb::arrow RecordBatch to arrow::array::RecordBatch
// via IPC serialization round-trip for crate compatibility.
let mut batches = Vec::with_capacity(duckdb_batches.len());
for db_batch in duckdb_batches {
let mut buf = Vec::new();
{
let mut writer = duckdb::arrow::ipc::writer::FileWriter::try_new(
&mut buf,
&db_batch.schema(),
)
.map_err(|e| anyhow::anyhow!("IPC write init failed: {e}"))?;
writer
.write(&db_batch)
.map_err(|e| anyhow::anyhow!("IPC write failed: {e}"))?;
writer
.finish()
.map_err(|e| anyhow::anyhow!("IPC finish failed: {e}"))?;
}
let reader =
arrow::ipc::reader::FileReader::try_new(std::io::Cursor::new(buf), None)
.map_err(|e| anyhow::anyhow!("IPC read failed: {e}"))?;
for batch in reader {
batches.push(batch.map_err(|e| anyhow::anyhow!("IPC batch read failed: {e}"))?);
}
}
Ok::<_, anyhow::Error>(batches)
})
.await?
}
}

#[async_trait]
impl Sink for DuckDBSink {
async fn write(
Expand Down