Skip to content

Hacky branch for those who want to experiment with end-to-end tracing #9847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 298 additions & 36 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ re_tracing = { path = "crates/utils/re_tracing", version = "=0.24.0-alpha.1", de
re_tuid = { path = "crates/utils/re_tuid", version = "=0.24.0-alpha.1", default-features = false }
re_uri = { path = "crates/utils/re_uri", version = "=0.24.0-alpha.1", default-features = false }
re_video = { path = "crates/utils/re_video", version = "=0.24.0-alpha.1", default-features = false }
# TODO
redap_telemetry = { path = "crates/utils/redap_telemetry", version = "=0.24.0-alpha.1", default-features = false }

# crates/viewer:
re_blueprint_tree = { path = "crates/viewer/re_blueprint_tree", version = "=0.24.0-alpha.1", default-features = false }
Expand Down Expand Up @@ -220,6 +222,7 @@ gltf = "1.1"
half = { version = "2.3.1", features = ["bytemuck"] }
hexasphere = "14.1.0"
home = "=0.5.9"
http = "1.2.0"
image = { version = "0.25", default-features = false }
indent = "0.1"
indexmap = "2.1" # Version chosen to align with other dependencies
Expand Down Expand Up @@ -365,6 +368,17 @@ wgpu = { version = "24.0", default-features = false, features = [
xshell = "0.2.7"



# TODO
opentelemetry = { version = "0.29.0", features = ["metrics"] }
opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] }
opentelemetry-otlp = "0.29.0"
opentelemetry-stdout = "0.29.0"
tracing-opentelemetry = "0.30.0"
tracing-subscriber = { version = "0.3.18", features = ["tracing-log", "fmt", "env-filter"] }
tower-service = "0.3"


# ---------------------------------------------------------------------------------
[profile]

Expand Down
1 change: 1 addition & 0 deletions crates/store/re_datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ futures-util.workspace = true
itertools.workspace = true
tokio-stream.workspace = true
tonic.workspace = true
tracing.workspace = true

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions crates/store/re_datafusion/src/datafusion_connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use datafusion::{catalog::TableProvider, error::DataFusionError};
use tracing::instrument;

use re_grpc_client::redap::RedapClient;
use re_log_types::{external::re_tuid::Tuid, EntryId};
Expand All @@ -23,6 +24,7 @@ impl DataFusionConnector {
}

