Skip to content

Commit 5cb04e1

Browse files
Add etl_type telemetry dimension (events/changes) (#218)
* Add etl_type telemetry dimension (events/changes) Emit an etl_type dimension on all benchmark telemetry to distinguish append-only (events) from mutation (changes) runs. - Derive etl_type from VersionMetadata mutation ratios - Add as OTel resource attribute and per-metric attribute - Include in system adapter setup metadata - Rename workflow dropdown from append-only/mutations to events/changes - Add ETL Type dropdown to Grafana dashboard (All/events/changes) - Filter all 20 dashboard panels by etl_type (same pattern as outcome) * chore: auto-fix cargo fmt + clippy --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 972dbcd commit 5cb04e1

7 files changed

Lines changed: 89 additions & 38 deletions

File tree

.github/workflows/run_spicebench.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ on:
2121
etl_type:
2222
description: 'ETL type'
2323
required: true
24-
default: 'append-only'
24+
default: 'events'
2525
type: choice
2626
options:
27-
- append-only
28-
- mutations
27+
- events
28+
- changes
2929
scale_factor:
3030
description: 'Scale Factor'
3131
required: true
@@ -223,7 +223,7 @@ jobs:
223223
SYSTEM_ADAPTER: ${{ github.event.inputs.system_under_test || 'spice_cloud' }}
224224
NUM_QUERY_CLIENTS: '8'
225225
ETL_BUCKET: 'spicebench'
226-
ETL_PREFIX: ${{ github.event.inputs.etl_type == 'mutations' && 'data-gen-mutable' || 'data-gen' }}
226+
ETL_PREFIX: ${{ github.event.inputs.etl_type == 'changes' && 'data-gen-mutable' || 'data-gen' }}
227227
SCALE_FACTOR: ${{ github.event.inputs.scale_factor || '1' }}
228228
ETL_REGION: 'us-east-1'
229229
ETL_SINK: 'adbc'

crates/adbc_client/src/pool.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,7 @@ impl AdbcConnectionManager {
7272
/// comparisons against BIGINT columns fail with `DATATYPE_MISMATCH`
7373
/// unless Int64 literals are explicitly suffixed with `L`.
7474
pub fn bigint_suffix(driver_name: &str) -> bool {
75-
match driver_name {
76-
d if d.eq_ignore_ascii_case("databricks") => true,
77-
_ => false,
78-
}
75+
matches!(driver_name, d if d.eq_ignore_ascii_case("databricks"))
7976
}
8077
}
8178

crates/data-generation/src/version.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,19 @@ impl VersionMetadata {
103103
pub fn mutation_config(&self) -> MutationConfig {
104104
MutationConfig::new(self.mutations.update_ratio, self.mutations.delete_ratio)
105105
}
106+
107+
/// Returns the ETL type based on the mutation configuration.
108+
///
109+
/// - `"events"` — append-only data (no updates or deletes).
110+
/// - `"changes"` — data with mutations (updates and/or deletes).
111+
#[must_use]
112+
pub fn etl_type(&self) -> &'static str {
113+
if self.mutations.update_ratio == 0.0 && self.mutations.delete_ratio == 0.0 {
114+
"events"
115+
} else {
116+
"changes"
117+
}
118+
}
106119
}
107120

108121
/// Mutation configuration stored in version metadata.

crates/etl/src/sink/adbc.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ const ADBC_BULK_INGEST_STREAM_BUFFER_ENV: &str = "SPICEBENCH_ADBC_BULK_INGEST_ST
6767
/// - `statement` — row-by-row `UPDATE … SET … WHERE …` statements (default)
6868
/// - `staging_table` — bulk ingest into temp staging table + single `MERGE INTO`
6969
/// - `bulk_ingest_upsert` — bulk ingest directly into the target table (relies on the
70-
/// target system's `on_conflict: upsert` or equivalent to merge)
70+
/// target system's `on_conflict: upsert` or equivalent to merge)
7171
const ADBC_UPDATE_STRATEGY_ENV: &str = "SPICEBENCH_ADBC_UPDATE_STRATEGY";
7272

7373
/// Strategy for executing UPDATE operations.
@@ -482,9 +482,11 @@ impl AdbcSink {
482482
.collect::<anyhow::Result<Vec<_>>>()?
483483
.join(", ");
484484

485-
let partition_clause = (!partition_by.is_empty())
486-
.then(|| format!("PARTITION BY ({})", partition_by.join(", ")))
487-
.unwrap_or_default();
485+
let partition_clause = if !partition_by.is_empty() {
486+
format!("PARTITION BY ({})", partition_by.join(", "))
487+
} else {
488+
String::new()
489+
};
488490

489491
let primary_key_statement = if !primary_keys.is_empty() {
490492
let key_idents: Vec<String> = primary_keys
@@ -510,7 +512,7 @@ impl AdbcSink {
510512
.iter()
511513
.map(|(table_name, config)| {
512514
self.create_table_sql(
513-
&table_name,
515+
table_name,
514516
config.schema.as_ref(),
515517
config.partition_columns.clone(),
516518
&config.primary_key_columns,
@@ -924,6 +926,7 @@ impl AdbcSink {
924926
))
925927
}
926928

929+
#[expect(clippy::too_many_arguments)]
927930
fn update_sql_for_row(
928931
&self,
929932
table_name: &str,

0 commit comments

Comments
 (0)