Skip to content

Commit 2b2b1c4

Browse files
committed
Logging
1 parent 9277344 commit 2b2b1c4

5 files changed

Lines changed: 133 additions & 12 deletions

File tree

crates/checkpointer/src/main.rs

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ struct Cli {
8686
/// Directory to write checkpoint parquet files into
8787
#[arg(long, default_value = "./checkpoints")]
8888
checkpoint_dir: PathBuf,
89+
90+
/// Local directory to check for pre-extracted data before downloading from S3
91+
#[arg(long, default_value = "./spicebench-data")]
92+
data_dir: PathBuf,
8993
}
9094

9195
#[cfg(feature = "duckdb")]
@@ -105,6 +109,32 @@ impl Cli {
105109
}
106110
}
107111

112+
/// Log the row count for each table in the DuckDB sink.
113+
#[cfg(feature = "duckdb")]
114+
async fn log_table_row_counts(
115+
sink: &DuckDBSink,
116+
table_names: &[String],
117+
checkpoint_idx: usize,
118+
) -> anyhow::Result<()> {
119+
for table in table_names {
120+
let sql = format!("SELECT COUNT(*) AS cnt FROM {table}");
121+
let batches = sink.query(&sql).await?;
122+
let count: i64 = batches
123+
.first()
124+
.and_then(|b| {
125+
b.column(0)
126+
.as_any()
127+
.downcast_ref::<arrow::array::Int64Array>()
128+
.map(|a| a.value(0))
129+
})
130+
.unwrap_or(0);
131+
if table == "lineitem" {
132+
eprintln!("[checkpoint] Checkpoint {checkpoint_idx} | {table}: {count} rows");
133+
}
134+
}
135+
Ok(())
136+
}
137+
108138
/// Run all checkpoint queries against the DuckDB sink and write each result
109139
/// set to a parquet file at `<checkpoint_dir>/<checkpoint_idx>/<query_idx>.parquet`.
110140
#[cfg(feature = "duckdb")]
@@ -198,20 +228,34 @@ async fn main() -> anyhow::Result<()> {
198228

199229
let source_config = cli.source_config();
200230
let version_prefix = source_config.prefix.clone();
201-
let archive_storage: Arc<dyn DataStorage> = Arc::new(S3Storage::new(&source_config)?);
202231

203-
// Download and extract the archive to a temporary local directory.
204-
let extract_dir = tempfile::tempdir()?;
205-
ETLPipeline::download(archive_storage, extract_dir.path()).await?;
206-
let source: Arc<dyn DataStorage> = Arc::new(FileStorage::new(extract_dir.path()));
232+
// Check local directory first; download from S3 only if version.json is missing.
233+
let (_extract_dir_handle, extract_path);
234+
let version_json_path = cli.data_dir.join("version.json");
235+
if version_json_path.exists() {
236+
tracing::info!(
237+
data_dir = %cli.data_dir.display(),
238+
"Using pre-existing local data, skipping S3 download"
239+
);
240+
_extract_dir_handle = None;
241+
extract_path = cli.data_dir.clone();
242+
} else {
243+
tracing::info!("Local data not found at {}, downloading from S3", cli.data_dir.display());
244+
let archive_storage: Arc<dyn DataStorage> = Arc::new(S3Storage::new(&source_config)?);
245+
let tmp = tempfile::tempdir()?;
246+
ETLPipeline::download(archive_storage, tmp.path()).await?;
247+
extract_path = tmp.path().to_path_buf();
248+
_extract_dir_handle = Some(tmp);
249+
}
250+
let source: Arc<dyn DataStorage> = Arc::new(FileStorage::new(&extract_path));
207251

208252
// Read version metadata to derive dataset config and mutations.
209253
let version_metadata = source.read_version_metadata().await?.ok_or_else(|| {
210-
anyhow::anyhow!(
211-
"No version.json found in extracted data at {}. Was data generation run for this version?",
212-
extract_dir.path().display()
213-
)
214-
})?;
254+
anyhow::anyhow!(
255+
"No version.json found in extracted data at {}. Was data generation run for this version?",
256+
extract_path.display()
257+
)
258+
})?;
215259

