Skip to content

Commit 06843c1

Browse files
committed
wip
1 parent a76a914 commit 06843c1

File tree

7 files changed

+132
-86
lines changed

7 files changed

+132
-86
lines changed

agent-control/src/agent_control/agent_control.rs

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::event::broadcaster::unbounded::UnboundedBroadcast;
1414
use crate::event::{
1515
AgentControlEvent, ApplicationEvent, OpAMPEvent, SubAgentEvent, channel::EventConsumer,
1616
};
17-
use crate::health::health_checker::{Health, Healthy, Unhealthy};
17+
use crate::health::health_checker::{Health, Healthy};
1818
use crate::health::with_start_time::HealthWithStartTime;
1919
use crate::opamp::remote_config::hash::ConfigState;
2020
use crate::opamp::remote_config::report::OpampRemoteConfigStatus;
@@ -205,7 +205,7 @@ where
205205
>,
206206
) {
207207
let _ = self
208-
.report_healthy(Healthy::new(String::default()))
208+
.report_health(Healthy::new(String::default()).into())
209209
.inspect_err(
210210
|err| error!(error_msg = %err,"Error reporting health on Agent Control start"),
211211
);
@@ -371,29 +371,18 @@ where
371371
Ok(())
372372
}
373373

374-
pub(crate) fn report_healthy(&self, healthy: Healthy) -> Result<(), AgentError> {
375-
self.report_health(healthy.clone().into())?;
376-
self.agent_control_publisher
377-
.broadcast(AgentControlEvent::AgentControlBecameHealthy(healthy));
378-
Ok(())
379-
}
380-
381-
pub(crate) fn report_unhealthy(&self, unhealthy: Unhealthy) -> Result<(), AgentError> {
382-
self.report_health(unhealthy.clone().into())?;
383-
self.agent_control_publisher
384-
.broadcast(AgentControlEvent::AgentControlBecameUnhealthy(unhealthy));
385-
Ok(())
386-
}
387-
388-
fn report_health(&self, health: Health) -> Result<(), AgentError> {
374+
pub(super) fn report_health(&self, health: Health) -> Result<(), AgentError> {
375+
let health = HealthWithStartTime::new(health, self.start_time);
389376
if let Some(handle) = &self.opamp_client {
390377
debug!(
391378
is_healthy = health.is_healthy().to_string(),
392379
"Sending agent-control health"
393380
);
394381

395-
handle.set_health(HealthWithStartTime::new(health, self.start_time).into())?;
382+
handle.set_health(health.clone().into())?;
396383
}
384+
self.agent_control_publisher
385+
.broadcast(AgentControlEvent::HealthUpdated(health));
397386
Ok(())
398387
}
399388
}
@@ -433,6 +422,7 @@ mod tests {
433422
use crate::event::channel::{EventConsumer, pub_sub};
434423
use crate::event::{AgentControlEvent, ApplicationEvent, OpAMPEvent};
435424
use crate::health::health_checker::{Healthy, Unhealthy};
425+
use crate::health::with_start_time::HealthWithStartTime;
436426
use crate::opamp::client_builder::tests::MockStartedOpAMPClient;
437427
use crate::opamp::remote_config::hash::{ConfigState, Hash};
438428
use crate::opamp::remote_config::{ConfigurationMap, OpampRemoteConfig};
@@ -444,7 +434,7 @@ mod tests {
444434
use std::collections::HashMap;
445435
use std::sync::Arc;
446436
use std::thread::{sleep, spawn};
447-
use std::time::Duration;
437+
use std::time::{Duration, SystemTime};
448438

449439
#[test]
450440
fn run_and_stop_supervisors_no_agents() {
@@ -607,7 +597,7 @@ mod tests {
607597
let running_agent = spawn({
608598
move || {
609599
// two agents in the supervisor group
610-
let agent = AgentControl::new(
600+
let mut agent = AgentControl::new(
611601
Some(started_client),
612602
sub_agent_builder,
613603
Arc::new(sa_dynamic_config_store),
@@ -620,6 +610,7 @@ mod tests {
620610
NoOpUpdater,
621611
ac_config,
622612
);
613+
agent.start_time = SystemTime::UNIX_EPOCH; // Patch time to allow comparison
623614
agent.run()
624615
}
625616
});
@@ -671,7 +662,7 @@ agents:
671662
let running_agent = spawn({
672663
move || {
673664
// two agents in the supervisor group
674-
let agent = AgentControl::new(
665+
let mut agent = AgentControl::new(
675666
Some(started_client),
676667
sub_agent_builder,
677668
Arc::new(sa_dynamic_config_store),
@@ -684,14 +675,18 @@ agents:
684675
NoOpUpdater,
685676
AgentControlConfig::default(),
686677
);
678+
agent.start_time = SystemTime::UNIX_EPOCH; // Path time to allow comparison
687679
agent.process_events(sub_agents)
688680
}
689681
});
690682

691683
opamp_publisher.publish(OpAMPEvent::Connected).unwrap();
692684

693685
// process_events always starts with AgentControlHealthy
694-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
686+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
687+
Healthy::default().into(),
688+
SystemTime::UNIX_EPOCH,
689+
));
695690
let ev = agent_control_consumer.as_ref().recv().unwrap();
696691
assert_eq!(expected, ev);
697692

@@ -728,7 +723,7 @@ agents:
728723
let running_agent = spawn({
729724
move || {
730725
// two agents in the supervisor group
731-
let agent = AgentControl::new(
726+
let mut agent = AgentControl::new(
732727
Some(started_client),
733728
sub_agent_builder,
734729
Arc::new(sa_dynamic_config_store),
@@ -741,6 +736,7 @@ agents:
741736
NoOpUpdater,
742737
AgentControlConfig::default(),
743738
);
739+
agent.start_time = SystemTime::UNIX_EPOCH; // Patch time to allow comparison
744740
agent.process_events(sub_agents)
745741
}
746742
});
@@ -753,7 +749,10 @@ agents:
753749
.unwrap();
754750

755751
// process_events always starts with AgentControlHealthy
756-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
752+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
753+
Healthy::default().into(),
754+
SystemTime::UNIX_EPOCH,
755+
));
757756
let ev = agent_control_consumer.as_ref().recv().unwrap();
758757
assert_eq!(expected, ev);
759758

@@ -1214,7 +1213,7 @@ agents:
12141213

12151214
let event_processor = spawn({
12161215
move || {
1217-
let agent = AgentControl::new(
1216+
let mut agent = AgentControl::new(
12181217
Some(started_client),
12191218
sub_agent_builder,
12201219
Arc::new(sa_dynamic_config_store),
@@ -1227,7 +1226,7 @@ agents:
12271226
NoOpUpdater,
12281227
AgentControlConfig::default(),
12291228
);
1230-
1229+
agent.start_time = SystemTime::UNIX_EPOCH; // Path time to allow comparison
12311230
agent.process_events(sub_agents);
12321231
}
12331232
});
@@ -1243,11 +1242,17 @@ agents:
12431242
assert!(event_processor.join().is_ok());
12441243

12451244
// process_events always starts with AgentControlHealthy
1246-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
1245+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
1246+
Healthy::default().into(),
1247+
SystemTime::UNIX_EPOCH,
1248+
));
12471249
let ev = agent_control_consumer.as_ref().recv().unwrap();
12481250
assert_eq!(expected, ev);
12491251

1250-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
1252+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
1253+
Healthy::default().into(),
1254+
SystemTime::UNIX_EPOCH,
1255+
));
12511256
let ev = agent_control_consumer.as_ref().recv().unwrap();
12521257
assert_eq!(expected, ev);
12531258
}
@@ -1291,7 +1296,7 @@ agents:
12911296

