Skip to content

Commit 05e0f08

Browse files
Don't run any agent service when running otel subcommand (#5748) (#5797)
* Remove need to run agent while running otel subcommand. * Cleanup. * Fix tests. * Fix tests. * Few more cleanups. * Mage check. (cherry picked from commit 3f1e4da) Co-authored-by: Blake Rouse <[email protected]>
1 parent be1c1ea commit 05e0f08

File tree

12 files changed

+101
-279
lines changed

12 files changed

+101
-279
lines changed

internal/pkg/agent/application/application.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
2929
"github.com/elastic/elastic-agent/internal/pkg/composable"
3030
"github.com/elastic/elastic-agent/internal/pkg/config"
31-
"github.com/elastic/elastic-agent/internal/pkg/otel"
3231
"github.com/elastic/elastic-agent/internal/pkg/release"
3332
"github.com/elastic/elastic-agent/pkg/component"
3433
"github.com/elastic/elastic-agent/pkg/component/runtime"
@@ -47,13 +46,11 @@ func New(
4746
testingMode bool,
4847
fleetInitTimeout time.Duration,
4948
disableMonitoring bool,
50-
runAsOtel bool,
5149
modifiers ...component.PlatformModifier,
5250
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {
5351

5452
err := version.InitVersionError()
55-
if err != nil && !runAsOtel {
56-
// ignore this error when running in otel mode
53+
if err != nil {
5754
// non-fatal error, log a warning and move on
5855
log.With("error.message", err).Warnf("Error initializing version information: falling back to %s", release.Version())
5956
}
@@ -92,13 +89,7 @@ func New(
9289
log.Infof("Loading baseline config from %v", pathConfigFile)
9390
rawConfig, err = config.LoadFile(pathConfigFile)
9491
if err != nil {
95-
if !runAsOtel {
96-
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
97-
}
98-
99-
// initialize with empty config, configuration file is not necessary in otel mode,
100-
// best effort is fine
101-
rawConfig = config.New()
92+
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
10293
}
10394
}
10495
if err := info.InjectAgentConfig(rawConfig); err != nil {
@@ -124,7 +115,6 @@ func New(
124115
tracer,
125116
monitor,
126117
cfg.Settings.GRPC,
127-
runAsOtel,
128118
)
129119
if err != nil {
130120
return nil, nil, nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
@@ -141,9 +131,6 @@ func New(
141131

142132
// testing mode uses a config manager that takes configuration from over the control protocol
143133
configMgr = newTestingModeConfigManager(log)
144-
} else if runAsOtel {
145-
// ignoring configuration in elastic-agent.yml
146-
configMgr = otel.NewOtelModeConfigManager()
147134
} else if configuration.IsStandalone(cfg.Fleet) {
148135
log.Info("Parsed configuration and determined agent is managed locally")
149136

@@ -189,13 +176,10 @@ func New(
189176
}
190177
}
191178

192-
var varsManager composable.Controller
193-
if !runAsOtel {
194-
// no need for vars in otel mode
195-
varsManager, err = composable.New(log, rawConfig, composableManaged)
196-
if err != nil {
197-
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
198-
}
179+
// no need for vars in otel mode
180+
varsManager, err := composable.New(log, rawConfig, composableManaged)
181+
if err != nil {
182+
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
199183
}
200184

201185
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, compModifiers...)

internal/pkg/agent/application/application_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ func TestLimitsLog(t *testing.T) {
6363
true, // testingMode
6464
time.Millisecond, // fleetInitTimeout
6565
true, // disable monitoring
66-
false, // not otel mode
6766
)
6867
require.NoError(t, err)
6968

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
895895
monitoringMgr := newTestMonitoringMgr()
896896
cfg := configuration.DefaultGRPCConfig()
897897
cfg.Port = 0
898-
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg, false)
898+
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg)
899899
require.NoError(t, err)
900900

901901
caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)

internal/pkg/agent/cmd/otel.go

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@ package cmd
88

99
import (
1010
"context"
11-
goerrors "errors"
12-
"sync"
1311

1412
"github.com/spf13/cobra"
1513
"github.com/spf13/pflag"
1614

1715
"github.com/elastic/elastic-agent-libs/service"
18-
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
1916
"github.com/elastic/elastic-agent/internal/pkg/cli"
2017
"github.com/elastic/elastic-agent/internal/pkg/otel"
2118
)
@@ -79,42 +76,5 @@ func runCollector(cmdCtx context.Context, configFiles []string) error {
7976
defer cancel()
8077
go service.ProcessWindowsControlEvents(stopCollector)
8178

82-
var otelStartWg sync.WaitGroup
83-
var errs []error
84-
var awaiters awaiters
85-
86-
otelAwaiter := make(chan struct{})
87-
awaiters = append(awaiters, otelAwaiter)
88-
89-
otelStartWg.Add(1)
90-
go func() {
91-
otelStartWg.Done()
92-
if err := otel.Run(ctx, stop, configFiles); err != nil {
93-
errs = append(errs, err)
94-
// otel collector finished with an error, exit run loop
95-
cancel()
96-
}
97-
98-
// close awaiter handled in run loop
99-
close(otelAwaiter)
100-
}()
101-
102-
// wait for otel to start
103-
otelStartWg.Wait()
104-
105-
if err := runElasticAgent(
106-
ctx,
107-
cancel,
108-
nil, // no config overrides
109-
stop, // service hook
110-
false, // not in testing mode
111-
0, // no fleet config
112-
true, // is otel mode
113-
awaiters, // wait for otel to finish
114-
); err != nil && !errors.Is(err, context.Canceled) {
115-
errs = append(errs, err)
116-
}
117-
118-
return goerrors.Join(errs...)
119-
79+
return otel.Run(ctx, stop, configFiles)
12080
}

internal/pkg/agent/cmd/run.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ const (
6464

6565
type (
6666
cfgOverrider func(cfg *configuration.Configuration)
67-
awaiters []<-chan struct{}
6867
)
6968

7069
func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
@@ -155,7 +154,7 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration
155154
defer cancel()
156155
go service.ProcessWindowsControlEvents(stopBeat)
157156

158-
return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, false, nil, modifiers...)
157+
return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...)
159158
}
160159

