@@ -23,6 +23,7 @@ use anyhow::Result;
2323
2424use opentelemetry:: metrics:: { Meter , MeterProvider } ;
2525
26+ use opentelemetry_sdk:: metrics:: PeriodicReader ;
2627use opentelemetry_sdk:: metrics:: exporter:: PushMetricExporter ;
2728use opentelemetry_sdk:: metrics:: reader:: MetricReader ;
2829use opentelemetry_sdk:: {
@@ -229,3 +230,99 @@ impl Telemetry {
229230 Ok ( ( ) )
230231 }
231232}
233+
234+ /// A dedicated periodic metrics pipeline for SUT resource metrics.
235+ ///
236+ /// Always exports to the Arrow backend (when `SPICEAI_BENCHMARK_METRICS_KEY` is set).
237+ /// Additionally exports to an OTLP endpoint when configured.
238+ /// Instruments created on the returned [`Meter`] are exported every 5 seconds.
239+ pub struct SutMetricsPipeline {
240+ provider : SdkMeterProvider ,
241+ meter : Meter ,
242+ }
243+
244+ impl SutMetricsPipeline {
245+ /// Create the pipeline.
246+ ///
247+ /// - `api_key_name`: env var name for the Arrow backend API key (e.g. `"SPICEAI_BENCHMARK_METRICS_KEY"`).
248+ /// - `otlp_endpoint`: optional OTLP gRPC endpoint to also export to.
249+ /// - `resource`: OTel resource attributes for exported metrics.
250+ pub async fn new (
251+ api_key_name : & str ,
252+ otlp_endpoint : Option < & str > ,
253+ resource : Resource ,
254+ ) -> Result < Self > {
255+ let mut builder = SdkMeterProvider :: builder ( ) . with_resource ( resource) ;
256+
257+ match std:: env:: var ( api_key_name) {
258+ Ok ( key) => {
259+ let token = Arc :: new ( SecretString :: new ( key. into ( ) ) ) ;
260+ // Arrow periodic reader (always, when API key is present)
261+ let arrow_exporter = otel_arrow:: OtelArrowExporter :: new (
262+ TelemetryExporterBuilder :: new ( )
263+ . with_credentials ( flight_client:: Credentials :: Bearer {
264+ token,
265+ prefix : false ,
266+ } )
267+ . with_service_name ( "benchmarks_telemetry" . into ( ) )
268+ . with_endpoint ( Arc :: clone ( & ENDPOINT ) )
269+ . build ( )
270+ . await ?,
271+ ) ;
272+ let reader = PeriodicReader :: builder ( arrow_exporter)
273+ . with_interval ( Duration :: from_secs ( 5 ) )
274+ . build ( ) ;
275+ builder = builder. with_reader ( reader) ;
276+ println ! (
277+ "SUT metrics: Arrow periodic exporter enabled (endpoint: {})" ,
278+ * ENDPOINT
279+ )
280+ }
281+ Err ( e) => {
282+ eprintln ! ( "Failed to create Arrow exporter for SUT metrics: {e}" ) ;
283+ }
284+ } ;
285+
286+ // OTLP periodic reader (when --otlp-endpoint is configured)
287+ if let Some ( endpoint) = otlp_endpoint {
288+ match MetricExporter :: builder ( )
289+ . with_tonic ( )
290+ . with_timeout ( Duration :: from_secs ( 10 ) )
291+ . with_endpoint ( endpoint)
292+ . build ( )
293+ {
294+ Ok ( otlp_exporter) => {
295+ let reader = PeriodicReader :: builder ( otlp_exporter)
296+ . with_interval ( Duration :: from_secs ( 5 ) )
297+ . build ( ) ;
298+ builder = builder. with_reader ( reader) ;
299+ println ! ( "SUT metrics: OTLP periodic exporter enabled (endpoint: {endpoint})" ) ;
300+ }
301+ Err ( e) => {
302+ eprintln ! ( "Failed to create OTLP exporter for SUT metrics: {e}" ) ;
303+ }
304+ }
305+ }
306+
307+ let provider = builder. build ( ) ;
308+ let meter = provider. meter ( "spicebench-sut" ) ;
309+
310+ Ok ( Self { provider, meter } )
311+ }
312+
313+ /// Get the meter for creating SUT instruments.
314+ #[ must_use]
315+ pub fn meter ( & self ) -> Meter {
316+ self . meter . clone ( )
317+ }
318+
319+ /// Flush and shut down the pipeline.
320+ pub fn shutdown ( & self ) {
321+ if let Err ( e) = self . provider . force_flush ( ) {
322+ eprintln ! ( "Failed to flush SUT metrics pipeline: {e}" ) ;
323+ }
324+ if let Err ( e) = self . provider . shutdown ( ) {
325+ eprintln ! ( "Failed to shutdown SUT metrics pipeline: {e}" ) ;
326+ }
327+ }
328+ }
0 commit comments