Skip to content

Commit 564dc1d

Browse files
committed
Prepare for trunk
1 parent 9a80630 commit 564dc1d

9 files changed

Lines changed: 356 additions & 75 deletions

File tree

.github/workflows/run_spicebench.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,9 @@ jobs:
324324
if [ -n "${DATABRICKS_STAGING_VOLUME_PATH:-}" ]; then
325325
ADAPTER_ENVS="${ADAPTER_ENVS} --system-adapter-env DATABRICKS_STAGING_VOLUME_PATH=${DATABRICKS_STAGING_VOLUME_PATH}"
326326
fi
327+
328+
export SPICEBENCH_TARGET_BATCH_ROWS="500000"
329+
export SPICEBENCH_ADBC_MAX_INGEST_BATCH_BYTES="1268435456"
327330
else
328331
ADAPTER_CMD="docker"
329332
ADAPTER_ARGS="run -i -e SPICEAI_API_KEY -e SPICE_CLOUD_API_URL -e AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY} -e SPIDAPTER_ICEBERG_REGION -e SPIDAPTER_ICEBERG_CATALOG_FROM ghcr.io/spiceai/spidapter:latest stdio --verbose --channel nightly"

crates/data-generation/src/storage/file.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,15 @@ impl DataStorage for FileStorage {
157157
.await
158158
.map_err(|e| anyhow::anyhow!("spawn_blocking panicked reading parquet: {e}"))??;
159159

160+
if table_name == "lineitem"
161+
&& let Some(result) = result.as_ref()
162+
{
163+
eprintln!(
164+
"[etl-read] table={table_name} batch_id={batch_id} rows={}",
165+
result.rows_read,
166+
);
167+
}
168+
160169
Ok(result)
161170
}
162171

crates/etl/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,6 +1738,14 @@ async fn run_pipeline(
17381738
}
17391739
};
17401740

