Skip to content

Commit bc6adb0

Browse files
committed
Add shared proxy telemetry sinks
1 parent 215e22e commit bc6adb0

11 files changed

Lines changed: 946 additions & 26 deletions

File tree

crates/calciforge/src/config.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ use serde::{Deserialize, Serialize};
1010
use std::collections::HashMap;
1111
use std::path::{Path, PathBuf};
1212

13+
mod observability;
1314
pub mod validator;
1415

16+
pub use observability::ProxyObservabilityConfig;
17+
1518
// ---------------------------------------------------------------------------
1619
// Schema types
1720
// ---------------------------------------------------------------------------
@@ -685,23 +688,12 @@ pub struct ProxyConfig {
685688
#[serde(default = "default_proxy_default_policy")]
686689
pub default_policy: ProxyAccessPolicy,
687690

688-
/// Legacy root provider adapter type for proxy.
689-
///
690-
/// Supported values include "http", "helicone", "litellm", "portkey",
691-
/// "tensorzero", "future-agi", "openrouter", and "mock". All non-mock
692-
/// values use the same OpenAI-compatible HTTP core plus engine-specific
693-
/// policy/metadata overlays.
694-
///
695-
/// Provider-specific routes under `[[proxy.providers]]` have their own
696-
/// narrower `backend_type` surface.
691+
/// Legacy root provider adapter type for proxy. Prefer explicit
692+
/// `[[proxy.providers]]` routes for operational configs.
697693
#[serde(default = "default_proxy_backend_type")]
698694
pub backend_type: String,
699695

700696
/// Optional operator UI URL for the selected provider adapter.
701-
///
702-
/// External engines such as Helicone may expose their own dashboard. Engines
703-
/// without a built-in dashboard can point this at a lightweight Calciforge
704-
/// status page or an observability frontend.
705697
#[serde(default)]
706698
pub gateway_ui_url: Option<String>,
707699

@@ -754,6 +746,10 @@ pub struct ProxyConfig {
754746
/// misconfiguration problems.
755747
#[serde(default = "default_gateway_fallback_on")]
756748
pub fallback_on: Vec<GatewayFailureKind>,
749+
750+
/// Fanout destinations for model-gateway attempt telemetry.
751+
#[serde(default)]
752+
pub observability: Vec<ProxyObservabilityConfig>,
757753
}
758754

759755
/// Stable failure classes used by gateway retry and synthetic fallback policy.
@@ -956,6 +952,7 @@ impl Default for ProxyConfig {
956952
voice: None,
957953
retry: GatewayRetryConfig::default(),
958954
fallback_on: default_gateway_fallback_on(),
955+
observability: Vec::new(),
959956
}
960957
}
961958
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use serde::{Deserialize, Serialize};
2+
use std::collections::HashMap;
3+
4+
/// Observability sink for model-gateway attempt telemetry.
5+
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
6+
pub struct ProxyObservabilityConfig {
7+
/// Sink type. Supported values: "log", "http-json", "otel", "traceloop".
8+
#[serde(default = "default_observability_kind")]
9+
pub kind: String,
10+
/// Whether this sink is active.
11+
#[serde(default = "default_true")]
12+
pub enabled: bool,
13+
/// Destination endpoint for network sinks. For OTLP sinks this may be a
14+
/// collector base URL or a full `/v1/traces` endpoint.
15+
#[serde(default)]
16+
pub endpoint: Option<String>,
17+
/// Extra headers for network sinks. Values must be operator-controlled and
18+
/// are never copied into telemetry payloads.
19+
#[serde(default)]
20+
pub headers: HashMap<String, String>,
21+
/// Per-sink timeout in milliseconds.
22+
#[serde(default = "default_observability_timeout_ms")]
23+
pub timeout_ms: u64,
24+
}
25+
26+
impl Default for ProxyObservabilityConfig {
27+
fn default() -> Self {
28+
Self {
29+
kind: default_observability_kind(),
30+
enabled: default_true(),
31+
endpoint: None,
32+
headers: HashMap::new(),
33+
timeout_ms: default_observability_timeout_ms(),
34+
}
35+
}
36+
}
37+
38+
fn default_observability_kind() -> String {
39+
"log".to_string()
40+
}
41+
42+
fn default_true() -> bool {
43+
true
44+
}
45+
46+
fn default_observability_timeout_ms() -> u64 {
47+
250
48+
}

