Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions crates/calciforge/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};

mod observability;
pub mod validator;

pub use observability::ProxyObservabilityConfig;

// ---------------------------------------------------------------------------
// Schema types
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -685,23 +688,12 @@ pub struct ProxyConfig {
#[serde(default = "default_proxy_default_policy")]
pub default_policy: ProxyAccessPolicy,

/// Legacy root provider adapter type for proxy.
///
/// Supported values include "http", "helicone", "litellm", "portkey",
/// "tensorzero", "future-agi", "openrouter", and "mock". All non-mock
/// values use the same OpenAI-compatible HTTP core plus engine-specific
/// policy/metadata overlays.
///
/// Provider-specific routes under `[[proxy.providers]]` have their own
/// narrower `backend_type` surface.
/// Legacy root provider adapter type for proxy. Prefer explicit
/// `[[proxy.providers]]` routes for operational configs.
#[serde(default = "default_proxy_backend_type")]
pub backend_type: String,

/// Optional operator UI URL for the selected provider adapter.
///
/// External engines such as Helicone may expose their own dashboard. Engines
/// without a built-in dashboard can point this at a lightweight Calciforge
/// status page or an observability frontend.
#[serde(default)]
pub gateway_ui_url: Option<String>,

Expand Down Expand Up @@ -754,6 +746,10 @@ pub struct ProxyConfig {
/// misconfiguration problems.
#[serde(default = "default_gateway_fallback_on")]
pub fallback_on: Vec<GatewayFailureKind>,

/// Fanout destinations for model-gateway attempt telemetry.
#[serde(default)]
pub observability: Vec<ProxyObservabilityConfig>,
}

/// Stable failure classes used by gateway retry and synthetic fallback policy.
Expand Down Expand Up @@ -956,6 +952,7 @@ impl Default for ProxyConfig {
voice: None,
retry: GatewayRetryConfig::default(),
fallback_on: default_gateway_fallback_on(),
observability: Vec::new(),
}
}
}
Expand Down
50 changes: 50 additions & 0 deletions crates/calciforge/src/config/observability.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Observability sink for model-gateway attempt telemetry.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct ProxyObservabilityConfig {
/// Sink type. Supported values: "log", "http-json" (alias "webhook"),
/// "otel" (alias "otlp"), and "traceloop"; underscores normalize to
/// hyphens.
#[serde(default = "default_observability_kind")]
pub kind: String,
/// Whether this sink is active.
#[serde(default = "default_true")]
pub enabled: bool,
/// Destination endpoint for network sinks. For OTLP sinks this may be a
/// collector base URL or a full `/v1/traces` endpoint.
#[serde(default)]
pub endpoint: Option<String>,
/// Extra headers for network sinks. Values must be operator-controlled and
/// are never copied into telemetry payloads.
#[serde(default)]
pub headers: HashMap<String, String>,
/// Per-sink timeout in milliseconds.
#[serde(default = "default_observability_timeout_ms")]
pub timeout_ms: u64,
}

impl Default for ProxyObservabilityConfig {
fn default() -> Self {
Self {
kind: default_observability_kind(),
enabled: default_true(),
endpoint: None,
headers: HashMap::new(),
timeout_ms: default_observability_timeout_ms(),
}
}
}

fn default_observability_kind() -> String {
"log".to_string()
}

fn default_true() -> bool {
true
}

fn default_observability_timeout_ms() -> u64 {
250
}
83 changes: 77 additions & 6 deletions crates/calciforge/src/config/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
//! - No circular dependencies

use anyhow::{Context, Result};
use reqwest::header::{HeaderName, HeaderValue};
use std::collections::{HashMap, HashSet};
use url::Url;

use crate::agent_kinds::{AgentKind, parse_agent_kind};
use crate::config::{CalciforgeConfig, CredentialOwner, GatewayRetryConfig};
use crate::config::{
CalciforgeConfig, CredentialOwner, GatewayRetryConfig, ProxyObservabilityConfig,
};
use crate::model_names::{
configured_agent_selectors, configured_first_class_model_ids, resolve_model_alias_chain,
};
Expand Down Expand Up @@ -558,6 +561,10 @@ fn validate_proxy_config(proxy: &crate::config::ProxyConfig, result: &mut Valida
}
}