12921297
let event_processor = spawn({
12931298
move || {
1294-
let agent = AgentControl::new(
1299+
let mut agent = AgentControl::new(
12951300
Some(started_client),
12961301
sub_agent_builder,
12971302
Arc::new(sa_dynamic_config_store),
@@ -1304,7 +1309,7 @@ agents:
13041309
NoOpUpdater,
13051310
AgentControlConfig::default(),
13061311
);
1307-
1312+
agent.start_time = SystemTime::UNIX_EPOCH; // Path time to allow comparison
13081313
agent.process_events(sub_agents);
13091314
}
13101315
});
@@ -1322,15 +1327,21 @@ agents:
13221327
assert!(event_processor.join().is_ok());
13231328

13241329
// process_events always starts with AgentControlHealthy
1325-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
1330+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
1331+
Healthy::default().into(),
1332+
SystemTime::UNIX_EPOCH,
1333+
));
13261334
let ev = agent_control_consumer.as_ref().recv().unwrap();
13271335
assert_eq!(expected, ev);
13281336

1329-
let expected = AgentControlEvent::AgentControlBecameUnhealthy(Unhealthy::new(
1337+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
1338+
Unhealthy::new(
13301339
String::default(),
13311340
String::from(
13321341
"Error applying Agent Control remote config: remote config error: `config hash: `a-hash` config error: `some error message``",
13331342
),
1343+
).into(),
1344+
SystemTime::UNIX_EPOCH,
13341345
));
13351346
let ev = agent_control_consumer.as_ref().recv().unwrap();
13361347
assert_eq!(expected, ev);
@@ -1360,7 +1371,7 @@ agents:
13601371

