@@ -13,14 +13,14 @@ use crate::health::health_checker::{Health, Unhealthy};
1313use crate :: health:: with_start_time:: HealthWithStartTime ;
1414use crate :: opamp:: operations:: stop_opamp_client;
1515use crate :: opamp:: remote_config:: OpampRemoteConfig ;
16- use crate :: opamp:: remote_config:: hash:: ConfigState ;
16+ use crate :: opamp:: remote_config:: hash:: { ConfigState , Hash } ;
1717use crate :: opamp:: remote_config:: report:: report_state;
1818use crate :: sub_agent:: effective_agents_assembler:: { EffectiveAgent , EffectiveAgentsAssembler } ;
1919use crate :: sub_agent:: error:: { SubAgentBuilderError , SubAgentError , SupervisorCreationError } ;
2020use crate :: sub_agent:: event_handler:: on_health:: on_health;
2121use crate :: sub_agent:: event_handler:: on_version:: on_version;
2222use crate :: sub_agent:: identity:: AgentIdentity ;
23- use crate :: sub_agent:: remote_config_parser:: RemoteConfigParser ;
23+ use crate :: sub_agent:: remote_config_parser:: { RemoteConfigParser , RemoteConfigParserError } ;
2424use crate :: sub_agent:: supervisor:: starter:: { SupervisorStarter , SupervisorStarterError } ;
2525use crate :: sub_agent:: supervisor:: stopper:: SupervisorStopper ;
2626use crate :: utils:: threads:: spawn_named_thread;
@@ -60,6 +60,9 @@ pub trait SubAgentBuilder {
6060 ) -> Result < Self :: NotStartedSubAgent , SubAgentBuilderError > ;
6161}
6262
63+ type BuilderSupervisorStopper < B > =
64+ <<B as SupervisorBuilder >:: SupervisorStarter as SupervisorStarter >:: SupervisorStopper ;
65+
6366/// SubAgentStopper is implementing the StartedSubAgent trait.
6467///
6568/// It stores the runtime JoinHandle and a SubAgentInternalEvent publisher.
@@ -146,10 +149,7 @@ where
146149 /// Any failure to assemble the effective agent or the supervisor, or failure to start the
147150 /// supervisor will be mark the existing hash as failed and report the error if there's an
148151 /// OpAMP client present in the sub-agent.
149- fn init_supervisor (
150- & self ,
151- ) -> Option < <<B as SupervisorBuilder >:: SupervisorStarter as SupervisorStarter >:: SupervisorStopper >
152- {
152+ fn init_supervisor ( & self ) -> Option < BuilderSupervisorStopper < B > > {
153153 // An earlier run of Agent Control might have data for this agent identity, so we
154154 // attempt to retrieve an existing remote config,
155155 // falling back to a local config if there's no remote config.
@@ -364,39 +364,21 @@ where
364364 & self ,
365365 opamp_client : & C ,
366366 config : OpampRemoteConfig ,
367- old_supervisor : Option <
368- <<B as SupervisorBuilder >:: SupervisorStarter as SupervisorStarter >:: SupervisorStopper ,
369- > ,
370- ) -> Option < <<B as SupervisorBuilder >:: SupervisorStarter as SupervisorStarter >:: SupervisorStopper >
371- {
372- // We return early if hash is same as the stored and is not on status applying (processing
373- // was incomplete), it won't restart the supervisor but the status will be reported again.
367+ old_supervisor : Option < BuilderSupervisorStopper < B > > ,
368+ ) -> Option < BuilderSupervisorStopper < B > > {
369+ // If hash is same as the stored and is not on status applying (processing was incomplete),
370+ // the previous working supervisor will keep running but the status will be reported again.
374371 if let Ok ( Some ( rc) ) = self . config_repository . get_remote_config ( & self . identity . id ) {
375372 if config. hash == rc. hash && !rc. state . is_applying ( ) {
376373 let _ = report_state ( rc. state , rc. hash , opamp_client) ;
377374 return old_supervisor;
378375 }
379376 }
380377
381- // We return also early if the hash comes failed. This might happen if the pre-processing
382- // steps of the incoming remote config (performed in the OpAMP client callbacks, see
383- // `process_remote_config` in `opamp::callbacks`) fails for any reason.
384- if let Some ( error_message) = config. state . error_message ( ) . cloned ( ) {
385- warn ! (
386- hash = %config. hash,
387- "Remote configuration cannot be applied: {error_message}"
388- ) ;
389- // We report the status but we don't store the failed hash because
390- // the persisted remote and hash are the previous working one.
391- let _ = report_state (
392- ConfigState :: Failed { error_message } ,
393- config. hash ,
394- opamp_client,
395- ) ;
396- // We don't store this failed hash because we know this remote_config is not correct,
397- // and it has already been reported and cached by the OpAMP Client.
398- // The local or a previous remote (stored with its hash) configs will be used,
399- // keeping the old supervisor running
378+ // If the remote hash comes failed from the pre-processing steps (performed in the OpAMP
379+ // client callbacks, see `process_remote_config` in `opamp::callbacks`),
380+ // the previous working supervisor will keep running and the hash won't be updated.
381+ if Self :: check_and_report_config_failed ( opamp_client, & config) {
400382 return old_supervisor;
401383 }
402384
@@ -421,87 +403,131 @@ where
421403 }
422404 } ;
423405
424- if not_started_supervisor. is_ok ( ) {
425- let _ = opamp_client
426- . update_effective_config ( )
427- . inspect_err ( |e| error ! ( "Effective config update failed: {e}" ) ) ;
428- }
429-
430406 // Now, we should have either a Supervisor or an error to handle later,
431407 // which can come from either:
432408 // - a parse failure
433409 // - having empty values
434410 // - the EffectiveAgent assembly attempt
435411 // - the Supervisor assembly attempt
436- // Let's continue.
437- // Prepare remote config state to register outcome
438- let mut state = config. state ;
439- let refreshed_supervisor = match not_started_supervisor {
440- Ok ( new_supervisor) => {
441- // Stop old supervisor if any. This needs to happen before starting the new one
442- stop_supervisor ( old_supervisor) ;
443-
444- // Start the new supervisor
445- self . start_supervisor ( new_supervisor)
446- // Alter the state depending on the outcome
447- . inspect ( |_| {
448- state = ConfigState :: Applied ;
449- self . update_remote_config_state ( state. clone ( ) ) ;
450- } )
451- . inspect_err ( |e| {
452- state = ConfigState :: Failed {
453- error_message : e. to_string ( ) ,
454- } ;
455- self . update_remote_config_state ( state. clone ( ) ) ;
456- } )
457- // Return it
458- . ok ( )
459- }
460- // If we have no configuration, stop the old supervisor if any. Expected outcome.
412+ // We report the state and effective config and return a supervisor if it can be started or reused
413+ match not_started_supervisor {
414+ // If all correct, return new supervisor
415+ Ok ( new_supervisor) => self . start_new_supervisor_reporting_config_and_state (
416+ opamp_client,
417+ & config. hash ,
418+ old_supervisor,
419+ new_supervisor,
420+ ) ,
421+ // If we have no configuration, stop the old supervisor and return None.
461422 Err ( SupervisorCreationError :: NoConfiguration ) => {
462423 // Stop old supervisor if any
463424 stop_supervisor ( old_supervisor) ;
464- // Mark hash as applied
465- state = ConfigState :: Applied ;
466- // Remove supervisor
425+
426+ // Report the config as applied
427+ let _ = report_state ( ConfigState :: Applied , config. hash , opamp_client) ;
428+
429+ // The effective config needs to be reported with the empty config.
430+ let _ = opamp_client
431+ . update_effective_config ( )
432+ . inspect_err ( |e| error ! ( "Effective config update failed: {e}" ) ) ;
467433 None
468434 }
469435 Err ( e) => {
470- // If we fail to build the supervisor, we don't stop the old one and return it back
471436 warn ! ( "Failed to build supervisor: {e}" ) ;
472437
473438 // If the remote config was deleted but creating the supervisor from local failed
474- // the hash should be marked as applied,
475- if let Ok ( None ) = parsed_remote {
476- // Stop old supervisor if any. This needs to happen before starting the new one
439+ // stop the old supervisor and return None.
440+ if Self :: check_and_report_local_failed ( opamp_client, & config. hash , parsed_remote) {
477441 stop_supervisor ( old_supervisor) ;
442+ return None ;
443+ }
478444
479- // Report the empty remote config as applied
480- state = ConfigState :: Applied ;
481- // The effective config needs to be reported with the local config that failed
482- // to start the supervisor (not ideal but better than leaving the deleted remote),
483- // if not FC could still consider the previous remote that has just been deleted.
484- let _ = opamp_client
485- . update_effective_config ( )
486- . inspect_err ( |e| error ! ( "Effective config update failed: {e}" ) ) ;
487-
488- None
489- } else {
490- // Mark hash as failed
491- state = ConfigState :: Failed {
445+ let _ = report_state (
446+ ConfigState :: Failed {
492447 error_message : e. to_string ( ) ,
493- } ;
494- // Use existing supervisor
495- old_supervisor
496- }
448+ } ,
449+ config. hash ,
450+ opamp_client,
451+ ) ;
452+
453+ // If we fail to build the supervisor, we don't stop the old one and return it back
454+ old_supervisor
497455 }
498- } ;
456+ }
457+ }
458+
459+ fn check_and_report_local_failed (
460+ opamp_client : & C ,
461+ hash : & Hash ,
462+ parsed_remote : Result < Option < RemoteConfig > , RemoteConfigParserError > ,
463+ ) -> bool {
464+ if let Ok ( None ) = parsed_remote {
465+ // Report the empty remote config as applied
466+ let _ = report_state ( ConfigState :: Applied , hash. clone ( ) , opamp_client) ;
499467
500- // We report the config status
501- let _ = report_state ( state, config. hash , opamp_client) ;
468+ // The effective config needs to be reported with the local config that failed
469+ // to start the supervisor (not ideal but better than leaving the deleted remote),
470+ // if not FC could still consider the previous remote that has just been deleted.
471+ let _ = opamp_client
472+ . update_effective_config ( )
473+ . inspect_err ( |e| error ! ( "Effective config update failed: {e}" ) ) ;
502474
503- // With everything already handled, return the supervisor if any
504- refreshed_supervisor
475+ return true ;
476+ }
477+ false
478+ }
479+
480+ fn start_new_supervisor_reporting_config_and_state (
481+ & self ,
482+ opamp_client : & C ,
483+ hash : & Hash ,
484+ old_supervisor : Option < BuilderSupervisorStopper < B > > ,
485+ new_supervisor : <B as SupervisorBuilder >:: SupervisorStarter ,
486+ ) -> Option < BuilderSupervisorStopper < B > > {
487+ let _ = opamp_client
488+ . update_effective_config ( )
489+ . inspect_err ( |e| error ! ( "Effective config update failed: {e}" ) ) ;
490+
491+ // Stop old supervisor if any. This needs to happen before starting the new one
492+ stop_supervisor ( old_supervisor) ;
493+
494+ // Start the new supervisor
495+ self . start_supervisor ( new_supervisor)
496+ // Alter the state depending on the outcome
497+ . inspect ( |_| {
498+ self . update_remote_config_state ( ConfigState :: Applied ) ;
499+ // Report the empty remote config as applied
500+ let _ = report_state ( ConfigState :: Applied , hash. clone ( ) , opamp_client) ;
501+ } )
502+ . inspect_err ( |e| {
503+ let state = ConfigState :: Failed {
504+ error_message : e. to_string ( ) ,
505+ } ;
506+ self . update_remote_config_state ( state. clone ( ) ) ;
507+ // Report the empty remote config as applied
508+ let _ = report_state ( ConfigState :: Applied , hash. clone ( ) , opamp_client) ;
509+ } )
510+ // Return it
511+ . ok ( )
512+ }
513+
514+ // check_and_report_config_failed returns true if the config is failed and reports that state
515+ fn check_and_report_config_failed ( opamp_client : & C , config : & OpampRemoteConfig ) -> bool {
516+ if let Some ( error_message) = config. state . error_message ( ) . cloned ( ) {
517+ warn ! (
518+ hash = %config. hash,
519+ "Remote configuration cannot be applied: {error_message}"
520+ ) ;
521+ // Failed configurations are reported but not persisted.
522+ let _ = report_state (
523+ ConfigState :: Failed { error_message } ,
524+ config. hash . clone ( ) ,
525+ opamp_client,
526+ ) ;
527+
528+ return true ;
529+ }
530+ false
505531 }
506532
507533 /// Parses incoming remote config, assembles and builds the supervisor.
@@ -1231,6 +1257,7 @@ deployment:
12311257 . unwrap ( ) ;
12321258
12331259 let supervisor_builder = expect_supervisor_do_not_build ( ) ;
1260+ opamp_client. should_update_effective_config ( 1 ) ;
12341261 opamp_client. should_set_remote_config_status_seq ( vec ! [
12351262 TestAgent :: status_applying( ) ,
12361263 TestAgent :: status_applied( ) ,
0 commit comments