161160
func logReturn(l *logger.Logger, err error) error {
@@ -165,8 +164,8 @@ func logReturn(l *logger.Logger, err error) error {
165164
return err
166165
}
167166

168-
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error {
169-
cfg, err := loadConfig(ctx, override, runAsOtel)
167+
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
168+
cfg, err := loadConfig(ctx, override)
170169
if err != nil {
171170
return err
172171
}
@@ -205,7 +204,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
205204
pathConfigFile := paths.AgentConfigFile()
206205

207206
// agent ID needs to stay empty in bootstrap mode
208-
createAgentID := !runAsOtel
207+
createAgentID := true
209208
if cfg.Fleet != nil && cfg.Fleet.Server != nil && cfg.Fleet.Server.Bootstrap {
210209
createAgentID = false
211210
}
@@ -285,7 +284,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
285284
l.Info("APM instrumentation disabled")
286285
}
287286

288-
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...)
287+
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
289288
if err != nil {
290289
return logReturn(l, err)
291290
}
@@ -397,9 +396,6 @@ LOOP:
397396
}
398397
cancel()
399398
err = <-appErr
400-
for _, a := range awaiters {
401-
<-a // wait for awaiter to be done
402-
}
403399

404400
if logShutdown {
405401
l.Info("Shutting down completed.")
@@ -410,16 +406,7 @@ LOOP:
410406
return logReturn(l, err)
411407
}
412408

413-
func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) {
414-
if runAsOtel {
415-
defaultCfg := configuration.DefaultConfiguration()
416-
// disable monitoring to avoid injection of monitoring components
417-
// in case inputs are not empty
418-
defaultCfg.Settings.MonitoringConfig.Enabled = false
419-
defaultCfg.Settings.V1MonitoringEnabled = false
420-
return defaultCfg, nil
421-
}
422-
409+
func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) {
423410
pathConfigFile := paths.ConfigFile()
424411
rawConfig, err := config.LoadFile(pathConfigFile)
425412
if err != nil {
@@ -592,7 +579,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati
592579
errors.M("path", enrollPath)))
593580
}
594581
logger.Info("Successfully performed delayed enrollment of this Elastic Agent.")
595-
return loadConfig(ctx, override, false)
582+
return loadConfig(ctx, override)
596583
}
597584

