Skip to content

Commit c234763

Browse files
fix: memory leak replacing long lived spans (#1281)
* fix: replace long lived spans * docs * review * activate self inst on canaries * test cluster * use agent_id * unify span guard
1 parent 20dd216 commit c234763

File tree

15 files changed

+158
-36
lines changed

15 files changed

+158
-36
lines changed

agent-control/src/agent_control/agent_id.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ pub(crate) mod tests {
9292
use crate::agent_control::agent_id::AgentID;
9393
use crate::agent_control::defaults::AGENT_CONTROL_ID;
9494

95+
impl Default for AgentID {
96+
fn default() -> Self {
97+
AgentID::new("default").unwrap()
98+
}
99+
}
100+
95101
#[test]
96102
fn agent_control_id() {
97103
let agent_id = AgentID::new_agent_control_id();

agent-control/src/opamp/operations.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ pub fn stop_opamp_client<C: StartedClient>(
117117
"Stopping OpAMP client for supervised agent type: {}",
118118
agent_id
119119
);
120-
// TODO We should call disconnect here as this means a graceful shutdown
121120
client.stop()?;
122121
}
123122
Ok(())

agent-control/src/sub_agent/health/health_checker.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
use super::with_start_time::StartTime;
2+
use crate::agent_control::agent_id::AgentID;
23
use crate::agent_type::runtime_config::HealthCheckInterval;
34
use crate::event::SubAgentInternalEvent;
45
use crate::event::cancellation::CancellationMessage;
56
use crate::event::channel::{EventConsumer, EventPublisher};
67

78
use crate::k8s;
89
use crate::sub_agent::health::with_start_time::HealthWithStartTime;
10+
use crate::sub_agent::identity::ID_ATTRIBUTE_NAME;
911
use crate::sub_agent::supervisor::starter::SupervisorStarterError;
1012
use crate::utils::thread_context::{NotStartedThreadContext, StartedThreadContext};
1113
use std::time::{SystemTime, SystemTimeError};
12-
use tracing::{debug, error};
14+
use tracing::{debug, error, info_span};
1315

14-
const HEALTH_CHECKER_THREAD_NAME: &str = "health checker";
16+
const HEALTH_CHECKER_THREAD_NAME: &str = "health_checker";
1517

1618
pub type StatusTime = SystemTime;
1719

@@ -212,6 +214,7 @@ pub trait HealthChecker {
212214
}
213215

214216
pub(crate) fn spawn_health_checker<H>(
217+
agent_id: AgentID,
215218
health_checker: H,
216219
sub_agent_internal_publisher: EventPublisher<SubAgentInternalEvent>,
217220
interval: HealthCheckInterval,
@@ -221,6 +224,12 @@ where
221224
H: HealthChecker + Send + 'static,
222225
{
223226
let callback = move |stop_consumer: EventConsumer<CancellationMessage>| loop {
227+
let span = info_span!(
228+
"health_check",
229+
{ ID_ATTRIBUTE_NAME } = %agent_id
230+
);
231+
let _guard = span.enter();
232+
224233
debug!("starting to check health with the configured checker");
225234

226235
let health = health_checker.check_health().unwrap_or_else(|err| {
@@ -378,6 +387,7 @@ pub mod tests {
378387
});
379388

380389
let started_thread_context = spawn_health_checker(
390+
AgentID::default(),
381391
health_checker,
382392
health_publisher,
383393
Duration::from_millis(10).into(), // Give room to publish and consume the events
@@ -441,6 +451,7 @@ pub mod tests {
441451
});
442452

443453
let started_thread_context = spawn_health_checker(
454+
AgentID::default(),
444455
health_checker,
445456
health_publisher,
446457
Duration::from_millis(10).into(), // Give room to publish and consume the events
@@ -499,6 +510,7 @@ pub mod tests {
499510
let start_time = SystemTime::now();
500511

501512
let started_thread_context = spawn_health_checker(
513+
AgentID::default(),
502514
health_checker,
503515
health_publisher,
504516
Duration::from_millis(10).into(), // Give room to publish and consume the events

agent-control/src/sub_agent/identity.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use crate::agent_control::defaults::{
55
use crate::agent_type::agent_type_id::AgentTypeID;
66
use std::fmt::{Display, Formatter};
77

8+
pub const ID_ATTRIBUTE_NAME: &str = "id";
9+
810
// This could be SubAgentIdentity
911
#[derive(Clone, Debug, PartialEq)]
1012
pub struct AgentIdentity {

agent-control/src/sub_agent/k8s/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ where
8585
{
8686
type NotStartedSubAgent = SubAgent<O::Client, SA, R, H, Y>;
8787

88-
#[instrument(skip_all, fields(%agent_identity),name = "build_sub_agent")]
88+
#[instrument(skip_all, fields(id = %agent_identity.id),name = "build_agent")]
8989
fn build(
9090
&self,
9191
agent_identity: &AgentIdentity,

agent-control/src/sub_agent/k8s/supervisor.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::k8s::labels::Labels;
1111
use crate::sub_agent::health::health_checker::spawn_health_checker;
1212
use crate::sub_agent::health::k8s::health_checker::SubAgentHealthChecker;
1313
use crate::sub_agent::health::with_start_time::StartTime;
14-
use crate::sub_agent::identity::AgentIdentity;
14+
use crate::sub_agent::identity::{AgentIdentity, ID_ATTRIBUTE_NAME};
1515
use crate::sub_agent::supervisor::starter::{SupervisorStarter, SupervisorStarterError};
1616
use crate::sub_agent::supervisor::stopper::SupervisorStopper;
1717
use crate::sub_agent::version::k8s::checkers::K8sAgentVersionChecker;
@@ -24,10 +24,10 @@ use k8s_openapi::serde_json;
2424
use kube::{api::DynamicObject, core::TypeMeta};
2525
use std::sync::Arc;
2626
use std::time::Duration;
27-
use tracing::{debug, error, info, trace, warn};
27+
use tracing::{debug, error, info, info_span, trace, warn};
2828

2929
const OBJECTS_SUPERVISOR_INTERVAL_SECONDS: u64 = 30;
30-
const SUPERVISOR_THREAD_NAME: &str = "k8s objects supervisor";
30+
const SUPERVISOR_THREAD_NAME: &str = "supervisor";
3131

3232
pub struct NotStartedSupervisorK8s {
3333
agent_identity: AgentIdentity,
@@ -126,7 +126,15 @@ impl NotStartedSupervisorK8s {
126126
) -> StartedThreadContext {
127127
let k8s_client = self.k8s_client.clone();
128128
let interval = self.interval;
129+
let agent_id = self.agent_identity.id.clone();
130+
129131
let callback = move |stop_consumer: EventConsumer<CancellationMessage>| loop {
132+
let span = info_span!(
133+
"reconcile_resources",
134+
{ ID_ATTRIBUTE_NAME } = %agent_id
135+
);
136+
let _guard = span.enter();
137+
130138
// Check and apply k8s objects
131139
if let Err(err) = Self::apply_resources(resources.iter(), k8s_client.clone()) {
132140
warn!(%err, "K8s resources apply failed");
@@ -161,6 +169,7 @@ impl NotStartedSupervisorK8s {
161169
};
162170

163171
let started_thread_context = spawn_health_checker(
172+
self.agent_identity.id.clone(),
164173
k8s_health_checker,
165174
sub_agent_internal_publisher,
166175
health_config.interval,
@@ -182,6 +191,7 @@ impl NotStartedSupervisorK8s {
182191
)?;
183192

184193
Some(spawn_version_checker(
194+
self.agent_identity.id.clone(),
185195
k8s_version_checker,
186196
sub_agent_internal_publisher,
187197
VersionCheckerInterval::default(),

agent-control/src/sub_agent/on_host/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ where
8181
{
8282
type NotStartedSubAgent = SubAgent<O::Client, SA, R, H, Y>;
8383

84-
#[instrument(skip_all, fields(%agent_identity),name = "build_sub_agent")]
84+
#[instrument(skip_all, fields(id = %agent_identity.id),name = "build_agent")]
8585
fn build(
8686
&self,
8787
agent_identity: &AgentIdentity,

agent-control/src/sub_agent/on_host/supervisor.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::sub_agent::health::health_checker::{
1212
use crate::sub_agent::health::health_checker::{Healthy, Unhealthy};
1313
use crate::sub_agent::health::on_host::health_checker::OnHostHealthChecker;
1414
use crate::sub_agent::health::with_start_time::{HealthWithStartTime, StartTime};
15-
use crate::sub_agent::identity::AgentIdentity;
15+
use crate::sub_agent::identity::{AgentIdentity, ID_ATTRIBUTE_NAME};
1616
use crate::sub_agent::on_host::command::command::CommandError;
1717
use crate::sub_agent::on_host::command::command_os::CommandOSNotStarted;
1818
use crate::sub_agent::on_host::command::executable_data::ExecutableData;
@@ -140,6 +140,7 @@ impl NotStartedSupervisorOnHost {
140140
let health_checker =
141141
OnHostHealthChecker::try_new(http_client, health_config.clone(), start_time)?;
142142
let started_thread_context = spawn_health_checker(
143+
self.agent_identity.id.clone(),
143144
health_checker,
144145
sub_agent_internal_publisher,
145146
health_config.interval,
@@ -159,6 +160,7 @@ impl NotStartedSupervisorOnHost {
159160
OnHostAgentVersionChecker::checked_new(self.agent_identity.agent_type_id.clone())?;
160161

161162
Some(spawn_version_checker(
163+
self.agent_identity.id.clone(),
162164
onhost_version_checker,
163165
sub_agent_internal_publisher,
164166
VersionCheckerInterval::default(),
@@ -176,10 +178,16 @@ impl NotStartedSupervisorOnHost {
176178
_ = wait_for_termination(current_pid.clone(), self.ctx.clone(), shutdown_ctx.clone());
177179

178180
let executable_data_clone = executable_data.clone();
181+
let agent_id = self.agent_identity.id.clone();
179182
// NotStartedThreadContext takes as input a callback that requires a EventConsumer<CancellationMessage>
180183
// as input. In that specific case it's not used, but we need to pass it to comply with the signature.
181184
// This should be refactored to work as the other threads used by the supervisor.
182185
let callback = move |_| loop {
186+
let span = info_span!(
187+
"start_executable",
188+
{ ID_ATTRIBUTE_NAME } = %agent_id
189+
);
190+
let span_guard = span.enter();
183191
// locks the current_pid to prevent `wait_for_termination` finishing before the process
184192
// is started and the pid is set.
185193
// In case starting the process fail the guard will be dropped and `wait_for_termination`
@@ -189,17 +197,11 @@ impl NotStartedSupervisorOnHost {
189197
// A context cancelled means that the supervisor has been gracefully stopped
190198
// before the process was started.
191199
if *Context::get_lock_cvar(&self.ctx).0.lock().unwrap() {
192-
debug!(
193-
supervisor = executable_data_clone.bin,
194-
msg = "supervisor stopped before starting the process"
195-
);
200+
debug!("supervisor stopped before starting the process");
196201
break;
197202
}
198203

199-
info!(
200-
supervisor = executable_data_clone.bin,
201-
msg = "starting supervisor process"
202-
);
204+
info!("starting supervisor process");
203205

204206
shutdown_ctx.reset().unwrap();
205207
// Signals return exit_code 0, if in the future we need to act on them we can import
@@ -215,7 +217,14 @@ impl NotStartedSupervisorOnHost {
215217
HealthWithStartTime::new(init_health.into(), supervisor_start_time).into(),
216218
);
217219

218-
let exit_code = start_command(not_started_command, pid_guard)
220+
let command_result = start_command(not_started_command, pid_guard, span_guard);
221+
let span = info_span!(
222+
"stop_executable",
223+
{ ID_ATTRIBUTE_NAME } = %agent_id
224+
);
225+
let _span_guard = span.enter();
226+
227+
let exit_code = command_result
219228
.inspect_err(|err| {
220229
error!(
221230
supervisor = executable_data_clone.bin,
@@ -340,6 +349,7 @@ fn handle_termination(
340349
fn start_command(
341350
not_started_command: CommandOSNotStarted,
342351
mut pid: std::sync::MutexGuard<Option<u32>>,
352+
span_guard: tracing::span::Entered<'_>,
343353
) -> Result<ExitStatus, CommandError> {
344354
// run and stream the process
345355
let started = not_started_command.start()?;
@@ -351,6 +361,8 @@ fn start_command(
351361
// free the lock so the wait_for_termination can lock it on graceful shutdown
352362
drop(pid);
353363

364+
drop(span_guard);
365+
354366
streaming.wait()
355367
}
356368

@@ -361,11 +373,11 @@ fn wait_for_termination(
361373
ctx: Context<bool>,
362374
shutdown_ctx: Context<bool>,
363375
) -> JoinHandle<()> {
364-
let s = info_span!("termination_signal_listener");
376+
let span = info_span!("termination_signal");
365377
spawn_named_thread("OnHost Termination signal listener", move || {
366-
let _guards = s.enter();
367378
let (lck, cvar) = Context::get_lock_cvar(&ctx);
368379
drop(cvar.wait_while(lck.lock().unwrap(), |finish| !*finish));
380+
let _span_guard = span.enter();
369381

370382
// context is unlocked here so locking it again in other thread that is blocking current_pid is safe.
371383
if let Some(pid) = *current_pid.lock().unwrap() {

agent-control/src/sub_agent/sub_agent.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ where
129129
}
130130

131131
pub fn runtime(self) -> JoinHandle<Result<(), SubAgentError>> {
132-
let s = info_span!("agent", id=%self.identity.id);
133132
spawn_named_thread("Subagent runtime", move || {
134-
let _guards = s.enter();
133+
let span = info_span!("start_agent", id=%self.identity.id);
134+
let _span_guard = span.enter();
135135

136136
let mut supervisor = self.assemble_and_start_supervisor();
137137
// Stores the current health state for logging purposes.
@@ -142,7 +142,9 @@ where
142142
.publish(SubAgentStarted(self.identity.clone(),SystemTime::now()))
143143
.inspect_err(|err| error!(error_msg = %err,"Cannot publish sub_agent_event::sub_agent_started"));
144144

145-
Option::as_ref(&self.maybe_opamp_client).map(|client| client.update_effective_config());
145+
self.maybe_opamp_client
146+
.as_ref()
147+
.map(|client| client.update_effective_config());
146148

147149
// The below two lines are used to create a channel that never receives any message
148150
// if the sub_agent_opamp_consumer is None. Thus, we avoid erroring if there is no
@@ -166,11 +168,15 @@ where
166168
let _ = uptime_reporter.report();
167169
}
168170

171+
drop(_span_guard);
172+
169173
// Count the received remote configs during execution
170174
let mut remote_config_count = 0;
171175
loop {
172176
select! {
173177
recv(opamp_receiver.as_ref()) -> opamp_event_res => {
178+
let span = info_span!("process_fleet_event", id=%self.identity.id);
179+
let _span_guard = span.enter();
174180
match opamp_event_res {
175181
Err(e) => {
176182
debug!(error = %e, select_arm = "sub_agent_opamp_consumer", "Channel closed");
@@ -220,6 +226,8 @@ where
220226
}
221227
},
222228
recv(&self.sub_agent_internal_consumer.as_ref()) -> sub_agent_internal_event_res => {
229+
let span = info_span!("process_event", id=%self.identity.id);
230+
let _span_guard = span.enter();
223231
match sub_agent_internal_event_res {
224232
Err(e) => {
225233
debug!(error = %e, select_arm = "sub_agent_internal_consumer", "Channel closed");

agent-control/src/sub_agent/version/version_checker.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
use crate::agent_control::agent_id::AgentID;
12
use crate::agent_type::version_config::VersionCheckerInterval;
23
use crate::event::SubAgentInternalEvent;
34
use crate::event::cancellation::CancellationMessage;
45
use crate::event::channel::{EventConsumer, EventPublisher};
6+
use crate::sub_agent::identity::ID_ATTRIBUTE_NAME;
57
use crate::utils::thread_context::{NotStartedThreadContext, StartedThreadContext};
6-
use tracing::{debug, error, info, warn};
8+
use tracing::{debug, error, info, info_span, warn};
79

8-
const VERSION_CHECKER_THREAD_NAME: &str = "version checker";
10+
const VERSION_CHECKER_THREAD_NAME: &str = "version_checker";
911

1012
pub trait VersionChecker {
1113
/// Use it to report the agent version for the opamp client
@@ -43,6 +45,7 @@ pub enum VersionCheckError {
4345
}
4446

4547
pub(crate) fn spawn_version_checker<V>(
48+
agent_id: AgentID,
4649
version_checker: V,
4750
sub_agent_internal_publisher: EventPublisher<SubAgentInternalEvent>,
4851
interval: VersionCheckerInterval,
@@ -53,6 +56,12 @@ where
5356
// Stores if the version was retrieved in last iteration for logging purposes.
5457
let mut version_retrieved = false;
5558
let callback = move |stop_consumer: EventConsumer<CancellationMessage>| loop {
59+
let span = info_span!(
60+
"version_check",
61+
{ ID_ATTRIBUTE_NAME } = %agent_id
62+
);
63+
let _guard = span.enter();
64+
5665
debug!("starting to check version with the configured checker");
5766

5867
match version_checker.check_agent_version() {
@@ -99,6 +108,7 @@ pub(crate) fn publish_version_event(
99108

100109
#[cfg(test)]
101110
pub mod tests {
111+
use crate::agent_control::agent_id::AgentID;
102112
use crate::agent_control::defaults::OPAMP_CHART_VERSION_ATTRIBUTE_KEY;
103113
use crate::event::SubAgentInternalEvent::AgentVersionInfo;
104114
use crate::event::channel::pub_sub;
@@ -142,6 +152,7 @@ pub mod tests {
142152
});
143153

144154
let started_thread_context = spawn_version_checker(
155+
AgentID::default(),
145156
version_checker,
146157
version_publisher,
147158
Duration::from_millis(10).into(),

0 commit comments

Comments
 (0)