impl DataFusionConnector {
#[instrument(skip_all, err)]
pub async fn get_entry_list(&mut self) -> Result<Arc<dyn TableProvider>, DataFusionError> {
// TODO(jleibs): Clean this up with better helpers
let entry: EntryDetails = self
Expand All @@ -48,6 +50,7 @@ impl DataFusionConnector {
.await
}

#[instrument(skip(self), err)]
pub async fn get_dataset_entry(
&mut self,
id: Tuid,
Expand All @@ -64,6 +67,7 @@ impl DataFusionConnector {
Ok(entry)
}

#[instrument(skip(self), err)]
pub async fn get_partition_table(
&self,
dataset_id: EntryId,
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_datafusion/src/partition_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use datafusion::{
catalog::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
};
use tracing::instrument;

use re_grpc_client::redap::RedapClient;
use re_log_encoding::codec::wire::decoder::Decode as _;
Expand Down Expand Up @@ -40,6 +41,7 @@ impl PartitionTableProvider {
impl GrpcStreamToTable for PartitionTableProvider {
type GrpcStreamData = ScanPartitionTableResponse;

#[instrument(skip(self), err)]
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
let request = GetPartitionTableSchemaRequest {
dataset_id: Some(self.dataset_id.into()),
Expand All @@ -60,6 +62,7 @@ impl GrpcStreamToTable for PartitionTableProvider {
))
}

#[instrument(skip(self), err)]
async fn send_streaming_request(
&mut self,
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_datafusion/src/search_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use datafusion::{
error::{DataFusionError, Result as DataFusionResult},
};
use tokio_stream::StreamExt as _;
use tracing::instrument;

use re_grpc_client::redap::RedapClient;
use re_log_encoding::codec::wire::decoder::Decode as _;
Expand Down Expand Up @@ -48,6 +49,7 @@ impl SearchResultsTableProvider {
impl GrpcStreamToTable for SearchResultsTableProvider {
type GrpcStreamData = SearchDatasetResponse;

#[instrument(skip(self), err)]
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
let mut request = self.request.clone();
request.scan_parameters = Some(ScanParameters {
Expand Down Expand Up @@ -84,6 +86,7 @@ impl GrpcStreamToTable for SearchResultsTableProvider {
Ok(schema)
}

#[instrument(skip(self), err)]
async fn send_streaming_request(
&mut self,
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
Expand Down
4 changes: 4 additions & 0 deletions crates/store/re_datafusion/src/table_entry_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use datafusion::{
catalog::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
};
use tracing::instrument;

use re_grpc_client::redap::RedapClient;
use re_log_encoding::codec::wire::decoder::Decode as _;
Expand Down Expand Up @@ -41,6 +42,7 @@ impl TableEntryTableProvider {
Ok(GrpcStreamProvider::prepare(self).await?)
}

#[instrument(skip(self), err)]
async fn table_id(&mut self) -> Result<EntryId, DataFusionError> {
if let Some(table_id) = self.table_id {
return Ok(table_id);
Expand Down Expand Up @@ -91,6 +93,7 @@ impl TableEntryTableProvider {
impl GrpcStreamToTable for TableEntryTableProvider {
type GrpcStreamData = ScanTableResponse;

#[instrument(skip(self), err)]
async fn fetch_schema(&mut self) -> DataFusionResult<SchemaRef> {
let request = GetTableSchemaRequest {
table_id: Some(self.table_id().await?.into()),
Expand All @@ -111,6 +114,7 @@ impl GrpcStreamToTable for TableEntryTableProvider {
))
}

#[instrument(skip(self), err)]
async fn send_streaming_request(
&mut self,
) -> DataFusionResult<tonic::Response<tonic::Streaming<Self::GrpcStreamData>>> {
Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_grpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ re_smart_channel.workspace = true
re_sorbet.workspace = true
re_uri.workspace = true

redap_telemetry.workspace = true

async-stream.workspace = true
thiserror.workspace = true
tokio-stream.workspace = true
Expand Down
29 changes: 15 additions & 14 deletions crates/store/re_grpc_client/src/redap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use re_protos::{
};
use re_uri::{DatasetDataUri, Origin};

use redap_telemetry::external::{tower, tower_http};

use crate::{spawn_future, StreamError, MAX_DECODING_MESSAGE_SIZE};

pub enum Command {
Expand Down Expand Up @@ -115,6 +117,7 @@ pub async fn channel(origin: Origin) -> Result<tonic::transport::Channel, Connec
}
}

// TODO: do web too, somehow
#[cfg(target_arch = "wasm32")]
pub type RedapClient = FrontendServiceClient<tonic_web_wasm_client::Client>;

Expand All @@ -127,26 +130,24 @@ pub async fn client(
}

#[cfg(not(target_arch = "wasm32"))]
pub type RedapClient = FrontendServiceClient<tonic::transport::Channel>;
// TODO(cmc): figure out how we integrate redap_telemetry in mainline Rerun
// pub type RedapClient = FrontendServiceClient<
// tower_http::trace::Trace<
// tonic::service::interceptor::InterceptedService<
// tonic::transport::Channel,
// redap_telemetry::TracingInjectorInterceptor,
// >,
// tower_http::classify::SharedClassifier<tower_http::classify::GrpcErrorsAsFailures>,
// >,
// >;
pub type RedapClient = FrontendServiceClient<
tower_http::trace::Trace<
tonic::service::interceptor::InterceptedService<
tonic::transport::Channel,
redap_telemetry::TracingInjectorInterceptor,
>,
tower_http::classify::SharedClassifier<tower_http::classify::GrpcErrorsAsFailures>,
redap_telemetry::GrpcSpanMaker,
>,
>;

#[cfg(not(target_arch = "wasm32"))]
pub async fn client(origin: Origin) -> Result<RedapClient, ConnectionError> {
let channel = channel(origin).await?;

let middlewares = tower::ServiceBuilder::new()
// TODO(cmc): figure out how we integrate redap_telemetry in mainline Rerun
// .layer(redap_telemetry::new_grpc_tracing_layer())
// .layer(redap_telemetry::TracingInjectorInterceptor::new_layer())
.layer(redap_telemetry::new_grpc_tracing_layer())
.layer(redap_telemetry::TracingInjectorInterceptor::new_layer())
.into_inner();

let svc = tower::ServiceBuilder::new()
Expand Down
38 changes: 38 additions & 0 deletions crates/utils/redap_telemetry/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "redap_telemetry"
authors.workspace = true
description = "Rerun's analytics SDK"
edition.workspace = true
homepage.workspace = true
include.workspace = true
license.workspace = true
publish = true
readme = "README.md"
repository.workspace = true
rust-version.workspace = true
version.workspace = true


[dependencies]

# External
anyhow.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
http.workspace = true
opentelemetry = { workspace = true, features = ["metrics"] }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
tonic.workspace = true
tower-http = { workspace = true, features = ["trace"] }
tower.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "json"] }
tracing.workspace = true


[dev-dependencies]
rand.workspace = true
tokio.workspace = true

[lints]
workspace = true
3 changes: 3 additions & 0 deletions crates/utils/redap_telemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# redap-telemetry

Part of the [`dataplatform`](https://github.com/rerun-io/dataplatform) family of crates.
94 changes: 94 additions & 0 deletions crates/utils/redap_telemetry/examples/basics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! Example of using our telemetry tools.
//!
//! Usage:
//! * Start the telemetry stack: `pixi run compose`
//! * Run this example: `cargo r --example basics`
//! * Go to <http://localhost:16686/search> to explore the logs and traces.
//! * Go to <http://localhost:9090/query> to explore the metrics.
//! * Check out <http://localhost:9090/api/v1/label/__name__/values> to list all available metrics.
//! * Try e.g. this query: `sum(is_even_histogram_bucket) by (le)`

use tracing::{Instrument as _, instrument};

// ---

#[instrument(err)]
async fn is_even(i: i32) -> anyhow::Result<()> {
simulate_latency().await;
anyhow::ensure!(i % 2 == 0, "oh no, `i` is odd!!");
Ok(())
}

#[tokio::main]
async fn main() {
// Safety: anything touching the env is unsafe, tis what it is.
#[expect(unsafe_code)]
unsafe {
std::env::set_var("OTEL_SERVICE_NAME", "redap-telemetry-example");
}

use clap::Parser as _;
// Take a look at `TelemetryArgs` to learn more about all the configurable things.
let args = redap_telemetry::TelemetryArgs::parse_from(std::env::args());

// This is the complete telemetry pipeline. Everything will be flushed when this gets dropped.
let _telemetry =
redap_telemetry::Telemetry::init(args, redap_telemetry::TelemetryDropBehavior::Shutdown);

let scope = opentelemetry::InstrumentationScope::builder("redap-telemetry-example").build();
let metrics = opentelemetry::global::meter_with_scope(scope);

let is_even_histogram = metrics
.f64_histogram("is_even_histogram")
.with_description("Latency percentiles for `is_even`")
.with_boundaries(vec![
10.0, 20.0, 30.0, 40.0, 60.0, 80.0, 100.0, 200.0, 400.0, 1000.0,
])
.build();

for batch in [0..20, 20..40, 40..60] {
let span = tracing::info_span!("main_loop", ?batch);
async {
for i in batch.clone() {
let now = tokio::time::Instant::now();

if let Err(err) = is_even(i).await {
tracing::error!(%err, i, "not even!");
} else {
tracing::info!(i, "is even!");
}

is_even_histogram.record(
now.elapsed().as_millis() as _,
&[opentelemetry::KeyValue::new("batch", format!("{batch:?}"))],
);
}
}
.instrument(span) // instrumenting async scopes is tricky!
.await;
}
}

// ---

async fn simulate_latency() {
use rand::Rng as _;
let p: u16 = rand::thread_rng().gen_range(1..=1000);

// p70: 10ms
// p80: 15ms
// p90: 30ms
// p95: 50ms
// p99: 70ms
// p999: 150ms
let delay_ms = match p {
1..=700 => 10,
701..=800 => 15,
801..=900 => 30,
901..=950 => 50,
951..=990 => 70,
_ => 150,
};

tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
}
Loading
Loading