Skip to content

Commit ea1d9e2

Browse files
varascarlosroman
andauthored
Event submission on run_integration exit with exit code (#259)
* feat: integrations run by cmd channel submit event when finished containing exit code * fix centos5 build * refactor: var rename * feat: cmd_stop_mode = process-not-found * fix: notify 0 exit code * feat: exit code is -3 on unknown errors * fix: old import * Update pkg/integrations/track/ctx/ctx.go Co-authored-by: Carlos <croman@newrelic.com> * Update pkg/integrations/track/tracker_test.go Co-authored-by: Carlos <croman@newrelic.com> Co-authored-by: Carlos <croman@newrelic.com>
1 parent 0f330af commit ea1d9e2

File tree

24 files changed

+337
-187
lines changed

24 files changed

+337
-187
lines changed

cmd/newrelic-infra/newrelic-infra.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
3131
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/v3legacy"
3232
"github.com/newrelic/infrastructure-agent/internal/socketapi"
33-
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
33+
"github.com/newrelic/infrastructure-agent/pkg/integrations/track"
3434
"github.com/newrelic/infrastructure-agent/pkg/plugins"
3535
"github.com/sirupsen/logrus"
3636

@@ -288,14 +288,14 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {
288288
// queues integration run requests
289289
definitionQ := make(chan integration.Definition, 100)
290290

291-
// track stoppable integrations
292-
tracker := stoppable.NewTracker()
293-
294291
emitterWithRegister := dm.NewEmitter(agt.GetContext(), dmSender, registerClient)
295292
nonRegisterEmitter := dm.NewNonRegisterEmitter(agt.GetContext(), dmSender)
296293

297294
dmEmitter := dm.NewEmitterWithFF(emitterWithRegister, nonRegisterEmitter, ffManager)
298295

296+
// track stoppable integrations
297+
tracker := track.NewTracker(dmEmitter)
298+
299299
integrationEmitter := emitter.NewIntegrationEmittor(agt, dmEmitter, ffManager)
300300
integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, tracker)
301301

internal/agent/cmdchannel/runintegration/runintegration.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ import (
1414
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
1515
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
1616
"github.com/newrelic/infrastructure-agent/pkg/fwrequest"
17+
ctx2 "github.com/newrelic/infrastructure-agent/pkg/integrations/track/ctx"
1718
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/config"
1819
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
1920
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
2021
"github.com/newrelic/infrastructure-agent/pkg/log"
2122
"github.com/newrelic/infrastructure-agent/pkg/trace"
2223
)
2324

25+
const cmdName = "run_integration"
26+
2427
// Errors
2528
var (
2629
ErrNoIntName = errors.New("missing required \"integration_name\"")
@@ -56,7 +59,9 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan
5659
LogDecorated(logger, cmd, args).WithError(err).Warn("cannot create handler for cmd channel run_integration requests")
5760
return
5861
}
59-
def.CmdChannelHash = args.Hash()
62+
63+
cmdChanReq := ctx2.NewCmdChannelRequest(cmdName, args.Hash(), args.IntegrationName, args.IntegrationArgs)
64+
def.CmdChanReq = &cmdChanReq
6065

6166
definitionQ <- def
6267

@@ -67,7 +72,7 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan
6772
return
6873
}
6974

70-
return cmdchannel.NewCmdHandler("run_integration", handleF)
75+
return cmdchannel.NewCmdHandler(cmdName, handleF)
7176
}
7277