for (index, sink) in proxy.observability.iter().enumerate() {
validate_proxy_observability_config(index, sink, result);
}

if !proxy.enabled {
return;
}
Expand Down Expand Up @@ -603,11 +610,9 @@ fn validate_proxy_config(proxy: &crate::config::ProxyConfig, result: &mut Valida
));
}

// Validate backend_type against the same allowlist the runtime uses.
if !crate::proxy::supported_root_gateway_backend_types()
.iter()
.any(|backend_type| *backend_type == proxy.backend_type)
{
// Validate backend_type with the same parser the runtime uses so accepted
// aliases stay aligned with the provider adapter boundary.
if root_gateway_type.is_none() {
result.add_error(format!(
"Proxy backend_type '{}' is unsupported. Use one of: {}. CLI-backed agents and experimental external gateways must be configured as agents or explicit provider adapters.",
proxy.backend_type,
Expand Down Expand Up @@ -789,6 +794,72 @@ fn validate_proxy_config(proxy: &crate::config::ProxyConfig, result: &mut Valida
}
}

fn validate_proxy_observability_config(
index: usize,
sink: &ProxyObservabilityConfig,
result: &mut ValidationResult,
) {
if sink.timeout_ms == 0 {
result.add_error(format!(
"Proxy observability sink #{index} timeout_ms cannot be zero"
));
}

let kind = sink.kind.trim().to_ascii_lowercase().replace('_', "-");
if !crate::proxy::telemetry::SUPPORTED_OBSERVABILITY_KINDS.contains(&kind.as_str()) {
result.add_error(format!(
"Proxy observability sink #{index} kind '{}' is invalid. Use one of: {}",
sink.kind,
crate::proxy::telemetry::SUPPORTED_OBSERVABILITY_KINDS.join(", ")
));
return;
}

if matches!(
kind.as_str(),
"http-json" | "webhook" | "otel" | "otlp" | "traceloop"
) {
match sink.endpoint.as_deref().map(str::trim) {
Some(endpoint) if !endpoint.is_empty() => validate_http_url(
&format!("Proxy observability sink #{index} endpoint"),
endpoint,
result,
false,
),
_ => result.add_error(format!(
"Proxy observability sink #{index} kind '{}' requires endpoint",
sink.kind
)),
}
}

if kind == "log"
&& sink
.endpoint
.as_deref()
.is_some_and(|s| !s.trim().is_empty())
{
result.add_warning(format!(
"Proxy observability sink #{index} kind='log' ignores endpoint"
));
}

for (name, value) in &sink.headers {
if HeaderName::from_bytes(name.as_bytes()).is_err() {
result.add_error(format!(
"Proxy observability sink #{index} header name '{}' is invalid",
name
));
}
if HeaderValue::from_str(value).is_err() {
result.add_error(format!(
"Proxy observability sink #{index} header '{}' has an invalid value",
name
));
}
}
}

fn validate_gateway_retry_config(
label: &str,
retry: &GatewayRetryConfig,
Expand Down
82 changes: 82 additions & 0 deletions crates/calciforge/src/config/validator_tests_3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,88 @@ fn named_openai_compatible_backend_types_are_validated_from_shared_allowlist() {
}
}

#[test]
fn root_backend_type_aliases_validate_with_runtime_parser() {
for backend_type in ["direct", "builtin_http", "lite_llm", "open_router"] {
let fixture = format!(
"{MIN_VALID}\n[proxy]\nenabled = true\nbind = \"127.0.0.1:18083\"\nbackend_type = \"{backend_type}\"\nbackend_url = \"https://gateway.example.invalid/v1\"\n"
);
let config = parse(&fixture);
let result = validate_config(&config);
assert!(
result.is_valid(),
"{backend_type} should validate because GatewayType::from_str accepts it; errors: {:?}",
result.errors
);
}
}

/// Given a proxy observability sink for an OTLP collector such as Traceloop,
/// when validate_config runs,
/// then the sink validates independently from the selected model provider
/// adapter.
#[test]
fn traceloop_observability_sink_validates_as_separate_surface() {
let fixture = format!(
"{MIN_VALID}\n[proxy]\nenabled = true\nbind = \"127.0.0.1:18083\"\nbackend_type = \"litellm\"\nbackend_url = \"https://gateway.example.invalid/v1\"\n\n[[proxy.observability]]\nkind = \"traceloop\"\nendpoint = \"https://api.traceloop.example/v1/traces\"\n"
);
let config = parse(&fixture);
let result = validate_config(&config);

assert!(
result.is_valid(),
"traceloop observability should validate without becoming a provider adapter; errors: {:?}",
result.errors
);
}

/// Given an HTTP telemetry sink without a destination,
/// when validate_config runs,
/// then validation fails before requests silently lose observability.
#[test]
fn network_observability_sink_requires_endpoint() {
let fixture = format!(
"{MIN_VALID}\n[proxy]\nenabled = true\nbind = \"127.0.0.1:18083\"\nbackend_type = \"litellm\"\nbackend_url = \"https://gateway.example.invalid/v1\"\n\n[[proxy.observability]]\nkind = \"http-json\"\n"
);
let config = parse(&fixture);
let result = validate_config(&config);

assert!(
!result.is_valid(),
"http-json observability without endpoint must fail"
);
assert!(
result
.errors
.iter()
.any(|e| e.contains("observability") && e.contains("requires endpoint")),
"error should name observability endpoint requirement; errors: {:?}",
result.errors
);
}

/// Given a telemetry sink with headers that reqwest would reject at startup,
/// when validate_config runs,
/// then validation reports the bad header before the proxy is launched.
#[test]
fn observability_sink_headers_must_be_valid_http_headers() {
let fixture = format!(
"{MIN_VALID}\n[proxy]\nenabled = true\nbind = \"127.0.0.1:18083\"\nbackend_type = \"litellm\"\nbackend_url = \"https://gateway.example.invalid/v1\"\n\n[[proxy.observability]]\nkind = \"http-json\"\nendpoint = \"https://observability.example.invalid/events\"\n\n[proxy.observability.headers]\n\"bad header\" = \"ok\"\n"
);
let config = parse(&fixture);
let result = validate_config(&config);

assert!(!result.is_valid(), "invalid observability header must fail");
assert!(
result
.errors
.iter()
.any(|e| e.contains("observability") && e.contains("header name")),
"error should name observability header validation; errors: {:?}",
result.errors
);
}

/// Given a disabled proxy with a configured gateway UI link,
/// when validate_config runs,
/// then the UI URL is still validated because chat help can surface it from
Expand Down
15 changes: 13 additions & 2 deletions crates/calciforge/src/proxy/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl HttpBackend {
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(BackendError::http_status_error(
status,
format!("API error {}: {}", status, error_text),
upstream_api_error_message(status, &error_text),
));
}

Expand Down Expand Up @@ -433,6 +433,17 @@ impl HttpBackend {
}
}

fn upstream_api_error_message(status: reqwest::StatusCode, body: &str) -> String {
if body.is_empty() {
format!("API error {status}: upstream response body was empty")
} else {
format!(
"API error {status}: upstream response body omitted ({} bytes)",
body.len()
)
}
}

fn is_kimi_backend(base_url: &str) -> bool {
let base_url = base_url.to_ascii_lowercase();
base_url.contains("api.kimi.com") || base_url.contains("moonshot")
Expand Down Expand Up @@ -501,7 +512,7 @@ impl SecretsBackend for HttpBackend {
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(BackendError::http_status_error(
status,
format!("API error {}: {}", status, error_text),
upstream_api_error_message(status, &error_text),
));
}

Expand Down
Loading
Loading