Skip to content

Commit de6490d

Browse files
authored
Command channel metadata (#261)
* feat: integrations run by cmd channel submit event when finished containing exit code * fix centos5 build * refactor: var rename * feat: cmd_stop_mode = process-not-found * fix: notify 0 exit code * feat: exit code is -3 on unknown errors * feat: command channel cmd metadata support * fix: old import * fix: old import * test: event submission
1 parent ea1d9e2 commit de6490d

File tree

8 files changed

+118
-41
lines changed

8 files changed

+118
-41
lines changed

internal/agent/cmdchannel/runintegration/runintegration.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan
6060
return
6161
}
6262

63-
cmdChanReq := ctx2.NewCmdChannelRequest(cmdName, args.Hash(), args.IntegrationName, args.IntegrationArgs)
63+
cmdChanReq := ctx2.NewCmdChannelRequest(cmdName, args.Hash(), args.IntegrationName, args.IntegrationArgs, cmd.Metadata)
6464
def.CmdChanReq = &cmdChanReq
6565

6666
definitionQ <- def
6767

68-
ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
69-
ev["cmd_stop_hash"] = args.Hash()
68+
ev := cmdChanReq.Event("cmd-api")
69+
7070
NotifyPlatform(dmEmitter, def, ev)
7171

7272
return

internal/agent/cmdchannel/runintegration/runintegration_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
1111
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
1212
dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils"
13+
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
1314
"github.com/newrelic/infrastructure-agent/pkg/log"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
@@ -53,3 +54,44 @@ func TestHandle_queuesIntegrationToBeRun(t *testing.T) {
5354
assert.Equal(t, "nri-foo", d.Name)
5455
// Definition won't allow assert further
5556
}
57+
58+
func TestHandle_notifiesPlatform(t *testing.T) {
59+
defQueue := make(chan integration.Definition, 1)
60+
il := integration.InstancesLookup{
61+
Legacy: func(_ integration.DefinitionCommandConfig) (integration.Definition, error) {
62+
return integration.Definition{}, nil
63+
},
64+
ByName: func(_ string) (string, error) {
65+
return "/path/to/nri-foo", nil
66+
},
67+
}
68+
em := dm.NewRecordEmitter()
69+
h := NewHandler(defQueue, il, em, l)
70+
71+
cmd := commandapi.Command{
72+
Args: []byte(`{ "integration_name": "nri-foo", "integration_args": ["bar", "baz"] }`),
73+
Metadata: map[string]interface{}{
74+
"meta key": "meta value",
75+
},
76+
}
77+
78+
err := h.Handle(context.Background(), cmd, false)
79+
require.NoError(t, err)
80+
81+
gotFRs := em.Received()
82+
require.Len(t, gotFRs, 1)
83+
require.Len(t, gotFRs[0].Data.DataSets, 1)
84+
gotEvents := gotFRs[0].Data.DataSets[0].Events
85+
require.Len(t, gotEvents, 1)
86+
expectedEvent := protocol.EventData{
87+
"eventType": "InfrastructureEvent",
88+
"category": "notifications",
89+
"summary": "cmd-api",
90+
"cmd_name": "run_integration",
91+
"cmd_hash": "nri-foo#[bar baz]",
92+
"cmd_args_name": "nri-foo",
93+
"cmd_args_args": "[bar baz]",
94+
"cmd_metadata.meta key": "meta value",
95+
}
96+
assert.Equal(t, expectedEvent, gotEvents[0])
97+
}

internal/agent/cmdchannel/service/service_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,18 @@ func TestSrv_InitialFetch_EnablesRegisterAndHandlesBackoff(t *testing.T) {
136136
assert.True(t, c.RegisterEnabled)
137137
}
138138