13611372
let event_processor = spawn({
13621373
move || {
1363-
let agent = AgentControl::new(
1374+
let mut agent = AgentControl::new(
13641375
Some(started_client),
13651376
sub_agent_builder,
13661377
Arc::new(sa_dynamic_config_store),
@@ -1373,7 +1384,7 @@ agents:
13731384
NoOpUpdater,
13741385
AgentControlConfig::default(),
13751386
);
1376-
1387+
agent.start_time = SystemTime::UNIX_EPOCH; // Path time to allow comparison
13771388
agent.process_events(sub_agents);
13781389
}
13791390
});
@@ -1387,7 +1398,10 @@ agents:
13871398
assert!(event_processor.join().is_ok());
13881399

13891400
// process_events always starts with AgentControlHealthy
1390-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
1401+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
1402+
Healthy::default().into(),
1403+
SystemTime::UNIX_EPOCH,
1404+
));
13911405
let ev = agent_control_consumer.as_ref().recv().unwrap();
13921406
assert_eq!(expected, ev);
13931407

@@ -1469,7 +1483,7 @@ agents:
14691483

14701484
let event_processor = spawn({
14711485
move || {
1472-
let agent = AgentControl::new(
1486+
let mut agent = AgentControl::new(
14731487
Some(started_client),
14741488
sub_agent_builder,
14751489
Arc::new(sa_dynamic_config_store),
@@ -1482,7 +1496,7 @@ agents:
14821496
NoOpUpdater,
14831497
AgentControlConfig::default(),
14841498
);
1485-
1499+
agent.start_time = SystemTime::UNIX_EPOCH; // Path time to allow comparison
14861500
agent.process_events(sub_agents);
14871501
}
14881502
});
@@ -1498,15 +1512,21 @@ agents:
14981512
assert!(event_processor.join().is_ok());
14991513

15001514
// process_events always starts with AgentControlHealthy
1501-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
1515+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
1516+
Healthy::default().into(),
1517+
SystemTime::UNIX_EPOCH,
1518+
));
15021519
let ev = agent_control_consumer.as_ref().recv().unwrap();
15031520
assert_eq!(expected, ev);
15041521

15051522
let expected = AgentControlEvent::SubAgentRemoved(agent_id);
15061523
let ev = agent_control_consumer.as_ref().recv().unwrap();
15071524
assert_eq!(expected, ev);
15081525

