Skip to content

Commit 5c5b02e

Browse files
feat(task-history): record Ballista stages for distributed queries (spiceai#10831)
* feat(task-history): capture distributed query observability For distributed Ballista queries, the task_history table now records the full parent + per-stage tree: sql_query (parent) duration, error, datasets, summary labels └── ballista_stage (child) stage_id, executors, task_count, stage timing, plan tree Previously the task_history span was created in submit_distributed_internal but never instrumented across the job's lifetime — it dropped at submission time with duration ≈ 0 and no error_message. Stage/executor detail was not recorded at all. Implementation: - QueryHandle owns the sql_query span behind Arc<Mutex<Option<Span>>>. spawn_finalize *takes* it (rather than cloning), so the OTel span closes when the spawned finalize future drops its instrumented clone — capturing the query's runtime, not arbitrary post-completion handle lifetime. - New stage_history module walks the in-process ExecutionGraph at job completion and emits one child ballista_stage span per stage via the existing OTel pipeline. Stage labels include executor_histogram, slowest_task_ms, partitions, attempt_num, total_executor_ms. Stage input is the plan rendered with ExplainFormat::Tree. - QueryHandle::cancel() now finalizes the tracker with JobCancelled before returning, so callers that cancel() then drop the handle record an accurate cancellation row instead of the Drop guard's client-disconnected fallback. - Drop guard finalizes orphaned handles: cancels the cancel_token sync, then spawns a finalize task that also calls scheduler.cancel_job so executors don't keep running an unobserved query. - Errors raised before QueryHandle creation (planning/validation/submission) emit tracing::error on the parent span so the row's error_message is populated (mirrors the sync run_internal path). Tests use a process-wide AsyncMutex (TEST_LOCK) to serialize against shared DF_SLOT, condition-driven wait_for_row_count polling instead of fixed sleeps, and parent.job_id-scoped assertions so other tests' task_history rows in the same binary can't pollute our row counts. Requires the matching Ballista fork PR (#38, merged onto spiceai-52.5 as 8afc1b74a605) which persists executor_id on terminal TaskInfo and bumps get_job_execution_graph to pub. * Bump ballista to pick up stuck-query detection (spiceai/datafusion-ballista#39) Updates the ballista pin from 8afc1b74 to 7e9872a5 (the merge commit of spiceai/datafusion-ballista#39) so the next cluster bench run picks up the new operator-facing warning when a distributed query stops making progress. Surgical lockfile update: only the three ballista source URLs change. Vortex stays pinned at the spiceai-52 commit (c536c9ae) the workspace was already using; without this, running cargo update -p ballista-* re-resolves the branch-tracked vortex transitive and bumps it to a newer rev whose arrow-rs version conflicts with the pinned datafusion-table-providers (CI symptom: no method as_list on dyn Array, Arc<Schema>: From<Schema> not satisfied, in adbc compilation). Verified: cargo check -p runtime --offline and cargo check -p spiced --features adbc --offline both clean. * feat(task-history): pluggable span middleware + Ballista stage timeline rendering Introduces two trait-based extension points on the task_history OTel exporter and removes the hardcoded `ballista_stage` branches that lived in the exporter: - `SpanTransform`: mutates a `SpanData` before conversion to a row. Implementors can adjust timestamps, inject attributes, redact fields, etc. - `SpanRetention`: declares a retention dependency between sibling spans in a batch. Returning `Some(parent_span_id)` from `parent_dependency(span)` means "keep this span iff that span is kept by base rules" — expressing the parent/child relationship directly instead of hardcoding it. `TaskHistoryExporter` now stores `Vec<Arc<dyn SpanTransform>>` and `Vec<Arc<dyn SpanRetention>>` with `.with_transform` / `.with_retention` builder methods. The exporter no longer mentions any specific span type — base retention checks PLAN_CAPTURE_LABEL and `min_sql_duration_ms`, retention rules then override per-span. Concrete consumer: `BallistaStageMiddleware` in `crates/runtime/src/datafusion/query/stage_history.rs` implements both traits: - As a `SpanTransform`, it rewrites `start_time` / `end_time` on `ballista_stage` spans using the `stage_started_at` / `stage_ended_at` attributes (millis since UNIX epoch), so the task_history row reflects the actual stage execution window. This is what makes per-stage execution visible on the timeline view. - As a `SpanRetention`, it declares that a `ballista_stage` row depends on its parent `sql_query` row, replacing the prior "if task.as_ref() == ballista_stage" branch in the exporter. `BallistaStageMiddleware::pair()` returns a single instance reused as both trait objects so callers register one Arc in two slots; the three exporter construction sites (production tracing, generic test util, distributed_task_history test) all use it. Includes unit tests covering both hooks: time override on the matching span name, no-op on non-matching names and missing/inverted/zero attributes; retention reports the parent dependency for stage spans and abstains for non-stage spans and orphan stages. * feat(observability): propagate request trace context across distributed query boundaries (spiceai#10896) Issue spiceai#10202. Distributed query executors no longer create a fresh internal request context; they inherit the originating request's protocol and W3C trace ids so executor-side metrics, telemetry dimensions, and any future task_history rows correlate end-to-end with the scheduler. * New `SpiceRequestContextConfig` (`spice_ctx` prefix) carries `protocol`, `trace_id`, and `span_id` through DataFusion's `ConfigExtension` mechanism — round-tripped opaquely by Ballista as `TaskDefinition` config props, with zero fork changes. * Registered as a default option extension at all 3 config_producer / session_builder sites in `runtime/src/cluster/mod.rs`. The scheduler session_builder now reads any `SpiceRequestContextConfig` set on the per-job session config and re-injects it on the built session config (it previously ignored its `_cfg` argument). * `Query::submit_distributed_internal` populates the extension from the current `RequestContext` before `create_or_update_session`. * `resolve_request_context(TaskContext)` shared helper establishes the canonical lookup order: typed `Arc<RequestContext>` extension → config extension → `Protocol::Internal` fallback (or `None`). * `BytesProcessedExec::execute` switched to the helper. Same panic behavior preserved when no context is found and `fallback_to_new_context` is off. * `FlightSqlExec` gains an optional `trace_parent: Option<String>` field with a `with_trace_parent` builder. `execute()` sets it as a `traceparent` gRPC metadata header (via `FlightSqlServiceClient::set_header`), falling back to formatting from the typed `RequestContext` extension on the `TaskContext` session config when not preset. `PartialAggregationFlightSqlExec` inherits and forwards on its own `execute()`. * W3C span semantics preserved: the propagated `span_id` is the sender's current span and becomes the parent of any new spans the receiver creates — task_history's existing `parent_span_id` chain extends naturally one level per executor hop. Tests: 4 config extension roundtrip + 4 resolver lookup-order + 1 `BytesProcessedExec` integration test asserting metric dimensions carry the correct protocol when only the config extension is present. * chore(deps): bump datafusion-ballista to c25e25b9 (spiceai-52.5) * fix(flightsql): propagate traceparent in pushed-down aggregate exec PartialAggregationFlightSqlExec was sending no traceparent header when its source FlightSqlExec was built by the aggregate-pushdown optimizer (which leaves trace_parent unset). Fall back to TaskContext like the non-pushed FlightSqlExec path does, so pushed-down aggregate calls preserve the same propagation behavior. Also picks up fmt drift in two trace-context files merged via spiceai#10896.
1 parent c6b06dc commit 5c5b02e

20 files changed

Lines changed: 2402 additions & 362 deletions

File tree

Cargo.lock

Lines changed: 467 additions & 306 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,9 +426,9 @@ datafusion-substrait = { git = "https://github.com/spiceai/datafusion.git", rev
426426

427427
datafusion-table-providers = { git = "https://github.com/datafusion-contrib/datafusion-table-providers.git", rev = "b798c391b6566c172d44361f8acc8472c958ca75" } # spiceai-52
428428

429-
ballista-core = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "383e165a080d648c313a2530a3a53eae6077fdba" } # spiceai-52.5
430-
ballista-executor = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "383e165a080d648c313a2530a3a53eae6077fdba" } # spiceai-52.5
431-
ballista-scheduler = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "383e165a080d648c313a2530a3a53eae6077fdba" } # spiceai-52.5
429+
ballista-core = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "c25e25b9e88c50404e230c698a35cbde2fdd30ae" } # spiceai-52.5
430+
ballista-executor = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "c25e25b9e88c50404e230c698a35cbde2fdd30ae" } # spiceai-52.5
431+
ballista-scheduler = { git = "https://github.com/spiceai/datafusion-ballista.git", rev = "c25e25b9e88c50404e230c698a35cbde2fdd30ae" } # spiceai-52.5
432432