139-
func TestSrv_InitialFetch_HandlesRunIntegration(t *testing.T) {
139+
func TestSrv_InitialFetch_HandlesRunIntegrationAndMetadata(t *testing.T) {
140140
serializedCmds := `
141141
{
142142
"return_value": [
143143
{
144144
"name": "run_integration",
145145
"arguments": {
146146
"integration_name": "nri-foo"
147+
},
148+
"metadata": {
149+
"target_pid": 123,
150+
"target_strategy": "<STRATEGY>"
147151
}
148152
}
149153
]
@@ -164,6 +168,11 @@ func TestSrv_InitialFetch_HandlesRunIntegration(t *testing.T) {
164168

165169
d := <-defQueue
166170
assert.Equal(t, "nri-foo", d.Name)
171+
require.NotNil(t, d.CmdChanReq)
172+
require.Contains(t, d.CmdChanReq.Metadata, "target_pid")
173+
require.Contains(t, d.CmdChanReq.Metadata, "target_strategy")
174+
assert.Equal(t, float64(123), d.CmdChanReq.Metadata["target_pid"])
175+
assert.Equal(t, "<STRATEGY>", d.CmdChanReq.Metadata["target_strategy"])
167176
}
168177

169178
func TestSrv_Run(t *testing.T) {

internal/agent/cmdchannel/stopintegration/stopintegration.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,12 @@ func notifyPlatform(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd co
111111
return err
112112
}
113113

114-
ccReq := ctx.NewCmdChannelRequest(cmdName, cmd.Hash, args.IntegrationName, args.IntegrationArgs)
114+
ccReq := ctx.NewCmdChannelRequest(cmdName, cmd.Hash, args.IntegrationName, args.IntegrationArgs, cmd.Metadata)
115115
def.CmdChanReq = &ccReq
116-
ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
117-
ev["cmd_stop_hash"] = args.Hash()
116+
117+
ev := ccReq.Event("cmd-api")
118118
ev["cmd_stop_mode"] = stopModeUsed
119+
119120
runintegration.NotifyPlatform(dmEmitter, def, ev)
120121

121122
return nil

pkg/backend/commandapi/client.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
backendhttp "github.com/newrelic/infrastructure-agent/pkg/backend/http"
1313
"github.com/newrelic/infrastructure-agent/pkg/entity"
14-
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
1514
)
1615

1716
type Client interface {
@@ -20,25 +19,11 @@ type Client interface {
2019
}
2120

2221
type Command struct {
23-
ID int `json:"id"`
24-
Hash string `json:"hash"`
25-
Name string `json:"name"`
26-
Args json.RawMessage `json:"arguments"`
27-
}
28-
29-
// Event creates an event from command.
30-
func (c *Command) Event(integrationName string, integrationArgs []string) protocol.EventData {
31-
return protocol.EventData{
32-
"eventType": "InfrastructureEvent",
33-
"category": "notifications",
34-
"summary": "cmd-api",
35-
"cmd_id": c.ID,
36-
"cmd_hash": c.Hash,
37-
"cmd_name": c.Name,
38-
"cmd_args": string(c.Args),
39-
"cmd_args_name": integrationName,
40-
"cmd_args_args": fmt.Sprintf("%+v", integrationArgs),
41-
}
22+
ID int `json:"id"`
23+
Hash string `json:"hash"`
24+
Metadata map[string]interface{} `json:"metadata"`
25+
Name string `json:"name"`
26+
Args json.RawMessage `json:"arguments"`
4227
}
4328

4429
type client struct {

pkg/integrations/track/ctx/ctx.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,44 @@
33

44
package ctx
55

6+
import (
7+
"fmt"
8+
9+
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
10+
)
11+
612
// CmdChannelRequest DTO storing context required to handle actions on integration run exit.
713
type CmdChannelRequest struct {
814
CmdChannelCmdName string
915
CmdChannelCmdHash string
1016
IntegrationName string
1117
IntegrationArgs []string
18+
Metadata map[string]interface{}
1219
}
1320

1421
// NewCmdChannelRequest create new CmdChannelRequest.
15-
func NewCmdChannelRequest(cmdChanCmdName, cmdChanCmdHash, integrationName string, integrationArgs []string) CmdChannelRequest {
22+
func NewCmdChannelRequest(cmdChanCmdName, cmdChanCmdHash, integrationName string, integrationArgs []string, metadata map[string]interface{}) CmdChannelRequest {
1623
return CmdChannelRequest{
1724
CmdChannelCmdName: cmdChanCmdName,
1825
CmdChannelCmdHash: cmdChanCmdHash,
1926
IntegrationName: integrationName,
2027
IntegrationArgs: integrationArgs,
28+
Metadata: metadata,
29+
}
30+
}
31+
32+
func (r *CmdChannelRequest) Event(summary string) protocol.EventData {
33+
ev := protocol.EventData{
34+
"eventType": "InfrastructureEvent",
35+
"category": "notifications",
36+
"summary": summary,
37+
"cmd_name": r.CmdChannelCmdName,
38+
"cmd_hash": r.CmdChannelCmdHash,
39+
"cmd_args_name": r.IntegrationName,
40+
"cmd_args_args": fmt.Sprintf("%+v", r.IntegrationArgs),
41+
}
42+
for k, v := range r.Metadata {
43+
ev["cmd_metadata."+k] = v
2144
}
45+
return ev
2246
}

pkg/integrations/track/tracker.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package track
55

66
import (
77
"context"
8-
"fmt"
98
"sync"
109
"time"
1110

@@ -113,16 +112,10 @@ func (t *Tracker) NotifyExit(hash string, exitCode int) {
113112
return
114113
}
115114

116-
ds := protocol.NewEventDataset(ts, protocol.EventData{
117-
"eventType": "InfrastructureEvent",
118-
"category": "notifications",
119-
"summary": "integration-exited",
120-
"cmd_name": iCtx.def.CmdChanReq.CmdChannelCmdName,
121-
"cmd_hash": iCtx.def.CmdChanReq.CmdChannelCmdHash,
122-
"cmd_args_name": iCtx.def.CmdChanReq.IntegrationName,
123-
"cmd_args_args": fmt.Sprintf("%+v", iCtx.def.CmdChanReq.IntegrationArgs),
124-
"cmd_exit_code": exitCode,
125-
})
115+
ev := iCtx.def.CmdChanReq.Event("integration-exited")
116+
ev["cmd_exit_code"] = exitCode
117+
118+
ds := protocol.NewEventDataset(ts, ev)
126119
data := protocol.NewData("tracker.notifyexit", "1", []protocol.Dataset{ds})
127120
t.eventEmitter.Send(fwrequest.NewFwRequest(iCtx.def, nil, nil, data))
128121
}

pkg/integrations/v4/dm/testutils/testutils.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,33 @@ import (
77
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
88
)
99

10+
// NoopEmitter /dev/null sink.
1011
type NoopEmitter struct{}
1112

12-
func (e *NoopEmitter) Send(_ fwrequest.FwRequest) {}
13-
1413
func NewNoopEmitter() dm.Emitter {
1514
return &NoopEmitter{}
1615
}
16+
17+
func (e *NoopEmitter) Send(_ fwrequest.FwRequest) {}
18+
19+
// RecordEmitter stores all received requests.
20+
type RecordEmitter struct {
21+
received []fwrequest.FwRequest
22+
}
23+
24+
// implementation fulfills the interface.
25+
var _ dm.Emitter = &RecordEmitter{}
26+
27+
func NewRecordEmitter() *RecordEmitter {
28+
return &RecordEmitter{
29+
received: []fwrequest.FwRequest{},
30+
}
31+
}
32+
33+
func (e *RecordEmitter) Send(r fwrequest.FwRequest) {
34+
e.received = append(e.received, r)
35+
}
36+
37+
func (e *RecordEmitter) Received() []fwrequest.FwRequest {
38+
return e.received
39+
}

0 commit comments

Comments
 (0)