crates/calciforge/src/config/validator.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
//! - No circular dependencies
1010
1111
use anyhow::{Context, Result};
12+
use reqwest::header::{HeaderName, HeaderValue};
1213
use std::collections::{HashMap, HashSet};
1314
use url::Url;
1415

1516
use crate::agent_kinds::{AgentKind, parse_agent_kind};
16-
use crate::config::{CalciforgeConfig, CredentialOwner, GatewayRetryConfig};
17+
use crate::config::{
18+
CalciforgeConfig, CredentialOwner, GatewayRetryConfig, ProxyObservabilityConfig,
19+
};
1720
use crate::model_names::{
1821
configured_agent_selectors, configured_first_class_model_ids, resolve_model_alias_chain,
1922
};
@@ -558,6 +561,10 @@ fn validate_proxy_config(proxy: &crate::config::ProxyConfig, result: &mut Valida
558561
}
559562
}
560563

564+
for (index, sink) in proxy.observability.iter().enumerate() {
565+
validate_proxy_observability_config(index, sink, result);
566+
}
567+
561568
if !proxy.enabled {
562569
return;
563570
}
@@ -789,6 +796,72 @@ fn validate_proxy_config(proxy: &crate::config::ProxyConfig, result: &mut Valida
789796
}
790797
}
791798

799+
fn validate_proxy_observability_config(
800+
index: usize,
801+
sink: &ProxyObservabilityConfig,
802+
result: &mut ValidationResult,
803+
) {
804+
if sink.timeout_ms == 0 {
805+
result.add_error(format!(
806+
"Proxy observability sink #{index} timeout_ms cannot be zero"
807+
));
808+
}
809+
810+
let kind = sink.kind.trim().to_ascii_lowercase().replace('_', "-");
811+
if !crate::proxy::telemetry::SUPPORTED_OBSERVABILITY_KINDS.contains(&kind.as_str()) {
812+
result.add_error(format!(
813+
"Proxy observability sink #{index} kind '{}' is invalid. Use one of: {}",
814+
sink.kind,
815+
crate::proxy::telemetry::SUPPORTED_OBSERVABILITY_KINDS.join(", ")
816+
));
817+
return;
818+
}
819+
820+
if matches!(
821+
kind.as_str(),
822+
"http-json" | "webhook" | "otel" | "otlp" | "traceloop"
823+
) {
824+
match sink.endpoint.as_deref().map(str::trim) {
825+
Some(endpoint) if !endpoint.is_empty() => validate_http_url(
826+
&format!("Proxy observability sink #{index} endpoint"),
827+
endpoint,
828+
result,
829+
false,
830+
),
831+
_ => result.add_error(format!(
832+
"Proxy observability sink #{index} kind '{}' requires endpoint",
833+
sink.kind
834+
)),
835+
}
836+
}
837+
838+
if kind == "log"
839+
&& sink
840+
.endpoint
841+
.as_deref()
842+
.is_some_and(|s| !s.trim().is_empty())
843+
{
844+
result.add_warning(format!(
845+
"Proxy observability sink #{index} kind='log' ignores endpoint"
846+
));
847+
}
848+
849+
for (name, value) in &sink.headers {
850+
if HeaderName::from_bytes(name.as_bytes()).is_err() {
851+
result.add_error(format!(
852+
"Proxy observability sink #{index} header name '{}' is invalid",
853+
name
854+
));
855+
}
856+
if HeaderValue::from_str(value).is_err() {
857+
result.add_error(format!(
858+
"Proxy observability sink #{index} header '{}' has an invalid value",
859+
name
860+
));
861+
}
862+
}
863+
}
864+
792865
fn validate_gateway_retry_config(
793866
label: &str,
794867
retry: &GatewayRetryConfig,

crates/calciforge/src/config/validator_tests_3.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,72 @@ fn named_openai_compatible_backend_types_are_validated_from_shared_allowlist() {
390390
}
391391
}
392392

