Skip to content

Commit a4704e8

Browse files
Forward migrate action to endpoint (#9801)
1 parent cb9c826 commit a4704e8

File tree

7 files changed

+270
-23
lines changed

7 files changed

+270
-23
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_migrate.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import (
2525
const ()
2626

2727
type migrateCoordinator interface {
28-
Migrate(_ context.Context, _ *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff) error
28+
actionCoordinator
29+
30+
Migrate(_ context.Context, _ *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff, _ func(context.Context, *fleetapi.ActionMigrate) error) error
2931
ReExec(callback reexec.ShutdownCallbackFn, argOverrides ...string)
3032
Protection() protection.Config
3133
}
@@ -90,7 +92,7 @@ func (h *Migrate) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
9092

9193
action.Data.EnrollmentToken = enrollmentToken
9294

93-
if err := h.coord.Migrate(ctx, action, fleetgateway.RequestBackoff); err != nil {
95+
if err := h.coord.Migrate(ctx, action, fleetgateway.RequestBackoff, h.notifyComponents); err != nil {
9496
// this should not happen, unmanaged agent should not receive the action
9597
// defensive coding to avoid misbehavior
9698
if errors.Is(err, coordinator.ErrNotManaged) {
@@ -112,6 +114,22 @@ func (h *Migrate) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
112114
return nil
113115
}
114116

117+
func (h *Migrate) notifyComponents(ctx context.Context, migrateAction *fleetapi.ActionMigrate) error {
118+
state := h.coord.State()
119+
ucs := findMatchingUnitsByActionType(state, fleetapi.ActionTypeMigrate)
120+
if len(ucs) > 0 {
121+
err := notifyUnitsOfProxiedAction(ctx, h.log, migrateAction, ucs, h.coord.PerformAction)
122+
if err != nil {
123+
return err
124+
}
125+
} else {
126+
// Log and continue
127+
h.log.Debugf("No components running for %v action type", fleetapi.ActionTypeMigrate)
128+
}
129+
130+
return nil
131+
}
132+
115133
func (h *Migrate) ackFailure(ctx context.Context, err error, action *fleetapi.ActionMigrate, acker acker.Acker) {
116134
action.Err = err
117135

internal/pkg/agent/application/actions/handlers/handler_action_migrate_test.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/elastic/elastic-agent/internal/pkg/agent/protection"
2323
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
2424
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
25+
"github.com/elastic/elastic-agent/pkg/component"
2526
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
2627
mockinfo "github.com/elastic/elastic-agent/testing/mocks/internal_/pkg/agent/application/info"
2728
)
@@ -38,7 +39,7 @@ func TestActionMigratelHandler(t *testing.T) {
3839
ack.On("Commit", t.Context()).Return(nil)
3940

4041
coord := &fakeMigrateCoordinator{}
41-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
42+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
4243
coord.On("ReExec", mock.Anything, mock.Anything)
4344
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
4445

@@ -77,7 +78,8 @@ func TestActionMigratelHandler(t *testing.T) {
7778
ack.On("Commit", t.Context()).Return(nil)
7879

7980
coord := &fakeMigrateCoordinator{}
80-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
81+
coord.On("State").Return(coordinator.State{})
82+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
8183
coord.On("ReExec", mock.Anything, mock.Anything)
8284
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil, Enabled: tc.protectionEnabled})
8385

@@ -114,7 +116,8 @@ func TestActionMigratelHandler(t *testing.T) {
114116
ack.On("Commit", t.Context()).Return(nil)
115117

116118
coord := &fakeMigrateCoordinator{}
117-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
119+
coord.On("State").Return(coordinator.State{})
120+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
118121
coord.On("ReExec", mock.Anything, mock.Anything)
119122
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
120123

@@ -163,7 +166,8 @@ func TestActionMigratelHandler(t *testing.T) {
163166
ack.On("Commit", t.Context()).Return(nil)
164167

165168
coord := &fakeMigrateCoordinator{}
166-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
169+
coord.On("State").Return(coordinator.State{})
170+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
167171
coord.On("ReExec", mock.Anything, mock.Anything)
168172
coord.On("Protection").Return(protection.Config{SignatureValidationKey: signatureValidationKey})
169173

@@ -199,7 +203,7 @@ func TestActionMigratelHandler(t *testing.T) {
199203
ack.On("Commit", t.Context()).Return(nil)
200204

201205
coord := &fakeMigrateCoordinator{}
202-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
206+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
203207
coord.On("ReExec", mock.Anything, mock.Anything)
204208
coord.On("Protection").Return(protection.Config{SignatureValidationKey: signatureValidationKey})
205209

@@ -248,7 +252,8 @@ func TestActionMigratelHandler(t *testing.T) {
248252
ack.On("Commit", t.Context()).Return(nil)
249253

250254
coord := &fakeMigrateCoordinator{}
251-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
255+
coord.On("State").Return(coordinator.State{})
256+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
252257
coord.On("ReExec", mock.Anything, mock.Anything)
253258
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
254259

@@ -300,7 +305,7 @@ func TestActionMigratelHandler(t *testing.T) {
300305
ack.On("Commit", t.Context()).Return(nil)
301306

302307
coord := &fakeMigrateCoordinator{}
303-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
308+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
304309
coord.On("ReExec", mock.Anything, mock.Anything)
305310
coord.On("Protection").Return(protection.Config{SignatureValidationKey: signatureValidationKey})
306311

@@ -322,7 +327,8 @@ func TestActionMigratelHandler(t *testing.T) {
322327
ack.On("Commit", t.Context()).Return(nil)
323328

324329
coord := &fakeMigrateCoordinator{}
325-
coord.On("Migrate", mock.Anything, mock.Anything).Return(coordinator.ErrFleetServer)
330+
coord.On("State").Return(coordinator.State{})
331+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(coordinator.ErrFleetServer)
326332
coord.On("ReExec", mock.Anything, mock.Anything)
327333
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
328334

@@ -343,11 +349,21 @@ type fakeMigrateCoordinator struct {
343349
mock.Mock
344350
}
345351

346-
func (f *fakeMigrateCoordinator) Migrate(ctx context.Context, a *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff) error {
347-
args := f.Called(ctx, a)
352+
func (f *fakeMigrateCoordinator) Migrate(ctx context.Context, a *fleetapi.ActionMigrate, b func(done <-chan struct{}) backoff.Backoff, n func(context.Context, *fleetapi.ActionMigrate) error) error {
353+
args := f.Called(ctx, a, b, n)
348354
return args.Error(0)
349355
}
350356

357+
func (f *fakeMigrateCoordinator) State() coordinator.State {
358+
args := f.Called()
359+
return args.Get(0).(coordinator.State)
360+
}
361+
362+
func (f *fakeMigrateCoordinator) PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) {
363+
args := f.Called(ctx, comp, unit, name, params)
364+
return args.Get(0).(map[string]interface{}), args.Error(1)
365+
}
366+
351367
func (f *fakeMigrateCoordinator) ReExec(callback reexec.ShutdownCallbackFn, argOverrides ...string) {
352368
f.Called(callback, argOverrides)
353369
}

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"reflect"
1212
"strings"
13+
"sync"
1314
"sync/atomic"
1415
"time"
1516

@@ -361,6 +362,11 @@ type Coordinator struct {
361362

362363
// Abstraction for diagnostics AddSecretMarkers function for testability
363364
secretMarkerFunc func(*logger.Logger, *config.Config) error
365+
366+
// migrationProgressWg is used to block processing of incoming policies after enroll is done
367+
// incomming policies are blocked until we reboot so components receiving proxied MIGRATE action
368+
// are not confused
369+
migrationProgressWg sync.WaitGroup
364370
}
365371

366372
// The channels Coordinator reads to receive updates from the various managers.
@@ -596,7 +602,12 @@ func (c *Coordinator) ReExec(callback reexec.ShutdownCallbackFn, argOverrides ..
596602

597603
// Migrate migrates agent to a new cluster and ACKs success to the old one.
598604
// In case of failure no ack is performed and error is returned.
599-
func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrate, backoffFactory func(done <-chan struct{}) backoff.Backoff) error {
605+
func (c *Coordinator) Migrate(
606+
ctx context.Context,
607+
action *fleetapi.ActionMigrate,
608+
backoffFactory func(done <-chan struct{}) backoff.Backoff,
609+
notifyFn func(context.Context, *fleetapi.ActionMigrate) error,
610+
) error {
600611
if !c.isManaged {
601612
return ErrNotManaged
602613
}
@@ -666,6 +677,24 @@ func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrat
666677
return errors.Join(fmt.Errorf("failed to enroll: %w", err), restoreErr)
667678
}
668679

680+
// lock processing of new config before notifying components
681+
// hold lock until notification failure or reexec
682+
c.migrationProgressWg.Add(1)
683+
if notifyFn != nil {
684+
// notify before completing migration
685+
// components such endpoint are crucial to work even though it's on stale cluster
686+
// error on component side is returned as part of Action response
687+
if err := notifyFn(ctx, action); err != nil {
688+
restoreErr := RestoreConfig()
689+
690+
// in case of failure no need to lock processing
691+
// safe to forward policy from source cluster
692+
c.migrationProgressWg.Done()
693+
694+
return errors.Join(fmt.Errorf("failed to notify components: %w", err), restoreErr)
695+
}
696+
}
697+
669698
// ACK success to source fleet server
670699
if err := c.ackMigration(ctx, action, c.fleetAcker); err != nil {
671700
c.logger.Warnf("failed to ACK success: %v", err)
@@ -677,23 +706,30 @@ func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrat
677706
return fmt.Errorf("failed to clean backup config: %w", err)
678707
}
679708

709+
c.bestEffortUnenroll(ctx, originalOptions)
710+
711+
return nil
712+
}
713+
714+
func (c *Coordinator) bestEffortUnenroll(ctx context.Context, originalOptions enroll.EnrollOptions) {
680715
originalRemoteConfig, err := originalOptions.RemoteConfig(false)
681716
if err != nil {
682-
return fmt.Errorf("failed to construct original remote config: %w", err)
717+
c.logger.Warnf("failed to construct original remote config: %v", err)
718+
return
683719
}
684720

685721
originalClient, err := fleetapiClient.NewAuthWithConfig(
686722
c.logger, originalOptions.EnrollAPIKey, originalRemoteConfig)
687723
if err != nil {
688-
return fmt.Errorf("failed to create original fleet client: %w", err)
724+
c.logger.Warnf("failed to create original fleet client: %v", err)
725+
return
689726
}
690727

691728
// Best effort: call unenroll on source cluster once done
692729
if err := c.unenroll(ctx, originalClient); err != nil {
693730
c.logger.Warnf("failed to unenroll from original cluster: %v", err)
731+
return
694732
}
695-
696-
return nil
697733
}
698734

699735
type upgradeOpts struct {
@@ -1532,6 +1568,8 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
15321568

15331569
// Always called on the main Coordinator goroutine.
15341570
func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) {
1571+
c.migrationProgressWg.Wait()
1572+
15351573
if c.otelMgr != nil {
15361574
c.otelCfg = cfg.OTel
15371575
}

0 commit comments

Comments
 (0)