1741+
let source_rows: usize = source_batches.iter().map(|b| b.num_rows()).sum();
1742+
if table_name == "lineitem" {
1743+
eprintln!(
1744+
"!!!!!!!!!!!!!!! [etl] table={table_name} batch_id={batch_id} source_batches={} source_rows={source_rows} consumed_work_units={consumed_work_units}",
1745+
source_batches.len(),
1746+
);
1747+
}
1748+
17411749
if source_batches.is_empty() {
17421750
debug!(
17431751
table = %table_name,

crates/etl/src/sink/adbc.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ limitations under the License.
1515
*/
1616

1717
use std::collections::HashMap;
18+
use std::sync::Mutex;
1819
use std::time::Instant;
1920

2021
use adbc_client::{
@@ -52,6 +53,7 @@ pub struct AdbcSink {
5253
pool: AdbcConnectionPool,
5354
target_db_catalog: Option<String>,
5455
target_db_schema: Option<String>,
56+
row_counts: Mutex<HashMap<String, u64>>,
5557
/// Character used to quote SQL identifiers (e.g. '"' for ANSI, '`' for Databricks).
5658
identifier_quote_char: char,
5759
/// Whether Int64/UInt64 literals need an `L` suffix (Databricks).
@@ -94,6 +96,7 @@ impl AdbcSink {
9496
pool,
9597
target_db_catalog,
9698
target_db_schema,
99+
row_counts: Mutex::new(HashMap::new()),
97100
identifier_quote_char,
98101
bigint_suffix,
99102
})
@@ -668,6 +671,18 @@ impl Sink for AdbcSink {
668671
.get()
669672
.map_err(|e| anyhow::anyhow!("Failed to get ADBC connection from pool: {e}"))?;
670673

674+
let rows_current = batch.num_rows() as u64;
675+
let op_label = match &op {
676+
InsertOp::Insert => "insert",
677+
InsertOp::Update { .. } => "update",
678+
InsertOp::Delete { .. } => "delete",
679+
};
680+
681+
let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f UTC");
682+
if table_name == "lineitem" {
683+
eprintln!("[adbc-write] {now} | {table_name} | {op_label} | rows: {rows_current}");
684+
}
685+
671686
match op {
672687
InsertOp::Insert => {
673688
self.ingest_insert_batch(&mut conn, table_name, batch)?;
@@ -722,6 +737,20 @@ impl Sink for AdbcSink {
722737
}
723738
}
724739

740+
let rows_total = {
741+
let mut counts = self.row_counts.lock().unwrap();
742+
let total = counts.entry(table_name.to_string()).or_insert(0);
743+
*total += rows_current;
744+
*total
745+
};
746+
747+
let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f UTC");
748+
if table_name == "lineitem" {
749+
eprintln!(
750+
"[adbc] WRITTEN {now} | {table_name} | {op_label} | rows: {rows_current} | total: {rows_total}"
751+
);
752+
}
753+
725754
Ok(())
726755
}
727756
}

crates/etl/src/sink/duckdb.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
use std::collections::HashSet;
17+
use std::collections::{HashMap, HashSet};
1818
use std::path::Path;
1919
use std::sync::{Arc, Mutex};
2020

@@ -35,6 +35,7 @@ use super::{InsertOp, Sink};
3535
pub struct DuckDBSink {
3636
conn: Arc<Mutex<duckdb::Connection>>,
3737
created_tables: TokioMutex<HashSet<String>>,
38+
row_counts: Mutex<HashMap<String, u64>>,
3839
}
3940

4041
impl DuckDBSink {
@@ -45,6 +46,7 @@ impl DuckDBSink {
4546
Ok(Self {
4647
conn: Arc::new(Mutex::new(conn)),
4748
created_tables: TokioMutex::new(HashSet::new()),
49+
row_counts: Mutex::new(HashMap::new()),
4850
})
4951
}
5052

@@ -390,6 +392,25 @@ impl Sink for DuckDBSink {
390392
created.insert(table_name.to_string());
391393
}
392394

395+
let rows_current = num_rows as u64;
396+
let op_label = match &op {
397+
InsertOp::Insert => "insert",
398+
InsertOp::Update { .. } => "update",
399+
InsertOp::Delete { .. } => "delete",
400+
};
401+
let rows_total = {
402+
let mut counts = self.row_counts.lock().unwrap();
403+
let total = counts.entry(table_name.to_string()).or_insert(0);
404+
*total += rows_current;
405+
*total
406+
};
407+
let now = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f UTC");
408+
if table_name == "lineitem" {
409+
eprintln!(
410+
"[duckdb] WRITTEN {now} | {table_name} | {op_label} | rows: {rows_current} | total: {rows_total}"
411+
);
412+
}
413+
393414
Ok(())
394415
}
395416
}

crates/test-framework/src/lib.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,32 @@ pub use rustls;
4141
pub enum Scenario {
4242
#[allow(clippy::upper_case_acronyms)]
4343
TPCH,
44+
45+
#[value(name = "tpch-0.1u-0.1d")]
46+
TPCHWithMutations,
47+
48+
#[value(name = "tpch-0.00003")]
49+
TPCHWithMutations2,
50+
51+
#[value(name = "tpch-old")]
52+
TPCHWithMutationsOld,
53+
54+
#[value(name = "tpch-old-0.00003")]
55+
TPCHWithMutationsOldMutations,
56+
57+
#[value(name = "philip")]
58+
Philip,
4459
}
4560

4661
impl Display for Scenario {
4762
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4863
match self {
4964
Scenario::TPCH => write!(f, "tpch"),
65+
Scenario::TPCHWithMutations => write!(f, "tpch-0.1u-0.1d"),
66+
Scenario::TPCHWithMutations2 => write!(f, "tpch-0.00003"),
67+
Scenario::TPCHWithMutationsOld => write!(f, "tpch-old"),
68+
Scenario::TPCHWithMutationsOldMutations => write!(f, "tpch-old-0.00003"),
69+
Scenario::Philip => write!(f, "philip"),
5070
}
5171
}
5272
}
@@ -56,12 +76,22 @@ impl Scenario {
5676
pub fn load_query_set(&self) -> anyhow::Result<QuerySet> {
5777
match self {
5878
Scenario::TPCH => Ok(QuerySet::Tpch),
79+
Scenario::TPCHWithMutations => Ok(QuerySet::Tpch),
80+
Scenario::TPCHWithMutations2 => Ok(QuerySet::Tpch),
81+
Scenario::TPCHWithMutationsOld => Ok(QuerySet::Tpch),
82+
Scenario::TPCHWithMutationsOldMutations => Ok(QuerySet::Tpch),
83+
Scenario::Philip => Ok(QuerySet::Tpch),
5984
}
6085
}
6186

6287
pub fn end_condition(&self) -> EndCondition {
6388
match self {
6489
Scenario::TPCH => EndCondition::Unlimited,
90+
Scenario::TPCHWithMutations => EndCondition::Unlimited,
91+
Scenario::TPCHWithMutations2 => EndCondition::Unlimited,
92+
Scenario::TPCHWithMutationsOld => EndCondition::Unlimited,
93+
Scenario::TPCHWithMutationsOldMutations => EndCondition::Unlimited,
94+
Scenario::Philip => EndCondition::Unlimited,
6595
}
6696
}
6797
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
select * from (
2+
select 'customer' as tbl, count(*) as cnt from customer
3+
union all select 'orders', count(*) from orders
4+
union all select 'lineitem', count(*) from lineitem
5+
union all select 'supplier', count(*) from supplier
6+
union all select 'nation', count(*) from nation
7+
union all select 'part', count(*) from part
8+
union all select 'partsupp', count(*) from partsupp
9+
union all select 'region', count(*) from region
10+
11+
)
12+
13+
order by cnt desc

src/commands/adbc_executor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ impl QueryExecutor for AdbcDirectQueryExecutor {
6868
.map(arrow::array::RecordBatch::num_rows)
6969
.sum();
7070

71+
println!("[query]: {} - {:?}", query.name, duration);
72+
7173
Ok(ExecutionResult {
7274
duration,
7375
row_count,

0 commit comments

Comments
 (0)