393+
/// Given a proxy observability sink for an OTLP collector such as Traceloop,
394+
/// when validate_config runs,
395+
/// then the sink validates independently from the selected model provider
396+
/// adapter.
397+
#[test]
398+
fn traceloop_observability_sink_validates_as_separate_surface() {
399+
let fixture = format!(
400+
"{MIN_VALID}\n[proxy]\nenabled = true\nbind = \"127.0.0.1:18083\"\nbackend_type = \"litellm\"\nbackend_url = \"https://gateway.example.invalid/v1\"\n\n[[proxy.observability]]\nkind = \"traceloop\"\nendpoint = \"https://api.traceloop.example/v1/traces\"\n"
401+
);
402+
let config = parse(&fixture);
403+
let result = validate_config(&config);
404+
405+
assert!(
406+
result.is_valid(),
407+
"traceloop observability should validate without becoming a provider adapter; errors: {:?}",
408+
result.errors
409+
);
410+
}
411+
412+
/// Given an HTTP telemetry sink without a destination,
413+
/// when validate_config runs,
414+
/// then validation fails before requests silently lose observability.
415+
#[test]
416+
fn network_observability_sink_requires_endpoint() {
417+
let fixture = format!(
418+
"{MIN_VALID}\n[proxy]\nenabled = true\nbind = \"127.0.0.1:18083\"\nbackend_type = \"litellm\"\nbackend_url = \"https://gateway.example.invalid/v1\"\n\n[[proxy.observability]]\nkind = \"http-json\"\n"
419+
);
420+
let config = parse(&fixture);
421+
let result = validate_config(&config);
422+
423+
assert!(
424+
!result.is_valid(),
425+
"http-json observability without endpoint must fail"
426+
);
427+
assert!(
428+
result
429+
.errors
430+
.iter()
431+
.any(|e| e.contains("observability") && e.contains("requires endpoint")),
432+
"error should name observability endpoint requirement; errors: {:?}",
433+
result.errors
434+
);
435+
}
436+
437+
/// Given a telemetry sink with headers that reqwest would reject at startup,
438+
/// when validate_config runs,
439+
/// then validation reports the bad header before the proxy is launched.
440+
#[test]
441+
fn observability_sink_headers_must_be_valid_http_headers() {
442+
let fixture = format!(
443+
"{MIN_VALID}\n[proxy]\nenabled = true\nbind = \"127.0.0.1:18083\"\nbackend_type = \"litellm\"\nbackend_url = \"https://gateway.example.invalid/v1\"\n\n[[proxy.observability]]\nkind = \"http-json\"\nendpoint = \"https://observability.example.invalid/events\"\n\n[proxy.observability.headers]\n\"bad header\" = \"ok\"\n"
444+
);
445+
let config = parse(&fixture);
446+
let result = validate_config(&config);
447+
448+
assert!(!result.is_valid(), "invalid observability header must fail");
449+
assert!(
450+
result
451+
.errors
452+
.iter()
453+
.any(|e| e.contains("observability") && e.contains("header name")),
454+
"error should name observability header validation; errors: {:?}",
455+
result.errors
456+
);
457+
}
458+
393459
/// Given a disabled proxy with a configured gateway UI link,
394460
/// when validate_config runs,
395461
/// then the UI URL is still validated because chat help can surface it from

0 commit comments

Comments
 (0)