Skip to content

Commit 718d830

Browse files
author
Jenkins
committed
feat: add real-time streaming security analysis (Loop 30)
Incremental regex-based security analysis during SSE streaming: - Add StreamingSecurityMonitor that runs lightweight pattern checks every N tokens (configurable, default 50) during SSE streaming - Detect injection patterns, PII, and data leakage mid-stream before the response completes — early warning layer - Fire alerts for critical findings mid-stream via the alert engine (don't wait for stream completion) - Tag all streaming findings with "detection": "streaming" metadata so consumers can distinguish from full post-stream analysis - Add StreamingAnalysisConfig (streaming_analysis.enabled, streaming_analysis.token_interval) to ProxyConfig - Expose RegexSecurityAnalyzer pattern methods publicly for direct synchronous use by the streaming monitor - Full post-stream analysis still runs after completion (unchanged) - 13 new tests covering: mid-stream injection/PII/leakage detection, streaming metadata tagging, delta-only analysis, interval behavior, findings draining, multi-finding detection, edge cases - Update config.example.yaml with streaming_analysis section
1 parent 5746cd4 commit 718d830

6 files changed

Lines changed: 542 additions & 5 deletions

File tree

config.example.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,21 @@ grpc:
204204
# Address and port to bind the gRPC server to.
205205
listen_addr: "0.0.0.0:50051"
206206

207+
# ---------------------------------------------------------------------------
208+
# Streaming security analysis — real-time analysis during SSE streaming
209+
# ---------------------------------------------------------------------------
210+
211+
streaming_analysis:
212+
# Enable incremental regex-based security checks during SSE streaming.
213+
# When enabled, the proxy runs lightweight pattern matching every N tokens
214+
# while the stream is still in progress. Findings are tagged with
215+
# "detection": "streaming" metadata and critical issues trigger alerts
216+
# mid-stream rather than waiting for stream completion.
217+
enabled: false
218+
# Number of completion tokens between each incremental analysis check.
219+
# Lower values detect threats faster but add marginal CPU overhead per chunk.
220+
token_interval: 50
221+
207222
# ---------------------------------------------------------------------------
208223
# Health check endpoint
209224
# ---------------------------------------------------------------------------

crates/llmtrace-core/src/lib.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,9 @@ pub struct ProxyConfig {
847847
/// Anomaly detection configuration.
848848
#[serde(default)]
849849
pub anomaly_detection: AnomalyDetectionConfig,
850+
/// Streaming security analysis configuration.
851+
#[serde(default)]
852+
pub streaming_analysis: StreamingAnalysisConfig,
850853
}
851854

852855
impl Default for ProxyConfig {
@@ -879,6 +882,7 @@ impl Default for ProxyConfig {
879882
auth: AuthConfig::default(),
880883
grpc: GrpcConfig::default(),
881884
anomaly_detection: AnomalyDetectionConfig::default(),
885+
streaming_analysis: StreamingAnalysisConfig::default(),
882886
}
883887
}
884888
}
@@ -1237,6 +1241,43 @@ impl Default for AnomalyDetectionConfig {
12371241
}
12381242
}
12391243

1244+
/// Configuration for real-time streaming security analysis.
1245+
///
1246+
/// When enabled, the proxy runs lightweight regex-based security pattern checks
1247+
/// incrementally during SSE streaming — every N tokens — producing interim
1248+
/// `SecurityFinding`s before the stream completes. This provides an early
1249+
/// warning layer; the full security analysis still runs after stream completion.
1250+
///
1251+
/// # Example (YAML)
1252+
///
1253+
/// ```yaml
1254+
/// streaming_analysis:
1255+
/// enabled: true
1256+
/// token_interval: 50
1257+
/// ```
1258+
#[derive(Debug, Clone, Serialize, Deserialize)]
1259+
pub struct StreamingAnalysisConfig {
1260+
/// Enable incremental security analysis during SSE streaming.
1261+
#[serde(default)]
1262+
pub enabled: bool,
1263+
/// Number of tokens between each incremental analysis check.
1264+
#[serde(default = "default_streaming_token_interval")]
1265+
pub token_interval: u32,
1266+
}
1267+
1268+
fn default_streaming_token_interval() -> u32 {
1269+
50
1270+
}
1271+
1272+
impl Default for StreamingAnalysisConfig {
1273+
fn default() -> Self {
1274+
Self {
1275+
enabled: false,
1276+
token_interval: default_streaming_token_interval(),
1277+
}
1278+
}
1279+
}
1280+
12401281
/// Security analysis configuration for ML-based prompt injection detection.
12411282
///
12421283
/// Controls whether ML-based detection is enabled alongside regex-based analysis,
@@ -2197,6 +2238,7 @@ mod tests {
21972238
auth: AuthConfig::default(),
21982239
grpc: GrpcConfig::default(),
21992240
anomaly_detection: AnomalyDetectionConfig::default(),
2241+
streaming_analysis: StreamingAnalysisConfig::default(),
22002242
};
22012243