433433
delta_kernel = { git = "https://github.com/spiceai/delta-kernel-rs.git", rev = "47034733a0477f72e4f6abbbf6a27d0da069860a" } # spiceai-0.18.2
434434

bin/spiced/src/tracing.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ where
243243
Some(format!("{host}:{port}").into())
244244
});
245245

246+
let (ballista_transform, ballista_retention) =
247+
runtime::datafusion::query::stage_history::BallistaStageMiddleware::pair();
246248
let task_history_exporter = task_history::otel_exporter::TaskHistoryExporter::new(
247249
df,
248250
captured_output,
@@ -251,7 +253,9 @@ where
251253
captured_plan,
252254
min_plan_duration_ms,
253255
node_id,
254-
);
256+
)
257+
.with_transform(ballista_transform)
258+
.with_retention(ballista_retention);
255259

256260
let zipkin_exporter = zipkin_task_history_otel_exporter(config).await?;
257261

crates/data_components/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ rdkafka = { workspace = true, features = ["ssl-vendored"], optional = true }
8888
regex.workspace = true
8989
reqwest.workspace = true
9090
runtime-rate-control = { path = "../runtime-rate-control" }
91+
runtime-request-context = { path = "../runtime-request-context" }
9192
rusqlite = { workspace = true, optional = true }
9293
elasticsearch = { path = "../elasticsearch", optional = true }
9394
s3_vectors = { path = "../s3_vectors", optional = true }

