From e20443bc0cd97c22cca84c4e850c1f7d966e73c5 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 30 Apr 2025 12:39:18 +0200 Subject: [PATCH 1/5] import redap_telemetry --- Cargo.lock | 334 ++++++++++++++++-- Cargo.toml | 14 + crates/utils/redap_telemetry/Cargo.toml | 38 ++ crates/utils/redap_telemetry/README.md | 3 + .../utils/redap_telemetry/examples/basics.rs | 94 +++++ crates/utils/redap_telemetry/src/args.rs | 157 ++++++++ crates/utils/redap_telemetry/src/grpc.rs | 127 +++++++ crates/utils/redap_telemetry/src/lib.rs | 163 +++++++++ crates/utils/redap_telemetry/src/telemetry.rs | 307 ++++++++++++++++ 9 files changed, 1201 insertions(+), 36 deletions(-) create mode 100644 crates/utils/redap_telemetry/Cargo.toml create mode 100644 crates/utils/redap_telemetry/README.md create mode 100644 crates/utils/redap_telemetry/examples/basics.rs create mode 100644 crates/utils/redap_telemetry/src/args.rs create mode 100644 crates/utils/redap_telemetry/src/grpc.rs create mode 100644 crates/utils/redap_telemetry/src/lib.rs create mode 100644 crates/utils/redap_telemetry/src/telemetry.rs diff --git a/Cargo.lock b/Cargo.lock index 1f95e64076e6..f0010a5dde92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -623,7 +623,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -670,7 +670,7 @@ dependencies = [ "enumflags2", "futures-channel", "futures-util", - "rand", + "rand 0.8.5", "raw-window-handle", "serde", "serde_repr", @@ -1221,7 +1221,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c" dependencies = [ "memchr", - "regex-automata", + "regex-automata 0.4.8", "serde", ] @@ -2169,7 +2169,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.8.5", "regex", "sqlparser", "tempfile", @@ -2282,7 +2282,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "rand", + "rand 0.8.5", "tokio", "tokio-util", "url", @@ -2310,7 +2310,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.8.5", "tempfile", "url", ] @@ -2390,7 +2390,7 @@ dependencies = [ "itertools 0.14.0", "log", "md-5", - "rand", + "rand 0.8.5", "regex", "sha2", "unicode-segmentation", @@ -2522,7 +2522,7 @@ dependencies = [ "log", "recursive", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -2746,7 +2746,7 @@ name = "dna" version = "0.24.0-alpha.1+dev" dependencies = [ "itertools 0.14.0", - "rand", + "rand 0.8.5", "rerun", ] @@ -4314,7 +4314,7 @@ version = "0.24.0-alpha.1+dev" dependencies = [ "anyhow", "clap", - "rand", + "rand 0.8.5", "rerun", ] @@ -4882,6 +4882,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -5143,7 +5152,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f093b3db6fd194718dcdeea6bd8c829417deae904e3fcc7732dabcd4416d25d8" dependencies = [ "ndarray", - "rand", + "rand 0.8.5", "rand_distr", ] @@ -5239,6 +5248,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.3" @@ -5621,7 +5640,7 @@ dependencies = [ "parking_lot", "percent-encoding", "quick-xml 0.37.2", - "rand", + "rand 0.8.5", "reqwest", "ring", "serde", @@ -5665,6 +5684,86 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.7", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" +dependencies = [ + "futures-core", + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 2.0.7", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.9.1", + "serde_json", + "thiserror 2.0.7", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -5708,6 +5807,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "owned_ttf_parser" version = "0.25.0" @@ -5930,7 +6035,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -6003,7 +6108,7 @@ version = "0.24.0-alpha.1+dev" dependencies = [ "anyhow", "clap", - "rand", + "rand 0.8.5", "rand_distr", "re_log", "rerun", @@ -6427,7 +6532,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" dependencies = [ "bytes", - "rand", + "rand 0.8.5", "ring", "rustc-hash 2.0.0", "rustls", @@ -6473,8 +6578,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -6484,7 +6599,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -6496,6 +6621,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", +] + [[package]] name = "rand_distr" version = "0.4.3" @@ -6503,7 +6637,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" dependencies = [ "num-traits", - "rand", + "rand 0.8.5", ] [[package]] @@ -6609,7 +6743,7 @@ version = "0.24.0-alpha.1+dev" dependencies = [ "base64 0.22.1", "jsonwebtoken", - "rand", + "rand 0.8.5", "re_log", "serde", "thiserror 1.0.65", @@ -6700,7 +6834,7 @@ dependencies = [ "half", "itertools 0.14.0", "nohash-hasher", - "rand", + "rand 0.8.5", "re_arrow_util", "re_byte_size", "re_error", @@ -6730,7 +6864,7 @@ dependencies = [ "nohash-hasher", "once_cell", "parking_lot", - "rand", + "rand 0.8.5", "re_arrow_util", "re_byte_size", "re_chunk", @@ -6981,6 +7115,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tracing", "wasm-bindgen-futures", ] @@ -7082,6 +7217,7 @@ dependencies = [ "re_smart_channel", "re_sorbet", "re_uri", + "redap_telemetry", "thiserror 1.0.65", "tokio", "tokio-stream", @@ -7306,7 +7442,7 @@ dependencies = [ "nohash-hasher", "parking_lot", "paste", - "rand", + "rand 0.8.5", "re_arrow_util", "re_byte_size", "re_chunk", @@ -7437,7 +7573,7 @@ dependencies = [ "image", "itertools 0.14.0", "pollster 0.4.0", - "rand", + "rand 0.8.5", "re_log", "re_math", "re_renderer", @@ -7565,7 +7701,7 @@ dependencies = [ "itertools 0.14.0", "nohash-hasher", "once_cell", - "rand", + "rand 0.8.5", "re_chunk_store", "re_context_menu", "re_data_ui", @@ -7605,7 +7741,7 @@ dependencies = [ "document-features", "getrandom 0.2.15", "once_cell", - "rand", + "rand 0.8.5", "re_byte_size", "serde", "web-time", @@ -7727,7 +7863,7 @@ dependencies = [ "itertools 0.14.0", "once_cell", "parking_lot", - "rand", + "rand 0.8.5", "re_arrow_util", "re_entity_db", "re_format", @@ -8262,6 +8398,26 @@ dependencies = [ "syn 2.0.99", ] +[[package]] +name = "redap_telemetry" +version = "0.24.0-alpha.1+dev" +dependencies = [ + "anyhow", + "clap", + "http", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", + "rand 0.8.5", + "tokio", + "tonic", + "tower 0.5.2", + "tower-http", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -8310,8 +8466,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.8", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -8322,9 +8487,15 @@ checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -8354,6 +8525,7 @@ checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" dependencies = [ "base64 0.22.1", "bytes", + "futures-channel", "futures-core", "futures-util", "h2", @@ -8500,6 +8672,7 @@ version = "0.24.0-alpha.1+dev" dependencies = [ "arrow", "chrono", + "clap", "crossbeam", "datafusion", "datafusion-ffi", @@ -8514,7 +8687,7 @@ dependencies = [ "prost-types", "pyo3", "pyo3-build-config", - "rand", + "rand 0.8.5", "re_arrow_util", "re_build_info", "re_build_tools", @@ -8535,10 +8708,12 @@ dependencies = [ "re_uri", "re_video", "re_web_viewer_server", + "redap_telemetry", "thiserror 1.0.65", "tokio", "tokio-stream", "tonic", + "tracing", "url", "uuid", ] @@ -9177,6 +9352,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shared_recording" version = "0.24.0-alpha.1+dev" @@ -9379,7 +9563,7 @@ version = "0.24.0-alpha.1+dev" dependencies = [ "itertools 0.14.0", "ndarray", - "rand", + "rand 0.8.5", "rand_distr", "re_build_tools", "rerun", @@ -9650,7 +9834,7 @@ dependencies = [ "itertools 0.14.0", "ndarray", "ndarray-rand", - "rand", + "rand 0.8.5", "re_log", "rerun", ] @@ -9660,7 +9844,7 @@ name = "test_data_density_graph" version = "0.24.0-alpha.1+dev" dependencies = [ "anyhow", - "rand", + "rand 0.8.5", "re_log", "rerun", ] @@ -9753,6 +9937,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "thrift" version = "0.17.0" @@ -10117,7 +10311,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -10155,6 +10349,7 @@ dependencies = [ "pin-project-lite", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -10199,6 +10394,67 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd8e764bd6f5813fd8bebc3117875190c5b0415be8f7f8059bffb6ecd979c444" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", ] [[package]] @@ -10434,6 +10690,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vec1" version = "1.12.1" @@ -11665,7 +11927,7 @@ dependencies = [ "hex", "nix", "ordered-stream", - "rand", + "rand 0.8.5", "serde", "serde_repr", "sha1", diff --git a/Cargo.toml b/Cargo.toml index 53c4ddedeafd..f8b3cbef5b77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,6 +105,8 @@ re_tracing = { path = "crates/utils/re_tracing", version = "=0.24.0-alpha.1", de re_tuid = { path = "crates/utils/re_tuid", version = "=0.24.0-alpha.1", default-features = false } re_uri = { path = "crates/utils/re_uri", version = "=0.24.0-alpha.1", default-features = false } re_video = { path = "crates/utils/re_video", version = "=0.24.0-alpha.1", default-features = false } +# TODO +redap_telemetry = { path = "crates/utils/redap_telemetry", version = "=0.24.0-alpha.1", default-features = false } # crates/viewer: re_blueprint_tree = { path = "crates/viewer/re_blueprint_tree", version = "=0.24.0-alpha.1", default-features = false } @@ -220,6 +222,7 @@ gltf = "1.1" half = { version = "2.3.1", features = ["bytemuck"] } hexasphere = "14.1.0" home = "=0.5.9" +http = "1.2.0" image = { version = "0.25", default-features = false } indent = "0.1" indexmap = "2.1" # Version chosen to align with other dependencies @@ -365,6 +368,17 @@ wgpu = { version = "24.0", default-features = false, features = [ xshell = "0.2.7" + +# TODO +opentelemetry = { version = "0.29.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.29.0", features = ["rt-tokio"] } +opentelemetry-otlp = "0.29.0" +opentelemetry-stdout = "0.29.0" +tracing-opentelemetry = "0.30.0" +tracing-subscriber = { version = "0.3.18", features = ["tracing-log", "fmt", "env-filter"] } +tower-service = "0.3" + + # --------------------------------------------------------------------------------- [profile] diff --git a/crates/utils/redap_telemetry/Cargo.toml b/crates/utils/redap_telemetry/Cargo.toml new file mode 100644 index 000000000000..03ba61eb8162 --- /dev/null +++ b/crates/utils/redap_telemetry/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "redap_telemetry" +authors.workspace = true +description = "Rerun's analytics SDK" +edition.workspace = true +homepage.workspace = true +include.workspace = true +license.workspace = true +publish = true +readme = "README.md" +repository.workspace = true +rust-version.workspace = true +version.workspace = true + + +[dependencies] + +# External +anyhow.workspace = true +clap = { workspace = true, features = ["derive", "env"] } +http.workspace = true +opentelemetry = { workspace = true, features = ["metrics"] } +opentelemetry-otlp = { workspace = true, features = ["grpc-tonic"] } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +tonic.workspace = true +tower-http = { workspace = true, features = ["trace"] } +tower.workspace = true +tracing-opentelemetry.workspace = true +tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "json"] } +tracing.workspace = true + + +[dev-dependencies] +rand.workspace = true +tokio.workspace = true + +[lints] +workspace = true diff --git a/crates/utils/redap_telemetry/README.md b/crates/utils/redap_telemetry/README.md new file mode 100644 index 000000000000..d17f2e3816f8 --- /dev/null +++ b/crates/utils/redap_telemetry/README.md @@ -0,0 +1,3 @@ +# redap-telemetry + +Part of the [`dataplatform`](https://github.com/rerun-io/dataplatform) family of crates. diff --git a/crates/utils/redap_telemetry/examples/basics.rs b/crates/utils/redap_telemetry/examples/basics.rs new file mode 100644 index 000000000000..e3576233952e --- /dev/null +++ b/crates/utils/redap_telemetry/examples/basics.rs @@ -0,0 +1,94 @@ +//! Example of using our telemetry tools. +//! +//! Usage: +//! * Start the telemetry stack: `pixi run compose` +//! * Run this example: `cargo r --example basics` +//! * Go to to explore the logs and traces. +//! * Go to to explore the metrics. +//! * Check out to list all available metrics. +//! * Try e.g. this query: `sum(is_even_histogram_bucket) by (le)` + +use tracing::{Instrument as _, instrument}; + +// --- + +#[instrument(err)] +async fn is_even(i: i32) -> anyhow::Result<()> { + simulate_latency().await; + anyhow::ensure!(i % 2 == 0, "oh no, `i` is odd!!"); + Ok(()) +} + +#[tokio::main] +async fn main() { + // Safety: anything touching the env is unsafe, tis what it is. + #[expect(unsafe_code)] + unsafe { + std::env::set_var("OTEL_SERVICE_NAME", "redap-telemetry-example"); + } + + use clap::Parser as _; + // Take a look at `TelemetryArgs` to learn more about all the configurable things. + let args = redap_telemetry::TelemetryArgs::parse_from(std::env::args()); + + // This is the complete telemetry pipeline. Everything will be flushed when this gets dropped. + let _telemetry = + redap_telemetry::Telemetry::init(args, redap_telemetry::TelemetryDropBehavior::Shutdown); + + let scope = opentelemetry::InstrumentationScope::builder("redap-telemetry-example").build(); + let metrics = opentelemetry::global::meter_with_scope(scope); + + let is_even_histogram = metrics + .f64_histogram("is_even_histogram") + .with_description("Latency percentiles for `is_even`") + .with_boundaries(vec![ + 10.0, 20.0, 30.0, 40.0, 60.0, 80.0, 100.0, 200.0, 400.0, 1000.0, + ]) + .build(); + + for batch in [0..20, 20..40, 40..60] { + let span = tracing::info_span!("main_loop", ?batch); + async { + for i in batch.clone() { + let now = tokio::time::Instant::now(); + + if let Err(err) = is_even(i).await { + tracing::error!(%err, i, "not even!"); + } else { + tracing::info!(i, "is even!"); + } + + is_even_histogram.record( + now.elapsed().as_millis() as _, + &[opentelemetry::KeyValue::new("batch", format!("{batch:?}"))], + ); + } + } + .instrument(span) // instrumenting async scopes is tricky! + .await; + } +} + +// --- + +async fn simulate_latency() { + use rand::Rng as _; + let p: u16 = rand::thread_rng().gen_range(1..=1000); + + // p70: 10ms + // p80: 15ms + // p90: 30ms + // p95: 50ms + // p99: 70ms + // p999: 150ms + let delay_ms = match p { + 1..=700 => 10, + 701..=800 => 15, + 801..=900 => 30, + 901..=950 => 50, + 951..=990 => 70, + _ => 150, + }; + + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; +} diff --git a/crates/utils/redap_telemetry/src/args.rs b/crates/utils/redap_telemetry/src/args.rs new file mode 100644 index 000000000000..d1c8041ef3dd --- /dev/null +++ b/crates/utils/redap_telemetry/src/args.rs @@ -0,0 +1,157 @@ +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LogFormat { + Pretty, + Compact, + Json, +} + +impl std::fmt::Display for LogFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Self::Pretty => "pretty", + Self::Compact => "compact", + Self::Json => "json", + }) + } +} + +impl std::str::FromStr for LogFormat { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + Ok(match s.to_lowercase().as_str() { + "pretty" => Self::Pretty, + "compact" => Self::Compact, + "json" => Self::Json, + unknown => anyhow::bail!("unknown LogFormat: '{unknown}"), + }) + } +} + +// --- + +const fn default_telemetry_attributes() -> &'static str { + concat!( + "service.namespace=redap,service.version=", + env!("CARGO_PKG_VERSION") + ) +} + +const fn default_log_filter() -> &'static str { + if cfg!(debug_assertions) { + "debug" + } else { + "info" + } +} + +/// Complete configuration for all things telemetry. +/// +/// Many of these are part of the official `OpenTelemetry` spec and can be configured directly via +/// the environment. Refer to this command's help as well as [the spec]. +/// +/// [the spec]: https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ +#[derive(Clone, Debug, clap::Parser)] +#[clap(author, version, about)] +pub struct TelemetryArgs { + /// Should telemetry be disabled entirely? + /// + /// Part of the `OpenTelemetry` spec. + #[clap(long, env = "OTEL_SDK_DISABLED", default_value_t = false)] + pub disabled: bool, + + /// The service name used for all things telemetry. + /// + /// Part of the `OpenTelemetry` spec. + #[clap(long, env = "OTEL_SERVICE_NAME")] + pub service_name: String, + + /// The service attributes used for all things telemetry. + /// + /// Expects a comma-separated string of key=value pairs, e.g. `a=b,c=d`. + /// + /// Part of the `OpenTelemetry` spec. + #[clap( + long, + env = "OTEL_RESOURCE_ATTRIBUTES", + default_value = default_telemetry_attributes(), + )] + pub attributes: String, + + /// This is the same as `RUST_LOG`. + /// + /// This only affects logs, not traces nor metrics. + #[clap(long, env = "RUST_LOG", default_value_t = default_log_filter().to_owned())] + pub log_filter: String, + + /// Capture test output as part of the logs. + #[clap(long, env = "RUST_LOG_CAPTURE_TEST_OUTPUT", default_value_t = false)] + pub log_test_output: bool, + + /// Use `json` in production. Pick between `pretty` and `compact` during development according + /// to your preferences. + #[clap(long, env = "RUST_LOG_FORMAT", default_value_t = LogFormat::Pretty)] + pub log_format: LogFormat, + + /// If true, log extra information about all retired spans, including their timings. + #[clap(long, env = "RUST_LOG_CLOSED_SPANS", default_value_t = false)] + pub log_closed_spans: bool, + + /// Same as `RUST_LOG`, but for traces. + /// + /// This only affects traces, not logs nor metrics. + #[clap(long, env = "RUST_TRACE", default_value = "info")] + pub trace_filter: String, + + /// The gRPC OTLP endpoint to send the traces to. + /// + /// It's fine for the target endpoint to be down. + /// + /// Part of the `OpenTelemetry` spec. + #[clap( + long, + env = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + default_value = "http://localhost:4317" + )] + pub trace_endpoint: String, + + /// How are spans sampled? + /// + /// This is applied _after_ `RUST_TRACE`. + /// + /// Part of the `OpenTelemetry` spec. + #[clap( + long, + env = "OTEL_TRACES_SAMPLER", + default_value = "parentbased_traceidratio" + )] + pub trace_sampler: String, + + /// The specified value will only be used if `OTEL_TRACES_SAMPLER` is set. + /// + /// Each Sampler type defines its own expected input, if any. Invalid or unrecognized input + /// MUST be logged and MUST be otherwise ignored, i.e. the implementation MUST behave as if + /// `OTEL_TRACES_SAMPLER_ARG` is not set. + /// + /// Part of the `OpenTelemetry` spec. + #[clap(long, env = "OTEL_TRACES_SAMPLER_ARG", default_value = "1.0")] + pub trace_sampler_args: String, + + /// The HTTP OTLP endpoint to send the metrics to. + /// + /// It's fine for the target endpoint to be down. + /// + /// Part of the `OpenTelemetry` spec. + #[clap( + long, + env = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + default_value = "http://localhost:9090/api/v1/otlp/v1/metrics" + )] + pub metric_endpoint: String, + + /// The interval in milliseconds at which metrics are pushed to the collector. + /// + /// Part of the `OpenTelemetry` spec. + #[clap(long, env = "OTEL_METRIC_EXPORT_INTERVAL", default_value = "10000")] + pub metric_interval: String, +} diff --git a/crates/utils/redap_telemetry/src/grpc.rs b/crates/utils/redap_telemetry/src/grpc.rs new file mode 100644 index 000000000000..1d9f9e6a9944 --- /dev/null +++ b/crates/utils/redap_telemetry/src/grpc.rs @@ -0,0 +1,127 @@ +// --- Telemetry --- + +/// Implements [`tower_http::trace::MakeSpan`] where the trace name is the gRPC method name. +#[derive(Debug, Default, Clone, Copy)] +pub struct GrpcSpanMaker; + +impl tower_http::trace::MakeSpan for GrpcSpanMaker { + fn make_span(&mut self, request: &http::Request) -> tracing::Span { + tracing::span!( + tracing::Level::INFO, + "", + otel.name = %request.uri().path(), + method = %request.method(), + uri = %request.uri(), + version = ?request.version(), + headers = ?request.headers(), + ) + } +} + +/// Creates a new [`tower::Layer`] middleware that automatically traces gRPC requests and responses. +/// +/// Works for both clients and servers. +pub fn new_grpc_tracing_layer() +-> tower_http::trace::TraceLayer { + tower_http::trace::TraceLayer::new_for_grpc().make_span_with(GrpcSpanMaker) +} + +// --- Propagation --- + +/// This implements a [`tonic::service::Interceptor`] that injects trace/span metadata into the +/// request headers, according to W3C standards. +/// +/// This trace/span information is extracted from the currently opened [`tracing::Span`], then +/// converting to the `OpenTelemetry` format, and finally injected into the request headers, thereby +/// propagating the trace across network boundaries. +/// +/// See also [`TracingExtractorInterceptor`]. +#[derive(Default, Clone)] +pub struct TracingInjectorInterceptor; + +impl TracingInjectorInterceptor { + /// Creates a new [`tower::Layer`] middleware that automatically applies the injector. + /// + /// See also [`new_grpc_tracing_layer`]. + pub fn new_layer() -> tonic::service::interceptor::InterceptorLayer { + tonic::service::interceptor::interceptor(Self) + } +} + +impl tonic::service::Interceptor for TracingInjectorInterceptor { + fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { + struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); + + impl opentelemetry::propagation::Injector for MetadataMap<'_> { + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } + } + + // Grab the trace information from `tracing`, and convert that into `opentelemetry`. + use tracing_opentelemetry::OpenTelemetrySpanExt as _; + let cx = tracing::Span::current().context(); + + // Inject the opentelemetry-formatted trace information into the headers. + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut MetadataMap(req.metadata_mut())); + }); + + Ok(req) + } +} + +/// This implements a [`tonic::service::Interceptor`] that extracts trace/span metadata from the +/// request headers, according to W3C standards. +/// +/// This trace/span information (which is still an `OpenTelemetry` payload, at that point) is then +/// injected back into the currently opened [`tracing::Span`] (if any), therefore propagating the +/// trace across network boundaries. +#[derive(Default, Clone)] +pub struct TracingExtractorInterceptor; + +impl TracingExtractorInterceptor { + /// Creates a new [`tower::Layer`] middleware that automatically applies the extractor. + /// + /// See also [`new_grpc_tracing_layer`]. + pub fn new_layer() -> tonic::service::interceptor::InterceptorLayer { + tonic::service::interceptor::interceptor(Self) + } +} + +impl tonic::service::Interceptor for TracingExtractorInterceptor { + fn call(&mut self, req: tonic::Request<()>) -> Result, tonic::Status> { + struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); + + impl opentelemetry::propagation::Extractor for MetadataMap<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::>() + } + } + + // Grab the trace information from the headers, in OpenTelemetry format. + let parent_ctx = opentelemetry::global::get_text_map_propagator(|prop| { + prop.extract(&MetadataMap(req.metadata())) + }); + + // Convert the trace information back into `tracing` and inject it into the current span (if any). + use tracing_opentelemetry::OpenTelemetrySpanExt as _; + tracing::Span::current().set_parent(parent_ctx); + + Ok(req) + } +} diff --git a/crates/utils/redap_telemetry/src/lib.rs b/crates/utils/redap_telemetry/src/lib.rs new file mode 100644 index 000000000000..d2c519dcbb91 --- /dev/null +++ b/crates/utils/redap_telemetry/src/lib.rs @@ -0,0 +1,163 @@ +//! Everything needed to set up telemetry (logs, traces, metrics) for both clients and servers. +//! +//! Logging strategy +//! ================ +//! +//! * All our logs go through the structured `tracing` macros. +//! +//! * We always log from `tracing` directly into stdio: we never involve the `OpenTelemetry` +//! logging API. Production is expected to read the logs from the pod's output. +//! There is never any internal buffering going on, besides the buffering of stdio itself. +//! +//! * All logs that happen as part of the larger trace/span will automatically be uploaded +//! with that trace/span. +//! This makes our traces a very powerful debugging tool, in addition to a profiler. +//! +//! Tracing strategy +//! ================ +//! +//! * All our traces go through the structured `tracing` macros. We *never* use the +//! `OpenTelemetry` macros. +//! +//! * The traces go through a first layer of filtering based on the value of `RUST_TRACE`, which +//! functions similarly to a `RUST_LOG` filter. +//! +//! * The traces are then sent to the `OpenTelemetry` SDK, where they will go through a pass of +//! sampling before being sent to the OTLP endpoint. +//! The sampling mechanism is controlled by the official OTEL environment variables. +//! span sampling decision. +//! +//! * Spans that contains error logs will properly be marked as failed, and easily findable. +//! +//! Metric strategy +//! =============== +//! +//! * Our metric strategy is basically the opposite of our logging strategy: everything goes +//! through `OpenTelemetry` directly, `tracing` is never involved. +//! +//! * Metrics are uploaded (as opposed to scrapped!) using the OTLP protocol, on a fixed interval +//! defined by the `OTEL_METRIC_EXPORT_INTERVAL` environment variable. + +mod args; +mod grpc; +mod telemetry; + +use opentelemetry_sdk::propagation::TraceContextPropagator; + +pub use self::{ + args::{LogFormat, TelemetryArgs}, + grpc::{ + GrpcSpanMaker, TracingExtractorInterceptor, TracingInjectorInterceptor, + new_grpc_tracing_layer, + }, + telemetry::{Telemetry, TelemetryDropBehavior}, +}; + +pub mod external { + pub use opentelemetry; + pub use tower; + pub use tower_http; + pub use tracing; + pub use tracing_opentelemetry; +} + +// --- + +/// Returns the active [`TraceId`] in the current context, if any. +/// +/// The returned trace ID can be search for in the distributed tracing backend, e.g. in jaeger: +/// ```text +/// http://localhost:16686/trace/{trace_id} +/// ``` +/// +/// Returns `None` if there is no trace *actively being sampled* in the current context. +/// +/// [`TraceId`]: [opentelemetry::TraceId] +pub fn current_trace_id() -> Option { + use opentelemetry::trace::TraceContextExt as _; + use tracing_opentelemetry::OpenTelemetrySpanExt as _; + + let cx = tracing::Span::current().context(); + let span = cx.span(); + let span_cx = span.span_context(); + + (span_cx.is_valid() && span_cx.is_sampled()).then(|| span_cx.trace_id()) +} + +/// Export the active trace in the current context as the W3C trace headers, if any. +/// +/// Returns `None` if there is no trace *actively being sampled* in the current context. +pub fn current_trace_headers() -> Option { + use opentelemetry::propagation::text_map_propagator::TextMapPropagator as _; + use opentelemetry::trace::TraceContextExt as _; + use tracing_opentelemetry::OpenTelemetrySpanExt as _; + + let cx = tracing::Span::current().context(); + let span = cx.span(); + let span_cx = span.span_context(); + + if !span_cx.is_valid() || !span_cx.is_sampled() { + return None; + } + + let propagator = TraceContextPropagator::new(); + let mut carrier = TraceHeaders::empty(); + + propagator.inject_context(&cx, &mut carrier); + + Some(carrier) +} + +#[derive(Debug, Clone)] +pub struct TraceHeaders { + pub traceparent: String, + pub tracestate: Option, +} + +impl TraceHeaders { + pub const TRACEPARENT_KEY: &'static str = "traceparent"; + pub const TRACESTATE_KEY: &'static str = "tracestate"; + + fn empty() -> Self { + Self { + traceparent: String::new(), + tracestate: None, + } + } +} + +impl opentelemetry::propagation::Injector for TraceHeaders { + fn set(&mut self, key: &str, value: String) { + match key { + Self::TRACEPARENT_KEY => self.traceparent = value, + Self::TRACESTATE_KEY => { + if !value.is_empty() { + self.tracestate = Some(value); + } + } + _ => {} + } + } +} + +impl opentelemetry::propagation::Extractor for TraceHeaders { + fn get(&self, key: &str) -> Option<&str> { + match key { + Self::TRACEPARENT_KEY => Some(self.traceparent.as_str()), + Self::TRACESTATE_KEY => self.tracestate.as_deref(), + _ => None, + } + } + + fn keys(&self) -> Vec<&str> { + vec![Self::TRACEPARENT_KEY, Self::TRACESTATE_KEY] + } +} + +impl From<&TraceHeaders> for opentelemetry::Context { + fn from(value: &TraceHeaders) -> Self { + use opentelemetry::propagation::text_map_propagator::TextMapPropagator as _; + let propagator = TraceContextPropagator::new(); + propagator.extract(value) + } +} diff --git a/crates/utils/redap_telemetry/src/telemetry.rs b/crates/utils/redap_telemetry/src/telemetry.rs new file mode 100644 index 000000000000..df24a86a9ad8 --- /dev/null +++ b/crates/utils/redap_telemetry/src/telemetry.rs @@ -0,0 +1,307 @@ +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::trace::SdkTracerProvider; +use tracing_subscriber::layer::SubscriberExt as _; +use tracing_subscriber::util::SubscriberInitExt as _; +use tracing_subscriber::{EnvFilter, Layer as _}; + +use crate::{LogFormat, TelemetryArgs}; + +// --- + +/// The Redap telemetry pipeline. +/// +/// Keep this alive for as long as you need to log, trace and/or measure. +/// +/// Will flush everything on drop. +#[derive(Debug, Clone)] +pub struct Telemetry { + traces: Option, + metrics: Option, + + drop_behavior: TelemetryDropBehavior, +} + +#[derive(Debug, Clone, Copy, Default)] +pub enum TelemetryDropBehavior { + /// The telemetry pipeline will be flushed everytime a [`Telemetry`] is dropped. + /// + /// This is particularly useful to use in conjunction with the fact that [`Telemetry`] + /// is `Clone`: lazy initialize a [`Telemetry`] into a static `LazyCell`/`LazyLock`, and keep + /// returning clones of that value. + /// You are guaranteed that the pipeline will get flushed everytime one of these clone goes out + /// of scope. + Flush, + + /// The telemetry pipeline will be flushed and shutdown the first time a [`Telemetry`] is dropped. + /// + /// The pipeline is then inactive, and all logs, traces and metrics are dropped. + #[default] + Shutdown, +} + +impl Telemetry { + pub fn flush(&mut self) { + let Self { + traces, + metrics, + drop_behavior: _, + } = self; + + if let Some(traces) = traces { + if let Err(err) = traces.force_flush() { + tracing::error!(%err, "failed to flush otel trace provider"); + } + } + + if let Some(metrics) = metrics { + if let Err(err) = metrics.force_flush() { + tracing::error!(%err, "failed to flush otel metric provider"); + } + } + } + + pub fn shutdown(&mut self) { + // NOTE: We do both `force_flush` and `shutdown` because, even though they both flush the + // pipeline, sometimes one has better error messages than the other (although, more often + // than not, they both provide useless errors and you should make sure to look into the + // DEBUG logs: this is generally where they end up). + self.flush(); + + let Self { + traces, + metrics, + drop_behavior: _, + } = self; + + if let Some(traces) = traces { + if let Err(err) = traces.shutdown() { + tracing::error!(%err, "failed to shutdown otel trace provider"); + } + } + + if let Some(metrics) = metrics { + if let Err(err) = metrics.shutdown() { + tracing::error!(%err, "failed to shutdown otel metric provider"); + } + } + } +} + +impl Drop for Telemetry { + fn drop(&mut self) { + match self.drop_behavior { + TelemetryDropBehavior::Flush => self.flush(), + TelemetryDropBehavior::Shutdown => self.shutdown(), + } + } +} + +impl Telemetry { + #[must_use = "dropping this will flush and shutdown all telemetry systems"] + pub fn init(args: TelemetryArgs, drop_behavior: TelemetryDropBehavior) -> anyhow::Result { + let TelemetryArgs { + disabled, + service_name, + attributes, + log_filter, + log_test_output, + log_format, + log_closed_spans, + trace_filter, + trace_endpoint, + trace_sampler, + trace_sampler_args, + metric_endpoint, + metric_interval, + } = args; + + if disabled { + // TODO(open-telemetry/opentelemetry-rust#1936): must be handled manually at the + // moment: . + + return Ok(Self { + metrics: None, + traces: None, + drop_behavior, + }); + } + + // For these things, all we need to do is make sure that the right OTEL env var is set. + // All the downstream libraries will do the right thing if they are. + // + // Safety: anything touching the env is unsafe, tis what it is. + #[expect(unsafe_code)] + unsafe { + std::env::set_var("OTEL_SERVICE_NAME", &service_name); + std::env::set_var("OTEL_RESOURCE_ATTRIBUTES", attributes); + std::env::set_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", trace_endpoint); + std::env::set_var("OTEL_TRACES_SAMPLER", trace_sampler); + std::env::set_var("OTEL_TRACES_SAMPLER_ARG", trace_sampler_args); + std::env::set_var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", metric_endpoint); + std::env::set_var("OTEL_METRIC_EXPORT_INTERVAL", metric_interval); + } + + let create_filter = |base: &str, forced: &str| { + Ok::<_, anyhow::Error>( + EnvFilter::new(base) + // TODO(cmc): do not override user's choice, bring back the logic from re_log + .add_directive(format!("aws_smithy_runtime={forced}").parse()?) + .add_directive(format!("datafusion={forced}").parse()?) + .add_directive(format!("datafusion_optimizer={forced}").parse()?) + .add_directive(format!("h2={forced}").parse()?) + .add_directive(format!("hyper={forced}").parse()?) + .add_directive(format!("hyper_util={forced}").parse()?) + .add_directive(format!("lance-arrow={forced}").parse()?) + .add_directive(format!("lance-core={forced}").parse()?) + .add_directive(format!("lance-datafusion={forced}").parse()?) + .add_directive(format!("lance-encoding={forced}").parse()?) + .add_directive(format!("lance-file={forced}").parse()?) + .add_directive(format!("lance-index={forced}").parse()?) + .add_directive(format!("lance-io={forced}").parse()?) + .add_directive(format!("lance-linalg={forced}").parse()?) + .add_directive(format!("lance-table={forced}").parse()?) + .add_directive(format!("lance={forced}").parse()?) + .add_directive(format!("opentelemetry-otlp={forced}").parse()?) + .add_directive(format!("opentelemetry={forced}").parse()?) + .add_directive(format!("opentelemetry_sdk={forced}").parse()?) + .add_directive(format!("rustls={forced}").parse()?) + .add_directive(format!("sqlparser={forced}").parse()?) + .add_directive(format!("tonic={forced}").parse()?) + .add_directive(format!("tonic_web={forced}").parse()?) + .add_directive(format!("tower={forced}").parse()?) + .add_directive(format!("tower_http={forced}").parse()?) + .add_directive(format!("tower_web={forced}").parse()?) + // + .add_directive("lance_encoding=off".parse()?), // this one is a real nightmare + ) + }; + + // Logging strategy + // ================ + // + // * All our logs go through the structured `tracing` macros. + // + // * We always log from `tracing` directly into stdio: we never involve the OpenTelemetry + // logging API. Production is expected to read the logs from the pod's output. + // There is never any internal buffering going on, besides the buffering of stdio itself. + // + // * All logs that happen as part of the larger trace/span will automatically be uploaded + // with that trace/span. + // This makes our traces a very powerful debugging tool, in addition to a profiler. + + let layer_logs_and_traces_stdio = { + let layer = tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .with_file(true) + .with_line_number(true) + .with_target(false) + .with_thread_ids(true) + .with_thread_names(true) + .with_span_events(if log_closed_spans { + tracing_subscriber::fmt::format::FmtSpan::CLOSE + } else { + tracing_subscriber::fmt::format::FmtSpan::NONE + }); + + // Everything is generically typed, which is why this is such a nightmare to do. + macro_rules! handle_format { + ($format:ident) => {{ + let layer = layer.event_format(tracing_subscriber::fmt::format().$format()); + if log_test_output { + layer.with_test_writer().boxed() + } else { + layer.boxed() + } + }}; + } + let layer = match log_format { + LogFormat::Pretty => handle_format!(pretty), + LogFormat::Compact => handle_format!(compact), + LogFormat::Json => handle_format!(json), + }; + + layer.with_filter(create_filter(&log_filter, "warn")?) + }; + + // Tracing strategy + // ================ + // + // * All our traces go through the structured `tracing` macros. We *never* use the + // OpenTelemetry macros. + // + // * The traces go through a first layer of filtering based on the value of `RUST_TRACE`, which + // functions similarly to a `RUST_LOG` filter. + // + // * The traces are then sent to the OpenTelemetry SDK, where they will go through a pass of + // sampling before being sent to the OTLP endpoint. + // The sampling mechanism is controlled by the official OTEL environment variables. + // span sampling decision. + // + // * Spans that contains error logs will properly be marked as failed, and easily findable. + + let (tracer_provider, layer_traces_otlp) = { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() // There's no good reason to use HTTP for traces (at the moment, that is) + .build()?; + + let provider = SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .build(); + + // This will be used by the `TracingInjectorInterceptor` & `TracingExtractorInterceptor` to + // encode the trace information into the request headers. + opentelemetry::global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + + // This is to make sure that if some third-party system is logging raw OpenTelemetry + // spans (as opposed to `tracing` spans), we will catch them and forward them + // appropriately. + opentelemetry::global::set_tracer_provider(provider.clone()); + + let layer = tracing_opentelemetry::layer() + .with_tracer(provider.tracer(service_name.clone())) + .with_filter(create_filter(&trace_filter, "info")?); + + (provider, layer) + }; + + // Metric strategy + // =============== + // + // * Our metric strategy is basically the opposite of our logging strategy: everything goes + // through OpenTelemetry directly, `tracing` is never involved. + // + // * Metrics are uploaded (as opposed to scrapped!) using the OTLP protocol, on a fixed interval + // defined by the OTEL_METRIC_EXPORT_INTERVAL environment variable. + + let metric_provider = { + let exporter = opentelemetry_otlp::MetricExporter::builder() + // That's the only thing Prometheus supports. + .with_temporality(opentelemetry_sdk::metrics::Temporality::Cumulative) + .with_http() // Prometheus only supports HTTP-based OTLP + .build()?; + + let provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter) + .build(); + + // All metrics are logged directly via `opentelemetry`. + opentelemetry::global::set_meter_provider(provider.clone()); + + provider + }; + + tracing_subscriber::registry() + .with(layer_logs_and_traces_stdio) + .with(layer_traces_otlp) + .try_init()?; + + Ok(Self { + drop_behavior, + traces: Some(tracer_provider), + metrics: Some(metric_provider), + }) + } +} From 3380f028a1654639b7776910a36706db5686434c Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 30 Apr 2025 12:41:48 +0200 Subject: [PATCH 2/5] integrate telemetry in re_grpc_client --- crates/store/re_grpc_client/Cargo.toml | 2 ++ crates/store/re_grpc_client/src/redap/mod.rs | 29 ++++++++++---------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index 76486ff17196..a8b9fda73118 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -29,6 +29,8 @@ re_smart_channel.workspace = true re_sorbet.workspace = true re_uri.workspace = true +redap_telemetry.workspace = true + async-stream.workspace = true thiserror.workspace = true tokio-stream.workspace = true diff --git a/crates/store/re_grpc_client/src/redap/mod.rs b/crates/store/re_grpc_client/src/redap/mod.rs index 7d65602526f3..929fc8978112 100644 --- a/crates/store/re_grpc_client/src/redap/mod.rs +++ b/crates/store/re_grpc_client/src/redap/mod.rs @@ -10,6 +10,8 @@ use re_protos::{ }; use re_uri::{DatasetDataUri, Origin}; +use redap_telemetry::external::{tower, tower_http}; + use crate::{spawn_future, StreamError, MAX_DECODING_MESSAGE_SIZE}; pub enum Command { @@ -115,6 +117,7 @@ pub async fn channel(origin: Origin) -> Result; @@ -127,26 +130,24 @@ pub async fn client( } #[cfg(not(target_arch = "wasm32"))] -pub type RedapClient = FrontendServiceClient; -// TODO(cmc): figure out how we integrate redap_telemetry in mainline Rerun -// pub type RedapClient = FrontendServiceClient< -// tower_http::trace::Trace< -// tonic::service::interceptor::InterceptedService< -// tonic::transport::Channel, -// redap_telemetry::TracingInjectorInterceptor, -// >, -// tower_http::classify::SharedClassifier, -// >, -// >; +pub type RedapClient = FrontendServiceClient< + tower_http::trace::Trace< + tonic::service::interceptor::InterceptedService< + tonic::transport::Channel, + redap_telemetry::TracingInjectorInterceptor, + >, + tower_http::classify::SharedClassifier, + redap_telemetry::GrpcSpanMaker, + >, +>; #[cfg(not(target_arch = "wasm32"))] pub async fn client(origin: Origin) -> Result { let channel = channel(origin).await?; let middlewares = tower::ServiceBuilder::new() - // TODO(cmc): figure out how we integrate redap_telemetry in mainline Rerun - // .layer(redap_telemetry::new_grpc_tracing_layer()) - // .layer(redap_telemetry::TracingInjectorInterceptor::new_layer()) + .layer(redap_telemetry::new_grpc_tracing_layer()) + .layer(redap_telemetry::TracingInjectorInterceptor::new_layer()) .into_inner(); let svc = tower::ServiceBuilder::new() From 6f104370effc8ad6148d68198066f3d6888c1fd3 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 30 Apr 2025 12:42:09 +0200 Subject: [PATCH 3/5] add some instrumentation to re_datafusion --- crates/store/re_datafusion/Cargo.toml | 1 + crates/store/re_datafusion/src/datafusion_connector.rs | 4 ++++ crates/store/re_datafusion/src/partition_table.rs | 3 +++ crates/store/re_datafusion/src/search_provider.rs | 3 +++ crates/store/re_datafusion/src/table_entry_provider.rs | 4 ++++ 5 files changed, 15 insertions(+) diff --git a/crates/store/re_datafusion/Cargo.toml b/crates/store/re_datafusion/Cargo.toml index 31bc0632d655..cf1ae667f87f 100644 --- a/crates/store/re_datafusion/Cargo.toml +++ b/crates/store/re_datafusion/Cargo.toml @@ -46,6 +46,7 @@ futures-util.workspace = true itertools.workspace = true tokio-stream.workspace = true tonic.workspace = true +tracing.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] futures.workspace = true diff --git a/crates/store/re_datafusion/src/datafusion_connector.rs b/crates/store/re_datafusion/src/datafusion_connector.rs index ca37bb2b0fc9..47affeb96a11 100644 --- a/crates/store/re_datafusion/src/datafusion_connector.rs +++ b/crates/store/re_datafusion/src/datafusion_connector.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use datafusion::{catalog::TableProvider, error::DataFusionError}; +use tracing::instrument; use re_grpc_client::redap::RedapClient; use re_log_types::{external::re_tuid::Tuid, EntryId}; @@ -23,6 +24,7 @@ impl DataFusionConnector { } impl DataFusionConnector { + #[instrument(skip_all, err)] pub async fn get_entry_list(&mut self) -> Result, DataFusionError> { // TODO(jleibs): Clean this up with better helpers let entry: EntryDetails = self @@ -48,6 +50,7 @@ impl DataFusionConnector { .await } + #[instrument(skip(self), err)] pub async fn get_dataset_entry( &mut self, id: Tuid, @@ -64,6 +67,7 @@ impl DataFusionConnector { Ok(entry) } + #[instrument(skip(self), err)] pub async fn get_partition_table( &self, dataset_id: EntryId, diff --git a/crates/store/re_datafusion/src/partition_table.rs b/crates/store/re_datafusion/src/partition_table.rs index c48e2b8f7cbb..cc826b2049c6 100644 --- a/crates/store/re_datafusion/src/partition_table.rs +++ b/crates/store/re_datafusion/src/partition_table.rs @@ -6,6 +6,7 @@ use datafusion::{ catalog::TableProvider, error::{DataFusionError, Result as DataFusionResult}, }; +use tracing::instrument; use re_grpc_client::redap::RedapClient; use re_log_encoding::codec::wire::decoder::Decode as _; @@ -40,6 +41,7 @@ impl PartitionTableProvider { impl GrpcStreamToTable for PartitionTableProvider { type GrpcStreamData = ScanPartitionTableResponse; + #[instrument(skip(self), err)] async fn fetch_schema(&mut self) -> DataFusionResult { let request = GetPartitionTableSchemaRequest { dataset_id: Some(self.dataset_id.into()), @@ -60,6 +62,7 @@ impl GrpcStreamToTable for PartitionTableProvider { )) } + #[instrument(skip(self), err)] async fn send_streaming_request( &mut self, ) -> DataFusionResult>> { diff --git a/crates/store/re_datafusion/src/search_provider.rs b/crates/store/re_datafusion/src/search_provider.rs index afea25018481..3a4a38b8984b 100644 --- a/crates/store/re_datafusion/src/search_provider.rs +++ b/crates/store/re_datafusion/src/search_provider.rs @@ -7,6 +7,7 @@ use datafusion::{ error::{DataFusionError, Result as DataFusionResult}, }; use tokio_stream::StreamExt as _; +use tracing::instrument; use re_grpc_client::redap::RedapClient; use re_log_encoding::codec::wire::decoder::Decode as _; @@ -48,6 +49,7 @@ impl SearchResultsTableProvider { impl GrpcStreamToTable for SearchResultsTableProvider { type GrpcStreamData = SearchDatasetResponse; + #[instrument(skip(self), err)] async fn fetch_schema(&mut self) -> DataFusionResult { let mut request = self.request.clone(); request.scan_parameters = Some(ScanParameters { @@ -84,6 +86,7 @@ impl GrpcStreamToTable for SearchResultsTableProvider { Ok(schema) } + #[instrument(skip(self), err)] async fn send_streaming_request( &mut self, ) -> DataFusionResult>> { diff --git a/crates/store/re_datafusion/src/table_entry_provider.rs b/crates/store/re_datafusion/src/table_entry_provider.rs index d25b95d3e14e..319b4b525c17 100644 --- a/crates/store/re_datafusion/src/table_entry_provider.rs +++ b/crates/store/re_datafusion/src/table_entry_provider.rs @@ -7,6 +7,7 @@ use datafusion::{ catalog::TableProvider, error::{DataFusionError, Result as DataFusionResult}, }; +use tracing::instrument; use re_grpc_client::redap::RedapClient; use re_log_encoding::codec::wire::decoder::Decode as _; @@ -41,6 +42,7 @@ impl TableEntryTableProvider { Ok(GrpcStreamProvider::prepare(self).await?) } + #[instrument(skip(self), err)] async fn table_id(&mut self) -> Result { if let Some(table_id) = self.table_id { return Ok(table_id); @@ -91,6 +93,7 @@ impl TableEntryTableProvider { impl GrpcStreamToTable for TableEntryTableProvider { type GrpcStreamData = ScanTableResponse; + #[instrument(skip(self), err)] async fn fetch_schema(&mut self) -> DataFusionResult { let request = GetTableSchemaRequest { table_id: Some(self.table_id().await?.into()), @@ -111,6 +114,7 @@ impl GrpcStreamToTable for TableEntryTableProvider { )) } + #[instrument(skip(self), err)] async fn send_streaming_request( &mut self, ) -> DataFusionResult>> { From 7b9c9b6f089d9f3828a319da4fe415c938e23d0d Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 30 Apr 2025 12:43:21 +0200 Subject: [PATCH 4/5] integrate telemetry in rerun_py --- rerun_py/Cargo.toml | 5 ++++ rerun_py/src/catalog/catalog_client.rs | 6 ++++ rerun_py/src/catalog/connection_handle.rs | 10 +++++++ rerun_py/src/catalog/dataframe_query.rs | 4 +++ rerun_py/src/catalog/dataset.rs | 9 ++++++ rerun_py/src/catalog/table.rs | 5 ++++ rerun_py/src/python_bridge.rs | 35 ++++++++++++++++++++++- 7 files changed, 73 insertions(+), 1 deletion(-) diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 370fe546e8dd..7a10b4443b16 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -67,6 +67,11 @@ re_uri.workspace = true re_video.workspace = true re_web_viewer_server = { workspace = true, optional = true } +# TODO +redap_telemetry.workspace = true +tracing.workspace = true +clap.workspace = true + arrow = { workspace = true, features = ["pyarrow"] } chrono.workspace = true #TODO(#9317): migrate to jiff when upgrading to pyo3 0.24 crossbeam.workspace = true diff --git a/rerun_py/src/catalog/catalog_client.rs b/rerun_py/src/catalog/catalog_client.rs index e68f5a0ffc52..4800f27a877d 100644 --- a/rerun_py/src/catalog/catalog_client.rs +++ b/rerun_py/src/catalog/catalog_client.rs @@ -6,6 +6,8 @@ use pyo3::{ types::PyAnyMethods as _, FromPyObject, Py, PyAny, PyResult, Python, }; +use tracing::instrument; + use re_log_types::EntryId; use re_protos::catalog::v1alpha1::EntryFilter; @@ -51,6 +53,7 @@ impl PyCatalogClient { } /// Get a list of all entries in the catalog. + #[instrument(skip_all, err)] fn entries(self_: Py, py: Python<'_>) -> PyResult>> { let mut connection = self_.borrow(py).connection.clone(); @@ -81,6 +84,7 @@ impl PyCatalogClient { } /// Get a dataset by name or id. + #[instrument(skip_all)] fn get_dataset( self_: Py, name_or_id: EntryIdLike, @@ -112,6 +116,7 @@ impl PyCatalogClient { //TODO(#9369): `datasets()` (needs FindDatasetsEntries rpc) /// Create a new dataset with the provided name. + #[instrument(skip_all)] fn create_dataset(self_: Py, py: Python<'_>, name: &str) -> PyResult> { let mut connection = self_.borrow_mut(py).connection.clone(); @@ -137,6 +142,7 @@ impl PyCatalogClient { /// Get a table by name or id. /// /// Note: the entry table is named `__entries`. + #[instrument(skip_all)] fn get_table( self_: Py, name_or_id: EntryIdLike, diff --git a/rerun_py/src/catalog/connection_handle.rs b/rerun_py/src/catalog/connection_handle.rs index 923e53c885c2..0de3f4e8d2d6 100644 --- a/rerun_py/src/catalog/connection_handle.rs +++ b/rerun_py/src/catalog/connection_handle.rs @@ -8,6 +8,7 @@ use pyo3::{ }; use re_log_encoding::codec::wire::decoder::Decode as _; use tokio_stream::StreamExt as _; +use tracing::instrument; use re_chunk::{LatestAtQuery, RangeQuery}; use re_chunk_store::ChunkStore; @@ -59,6 +60,7 @@ impl ConnectionHandle { // TODO(ab): all these request wrapper should be implemented in a more general client wrapper also // used in e.g. the redap browser, etc. The present connection handle should just forward them. impl ConnectionHandle { + #[instrument(skip(self, py), err)] pub fn find_entries( &mut self, py: Python<'_>, @@ -83,6 +85,7 @@ impl ConnectionHandle { Ok(entries?) } + #[instrument(skip(self, py), err)] pub fn delete_entry(&mut self, py: Python<'_>, entry_id: EntryId) -> PyResult<()> { let _response = wait_for_future( py, @@ -95,6 +98,7 @@ impl ConnectionHandle { Ok(()) } + #[instrument(skip(self, py), err)] pub fn create_dataset(&mut self, py: Python<'_>, name: String) -> PyResult { let response = wait_for_future( py, @@ -110,6 +114,7 @@ impl ConnectionHandle { .try_into()?) } + #[instrument(skip(self, py), err)] pub fn read_dataset(&mut self, py: Python<'_>, entry_id: EntryId) -> PyResult { let response = wait_for_future( py, @@ -126,6 +131,7 @@ impl ConnectionHandle { .try_into()?) } + #[instrument(skip(self, py), err)] pub fn read_table(&mut self, py: Python<'_>, entry_id: EntryId) -> PyResult { let response = wait_for_future( py, @@ -142,6 +148,7 @@ impl ConnectionHandle { .try_into()?) } + #[instrument(skip(self, py), err)] pub fn get_dataset_schema( &mut self, py: Python<'_>, @@ -160,6 +167,7 @@ impl ConnectionHandle { }) } + #[instrument(skip(self, py), err)] pub fn register_with_dataset( &mut self, py: Python<'_>, @@ -187,6 +195,7 @@ impl ConnectionHandle { }) } + #[instrument(skip(self, py), err)] pub fn wait_for_task( &mut self, py: Python<'_>, @@ -246,6 +255,7 @@ impl ConnectionHandle { } #[allow(clippy::too_many_arguments)] + #[instrument(skip(self, py, partition_ids), err)] pub fn get_chunks_for_dataframe_query( &mut self, py: Python<'_>, diff --git a/rerun_py/src/catalog/dataframe_query.rs b/rerun_py/src/catalog/dataframe_query.rs index 34c26e9cc793..6682fbe2d0a3 100644 --- a/rerun_py/src/catalog/dataframe_query.rs +++ b/rerun_py/src/catalog/dataframe_query.rs @@ -8,6 +8,7 @@ use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::PyAnyMethods as _; use pyo3::types::{PyCapsule, PyDict, PyTuple}; use pyo3::{pyclass, pymethods, Bound, Py, PyAny, PyRef, PyResult, Python}; +use tracing::instrument; use re_chunk::ComponentName; use re_chunk_store::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector}; @@ -36,6 +37,7 @@ pub struct PyDataframeQueryView { impl PyDataframeQueryView { #[expect(clippy::fn_params_excessive_bools)] + #[instrument(skip(dataset, contents, py))] pub fn new( dataset: Py, index: String, @@ -396,6 +398,7 @@ impl PyDataframeQueryView { } /// Returns a DataFusion table provider capsule. + #[instrument(skip_all)] fn __datafusion_table_provider__<'py>( self_: PyRef<'py, Self>, py: Python<'py>, @@ -446,6 +449,7 @@ impl PyDataframeQueryView { } /// Register this view to the global DataFusion context and return a DataFrame. + #[instrument(skip_all)] fn df(self_: PyRef<'_, Self>) -> PyResult> { let py = self_.py(); diff --git a/rerun_py/src/catalog/dataset.rs b/rerun_py/src/catalog/dataset.rs index d8dfe1fb6814..e9b44a328b5b 100644 --- a/rerun_py/src/catalog/dataset.rs +++ b/rerun_py/src/catalog/dataset.rs @@ -6,6 +6,7 @@ use arrow::pyarrow::PyArrowType; use pyo3::{exceptions::PyRuntimeError, pyclass, pymethods, Py, PyAny, PyRef, PyResult, Python}; use re_grpc_client::redap::get_chunks_response_to_chunk_and_partition_id; use tokio_stream::StreamExt as _; +use tracing::instrument; use re_chunk_store::{ChunkStore, ChunkStoreHandle}; use re_dataframe::{ComponentColumnSelector, TimeColumnSelector}; @@ -47,6 +48,7 @@ impl PyDataset { /// Return the Arrow schema of the data contained in the dataset. //TODO(#9457): there should be another `schema` method which returns a `PySchema` + #[instrument(skip_all)] fn arrow_schema(self_: PyRef<'_, Self>) -> PyResult> { let super_ = self_.as_super(); let mut connection = super_.client.borrow_mut(self_.py()).connection().clone(); @@ -57,6 +59,7 @@ impl PyDataset { } /// Return the partition table as a Datafusion table provider. + #[instrument(skip_all)] fn partition_table(self_: PyRef<'_, Self>) -> PyResult { let super_ = self_.as_super(); let connection = super_.client.borrow(self_.py()).connection().clone(); @@ -95,6 +98,7 @@ impl PyDataset { } /// Register a RRD URI to the dataset. + #[instrument(skip(self_), err)] fn register(self_: PyRef<'_, Self>, recording_uri: String) -> PyResult<()> { // TODO(#9731): In order to make the `register` method to appear synchronous, // we need to hard-code a max timeout for waiting for the task. @@ -113,6 +117,7 @@ impl PyDataset { } /// Download a partition from the dataset. + #[instrument(skip(self_), err)] fn download_partition(self_: PyRef<'_, Self>, partition_id: String) -> PyResult { let super_ = self_.as_super(); let mut client = super_.client.borrow(self_.py()).connection().client(); @@ -208,6 +213,7 @@ impl PyDataset { store_position = false, base_tokenizer = "simple", ))] + #[instrument(skip(self_, column, time_index), err)] fn create_fts_index( self_: PyRef<'_, Self>, column: PyComponentColumnSelector, @@ -273,6 +279,7 @@ impl PyDataset { num_sub_vectors = 16, distance_metric = VectorDistanceMetricLike::VectorDistanceMetric(crate::catalog::PyVectorDistanceMetric::Cosine), ))] + #[instrument(skip(self_, column, time_index, distance_metric), err)] fn create_vector_index( self_: PyRef<'_, Self>, column: PyComponentColumnSelector, @@ -327,6 +334,7 @@ impl PyDataset { } /// Search the dataset using a full-text search query. + #[instrument(skip(self_, column), err)] fn search_fts( self_: PyRef<'_, Self>, query: String, @@ -390,6 +398,7 @@ impl PyDataset { } /// Search the dataset using a vector search query. + #[instrument(skip(self_, query, column), err)] fn search_vector( self_: PyRef<'_, Self>, query: VectorLike<'_>, diff --git a/rerun_py/src/catalog/table.rs b/rerun_py/src/catalog/table.rs index 9e0fe192b26c..6fb3814d994e 100644 --- a/rerun_py/src/catalog/table.rs +++ b/rerun_py/src/catalog/table.rs @@ -8,9 +8,12 @@ use pyo3::{ types::{PyAnyMethods as _, PyCapsule}, Bound, PyAny, PyRef, PyRefMut, PyResult, }; +use tracing::instrument; use re_datafusion::TableEntryTableProvider; +use redap_telemetry::external::tracing; + use crate::{ catalog::PyEntry, utils::{get_tokio_runtime, wait_for_future}, @@ -28,6 +31,7 @@ pub struct PyTable { #[pymethods] impl PyTable { /// Returns a DataFusion table provider capsule. + #[instrument(skip_all)] fn __datafusion_table_provider__( mut self_: PyRefMut<'_, Self>, ) -> PyResult> { @@ -65,6 +69,7 @@ impl PyTable { } /// Registers the table with the DataFusion context and return a DataFrame. + #[instrument(skip_all)] fn df(self_: PyRef<'_, Self>) -> PyResult> { let py = self_.py(); diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 1e58737f69cd..bb844e892994 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -43,6 +43,7 @@ impl PyRuntimeErrorExt for PyRuntimeError { use once_cell::sync::{Lazy, OnceCell}; use crate::dataframe::PyRecording; +use crate::utils::get_tokio_runtime; // The bridge needs to have complete control over the lifetimes of the individual recordings, // otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python @@ -105,13 +106,43 @@ fn global_web_viewer_server( WEB_HANDLE.get_or_init(Default::default).lock() } +// TODO +fn telemetry() -> parking_lot::MutexGuard<'static, redap_telemetry::Telemetry> { + static TELEMETRY: OnceCell> = OnceCell::new(); + TELEMETRY + .get_or_init(|| { + #[allow(unsafe_code)] + unsafe { + std::env::set_var("OTEL_SERVICE_NAME", "rerun-py"); + } + + use clap::Parser as _; + let args = redap_telemetry::TelemetryArgs::parse_from::<_, String>(vec![]); + + let runtime = get_tokio_runtime(); // TODO + runtime.block_on(async { + let telemetry = redap_telemetry::Telemetry::init( + args, + redap_telemetry::TelemetryDropBehavior::Shutdown, + ) + .unwrap(); + parking_lot::Mutex::new(telemetry) + }) + }) + .lock() +} + /// The python module is called "rerun_bindings". #[pymodule] fn rerun_bindings(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { // NOTE: We do this here because some the inner init methods don't respond too kindly to being // called more than once. // The SDK should not be as noisy as the CLI, so we set log filter to warning if not specified otherwise. - re_log::setup_logging_with_filter(&re_log::log_filter_from_env_or_default("warn")); + // + // TODO: how do we coexist with that? + // re_log::setup_logging_with_filter(&re_log::log_filter_from_env_or_default("warn")); + + let _telemetry = telemetry(); // These two components are necessary for imports to work m.add_class::()?; @@ -347,6 +378,8 @@ fn shutdown(py: Python<'_>) { flush_garbage_queue(); }); + + telemetry().shutdown(); } // --- Recordings --- From c67b1158b3c6062b3b9acd5b5f547c0010128999 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 30 Apr 2025 12:43:26 +0200 Subject: [PATCH 5/5] add test script --- test.py | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 test.py diff --git a/test.py b/test.py new file mode 100644 index 000000000000..1b2e38a6fcff --- /dev/null +++ b/test.py @@ -0,0 +1,58 @@ +from pathlib import Path + +import numpy as np +import rerun as rr + +RRD_DIR = Path("/tmp/redap_rrd") +RRD_DIR.mkdir(exist_ok=True) + + +def create_rrds(number: int) -> list[Path]: + N_POINTS = 100 + + + paths = [] + + for i in range(number): + frames = rr.TimeColumn("frames", sequence=range(i * N_POINTS, (i+1) * N_POINTS)) + + rec_path = RRD_DIR / f"rrd{i}.rrd" + rec_path.unlink(missing_ok=True) + + paths.append(rec_path) + + rec = rr.RecordingStream("rerun_example_redap_data", recording_id=f"rec_id_{i}") + rec.save(rec_path) + rec.send_recording_name(f"data_{i}") + rec.send_columns("/data", [frames], rr.Scalars.columns(scalars=np.random.rand(N_POINTS))) + rec.flush(blocking=True) + + + return paths + + + +DATASET_NAME = "rerun_example_redap_data" + +c = rr.catalog.CatalogClient("rerun+http://localhost:51234") + + +try: + dataset = c.get_dataset(DATASET_NAME) + dataset.delete() +except: + pass # who cares + + +d = c.create_dataset(DATASET_NAME) + +for path in create_rrds(10): + print(f"register {path}") + d.register(f"file://{path.absolute()}") + +df = d.dataframe_query_view( + index="frames", + contents={"/data": "rerun.components.Scalar"}, +).df() + +df.show(1000)