Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions server/src/executor_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::{
},
executor_api::executor_api_pb::{FunctionExecutorState, FunctionExecutorTerminationReason},
executors::ExecutorManager,
metrics::GrpcMetrics,
pb_helpers::blob_store_url_to_path,
state_store::{
IndexifyState,
Expand Down Expand Up @@ -548,6 +549,7 @@ pub struct ExecutorAPIService {
indexify_state: Arc<IndexifyState>,
executor_manager: Arc<ExecutorManager>,
blob_storage_registry: Arc<BlobStorageRegistry>,
grpc_metrics: GrpcMetrics,
}

impl ExecutorAPIService {
Expand All @@ -560,6 +562,7 @@ impl ExecutorAPIService {
indexify_state,
executor_manager,
blob_storage_registry,
grpc_metrics: GrpcMetrics::new(),
}
}

Expand Down Expand Up @@ -833,6 +836,8 @@ impl ExecutorApi for ExecutorAPIService {
request: Request<ReportExecutorStateRequest>,
) -> Result<Response<ReportExecutorStateResponse>, Status> {
let start = Instant::now();
let _timer = self.grpc_metrics.record_request("report_executor_state");

let executor_state = request
.get_ref()
.executor_state
Expand Down Expand Up @@ -916,6 +921,10 @@ impl ExecutorApi for ExecutorAPIService {
&self,
request: Request<GetDesiredExecutorStatesRequest>,
) -> Result<Response<Self::get_desired_executor_statesStream>, Status> {
let _timer = self
.grpc_metrics
.record_request("get_desired_executor_states");

let executor_id = request
.get_ref()
.executor_id
Expand Down Expand Up @@ -963,6 +972,8 @@ impl ExecutorApi for ExecutorAPIService {
&self,
request: Request<executor_api_pb::FunctionCallRequest>,
) -> Result<Response<executor_api_pb::FunctionCallResponse>, Status> {
let _timer = self.grpc_metrics.record_request("call_function");

let req = request.into_inner();
let updates = req
.updates
Expand Down
71 changes: 65 additions & 6 deletions server/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,71 @@ pub mod queue {
}
}

/// Timer that owns its labels and records duration on drop
pub struct GrpcTimer {
start: Instant,
histogram: Histogram<f64>,
handler: &'static str,
}

impl Drop for GrpcTimer {
fn drop(&mut self) {
self.histogram.record(
self.start.elapsed().as_secs_f64(),
&[KeyValue::new("handler", self.handler)],
);
}
}

/// Metrics for gRPC Executor API handlers defined in executor_api.rs
#[derive(Clone, Debug)]
pub struct GrpcMetrics {
/// Counter for total requests per handler
pub requests: Counter<u64>,
/// Histogram for request duration per handler
pub request_duration: Histogram<f64>,
}

impl Default for GrpcMetrics {
fn default() -> Self {
Self::new()
}
}

impl GrpcMetrics {
pub fn new() -> Self {
let meter = opentelemetry::global::meter("grpc_executor_api");

let requests = meter
.u64_counter("indexify.grpc.requests_total")
.with_description("Total number of gRPC requests by handler")
.build();

let request_duration = meter
.f64_histogram("indexify.grpc.request_duration")
.with_unit("s")
.with_boundaries(low_latency_boundaries())
.with_description("gRPC request duration in seconds by handler")
.build();

Self {
requests,
request_duration,
}
}

/// Record a request and start a timer for the given handler.
/// Returns a GrpcTimer that records duration on drop.
pub fn record_request(&self, handler: &'static str) -> GrpcTimer {
self.requests.add(1, &[KeyValue::new("handler", handler)]);
GrpcTimer {
start: Instant::now(),
histogram: self.request_duration.clone(),
handler,
}
}
}

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct StateStoreMetrics {
Expand All @@ -306,12 +371,6 @@ pub struct StateStoreMetrics {
pub state_write_request_state_change: Histogram<f64>,
}

impl Default for StateStoreMetrics {
fn default() -> Self {
Self::new()
}
}

impl StateStoreMetrics {
pub fn new() -> Self {
let meter = opentelemetry::global::meter("state_store");
Expand Down