Skip to content

Commit 24889f2

Browse files
committed
feat(metrics): add smg_pd_* PD disaggregation metrics (#1776)
Add per-request, engine-agnostic PD signals that only SMG can measure (it is the only component that observes both the prefill and decode legs): - smg_pd_prefill_duration_seconds{backend_type,model,runtime} - smg_pd_kv_transfer_duration_seconds{backend_type,model,runtime} (vLLM sequential PD: prefill-drain to decode-send window) - smg_pd_ttft_seconds{backend_type,model,runtime} honest TTFT (prefill start to first decode token); kept alongside the decode-only smg_router_ttft_seconds{backend_type=pd} - smg_pd_kv_connector_mode_total{mode} - smg_pd_bootstrap_failures_total, smg_pd_kv_transfer_failures_total Also wire per-leg worker-error parity (record_worker_error for prefill/decode) into the gRPC PD pipeline to match HTTP. Timings use a monotonic clock and are recorded once per request (HTTP records on the success path so retries never double-count). Tests: unit tests for the new recorders; an integration test driving a non-streaming PD request and asserting smg_pd_* (incl. smg_pd_ttft_seconds) are emitted. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
1 parent 1288d16 commit 24889f2

9 files changed

Lines changed: 503 additions & 17 deletions

File tree

model_gateway/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ toml = "1.0"
133133
[dev-dependencies]
134134
criterion = { version = "0.8", features = ["html_reports"] }
135135
tower = { version = "0.5", features = ["util"] }
136+
# Used by PD metrics integration tests to capture metric emission via a
137+
# thread-local Prometheus recorder (same versions as the runtime deps above).
138+
metrics = "0.24.2"
139+
metrics-exporter-prometheus = { version = "0.18.1", default-features = false }
136140
http-body-util = "0.1"
137141
portpicker = "0.1"
138142
lazy_static = "1.4"

model_gateway/src/observability/metrics.rs

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,33 @@ pub(crate) fn init_metrics() {
201201
"Total generation time by router_type, backend_type, model, endpoint (gRPC only)"
202202
);
203203

204+
// Layer 2: PD disaggregation metrics (signals only SMG can measure — it is the
205+
// only component that observes both the prefill and decode legs of a request).
206+
describe_histogram!(
207+
"smg_pd_prefill_duration_seconds",
208+
"Prefill-leg RPC duration by backend_type, model, runtime"
209+
);
210+
describe_histogram!(
211+
"smg_pd_kv_transfer_duration_seconds",
212+
"KV-transfer window (prefill drain to decode send) by backend_type, model, runtime (vLLM sequential PD)"
213+
);
214+
describe_histogram!(
215+
"smg_pd_ttft_seconds",
216+
"Honest end-to-end TTFT (prefill start to first decode token) by backend_type, model, runtime"
217+
);
218+
describe_counter!(
219+
"smg_pd_kv_connector_mode_total",
220+
"KV connector mode decisions by mode (mooncake/nixl/passthrough)"
221+
);
222+
describe_counter!(
223+
"smg_pd_bootstrap_failures_total",
224+
"PD bootstrap injection failures"
225+
);
226+
describe_counter!(
227+
"smg_pd_kv_transfer_failures_total",
228+
"PD KV-transfer failures (missing connector params at decode handoff)"
229+
);
230+
204231
// Layer 3: Worker metrics
205232
describe_gauge!(
206233
"smg_worker_pool_size",
@@ -430,6 +457,11 @@ pub mod metrics_labels {
430457
pub const TOKEN_INPUT: &str = "input";
431458
pub const TOKEN_OUTPUT: &str = "output";
432459

460+
// PD KV connector modes (smg_pd_kv_connector_mode_total)
461+
pub const KV_CONNECTOR_MOONCAKE: &str = "mooncake";
462+
pub const KV_CONNECTOR_NIXL: &str = "nixl";
463+
pub const KV_CONNECTOR_PASSTHROUGH: &str = "passthrough";
464+
433465
// Storage types
434466
pub const STORAGE_RESPONSE: &str = "response";
435467
pub const STORAGE_CONVERSATION: &str = "conversation";
@@ -835,6 +867,96 @@ impl Metrics {
835867
.increment(output_tokens);
836868
}
837869

870+
// ========================================================================
871+
// Layer 2: PD disaggregation metrics
872+
//
873+
// Per-request, engine-agnostic signals that no backend can self-report: SMG
874+
// is the only component that sees both the prefill and decode legs. All
875+
// durations come from a monotonic clock and are recorded once per request
876+
// (never per retry attempt).
877+
// ========================================================================
878+
879+
/// Record prefill-leg RPC duration.
880+
/// Uses string interning for model_id and runtime.
881+
pub fn record_pd_prefill_duration(
882+
backend_type: &'static str,
883+
model_id: &str,
884+
runtime: &str,
885+
duration: Duration,
886+
) {
887+
let model = intern_string(model_id);
888+
let runtime = intern_string(runtime);
889+
histogram!(
890+
"smg_pd_prefill_duration_seconds",
891+
"backend_type" => backend_type,
892+
"model" => model,
893+
"runtime" => runtime
894+
)
895+
.record(duration.as_secs_f64());
896+
}
897+
898+
/// Record the KV-transfer window (prefill drain to decode send) for vLLM
899+
/// sequential PD. Uses string interning for model_id and runtime.
900+
pub fn record_pd_kv_transfer_duration(
901+
backend_type: &'static str,
902+
model_id: &str,
903+
runtime: &str,
904+
duration: Duration,
905+
) {
906+
let model = intern_string(model_id);
907+
let runtime = intern_string(runtime);
908+
histogram!(
909+
"smg_pd_kv_transfer_duration_seconds",
910+
"backend_type" => backend_type,
911+
"model" => model,
912+
"runtime" => runtime
913+
)
914+
.record(duration.as_secs_f64());
915+
}
916+
917+
/// Record honest end-to-end TTFT: prefill start to first decode token.
918+
///
919+
/// INVARIANT: this is the user-facing complement to
920+
/// `smg_router_ttft_seconds{backend_type="pd"}`, which measures only the
921+
/// decode leg (first decode token minus decode-send). For sequential PD the
922+
/// two differ by the prefill + KV-transfer time; both are kept on purpose.
923+
/// Uses string interning for model_id and runtime.
924+
pub fn record_pd_ttft(
925+
backend_type: &'static str,
926+
model_id: &str,
927+
runtime: &str,
928+
duration: Duration,
929+
) {
930+
let model = intern_string(model_id);
931+
let runtime = intern_string(runtime);
932+
histogram!(
933+
"smg_pd_ttft_seconds",
934+
"backend_type" => backend_type,
935+
"model" => model,
936+
"runtime" => runtime
937+
)
938+
.record(duration.as_secs_f64());
939+
}
940+
941+
/// Record a KV connector mode decision (mooncake/nixl/passthrough).
942+
pub fn record_pd_kv_connector_mode(mode: &'static str) {
943+
counter!(
944+
"smg_pd_kv_connector_mode_total",
945+
"mode" => mode
946+
)
947+
.increment(1);
948+
}
949+
950+
/// Record a PD bootstrap injection failure.
951+
pub fn record_pd_bootstrap_failure() {
952+
counter!("smg_pd_bootstrap_failures_total").increment(1);
953+
}
954+
955+
/// Record a PD KV-transfer failure (missing connector params at handoff).
956+
pub fn record_pd_kv_transfer_failure() {
957+
counter!("smg_pd_kv_transfer_failures_total").increment(1);
958+
}
959+
838960
// ========================================================================
839961
// Layer 3: Worker metrics
840962
// ========================================================================
@@ -1561,4 +1683,103 @@ mod tests {
15611683
assert_eq!(method_to_static_str("POST"), "POST");
15621684
assert_eq!(method_to_static_str("UNKNOWN"), "OTHER");
15631685
}
1686+
1687+
// ========================================================================
1688+
// PD disaggregation metric tests
1689+
// ========================================================================
1690+
1691+
/// Run `f` with a Prometheus recorder installed thread-locally and return
1692+
/// the rendered /metrics text. Mirrors the helper in `runtime_metrics`.
1693+
fn with_test_recorder<T>(f: impl FnOnce() -> T) -> (String, T) {
1694+
let recorder = PrometheusBuilder::new().build_recorder();
1695+
let handle = recorder.handle();
1696+
let result = metrics::with_local_recorder(&recorder, f);
1697+
(handle.render(), result)
1698+
}
1699+
1700+
#[test]
1701+
fn test_record_pd_prefill_duration_emits_histogram() {
1702+
let (rendered, ()) = with_test_recorder(|| {
1703+
Metrics::record_pd_prefill_duration(
1704+
metrics_labels::BACKEND_PD,
1705+
"test-model",
1706+
"vllm",
1707+
Duration::from_millis(42),
1708+
);
1709+
});
1710+
assert!(
1711+
rendered.contains("smg_pd_prefill_duration_seconds_count{")
1712+
&& rendered.contains(r#"backend_type="pd""#)
1713+
&& rendered.contains(r#"model="test-model""#)
1714+
&& rendered.contains(r#"runtime="vllm""#),
1715+
"prefill duration histogram not emitted; rendered:\n{rendered}"
1716+
);
1717+
}
1718+
1719+
#[test]
1720+
fn test_record_pd_kv_transfer_duration_emits_histogram() {
1721+
let (rendered, ()) = with_test_recorder(|| {
1722+
Metrics::record_pd_kv_transfer_duration(
1723+
metrics_labels::BACKEND_PD,
1724+
"m",
1725+
"vllm",
1726+
Duration::from_millis(7),
1727+
);
1728+
});
1729+
assert!(
1730+
rendered.contains("smg_pd_kv_transfer_duration_seconds_count"),
1731+
"kv transfer histogram not emitted; rendered:\n{rendered}"
1732+
);
1733+
}
1734+
1735+
#[test]
1736+
fn test_record_pd_ttft_emits_histogram() {
1737+
let (rendered, ()) = with_test_recorder(|| {
1738+
Metrics::record_pd_ttft(
1739+
metrics_labels::BACKEND_PD,
1740+
"m",
1741+
"sglang",
1742+
Duration::from_millis(123),
1743+
);
1744+
});
1745+
assert!(
1746+
rendered.contains("smg_pd_ttft_seconds_count")
1747+
&& rendered.contains(r#"runtime="sglang""#),
1748+
"pd ttft histogram not emitted; rendered:\n{rendered}"
1749+
);
1750+
}
1751+
1752+
#[test]
1753+
fn test_record_pd_kv_connector_mode_counts_by_mode() {
1754+
let (rendered, ()) = with_test_recorder(|| {
1755+
Metrics::record_pd_kv_connector_mode(metrics_labels::KV_CONNECTOR_MOONCAKE);
1756+
Metrics::record_pd_kv_connector_mode(metrics_labels::KV_CONNECTOR_MOONCAKE);
1757+
Metrics::record_pd_kv_connector_mode(metrics_labels::KV_CONNECTOR_NIXL);
1758+
});
1759+
assert!(
1760+
rendered.contains(r#"smg_pd_kv_connector_mode_total{mode="mooncake"} 2"#),
1761+
"mooncake connector counter wrong; rendered:\n{rendered}"
1762+
);
1763+
assert!(
1764+
rendered.contains(r#"smg_pd_kv_connector_mode_total{mode="nixl"} 1"#),
1765+
"nixl connector counter wrong; rendered:\n{rendered}"
1766+
);
1767+
}
1768+
1769+
#[test]
1770+
fn test_record_pd_failure_counters() {
1771+
let (rendered, ()) = with_test_recorder(|| {
1772+
Metrics::record_pd_bootstrap_failure();
1773+
Metrics::record_pd_kv_transfer_failure();
1774+
Metrics::record_pd_kv_transfer_failure();
1775+
});
1776+
assert!(
1777+
rendered.contains("smg_pd_bootstrap_failures_total 1"),
1778+
"bootstrap failure counter wrong; rendered:\n{rendered}"
1779+
);
1780+
assert!(
1781+
rendered.contains("smg_pd_kv_transfer_failures_total 2"),
1782+
"kv transfer failure counter wrong; rendered:\n{rendered}"
1783+
);
1784+
}
15641785
}

model_gateway/src/routers/grpc/common/response_collection.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub(crate) async fn collect_responses(
3939
ExecutionResult::Dual {
4040
mut prefill,
4141
decode,
42+
..
4243
} => {
4344
// Collect prefill for input_logprobs (don't mark completed yet)
4445
let prefill_responses = collect_stream_responses(&mut prefill, "Prefill").await?;

0 commit comments

Comments
 (0)