1509-
let expected = AgentControlEvent::AgentControlBecameHealthy(Healthy::default());
1526+
let expected = AgentControlEvent::HealthUpdated(HealthWithStartTime::new(
1527+
Healthy::default().into(),
1528+
SystemTime::UNIX_EPOCH,
1529+
));
15101530
let ev = agent_control_consumer.as_ref().recv().unwrap();
15111531
assert_eq!(expected, ev);
15121532
}

agent-control/src/agent_control/event_handler/opamp/remote_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ where
5353
error!(error_message);
5454
OpampRemoteConfigStatus::Error(error_message.clone())
5555
.report(opamp_client, opamp_remote_config.hash.get())?;
56-
Ok(self.report_unhealthy(Unhealthy::new(String::default(), error_message))?)
56+
Ok(self.report_health(Unhealthy::new(String::default(), error_message).into())?)
5757
}
5858
Ok(()) => {
5959
self.sa_dynamic_config_store
6060
.update_hash_state(&ConfigState::Applied)?;
6161
OpampRemoteConfigStatus::Applied
6262
.report(opamp_client, opamp_remote_config.hash.get())?;
6363
opamp_client.update_effective_config()?;
64-
Ok(self.report_healthy(Healthy::new(String::default()))?)
64+
Ok(self.report_health(Healthy::new(String::default()).into())?)
6565
}
6666
}
6767
}

agent-control/src/agent_control/http_server/status.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::agent_control::agent_id::AgentID;
22

33
use crate::agent_type::agent_type_id::AgentTypeID;
4-
use crate::health::health_checker::{Healthy, Unhealthy};
4+
use crate::health::health_checker::Health;
55
use crate::health::with_start_time::HealthWithStartTime;
66
use crate::opamp::{LastErrorCode, LastErrorMessage};
77
use crate::sub_agent::identity::AgentIdentity;
@@ -34,16 +34,19 @@ pub struct AgentControlStatus {
3434
}
3535

3636
impl AgentControlStatus {
37-
pub fn healthy(&mut self, healthy: Healthy) {
38-
self.healthy = true;
39-
self.last_error = None;
40-
self.status = healthy.status().to_string();
41-
}
42-
43-
pub fn unhealthy(&mut self, unhealthy: Unhealthy) {
44-
self.healthy = false;
45-
self.last_error = unhealthy.last_error().to_string().into();
46-
self.status = unhealthy.status().to_string();
37+
pub fn set_health(&mut self, health: HealthWithStartTime) {
38+
match Health::from(health) {
39+
Health::Healthy(healthy) => {
40+
self.healthy = true;
41+
self.last_error = None;
42+
self.status = healthy.status().to_string();
43+
}
44+
Health::Unhealthy(unhealthy) => {
45+
self.healthy = false;
46+
self.last_error = unhealthy.last_error().to_string().into();
47+
self.status = unhealthy.status().to_string();
48+
}
49+
}
4750
}
4851
}
4952

agent-control/src/agent_control/http_server/status_handler.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ mod tests {
5656
.with_sub_agents(sub_agents.into())
5757
.with_opamp(Url::try_from("http://127.0.0.1").unwrap());
5858

59-
st.agent_control.healthy(Healthy::default());
59+
st.agent_control.set_health(HealthWithStartTime::new(
60+
Healthy::default().into(),
61+
SystemTime::now(),
62+
));
6063
st.fleet.reachable();
6164

6265
let status = Arc::new(RwLock::new(st));
@@ -100,8 +103,12 @@ mod tests {
100103
.with_sub_agents(sub_agents.into())
101104
.with_opamp(Url::try_from("http://127.0.0.1").unwrap());
102105

103-
st.agent_control
104-
.unhealthy(Unhealthy::default().with_last_error("agent control error".to_string()));
106+
st.agent_control.set_health(HealthWithStartTime::new(
107+
Unhealthy::default()
108+
.with_last_error("agent control error".to_string())
109+
.into(),
110+
SystemTime::now(),
111+
));
105112
st.fleet.reachable();
106113

107114
let status = Arc::new(RwLock::new(st));

0 commit comments

Comments
 (0)