7378
func NotifyPlatform(dmEmitter dm.Emitter, def integration.Definition, ev protocol.EventData) {

internal/agent/cmdchannel/stopintegration/stopintegration.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,21 @@ import (
1212
"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration"
1313
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
1414
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
15-
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
15+
"github.com/newrelic/infrastructure-agent/pkg/integrations/track"
16+
"github.com/newrelic/infrastructure-agent/pkg/integrations/track/ctx"
1617
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
1718
"github.com/newrelic/infrastructure-agent/pkg/log"
1819
"github.com/newrelic/infrastructure-agent/pkg/trace"
1920
"github.com/shirou/gopsutil/process"
2021
)
2122

2223
const (
24+
cmdName = "stop_integration"
2325
terminationGracePeriod = 1 * time.Minute
2426
)
2527

2628
// NewHandler creates a cmd-channel handler for stop-integration requests.
27-
func NewHandler(tracker *stoppable.Tracker, il integration.InstancesLookup, dmEmitter dm.Emitter, l log.Entry) *cmdchannel.CmdHandler {
29+
func NewHandler(tracker *track.Tracker, il integration.InstancesLookup, dmEmitter dm.Emitter, l log.Entry) *cmdchannel.CmdHandler {
2830
handleF := func(ctx context.Context, cmd commandapi.Command, initialFetch bool) (err error) {
2931
if runtime.GOOS == "windows" {
3032
return cmdchannel.ErrOSNotSupported
@@ -58,6 +60,7 @@ func NewHandler(tracker *stoppable.Tracker, il integration.InstancesLookup, dmEm
5860
p, err := process.NewProcess(int32(<-pidC))
5961
if err != nil {
6062
runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot retrieve process")
63+
notifyPlatformWithLog(dmEmitter, il, cmd, args, "process-not-found", l)
6164
return
6265
}
6366

@@ -85,18 +88,21 @@ func NewHandler(tracker *stoppable.Tracker, il integration.InstancesLookup, dmEm
8588
stopModeUsed = "sigkill"
8689
}
8790

88-
// notify platform
89-
if err = notifyPlatform(dmEmitter, il, cmd, args, stopModeUsed); err != nil {
90-
runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot notify platform about command")
91-
}
91+
notifyPlatformWithLog(dmEmitter, il, cmd, args, stopModeUsed, l)
9292

9393
// no further error handling required
9494
err = nil
9595

9696
return
9797
}
9898

99-
return cmdchannel.NewCmdHandler("stop_integration", handleF)
99+
return cmdchannel.NewCmdHandler(cmdName, handleF)
100+
}
101+
102+
func notifyPlatformWithLog(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd commandapi.Command, args runintegration.RunIntArgs, stopModeUsed string, l log.Entry) {
103+
if err := notifyPlatform(dmEmitter, il, cmd, args, stopModeUsed); err != nil {
104+
runintegration.LogDecorated(l, cmd, args).WithError(err).Warn("cannot notify platform about command")
105+
}
100106
}
101107

102108
func notifyPlatform(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd commandapi.Command, args runintegration.RunIntArgs, stopModeUsed string) error {
@@ -105,7 +111,8 @@ func notifyPlatform(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd co
105111
return err
106112
}
107113

108-
def.CmdChannelHash = args.Hash()
114+
ccReq := ctx.NewCmdChannelRequest(cmdName, cmd.Hash, args.IntegrationName, args.IntegrationArgs)
115+
def.CmdChanReq = &ccReq
109116
ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
110117
ev["cmd_stop_hash"] = args.Hash()
111118
ev["cmd_stop_mode"] = stopModeUsed

internal/agent/cmdchannel/stopintegration/stopintegration_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration"
1414
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
1515
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
16-
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
16+
"github.com/newrelic/infrastructure-agent/pkg/integrations/track"
1717
dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils"
1818
"github.com/newrelic/infrastructure-agent/pkg/log"
1919
"github.com/shirou/gopsutil/process"
@@ -30,13 +30,14 @@ func TestHandle_returnsErrorOnMissingName(t *testing.T) {
3030
t.Skip("CC stop-intergation is not supported on Windows")
3131
}
3232

33-
h := NewHandler(stoppable.NewTracker(), integration.ErrLookup, dm.NewNoopEmitter(), l)
33+
h := NewHandler(track.NewTracker(nil), integration.ErrLookup, dm.NewNoopEmitter(), l)
3434

3535
cmdArgsMissingPID := commandapi.Command{
3636
Args: []byte(`{ "integration_args": ["foo"] }`),
3737
}
3838

3939
err := h.Handle(context.Background(), cmdArgsMissingPID, false)
40+
require.Error(t, err)
4041
assert.Equal(t, cmdchannel.NewArgsErr(runintegration.ErrNoIntName).Error(), err.Error())
4142
}
4243

@@ -46,12 +47,12 @@ func TestHandle_signalStopProcess(t *testing.T) {
4647
}
4748

4849
// Given a handler with an stoppables tracker
49-
tracker := stoppable.NewTracker()
50+
tracker := track.NewTracker(nil)
5051
h := NewHandler(tracker, integration.ErrLookup, dm.NewNoopEmitter(), l)
5152

5253
// When a process context is tracked
5354
ctx := context.Background()
54-
ctx, pidC := tracker.Track(ctx, "foo#")
55+
ctx, pidC := tracker.Track(ctx, "foo#", nil)
5556

5657
proc := exec.CommandContext(ctx, "sleep", "5")
5758

internal/gobackfill/gte_1_12.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright 2020 New Relic Corporation. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Golang code for Go versions greater or equal than 1.12.
5+
//
6+
// +build go1.12
7+
8+
package gobackfill
9+
10+
import "os/exec"
11+
12+
// ExitCode wrapping to backfill old Golang versions.
13+
// Negative values mean:
14+
// * -1: not exited
15+
// * -2: unknown
16+
func ExitCode(exitError *exec.ExitError) int {
17+
return exitError.ExitCode()
18+
}

internal/gobackfill/lt_1_12.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright 2020 New Relic Corporation. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Golang code for Go versions lower than 1.12.
5+
//
6+
// +build !go1.12
7+
8+
package gobackfill
9+
10+
import "os/exec"
11+
12+
// ExitCode not supported, always value -2, as -1 belong to not finished.
13+
func ExitCode(exitError *exec.ExitError) int {
14+
return -2
15+
}

internal/integrations/v4/executor/executor.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@ import (
1313
"os/exec"
1414
"sync"
1515

16+
"github.com/newrelic/infrastructure-agent/internal/gobackfill"
1617
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/constants"
1718
"github.com/newrelic/infrastructure-agent/pkg/helpers"
1819
"github.com/newrelic/infrastructure-agent/pkg/log"
1920
)
2021

22+
const unknownErrExitCode = -3
23+
2124
var illog = log.WithComponent("integrations.Executor")
2225

2326
// Executor handles Executable commands asynchronously.
@@ -38,7 +41,7 @@ func FromCmdSlice(cmd []string, cfg *Config) Executor {
3841
// The executed process can be cancelled via the provided Context.
3942
// When writable PID channel is provided, generated PID will be written, so process could be signaled by 3rd parties.
4043
// When the process ends, all the channels are closed.
41-
func (r *Executor) Execute(ctx context.Context, pidChan chan<- int) OutputReceive {
44+
func (r *Executor) Execute(ctx context.Context, pidChan, exitCodeCh chan<- int) OutputReceive {
4245
out, receiver := NewOutput()
4346
commandCtx, cancelCommand := context.WithCancel(ctx)
4447

@@ -99,7 +102,17 @@ func (r *Executor) Execute(ctx context.Context, pidChan chan<- int) OutputReceiv
99102
<-commandCtx.Done()
100103
if err := cmd.Wait(); err != nil {
101104
out.Errors <- err
105+
if exitCodeCh != nil {
106+
if exitError, ok := err.(*exec.ExitError); ok {
107+
exitCodeCh <- gobackfill.ExitCode(exitError)
108+
} else {
109+
exitCodeCh <- unknownErrExitCode
110+
}
111+
}
112+
} else if exitCodeCh != nil {
113+
exitCodeCh <- 0
102114
}
115+
103116
allOutputForwarded.Wait() // waiting again to avoid closing output before the data is received during cancellation
104117
out.Close()
105118
}()

internal/integrations/v4/executor/executor_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestRunnable_CLI_Execute(t *testing.T) {
3131
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd), execConfig(t))
3232

3333
// WHEN it is executed
34-
to := r.Execute(context.Background(), nil)
34+
to := r.Execute(context.Background(), nil, nil)
3535

3636
// THEN no errors are returned
3737
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
@@ -51,7 +51,7 @@ func TestRunnable_CLI_Execute_with_spaces(t *testing.T) {
5151
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmdWithSpace), execConfig(t))
5252

5353
// WHEN it is executed
54-
to := r.Execute(context.Background(), nil)
54+
to := r.Execute(context.Background(), nil, nil)
5555

5656
// THEN no errors are returned
5757
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
@@ -76,7 +76,7 @@ func TestRunnable_Execute_WithUser(t *testing.T) {
7676
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd), cfg)
7777

7878
// WHEN it is executed
79-
to := r.Execute(context.Background(), nil)
79+
to := r.Execute(context.Background(), nil, nil)
8080

8181
// THEN no errors are returned
8282
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
@@ -95,7 +95,7 @@ func TestRunnable_Execute_WithArgs(t *testing.T) {
9595
cfg := execConfig(t)
9696
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd, "world"), cfg)
9797

98-
to := r.Execute(context.Background(), nil)
98+
to := r.Execute(context.Background(), nil, nil)
9999
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
100100
assert.Equal(t, "stdout line", testhelp.ChannelRead(to.Stdout))
101101
assert.Equal(t, "-world", testhelp.ChannelRead(to.Stdout))
@@ -113,7 +113,7 @@ func TestRunnable_Execute_WithArgs_WithEnv(t *testing.T) {
113113
cfg.Environment = map[string]string{"PREFIX": "hello"}
114114
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd, "world"), cfg)
115115

116-
to := r.Execute(context.Background(), nil)
116+
to := r.Execute(context.Background(), nil, nil)
117117
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
118118
assert.Equal(t, "stdout line", testhelp.ChannelRead(to.Stdout))
119119
assert.Equal(t, "hello-world", testhelp.ChannelRead(to.Stdout))
@@ -127,7 +127,7 @@ func TestRunnable_Execute_Error(t *testing.T) {
127127
r := FromCmdSlice(testhelp.Command(fixtures.ErrorCmd), execConfig(t))
128128

129129
// WHEN it is executed
130-
to := r.Execute(context.Background(), nil)
130+
to := r.Execute(context.Background(), nil, nil)
131131

132132
// THEN an error is returned
133133
assert.Error(t, testhelp.ChannelErrClosed(to.Errors))
@@ -149,7 +149,7 @@ func TestRunnable_Execute_Blocked(t *testing.T) {
149149
r := FromCmdSlice(testhelp.Command(fixtures.BlockedCmd), cfg)
150150

151151
// THAT is normally working
152-
to := r.Execute(ctx, nil)
152+
to := r.Execute(ctx, nil, nil)
153153
assert.Equal(t, "starting", testhelp.ChannelRead(to.Stdout))
154154
assert.Error(t, testhelp.ChannelErrClosedTimeout(to.Errors, 100*time.Millisecond))
155155

@@ -176,7 +176,7 @@ func TestNoRaces(t *testing.T) {
176176
for i := 0; i < 100; i++ {
177177
ctx, cancel := context.WithCancel(context.Background())
178178
cmd := FromCmdSlice([]string{"echo", hugeLine}, &Config{})
179-
go cmd.Execute(ctx, nil)
179+
go cmd.Execute(ctx, nil, nil)
180180
cancel()
181181
}
182182
}
@@ -191,7 +191,7 @@ func TestRunnable_Execute_Verbose(t *testing.T) {
191191
ctx := context.WithValue(context.Background(), constants.EnableVerbose, 1)
192192

193193
// WHEN it is executed
194-
to := r.Execute(ctx, nil)
194+
to := r.Execute(ctx, nil, nil)
195195

196196
// THEN no errors are returned
197197
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
@@ -213,7 +213,7 @@ func TestRunnable_Execute_VerboseFalse(t *testing.T) {
213213
ctx := context.WithValue(context.Background(), constants.EnableVerbose, 0)
214214

215215
// WHEN it is executed
216-
to := r.Execute(ctx, nil)
216+
to := r.Execute(ctx, nil, nil)
217217

218218
// THEN no errors are returned
219219
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
@@ -232,7 +232,7 @@ func TestRunnable_Execute_NoVerboseSet(t *testing.T) {
232232
r := FromCmdSlice(testhelp.Command(fixtures.IntegrationVerboseScript), execConfig(t))
233233

234234
// WHEN it is executed
235-
to := r.Execute(context.Background(), nil)
235+
to := r.Execute(context.Background(), nil, nil)
236236

237237
// THEN no errors are returned
238238
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))

0 commit comments

Comments
 (0)