crates/data_components/src/flightsql.rs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,23 @@ use datafusion::{
5757
},
5858
sql::TableReference,
5959
};
60+
use runtime_request_context::RequestContext;
6061
use tonic::codegen::Bytes;
6162
use tonic::transport::{Channel, channel};
6263

6364
use crate::Read;
6465

66+
/// Build a W3C `traceparent` header value (`00-{trace_id}-{span_id}-01`)
67+
/// from the typed `Arc<RequestContext>` extension on the
68+
/// [`TaskContext`]'s session config, if one is present. The fixed `01`
69+
/// trace flag marks the trace as sampled.
70+
#[must_use]
71+
pub fn trace_parent_from_task_context(context: &TaskContext) -> Option<String> {
72+
let request_context = context.session_config().get_extension::<RequestContext>()?;
73+
let tp = request_context.trace_parent().as_ref()?;
74+
Some(format!("00-{}-{}-01", tp.trace_id, tp.span_id))
75+
}
76+
6577
pub mod federation;
6678

6779
#[derive(Debug, Snafu)]
@@ -432,6 +444,12 @@ pub struct FlightSqlExec {
432444
properties: PlanProperties,
433445
cookie_store: Arc<CookieStore>,
434446
metrics: ExecutionPlanMetricsSet,
447+
/// Optional W3C `traceparent` value (e.g. `00-{trace_id}-{span_id}-01`)
448+
/// to attach as a gRPC metadata header on outgoing `execute()` and
449+
/// `do_get()` calls. When `None`, `execute()` falls back to reading
450+
/// the typed `Arc<RequestContext>` extension from the `TaskContext`
451+
/// session config and constructs a header from its `trace_parent()`.
452+
trace_parent: Option<String>,
435453
}
436454

437455
impl FlightSqlExec {
@@ -460,9 +478,27 @@ impl FlightSqlExec {
460478
),
461479
cookie_store,
462480
metrics: ExecutionPlanMetricsSet::new(),
481+
trace_parent: None,
463482
})
464483
}
465484

485+
/// Set an explicit W3C `traceparent` header value to forward on each
486+
/// outgoing `FlightSQL` call. Useful when the plan-creation path has
487+
/// access to an `Arc<RequestContext>` but the executor-side
488+
/// `TaskContext` will not (e.g. when this `ExecutionPlan` is shipped
489+
/// to a remote executor via Ballista codecs).
490+
#[must_use]
491+
pub fn with_trace_parent(mut self, trace_parent: Option<String>) -> Self {
492+
self.trace_parent = trace_parent;
493+
self
494+
}
495+
496+
/// Returns the currently configured W3C `traceparent` value, if any.
497+
#[must_use]
498+
pub fn trace_parent(&self) -> Option<&str> {
499+
self.trace_parent.as_deref()
500+
}
501+
466502
/// Returns a reference to the underlying `FlightSqlClient`.
467503
#[must_use]
468504
pub fn client(&self) -> &FlightSqlClient {
@@ -655,6 +691,7 @@ impl ExecutionPlan for FlightSqlExec {
655691
),
656692
cookie_store: Arc::clone(&self.cookie_store),
657693
metrics: ExecutionPlanMetricsSet::new(),
694+
trace_parent: self.trace_parent.clone(),
658695
};
659696