216260
let dataset_source = DatasetSource::from_dataset_type(&version_metadata.dataset_type)?;
217261
let dataset_config = version_metadata.dataset_config();
@@ -234,7 +278,7 @@ async fn main() -> anyhow::Result<()> {
234278
bucket = %cli.bucket,
235279
prefix = %cli.prefix,
236280
version_prefix = %version_prefix,
237-
extract_dir = %extract_dir.path().display(),
281+
extract_dir = %extract_path.display(),
238282
duckdb_path = %cli.duckdb_path.display(),
239283
scale_factor = version_metadata.scale_factor,
240284
num_steps = version_metadata.num_steps,
@@ -243,6 +287,9 @@ async fn main() -> anyhow::Result<()> {
243287
"Starting Checkpointer"
244288
);
245289

290+
let mut table_names: Vec<String> = version_metadata.tables.keys().cloned().collect();
291+
table_names.sort();
292+
246293
pipeline.initialize().await?;
247294
pipeline.run(cli.checkpoint_interval_steps as usize).await?;
248295

@@ -258,6 +305,7 @@ async fn main() -> anyhow::Result<()> {
258305
checkpoint = checkpoint_idx,
259306
"Pipeline paused, running checkpoint queries"
260307
);
308+
log_table_row_counts(&target, &table_names, checkpoint_idx).await?;
261309
run_checkpoint_queries(
262310
&target,
263311
&checkpoint_queries,
@@ -276,6 +324,7 @@ async fn main() -> anyhow::Result<()> {
276324
checkpoint = checkpoint_idx,
277325
"Pipeline completed, running final checkpoint queries"
278326
);
327+
log_table_row_counts(&target, &table_names, checkpoint_idx).await?;
279328
run_checkpoint_queries(
280329
&target,
281330
&checkpoint_queries,

crates/data-generation/src/storage/file.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,15 @@ impl DataStorage for FileStorage {
157157
.await
158158
.map_err(|e| anyhow::anyhow!("spawn_blocking panicked reading parquet: {e}"))??;
159159

160+
if table_name == "lineitem"
161+
&& let Some(result) = result.as_ref()
162+
{
163+
eprintln!(
164+
"[etl-read] table={table_name} batch_id={batch_id} rows={}",
165+
result.rows_read,
166+
);
167+
}
168+
160169
Ok(result)
161170
}
162171

crates/etl/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,6 +1791,14 @@ async fn run_pipeline(
17911791
}
17921792
};
17931793

1794+
let source_rows: usize = source_batches.iter().map(|b| b.num_rows()).sum();
1795+
if table_name == "lineitem" {
1796+
eprintln!(
1797+
"!!!!!!!!!!!!!!! [etl] table={table_name} batch_id={batch_id} source_batches={} source_rows={source_rows} consumed_work_units={consumed_work_units}",
1798+
source_batches.len(),
1799+
);
1800+
}
1801+
17941802
if source_batches.is_empty() {
17951803
debug!(
17961804
table = %table_name,

crates/etl/src/sink/adbc.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ limitations under the License.
1515
*/
1616

1717
use std::collections::HashMap;
18+
use std::sync::Mutex;
1819
use std::time::Instant;
1920

2021
use adbc_client::{
@@ -88,6 +89,7 @@ pub struct AdbcSink {
8889
pool: AdbcConnectionPool,
8990
target_db_catalog: Option<String>,
9091
target_db_schema: Option<String>,
92+
row_counts: Mutex<HashMap<String, u64>>,
9193
/// Character used to quote SQL identifiers (e.g. '"' for ANSI, '`' for Databricks).
9294
identifier_quote_char: char,
9395
/// Whether Int64/UInt64 literals need an `L` suffix (Databricks).
@@ -130,6 +132,7 @@ impl AdbcSink {
130132
pool,
131133
target_db_catalog,
132134
target_db_schema,
135+
row_counts: Mutex::new(HashMap::new()),
133136
identifier_quote_char,
134137
bigint_suffix,
135138
})
@@ -788,6 +791,7 @@ impl AdbcSink {
788791
WHEN MATCHED THEN UPDATE SET {set_clause}"
789792
)
790793
}
794+
791795
}
792796

793797
#[async_trait]
@@ -809,6 +813,18 @@ impl Sink for AdbcSink {
809813
.get()
810814
.map_err(|e| anyhow::anyhow!("Failed to get ADBC connection from pool: {e}"))?;
811815

816+
let rows_current = batch.num_rows() as u64;
817+
let op_label = match &op {
818+
InsertOp::Insert => "insert",
819+
InsertOp::Update { .. } => "update",
820+
InsertOp::Delete { .. } => "delete",
821+
};
822+
823+
let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f UTC");
824+
// if table_name == "lineitem" {
825+
eprintln!("[adbc-write] {now} | {table_name} | {op_label} | rows: {rows_current}");
826+
// }
827+
812828
match op {
813829
InsertOp::Insert => {
814830
self.ingest_insert_batch(&mut conn, table_name, batch)?;
@@ -882,6 +898,24 @@ impl Sink for AdbcSink {
882898
}
883899
}
884900

901+
let rows_total = {
902+
let mut counts = self.row_counts.lock().unwrap();
903+
let total = counts.entry(table_name.to_string()).or_insert(0);
904+
match op_label {
905+
"insert" => *total += rows_current,
906+
"delete" => *total = total.saturating_sub(rows_current),
907+
_ => {} // updates don't change row count
908+
}
909+
*total
910+
};
911+
912+
let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f UTC");
913+
// if table_name == "lineitem" {
914+
eprintln!(
915+
"[adbc] WRITTEN {now} | {table_name} | {op_label} | rows: {rows_current} | total: {rows_total}"
916+
);
917+
// }
918+
885919
Ok(())
886920
}
887921
}

crates/etl/src/sink/duckdb.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
use std::collections::HashSet;
17+
use std::collections::{HashMap, HashSet};
1818
use std::path::Path;
1919
use std::sync::{Arc, Mutex};
2020

@@ -35,6 +35,7 @@ use super::{InsertOp, Sink};
3535
pub struct DuckDBSink {
3636
conn: Arc<Mutex<duckdb::Connection>>,
3737
created_tables: TokioMutex<HashSet<String>>,
38+
row_counts: Mutex<HashMap<String, u64>>,
3839
}
3940

4041
impl DuckDBSink {
@@ -45,6 +46,7 @@ impl DuckDBSink {
4546
Ok(Self {
4647
conn: Arc::new(Mutex::new(conn)),
4748
created_tables: TokioMutex::new(HashSet::new()),
49+
row_counts: Mutex::new(HashMap::new()),
4850
})
4951
}
5052

@@ -390,6 +392,25 @@ impl Sink for DuckDBSink {
390392
created.insert(table_name.to_string());
391393
}
392394

395+
let rows_current = num_rows as u64;
396+
let op_label = match &op {
397+
InsertOp::Insert => "insert",
398+
InsertOp::Update { .. } => "update",
399+
InsertOp::Delete { .. } => "delete",
400+
};
401+
let rows_total = {
402+
let mut counts = self.row_counts.lock().unwrap();
403+
let total = counts.entry(table_name.to_string()).or_insert(0);
404+
*total += rows_current;
405+
*total
406+
};
407+
let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f UTC");
408+
if table_name == "lineitem" {
409+
eprintln!(
410+
"[duckdb] WRITTEN {now} | {table_name} | {op_label} | rows: {rows_current} | total: {rows_total}"
411+
);
412+
}
413+
393414
Ok(())
394415
}
395416
}

0 commit comments

Comments
 (0)