Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions orion-interner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
edition.workspace = true
license-file.workspace = true
name = "orion-interner"
rust-version.workspace = true
version.workspace = true

[lints]
workspace = true

[dependencies]
lasso = { version = "0.7.3", features = [
"ahash",
"ahasher",
"dashmap",
"multi-threaded",
] }
serde.workspace = true
37 changes: 37 additions & 0 deletions orion-interner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// SPDX-FileCopyrightText: © 2025 kmesh authors
// SPDX-License-Identifier: Apache-2.0
//
// Copyright 2025 kmesh authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

use std::sync::OnceLock;

use lasso::ThreadedRodeo;

static GLOBAL_INTERNER: OnceLock<ThreadedRodeo> = OnceLock::new();

pub fn to_static_str(s: &str) -> &'static str {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this worth a separate package

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And when and where is it used

let interner = GLOBAL_INTERNER.get_or_init(ThreadedRodeo::new);
let key = interner.get_or_intern(s);
let static_ref = interner.resolve(&key);

// SAFETY: The `GLOBAL_INTERNER` is a `static` variable, meaning it has a `'static`
// lifetime and is never dropped. Therefore, the string slices stored within it
// are also valid for the `'static` lifetime. This transmute is safe because
// we are extending a lifetime that is already effectively `'static`.
unsafe { std::mem::transmute::<&str, &'static str>(static_ref) }
}
37 changes: 37 additions & 0 deletions orion-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[package]
edition.workspace = true
license-file.workspace = true
name = "orion-metrics"
rust-version.workspace = true
version.workspace = true

[features]
jemalloc = ["dep:tikv-jemalloc-ctl"]
metrics = []

[dependencies]
ahash = "0.8.11"
dashmap = "6.1.0"
http.workspace = true
memory-stats = "1.2.0"
nohash = "0.2.0"
opentelemetry = "0.29.0"
opentelemetry-otlp = { version = "0.29.0", features = [
"grpc-tonic",
"http-json",
"hyper-client",
"serde",
"serde_json",
"serialize",
"tokio",
"tonic",
] }
opentelemetry_sdk = "0.29.0"
orion-configuration.workspace = true
orion-interner.workspace = true
parking_lot = "0.12.3"
serde.workspace = true
thread-id = "5.0.0"
tikv-jemalloc-ctl = { version = "0.6.0", optional = true, features = ["stats"] }
tracing.workspace = true
tracing-subscriber.workspace = true
137 changes: 137 additions & 0 deletions orion-metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// SPDX-FileCopyrightText: © 2025 kmesh authors
// SPDX-License-Identifier: Apache-2.0
//
// Copyright 2025 kmesh authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#[macro_use]
pub mod macros;
pub mod metrics;
pub mod sharded;

use orion_configuration::config::{Bootstrap, metrics::StatsSink};
use serde::{Deserialize, Serialize};

#[cfg(feature = "metrics")]
use {
opentelemetry::global,
opentelemetry_otlp::{Protocol, WithExportConfig},
opentelemetry_sdk::metrics::PeriodicReader,
parking_lot::{Condvar, Mutex},
std::sync::LazyLock,
std::time::Duration,
tracing::info,
};

#[cfg(feature = "metrics")]
const DEFAULT_EXPORT_PERIOD: std::time::Duration = std::time::Duration::from_secs(5);
#[cfg(feature = "metrics")]
static SETUP_BARRIER: LazyLock<(Mutex<bool>, Condvar)> = LazyLock::new(|| (Mutex::new(false), Condvar::new()));

pub struct VecMetrics(pub Vec<Metrics>);

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct Metrics {
pub prefix: String,
pub endpoint: String,
pub stat_prefix: Option<String>,
pub export_period: Option<std::time::Duration>,
}

impl From<&Bootstrap> for VecMetrics {
fn from(bootstrap: &Bootstrap) -> Self {
let metrics: Vec<_> = bootstrap
.stats_sinks
.iter()
.filter_map(|sink| match sink {
StatsSink::OpenTelemetry(config) => {
let endpoint = config.grpc_service.google_grpc.as_ref().map(|g| g.target_uri.clone());
let stat_prefix = config.grpc_service.google_grpc.as_ref().map(|g| g.stat_prefix.clone());
let prefix = config.prefix.clone();
let export_period = bootstrap.stats_flush_interval;
endpoint.map(|endpoint| Metrics { prefix, endpoint, stat_prefix, export_period })
},
})
.collect();

Self(metrics)
}
}

