Skip to content

Commit 2fbfcd7

Browse files
committed
feat(wip): rework impementation
1 parent 21b7be4 commit 2fbfcd7

File tree

4 files changed

+48
-36
lines changed

4 files changed

+48
-36
lines changed

agent-control/src/sub_agent/k8s/new_supervisor.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
sub_agent::{
88
effective_agents_assembler::EffectiveAgent,
99
k8s::supervisor::{NotStartedSupervisorK8s, StartedSupervisorK8s},
10-
supervisor::{Supervisor, SupervisorStarter, starter::SupervisorStarterError},
10+
supervisor::{ApplyError, Supervisor, SupervisorStarter, starter::SupervisorStarterError},
1111
},
1212
utils::thread_context::{ThreadCollectionStopperExt, ThreadContextStopperError},
1313
};
@@ -53,52 +53,64 @@ impl SupervisorStarter for NotStartedSupervisorK8s {
5353
}
5454

5555
impl Supervisor for StartedSupervisorK8s {
56-
type ApplyError = SupervisorStarterError;
5756
type StopError = ThreadContextStopperError;
5857

59-
fn apply(self, effective_agent: EffectiveAgent) -> Result<Self, Self::ApplyError> {
58+
fn apply(self, effective_agent: EffectiveAgent) -> Result<Self, ApplyError<Self>> {
6059
let new_k8s_config = effective_agent
6160
.get_k8s_config()
62-
.map_err(|e| SupervisorStarterError::ConfigError(e.to_string()))?;
61+
.map_err(|e| SupervisorStarterError::ConfigError(e.to_string()));
6362

64-
if &self.k8s_config == new_k8s_config {
63+
if let Ok(cfg) = new_k8s_config
64+
&& cfg == &self.k8s_config
65+
{
6566
// No changes, return same supervisor
6667
return Ok(self);
6768
}
6869

69-
// Reuse started supervisor's contents
70-
let Self {
71-
thread_contexts,
72-
agent_identity,
73-
k8s_client,
74-
sub_agent_internal_publisher,
75-
..
76-
} = self;
77-
7870
debug!(
79-
agent_id = %agent_identity.id,
71+
agent_id = %self.agent_identity.id,
8072
"Applying new configuration to K8s supervisor"
8173
);
8274

83-
// Attach the agent id info to the logs emitted by `thread_contexts.stop()`.
84-
let span = info_span!("stopping_supervisor", agent_id = %agent_identity.id);
85-
let stop_threads_result = span.in_scope(|| thread_contexts.stop());
86-
if let Err(e) = stop_threads_result {
87-
warn!(agent_id = %agent_identity.id, "Errors stopping supervisor threads: {e}");
88-
}
89-
9075
// A new non-started supervisor to build dynamic objects from the new config
91-
let temp_starter = NotStartedSupervisorK8s::new(
92-
agent_identity,
93-
k8s_client.clone(),
94-
new_k8s_config.clone(),
95-
);
96-
let resources = temp_starter.build_dynamic_objects()?;
97-
98-
// Apply resources directly
99-
Self::apply_resources(resources.iter(), &k8s_client)?;
100-
101-
SupervisorStarter::start(temp_starter, sub_agent_internal_publisher)
76+
let temp_starter = new_k8s_config.and_then(|new_k8s_config| {
77+
Ok(NotStartedSupervisorK8s::new(
78+
self.agent_identity.clone(),
79+
self.k8s_client.clone(),
80+
new_k8s_config.clone(),
81+
))
82+
});
83+
84+
let resources = temp_starter
85+
.clone() // try to remove, need a reference to `temp_starter`
86+
.and_then(|temp_starter| temp_starter.build_dynamic_objects())
87+
.and_then(|res| Self::apply_resources(res.iter(), &self.k8s_client.clone()));
88+
89+
let new_supervisor = temp_starter.and_then(|temp_starter| {
90+
SupervisorStarter::start(temp_starter, self.sub_agent_internal_publisher.clone())
91+
});
92+
93+
match new_supervisor {
94+
Ok(supervisor) => {
95+
// Stop old supervisor
96+
let Self {
97+
thread_contexts,
98+
agent_identity,
99+
..
100+
} = self;
101+
// Attach the agent id info to the logs emitted by `thread_contexts.stop()`.
102+
let span = info_span!("stopping_supervisor", agent_id = %agent_identity.id);
103+
let stop_threads_result = span.in_scope(|| thread_contexts.stop());
104+
if let Err(e) = stop_threads_result {
105+
warn!(agent_id = %agent_identity.id, "Errors stopping supervisor threads: {e}");
106+
}
107+
Ok(supervisor)
108+
}
109+
Err(e) => Err(ApplyError {
110+
reason: e.to_string(),
111+
supervisor: self,
112+
}),
113+
}
102114
}
103115

104116
fn stop(self) -> Result<(), Self::StopError> {

agent-control/src/sub_agent/k8s/supervisor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use tracing::{debug, info, info_span, trace, warn};
3232
const OBJECTS_SUPERVISOR_INTERVAL_SECONDS: u64 = 30;
3333
const SUPERVISOR_THREAD_NAME: &str = "supervisor";
3434

35+
#[derive(Debug, Clone)]
3536
pub struct NotStartedSupervisorK8s {
3637
pub(super) agent_identity: AgentIdentity,
3738
pub(super) k8s_client: Arc<SyncK8sClient>,

agent-control/src/sub_agent/supervisor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,9 @@ pub mod tests {
191191
mock! {
192192
pub Supervisor {}
193193
impl Supervisor for Supervisor {
194-
type ApplyError = MockError;
195194
type StopError = MockError;
196195

197-
fn apply(self, effective_agent: EffectiveAgent) -> Result<Self, <Self as Supervisor>::ApplyError>;
196+
fn apply(self, effective_agent: EffectiveAgent) -> Result<Self, ApplyError<Self>>;
198197
fn stop(self) -> Result<(), <Self as Supervisor>::StopError>;
199198
}
200199
}

agent-control/src/sub_agent/supervisor/starter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::sub_agent::error::SubAgentBuilderError;
66
use crate::sub_agent::supervisor::stopper::SupervisorStopper;
77
use thiserror::Error;
88

9-
#[derive(Debug, Error)]
9+
#[derive(Debug, Error, Clone)]
1010
pub enum SupervisorStarterError {
1111
#[error("the kube client returned an error: {0}")]
1212
Generic(#[from] crate::k8s::error::K8sError),

0 commit comments

Comments
 (0)