Skip to content

Commit 5c9a16e

Browse files
committed
feat: implement metrics and monitoring
1 parent 7564111 commit 5c9a16e

File tree

9 files changed

+1561
-11
lines changed

9 files changed

+1561
-11
lines changed

Cargo.lock

Lines changed: 477 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[workspace]
22
resolver = "2"
3-
members = ["actions", "bus", "engine", "engine_core", "rules"]
3+
members = ["actions", "bus", "engine", "engine_core", "metrics", "rules"]
44

55
[workspace.package]
6-
version = "0.1.3"
6+
version = "0.1.4"
77
edition = "2024"
88
authors = ["Your Name"]
99
license = "MIT"

README.md

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,53 @@ sc stop WinEventEngine
226226
engine.exe --uninstall
227227
```
228228

229+
## Metrics and Monitoring
230+
231+
The engine includes built-in metrics collection with a local HTTP endpoint for monitoring.
232+
233+
### Available Metrics
234+
235+
The following metrics are collected automatically:
236+
237+
- **events_total** - Total events processed by source and type
238+
- **events_dropped_total** - Events dropped due to full buffer
239+
- **events_processing_duration_seconds** - Event processing latency
240+
- **rules_evaluated_total** - Rule evaluations by rule name
241+
- **rules_matched_total** - Successful rule matches
242+
- **rules_match_duration_seconds** - Rule matching latency
243+
- **actions_executed_total** - Action executions by name and status
244+
- **actions_execution_duration_seconds** - Action execution latency
245+
- **plugins_events_generated_total** - Events generated per plugin
246+
- **plugins_errors_total** - Plugin errors
247+
- **config_reload_total** - Configuration reloads
248+
- **engine_uptime_seconds** - Engine uptime
249+
250+
### Retention
251+
252+
Metrics are retained with a sliding window:
253+
- **Regular metrics**: 1 hour
254+
- **Error metrics**: 24 hours
255+
256+
### Accessing Metrics
257+
258+
The metrics endpoint runs on `127.0.0.1:9090` (localhost only):
259+
260+
```bash
261+
# Prometheus format
262+
curl http://127.0.0.1:9090/metrics
263+
264+
# JSON snapshot
265+
curl http://127.0.0.1:9090/api/snapshot
266+
267+
# Health check
268+
curl http://127.0.0.1:9090/health
269+
270+
# Web UI
271+
curl http://127.0.0.1:9090/
272+
```
273+
274+
**Note**: The metrics endpoint is only accessible from localhost for security.
275+
229276
## Architecture
230277

231278
```
@@ -255,6 +302,9 @@ engine.exe --uninstall
255302
│ ├── PowerShell Script │
256303
│ ├── Log Message │
257304
│ └── HTTP Request (extensible) │
305+
│ │
306+
│ Metrics Collector (1h sliding window) │
307+
│ └── HTTP Endpoint @ 127.0.0.1:9090 │
258308
└─────────────────────────────────────────────────────────┘
259309
```
260310

@@ -291,6 +341,11 @@ win_event_engine/
291341
│ ├── src/
292342
│ │ └── lib.rs
293343
│ └── Cargo.toml
344+
├── metrics/ # Metrics collection and HTTP endpoint
345+
│ ├── src/
346+
│ │ ├── lib.rs # Metrics collector
347+
│ │ └── server.rs # HTTP server
348+
│ └── Cargo.toml
294349
├── config.toml.example # Example configuration
295350
├── rules.toml.example # Example rules
296351
├── AGENTS.md # Developer guidelines
@@ -445,8 +500,8 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
445500
- [x] Rule engine
446501
- [x] Configuration hot-reloading
447502
- [x] Windows service wrapper
503+
- [x] Metrics and monitoring
448504
- [ ] Web dashboard
449-
- [ ] Metrics and monitoring
450505
- [ ] Plugin system for custom actions
451506

452507
## Support

engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ engine_core = { path = "../engine_core" }
2727
bus = { path = "../bus" }
2828
rules = { path = "../rules" }
2929
actions = { path = "../actions" }
30+
metrics = { path = "../metrics" }
3031

3132
[dev-dependencies]
3233
tempfile = "3"

engine/src/engine.rs

Lines changed: 96 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@ use actions::{Action, ActionExecutor, ExecuteAction, LogAction, LogLevel, PowerS
77
use bus::create_event_bus;
88
use engine_core::event::EventKind;
99
use engine_core::plugin::EventSourcePlugin;
10+
use metrics::{
11+
record_action_execution, record_config_reload, record_event,
12+
record_event_processing_duration, record_rule_match, record_rule_match_duration,
13+
record_rule_evaluation, MetricsCollector,
14+
};
1015
use rules::{EventKindMatcher, FilePatternMatcher, Rule, RuleMatcher, WindowMatcher, WindowEventType};
1116
use std::path::PathBuf;
1217
use std::sync::Arc;
1318
use tokio::sync::mpsc;
14-
use tokio::time::{Duration, timeout};
19+
use tokio::time::{Duration, timeout, Instant};
1520
use tracing::{error, info, warn};
1621

1722
pub struct Engine {
@@ -23,10 +28,13 @@ pub struct Engine {
2328
event_sender: Option<mpsc::Sender<engine_core::event::Event>>,
2429
shutdown_flag: Arc<std::sync::atomic::AtomicBool>,
2530
config_reload_rx: Option<mpsc::Receiver<()>>,
31+
metrics: Arc<MetricsCollector>,
2632
}
2733

2834
impl Engine {
2935
pub fn new(config: Config, config_path: Option<PathBuf>) -> Self {
36+
let metrics = Arc::new(MetricsCollector::new());
37+
3038
Self {
3139
config,
3240
config_path,
@@ -36,8 +44,14 @@ impl Engine {
3644
event_sender: None,
3745
shutdown_flag: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3846
config_reload_rx: None,
47+
metrics,
3948
}
4049
}
50+
51+
/// Get a reference to the metrics collector
52+
pub fn metrics(&self) -> Arc<MetricsCollector> {
53+
self.metrics.clone()
54+
}
4155

4256
pub fn take_config_reload_rx(&mut self) -> Option<mpsc::Receiver<()>> {
4357
self.config_reload_rx.take()
@@ -62,28 +76,66 @@ impl Engine {
6276
// Start event processing loop
6377
let rules = self.rules.clone();
6478
let action_executor = self.action_executor.clone();
79+
let metrics = self.metrics.clone();
6580

6681
tokio::spawn(async move {
6782
info!("Event processing loop started");
6883

6984
while let Some(event) = receiver.recv().await {
85+
let start_time = Instant::now();
86+
let event_source = event.source.clone();
87+
let event_type = format!("{:?}", event.kind);
88+
89+
// Record event received
90+
record_event(&metrics, &event_source, &event_type);
91+
7092
tracing::debug!("Processing event: {:?} from {}", event.kind, event.source);
7193

7294
for (idx, rule) in rules.iter().enumerate() {
7395
if !rule.enabled {
7496
continue;
7597
}
7698

77-
if rule.matches(&event) {
99+
// Record rule evaluation
100+
record_rule_evaluation(&metrics, &rule.name);
101+
102+
let match_start = Instant::now();
103+
let matched = rule.matches(&event);
104+
record_rule_match_duration(&metrics, &rule.name, match_start.elapsed());
105+
106+
if matched {
107+
// Record successful rule match
108+
record_rule_match(&metrics, &rule.name);
78109
info!("Rule '{}' matched event from {}", rule.name, event.source);
79110

80111
let action_name = format!("rule_{}_action", idx);
112+
let action_start = Instant::now();
113+
81114
match action_executor.execute(&action_name, &event) {
82-
Ok(result) => info!("Action executed successfully: {:?}", result),
83-
Err(e) => error!("Action execution failed: {}", e),
115+
Ok(result) => {
116+
record_action_execution(
117+
&metrics,
118+
&action_name,
119+
true,
120+
action_start.elapsed()
121+
);
122+
info!("Action executed successfully: {:?}", result);
123+
}
124+
Err(e) => {
125+
record_action_execution(
126+
&metrics,
127+
&action_name,
128+
false,
129+
action_start.elapsed()
130+
);
131+
error!("Action execution failed: {}", e);
132+
}
84133
}
85134
}
86135
}
136+
137+
// Record total event processing duration
138+
record_event_processing_duration(&metrics, start_time.elapsed());
87139
}
88140

89141
info!("Event processing loop stopped");
@@ -485,6 +537,7 @@ public class MediaKeys {
485537
"New configuration validation failed: {}, keeping current config",
486538
e
487539
);
540+
record_config_reload(&self.metrics, false);
488541
return Err(EngineError::Config(e.to_string()));
489542
}
490543

@@ -509,30 +562,66 @@ public class MediaKeys {
509562
if let Some(_sender) = &self.event_sender {
510563
let rules = self.rules.clone();
511564
let action_executor = self.action_executor.clone();
565+
let metrics = self.metrics.clone();
512566
let mut receiver = bus::create_event_bus(self.config.engine.event_buffer_size).1;
513567

514568
tokio::spawn(async move {
515569
while let Some(event) = receiver.recv().await {
570+
let start_time = Instant::now();
571+
let event_source = event.source.clone();
572+
let event_type = format!("{:?}", event.kind);
573+
574+
record_event(&metrics, &event_source, &event_type);
575+
516576
tracing::debug!("Processing event: {:?} from {}", event.kind, event.source);
517577

518578
for (idx, rule) in rules.iter().enumerate() {
519579
if !rule.enabled {
520580
continue;
521581
}
522582

523-
if rule.matches(&event) {
583+
record_rule_evaluation(&metrics, &rule.name);
584+
585+
let match_start = Instant::now();
586+
let matched = rule.matches(&event);
587+
record_rule_match_duration(&metrics, &rule.name, match_start.elapsed());
588+
589+
if matched {
590+
record_rule_match(&metrics, &rule.name);
524591
info!("Rule '{}' matched event from {}", rule.name, event.source);
525592

526593
let action_name = format!("rule_{}_action", idx);
594+
let action_start = Instant::now();
595+
527596
match action_executor.execute(&action_name, &event) {
528-
Ok(result) => info!("Action executed successfully: {:?}", result),
529-
Err(e) => error!("Action execution failed: {}", e),
597+
Ok(result) => {
598+
record_action_execution(
599+
&metrics,
600+
&action_name,
601+
true,
602+
action_start.elapsed()
603+
);
604+
info!("Action executed successfully: {:?}", result);
605+
}
606+
Err(e) => {
607+
record_action_execution(
608+
&metrics,
609+
&action_name,
610+
false,
611+
action_start.elapsed()
612+
);
613+
error!("Action execution failed: {}", e);
614+
}
530615
}
531616
}
532617
}
618+
619+
record_event_processing_duration(&metrics, start_time.elapsed());
533620
}
534621
});
535622
}
623+
624+
record_config_reload(&self.metrics, true);
536625

537626
let status = self.get_status();
538627
info!(

engine/src/main.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ mod service;
77
mod integration_tests;
88

99
use clap::Parser;
10+
use metrics::server::MetricsServer;
1011
use std::path::PathBuf;
12+
use std::sync::Arc;
1113
use tracing::{Level, error, info};
1214
use tracing_subscriber;
1315

@@ -216,6 +218,17 @@ async fn main() {
216218
std::process::exit(1);
217219
}
218220

221+
// Start metrics server and cleanup task
222+
let metrics = engine_instance.metrics();
223+
metrics.start_cleanup_task().await;
224+
let metrics_server = MetricsServer::new(metrics, 9090);
225+
tokio::spawn(async move {
226+
if let Err(e) = metrics_server.start().await {
227+
error!("Metrics server error: {}", e);
228+
}
229+
});
230+
info!("Metrics server available at http://127.0.0.1:9090");
231+
219232
let status = engine_instance.get_status();
220233
info!(
221234
"Engine running with {} plugins and {} rules",
@@ -290,6 +303,7 @@ async fn main() {
290303
}
291304

292305
// Shutdown
306+
engine_for_shutdown.metrics().stop_cleanup_task().await;
293307
engine_for_shutdown.shutdown().await;
294308
info!("Engine stopped");
295309
}

metrics/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "metrics"
3+
version.workspace = true
4+
edition.workspace = true
5+
6+
[dependencies]
7+
serde = { version = "1", features = ["derive"] }
8+
tokio = { version = "1", features = ["full"] }
9+
tracing = "0.1"
10+
chrono = { version = "0.4", features = ["serde"] }
11+
dashmap = "5"
12+
axum = "0.7"
13+
14+
[dev-dependencies]
15+
tokio-test = "0.4"

0 commit comments

Comments
 (0)