// This function launches the metrics exporter based on the provided configuration.
// It's designed to be called during the application startup to initialize the OpenTelemetry metrics exporter.
//
#[cfg(feature = "metrics")]
pub async fn launch_metrics_exporter(
multi_metrics: &[Metrics],
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
info!("Launching OpenTelemetry exporter...");
let mut provider_builder = opentelemetry_sdk::metrics::SdkMeterProvider::builder();

for metrics in multi_metrics {
info!("Building gRPC exporter {}, with endpoint {}...", metrics.prefix, metrics.endpoint);
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(&metrics.endpoint)
.with_protocol(Protocol::Grpc)
.with_timeout(Duration::from_secs(5))
.build()?;

let exporter_period = metrics.export_period.unwrap_or(DEFAULT_EXPORT_PERIOD);

info!("Building periodic reader, with period {:?}...", exporter_period);
let periodic_reader = PeriodicReader::builder(exporter).with_interval(exporter_period).build();

info!("Updating provider with the periodic_reader...");
provider_builder = provider_builder.with_reader(periodic_reader);
}

info!("Building and setting global Meter provider...");
let provider = provider_builder.build();
global::set_meter_provider(provider);

signal_setup_complete();
info!("OpenTelemetry exporter launched successfully.");

Ok(())
}

#[cfg(not(feature = "metrics"))]
pub async fn launch_metrics_exporter(
_multi_metrics: &[Metrics],
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
Ok(())
}

#[cfg(feature = "metrics")]
fn signal_setup_complete() {
let (lock, cvar) = &*SETUP_BARRIER;
let mut setup_complete = lock.lock();
*setup_complete = true;
cvar.notify_all();
}

#[cfg(feature = "metrics")]
pub fn wait_for_metrics_setup() {
let (lock, cvar) = &*SETUP_BARRIER;
let mut setup_complete = lock.lock();
while !*setup_complete {
cvar.wait(&mut setup_complete);
}
}
#[cfg(not(feature = "metrics"))]
pub fn wait_for_metrics_setup() {}
95 changes: 95 additions & 0 deletions orion-metrics/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// SPDX-FileCopyrightText: © 2025 kmesh authors
// SPDX-License-Identifier: Apache-2.0
//
// Copyright 2025 kmesh authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#[cfg(feature = "metrics")]
macro_rules! init_observable_counter {
($counter: ident, $prefix: literal, $name: literal, $descr: literal) => {
_ = $counter.set(Metric::new($prefix, $name, $descr, ShardedU64::new()));
_ = global::meter(concat!("orion.", $prefix))
.u64_observable_counter($name)
.with_description($descr)
.with_callback(move |observer| {
let values = $counter.get().unwrap().value.load_all();
values.iter().for_each(|(key, value)| {
observer.observe(*value, key);
});
})
.build();
};
}

#[cfg(feature = "metrics")]
macro_rules! init_observable_gauge {
($counter: ident, $prefix: literal, $name: literal, $descr: literal) => {
_ = $counter.set(Metric::new($prefix, $name, $descr, ShardedU64::new()));
_ = global::meter(concat!("orion.", $prefix))
.u64_observable_gauge($name)
.with_description($descr)
.with_callback(move |observer| {
let values = $counter.get().unwrap().value.load_all();
values.iter().for_each(|(key, value)| {
observer.observe(*value, key);
});
})
.build();
};
}

#[macro_export]
#[cfg(feature = "metrics")]
macro_rules! with_metric {
($counter: expr, $method: ident, $($args: expr),*) => {
$counter.get().inspect(|c| c.value.$method($($args),*));
};
}
#[macro_export]
#[cfg(not(feature = "metrics"))]
macro_rules! with_metric {
($counter: expr, $method: ident, $($args: expr),*) => {
// No-op if metrics feature is not enabled
if false {
// This creates a tuple containing the results of the expressions,
// effectively "using" them without generating runtime code.
let _ = $counter;
let _ = ($($args),*);
}
};
}

#[macro_export]
#[cfg(feature = "metrics")]
macro_rules! with_histogram {
($counter: expr, $method: ident, $($args: expr),*) => {
$counter.get().inspect(|c| c.$method($($args),*));
};
}
#[macro_export]
#[cfg(not(feature = "metrics"))]
macro_rules! with_histogram {
($counter: expr, $method: ident, $($args: expr),*) => {
// No-op if metrics feature is not enabled
if false {
// This creates a tuple containing the results of the expressions,
// effectively "using" them without generating runtime code.
let _ = $counter;
let _ = ($($args),*);
}
};
}
Loading
Loading