660697
Ok(SortOrderPushdownResult::Exact {
@@ -665,7 +702,7 @@ impl ExecutionPlan for FlightSqlExec {
665702
fn execute(
666703
&self,
667704
partition: usize,
668-
_context: Arc<TaskContext>,
705+
context: Arc<TaskContext>,
669706
) -> DataFusionResult<SendableRecordBatchStream> {
670707
let sql = self.sql().map_err(to_execution_error)?;
671708
let target_schema = self.schema();
@@ -676,9 +713,19 @@ impl ExecutionPlan for FlightSqlExec {
676713

677714
let baseline = datafusion::common::instant::Instant::now();
678715

679-
let inner = query_to_stream(self.client.clone(), sql, Arc::clone(&self.cookie_store)).map(
680-
move |result| result.and_then(|batch| coerce_batch_to_schema(&batch, &target_schema)),
681-
);
716+
let mut client = self.client.clone();
717+
let trace_parent = self
718+
.trace_parent
719+
.clone()
720+
.or_else(|| trace_parent_from_task_context(&context));
721+
if let Some(value) = trace_parent {
722+
client.set_header("traceparent", value);
723+
}
724+
725+
let inner =
726+
query_to_stream(client, sql, Arc::clone(&self.cookie_store)).map(move |result| {
727+
result.and_then(|batch| coerce_batch_to_schema(&batch, &target_schema))
728+
});
682729

683730
let timed_stream = stream! {
684731
futures::pin_mut!(inner);
@@ -727,6 +774,7 @@ impl ExecutionPlan for FlightSqlExec {
727774
properties: self.properties.clone(),
728775
cookie_store: Arc::clone(&self.cookie_store),
729776
metrics: ExecutionPlanMetricsSet::new(),
777+
trace_parent: self.trace_parent.clone(),
730778
};
731779

732780
Some(Arc::new(new_plan))

crates/datafusion-optimizer-rules/src/physical_plan/flightsql/exec.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use datafusion::physical_plan::{
4040
};
4141
use datafusion::sql::TableReference;
4242

43-
use data_components::flightsql::{FlightSqlClient, FlightSqlExec, query_to_stream};
43+
use data_components::flightsql::{
44+
FlightSqlClient, FlightSqlExec, query_to_stream, trace_parent_from_task_context,
45+
};
4446
use data_components::sql_expr::to_sql_preserving_precedence;
4547
use flight_client::cookie::CookieStore;
4648
use futures::StreamExt;
@@ -153,6 +155,11 @@ pub struct PartialAggregationFlightSqlExec {
153155
cookie_store: Arc<CookieStore>,
154156
/// Cached plan properties.
155157
properties: PlanProperties,
158+
/// Optional W3C `traceparent` value inherited from the source
159+
/// `FlightSqlExec`. Forwarded as a gRPC metadata header on each
160+
/// outgoing call so executor-side spans chain back to the originating
161+
/// request.
162+
trace_parent: Option<String>,
156163
}
157164

158165
impl PartialAggregationFlightSqlExec {
@@ -181,6 +188,7 @@ impl PartialAggregationFlightSqlExec {
181188
client: source.client().clone(),
182189
cookie_store: Arc::clone(source.cookie_store()),
183190
properties,
191+
trace_parent: source.trace_parent().map(str::to_string),
184192
}
185193
}
186194

@@ -250,7 +258,7 @@ impl ExecutionPlan for PartialAggregationFlightSqlExec {
250258
fn execute(
251259
&self,
252260
partition: usize,
253-
_context: Arc<TaskContext>,
261+
context: Arc<TaskContext>,
254262
) -> Result<SendableRecordBatchStream> {
255263
if partition != 0 {
256264
return Err(DataFusionError::Execution(format!(
@@ -261,14 +269,20 @@ impl ExecutionPlan for PartialAggregationFlightSqlExec {
261269
let target_schema = self.schema();
262270
let column_mapping = query.column_mapping;
263271

264-
let stream = query_to_stream(
265-
self.client.clone(),
266-
query.sql,
267-
Arc::clone(&self.cookie_store),
268-
)
269-
.map(move |result: std::result::Result<_, DataFusionError>| {
270-
result.and_then(|batch| remap_batch(&batch, &column_mapping, &target_schema))
271-
});
272+
let mut client = self.client.clone();
273+
let trace_parent = self
274+
.trace_parent
275+
.clone()
276+
.or_else(|| trace_parent_from_task_context(&context));
277+
if let Some(value) = trace_parent {
278+
client.set_header("traceparent", value);
279+
}
280+
281+
let stream = query_to_stream(client, query.sql, Arc::clone(&self.cookie_store)).map(
282+
move |result: std::result::Result<_, DataFusionError>| {
283+
result.and_then(|batch| remap_batch(&batch, &column_mapping, &target_schema))
284+
},
285+
);
272286

273287
Ok(Box::pin(RecordBatchStreamAdapter::new(
274288
self.schema(),
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod cluster_config;
2+
pub mod request_context_config;

0 commit comments

Comments
 (0)