598585
func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) {

pkg/component/runtime/manager.go

Lines changed: 50 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,7 @@ type Manager struct {
145145

146146
// doneChan is closed when Manager is shutting down to signal that any
147147
// pending requests should be canceled.
148-
doneChan chan struct{}
149-
runAsOtel bool
148+
doneChan chan struct{}
150149
}
151150

152151
// NewManager creates a new manager.
@@ -157,7 +156,6 @@ func NewManager(
157156
tracer *apm.Tracer,
158157
monitor MonitoringManager,
159158
grpcConfig *configuration.GRPCConfig,
160-
runAsOtel bool,
161159
) (*Manager, error) {
162160
ca, err := authority.NewCA()
163161
if err != nil {
@@ -193,7 +191,6 @@ func NewManager(
193191
grpcConfig: grpcConfig,
194192
serverReady: make(chan struct{}),
195193
doneChan: make(chan struct{}),
196-
runAsOtel: runAsOtel,
197194
}
198195
return m, nil
199196
}
@@ -212,56 +209,54 @@ func (m *Manager) Run(ctx context.Context) error {
212209
server *grpc.Server
213210
wgServer sync.WaitGroup
214211
)
215-
if !m.runAsOtel {
216-
if m.isLocal {
217-
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
218-
} else {
219-
listener, err = net.Listen("tcp", m.listenAddr)
220-
}
221-
222-
if err != nil {
223-
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
224-
}
225-
226-
if m.isLocal {
227-
defer ipc.CleanupListener(m.logger, m.listenAddr)
228-
} else {
229-
m.listenPort = listener.Addr().(*net.TCPAddr).Port
230-
}
212+
if m.isLocal {
213+
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
214+
} else {
215+
listener, err = net.Listen("tcp", m.listenAddr)
216+
}
231217

232-
certPool := x509.NewCertPool()
233-
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
234-
return errors.New("failed to append root CA")
235-
}
236-
creds := credentials.NewTLS(&tls.Config{
237-
ClientAuth: tls.RequireAndVerifyClientCert,
238-
ClientCAs: certPool,
239-
GetCertificate: m.getCertificate,
240-
MinVersion: tls.VersionTLS12,
241-
})
242-
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
243-
if m.tracer != nil {
244-
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
245-
server = grpc.NewServer(
246-
grpc.UnaryInterceptor(apmInterceptor),
247-
grpc.Creds(creds),
248-
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
249-
)
250-
} else {
251-
server = grpc.NewServer(
252-
grpc.Creds(creds),
253-
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
254-
)
255-
}
256-
proto.RegisterElasticAgentServer(server, m)
218+
if err != nil {
219+
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
220+
}
257221

258-
// start serving GRPC connections
259-
wgServer.Add(1)
260-
go func() {
261-
defer wgServer.Done()
262-
m.serverLoop(ctx, listener, server)
263-
}()
222+
if m.isLocal {
223+
defer ipc.CleanupListener(m.logger, m.listenAddr)
224+
} else {
225+
m.listenPort = listener.Addr().(*net.TCPAddr).Port
226+
}
227+
228+
certPool := x509.NewCertPool()
229+
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
230+
return errors.New("failed to append root CA")
231+
}
232+
creds := credentials.NewTLS(&tls.Config{
233+
ClientAuth: tls.RequireAndVerifyClientCert,
234+
ClientCAs: certPool,
235+
GetCertificate: m.getCertificate,
236+
MinVersion: tls.VersionTLS12,
237+
})
238+
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
239+
if m.tracer != nil {
240+
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
241+
server = grpc.NewServer(
242+
grpc.UnaryInterceptor(apmInterceptor),
243+
grpc.Creds(creds),
244+
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
245+
)
246+
} else {
247+
server = grpc.NewServer(
248+
grpc.Creds(creds),
249+
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
250+
)
264251
}
252+
proto.RegisterElasticAgentServer(server, m)
253+
254+
// start serving GRPC connections
255+
wgServer.Add(1)
256+
go func() {
257+
defer wgServer.Done()
258+
m.serverLoop(ctx, listener, server)
259+
}()
265260

266261
// Start the run loop, which continues on the main goroutine
267262
// until the context is canceled.
@@ -271,13 +266,11 @@ func (m *Manager) Run(ctx context.Context) error {
271266
m.shutdown()
272267

273268
// Close the rpc listener and wait for serverLoop to return
274-
if !m.runAsOtel {
275-
listener.Close()
276-
wgServer.Wait()
269+
listener.Close()
270+
wgServer.Wait()
277271

278-
// Cancel any remaining connections
279-
server.Stop()
280-
}
272+
// Cancel any remaining connections
273+
server.Stop()
281274
return ctx.Err()
282275
}
283276

0 commit comments

Comments
 (0)