22022244
let serialized = serde_json::to_string(&config).unwrap();

crates/llmtrace-proxy/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ pub use cost::CostEstimator;
2727
pub use cost_caps::CostTracker;
2828
pub use grpc::run_grpc_server;
2929
pub use proxy::{health_handler, proxy_handler, AppState};
30+
pub use streaming::StreamingSecurityMonitor;

crates/llmtrace-proxy/src/proxy.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use crate::circuit_breaker::CircuitBreaker;
88
use crate::cost::CostEstimator;
99
use crate::provider::{self, ParsedResponse};
10-
use crate::streaming::StreamingAccumulator;
10+
use crate::streaming::{StreamingAccumulator, StreamingSecurityMonitor};
1111
use axum::body::Body;
1212
use axum::extract::State;
1313
use axum::http::{HeaderMap, Request, Response, StatusCode};
@@ -360,6 +360,13 @@ pub async fn proxy_handler(
360360
} else {
361361
None
362362
};
363+
// Initialise the streaming security monitor (only for SSE streams
364+
// when streaming analysis is enabled).
365+
let mut streaming_monitor = if is_streaming {
366+
StreamingSecurityMonitor::new(&state_bg.config.streaming_analysis)
367+
} else {
368+
None
369+
};
363370
let mut raw_collected = Vec::new();
364371
let mut ttft_ms: Option<u64> = None;
365372

@@ -373,6 +380,26 @@ pub async fn proxy_handler(
373380
let elapsed = Utc::now().signed_duration_since(start_time);
374381
ttft_ms = Some(elapsed.num_milliseconds().max(0) as u64);
375382
}
383+
384+
// --- Real-time streaming security analysis ---
385+
if let Some(ref mut monitor) = streaming_monitor {
386+
if monitor.should_analyze(acc.completion_token_count) {
387+
let new_findings = monitor
388+
.analyze_incremental(&acc.content, acc.completion_token_count);
389+
// Fire mid-stream alerts for critical findings
390+
if !new_findings.is_empty() {
391+
info!(
392+
%trace_id,
393+
count = new_findings.len(),
394+
tokens = acc.completion_token_count,
395+
"Streaming security findings detected mid-stream"
396+
);
397+
if let Some(ref engine) = state_bg.alert_engine {
398+
engine.check_and_alert(trace_id, tenant_id, &new_findings);
399+
}
400+
}
401+
}
402+
}
376403
}
377404
raw_collected.extend_from_slice(&bytes);
378405
if body_sender.send(Ok(bytes)).await.is_err() {
@@ -390,6 +417,29 @@ pub async fn proxy_handler(
390417
// body_sender is dropped here, closing the stream to the client.
391418
drop(body_sender);
392419

420+
// Run one final streaming analysis on any remaining content that
421+
// didn't cross a token-interval boundary.
422+
if let (Some(ref acc), Some(ref mut monitor)) = (&sse_accumulator, &mut streaming_monitor) {
423+
let final_findings =
424+
monitor.analyze_incremental(&acc.content, acc.completion_token_count);
425+
if !final_findings.is_empty() {
426+
info!(
427+
%trace_id,
428+
count = final_findings.len(),
429+
"Streaming security findings in final flush"
430+
);
431+
if let Some(ref engine) = state_bg.alert_engine {
432+
engine.check_and_alert(trace_id, tenant_id, &final_findings);
433+
}
434+
}
435+
}
436+
437+
// Collect streaming security findings for attachment to the trace span.
438+
let streaming_findings: Vec<SecurityFinding> = streaming_monitor
439+
.as_mut()
440+
.map(|m| m.take_findings())
441+
.unwrap_or_default();
442+
393443
// Build the captured interaction with streaming metrics if applicable
394444
let (response_text, prompt_tokens, completion_tokens, total_tokens) =
395445
if let Some(acc) = sse_accumulator {
@@ -454,6 +504,11 @@ pub async fn proxy_handler(
454504
// --- Security analysis first, so findings can be persisted with the trace ---
455505
let mut security_findings = run_security_analysis(&state_bg, &captured).await;
456506

507+
// Merge in any findings detected during streaming (early warning layer).
508+
// These have already been alerted on mid-stream; now we persist them
509+
// alongside the full post-stream analysis findings.
510+
security_findings.extend(streaming_findings);
511+
457512
// --- Anomaly detection (async, non-blocking) ---
458513
if let Some(ref detector) = state_bg.anomaly_detector {
459514
let anomaly_findings = detector

0 commit comments

Comments
 (0)