Skip to content

Commit ae498fc

Browse files
committed
Update how NGINX & NAP processes are discovered
1 parent 5cc2544 commit ae498fc

File tree

9 files changed

+471
-266
lines changed

9 files changed

+471
-266
lines changed

api/grpc/mpi/v1/command.pb.go

Lines changed: 313 additions & 204 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/grpc/mpi/v1/command.pb.validate.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/watcher/instance/instance_watcher_service.go

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ import (
1313
"sync"
1414
"time"
1515

16+
"github.com/nginx/agent/v3/pkg/nginxprocess"
17+
1618
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
19+
"github.com/nginx/agent/v3/internal/watcher/process"
20+
1721
"github.com/nginx/agent/v3/internal/config"
1822
"github.com/nginx/agent/v3/internal/datasource/host/exec"
1923
"github.com/nginx/agent/v3/internal/logger"
2024
"github.com/nginx/agent/v3/internal/model"
21-
"github.com/nginx/agent/v3/internal/watcher/process"
22-
"github.com/nginx/agent/v3/pkg/nginxprocess"
2325
)
2426

2527
const defaultAgentPath = "/run/nginx-agent"
@@ -40,16 +42,17 @@ type (
4042
}
4143

4244
InstanceWatcherService struct {
43-
processOperator process.ProcessOperatorInterface
44-
nginxConfigParser nginxConfigParser
45-
executer exec.ExecInterface
46-
agentConfig *config.Config
47-
instanceCache map[string]*mpi.Instance
48-
nginxConfigCache map[string]*model.NginxConfigContext
49-
instancesChannel chan<- InstanceUpdatesMessage
50-
nginxConfigContextChannel chan<- NginxConfigContextMessage
51-
processParsers []processParser
52-
cacheMutex sync.Mutex
45+
processOperator process.ProcessOperatorInterface
46+
nginxConfigParser nginxConfigParser
47+
executer exec.ExecInterface
48+
agentConfig *config.Config
49+
instanceCache map[string]*mpi.Instance
50+
nginxConfigCache map[string]*model.NginxConfigContext
51+
instancesChannel chan<- InstanceUpdatesMessage
52+
nginxConfigContextChannel chan<- NginxConfigContextMessage
53+
nginxParser processParser
54+
nginxAppProtectProcessParser processParser
55+
cacheMutex sync.Mutex
5356
}
5457

5558
InstanceUpdates struct {
@@ -71,17 +74,15 @@ type (
7174

7275
func NewInstanceWatcherService(agentConfig *config.Config) *InstanceWatcherService {
7376
return &InstanceWatcherService{
74-
agentConfig: agentConfig,
75-
processOperator: process.NewProcessOperator(),
76-
processParsers: []processParser{
77-
NewNginxProcessParser(),
78-
NewNginxAppProtectProcessParser(),
79-
},
80-
nginxConfigParser: NewNginxConfigParser(agentConfig),
81-
instanceCache: make(map[string]*mpi.Instance),
82-
cacheMutex: sync.Mutex{},
83-
nginxConfigCache: make(map[string]*model.NginxConfigContext),
84-
executer: &exec.Exec{},
77+
agentConfig: agentConfig,
78+
processOperator: process.NewProcessOperator(),
79+
nginxParser: NewNginxProcessParser(),
80+
nginxAppProtectProcessParser: NewNginxAppProtectProcessParser(),
81+
nginxConfigParser: NewNginxConfigParser(agentConfig),
82+
instanceCache: make(map[string]*mpi.Instance),
83+
cacheMutex: sync.Mutex{},
84+
nginxConfigCache: make(map[string]*model.NginxConfigContext),
85+
executer: &exec.Exec{},
8586
}
8687
}
8788

@@ -246,7 +247,7 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) (
246247
instanceUpdates InstanceUpdates,
247248
err error,
248249
) {
249-
processes, err := iw.processOperator.Processes(ctx)
250+
nginxProcesses, nginxAppProtectProcesses, err := iw.processOperator.Processes(ctx)
250251
if err != nil {
251252
return instanceUpdates, err
252253
}
@@ -256,11 +257,14 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) (
256257
agentInstance := iw.agentInstance(ctx)
257258
instancesFound[agentInstance.GetInstanceMeta().GetInstanceId()] = agentInstance
258259

259-
for _, parser := range iw.processParsers {
260-
instances := parser.Parse(ctx, processes)
261-
for _, instance := range instances {
262-
instancesFound[instance.GetInstanceMeta().GetInstanceId()] = instance
263-
}
260+
nginxInstances := iw.nginxParser.Parse(ctx, nginxProcesses)
261+
for _, instance := range nginxInstances {
262+
instancesFound[instance.GetInstanceMeta().GetInstanceId()] = instance
263+
}
264+
265+
nginxAppProtectInstances := iw.nginxAppProtectProcessParser.Parse(ctx, nginxAppProtectProcesses)
266+
for _, instance := range nginxAppProtectInstances {
267+
instancesFound[instance.GetInstanceMeta().GetInstanceId()] = instance
264268
}
265269
newInstances, updatedInstances, deletedInstances := compareInstances(iw.instanceCache, instancesFound)
266270

internal/watcher/instance/instance_watcher_service_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestInstanceWatcherService_checkForUpdates(t *testing.T) {
2828
nginxConfigContext := testModel.GetConfigContext()
2929

3030
fakeProcessWatcher := &processfakes.FakeProcessOperatorInterface{}
31-
fakeProcessWatcher.ProcessesReturns(nil, nil)
31+
fakeProcessWatcher.ProcessesReturns(nil, nil, nil)
3232

3333
fakeProcessParser := &instancefakes.FakeProcessParser{}
3434
fakeProcessParser.ParseReturns(map[string]*mpi.Instance{
@@ -43,7 +43,8 @@ func TestInstanceWatcherService_checkForUpdates(t *testing.T) {
4343

4444
instanceWatcherService := NewInstanceWatcherService(types.AgentConfig())
4545
instanceWatcherService.processOperator = fakeProcessWatcher
46-
instanceWatcherService.processParsers = []processParser{fakeProcessParser}
46+
instanceWatcherService.nginxParser = fakeProcessParser
47+
instanceWatcherService.nginxAppProtectProcessParser = fakeProcessParser
4748
instanceWatcherService.nginxConfigParser = fakeNginxConfigParser
4849
instanceWatcherService.instancesChannel = instanceUpdatesChannel
4950
instanceWatcherService.nginxConfigContextChannel = nginxConfigContextChannel
@@ -131,7 +132,7 @@ func TestInstanceWatcherService_instanceUpdates(t *testing.T) {
131132
for _, test := range tests {
132133
t.Run(test.name, func(tt *testing.T) {
133134
fakeProcessWatcher := &processfakes.FakeProcessOperatorInterface{}
134-
fakeProcessWatcher.ProcessesReturns(nil, nil)
135+
fakeProcessWatcher.ProcessesReturns(nil, nil, nil)
135136

136137
fakeProcessParser := &instancefakes.FakeProcessParser{}
137138
fakeProcessParser.ParseReturns(test.parsedInstances)
@@ -142,7 +143,8 @@ func TestInstanceWatcherService_instanceUpdates(t *testing.T) {
142143

143144
instanceWatcherService := NewInstanceWatcherService(types.AgentConfig())
144145
instanceWatcherService.processOperator = fakeProcessWatcher
145-
instanceWatcherService.processParsers = []processParser{fakeProcessParser}
146+
instanceWatcherService.nginxParser = fakeProcessParser
147+
instanceWatcherService.nginxAppProtectProcessParser = fakeProcessParser
146148
instanceWatcherService.instanceCache = test.oldInstances
147149
instanceWatcherService.executer = fakeExec
148150

internal/watcher/instance/nginx_app_protect_process_parser.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ import (
1111
"os"
1212
"strings"
1313

14+
"github.com/nginx/agent/v3/pkg/nginxprocess"
15+
1416
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
15-
"github.com/nginx/agent/v3/internal/model"
16-
"github.com/nginx/agent/v3/pkg/uuid"
17+
"github.com/nginx/agent/v3/pkg/id"
1718
)
1819

1920
const (
@@ -44,7 +45,10 @@ func NewNginxAppProtectProcessParser() *NginxAppProtectProcessParser {
4445
}
4546
}
4647

47-
func (n NginxAppProtectProcessParser) Parse(ctx context.Context, processes []*model.Process) map[string]*mpi.Instance {
48+
func (n NginxAppProtectProcessParser) Parse(
49+
ctx context.Context,
50+
processes []*nginxprocess.Process,
51+
) map[string]*mpi.Instance {
4852
instanceMap := make(map[string]*mpi.Instance) // key is instanceID
4953

5054
for _, process := range processes {
@@ -82,8 +86,8 @@ func (n NginxAppProtectProcessParser) Parse(ctx context.Context, processes []*mo
8286
return instanceMap
8387
}
8488

85-
func (n NginxAppProtectProcessParser) instanceID(process *model.Process) string {
86-
return uuid.Generate("%s", process.Exe)
89+
func (n NginxAppProtectProcessParser) instanceID(process *nginxprocess.Process) string {
90+
return id.Generate("%s", process.Exe)
8791
}
8892

8993
func (n NginxAppProtectProcessParser) instanceVersion(ctx context.Context) string {

internal/watcher/instance/nginx_app_protect_process_parser_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import (
1010
"os"
1111
"testing"
1212

13+
"github.com/nginx/agent/v3/pkg/nginxprocess"
14+
1315
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
14-
"github.com/nginx/agent/v3/internal/model"
1516
"github.com/nginx/agent/v3/test/helpers"
1617
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/require"
@@ -42,7 +43,7 @@ func TestNginxAppProtectProcessParser_Parse(t *testing.T) {
4243
},
4344
}
4445

45-
processes := []*model.Process{
46+
processes := []*nginxprocess.Process{
4647
{
4748
PID: 789,
4849
PPID: 1234,

internal/watcher/process/process_operator.go

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ package process
77

88
import (
99
"context"
10+
"strings"
11+
12+
"github.com/shirou/gopsutil/v4/process"
1013

1114
"github.com/nginx/agent/v3/pkg/nginxprocess"
1215
)
@@ -18,21 +21,86 @@ type (
1821
ProcessOperator struct{}
1922

2023
ProcessOperatorInterface interface {
21-
Processes(ctx context.Context) ([]*nginxprocess.Process, error)
24+
Processes(ctx context.Context) (
25+
nginxProcesses []*nginxprocess.Process,
26+
nginxAppProtectProcesses []*nginxprocess.Process,
27+
err error,
28+
)
2229
Process(ctx context.Context, pid int32) (*nginxprocess.Process, error)
2330
}
2431
)
2532

33+
func nginxFilter(ctx context.Context, p *process.Process) bool {
34+
name, _ := p.NameWithContext(ctx) // slow: shells out to ps
35+
if name != "nginx" {
36+
return false
37+
}
38+
39+
cmdLine, _ := p.CmdlineWithContext(ctx) // slow: shells out to ps
40+
// ignore nginx processes in the middle of an upgrade
41+
if !strings.HasPrefix(cmdLine, "nginx:") || strings.Contains(cmdLine, "upgrade") {
42+
return false
43+
}
44+
45+
return true
46+
}
47+
48+
func napFilter(ctx context.Context, p *process.Process) bool {
49+
name, _ := p.NameWithContext(ctx) // slow: shells out to ps
50+
return name != "bd-socket-plugin"
51+
}
52+
2653
var _ ProcessOperatorInterface = (*ProcessOperator)(nil)
2754

2855
func NewProcessOperator() *ProcessOperator {
2956
return &ProcessOperator{}
3057
}
3158

32-
func (pw *ProcessOperator) Processes(ctx context.Context) ([]*nginxprocess.Process, error) {
33-
return nginxprocess.List(ctx)
59+
func (pw *ProcessOperator) Processes(ctx context.Context) (
60+
nginxProcesses []*nginxprocess.Process,
61+
nginxAppProtectProcesses []*nginxprocess.Process,
62+
err error,
63+
) {
64+
processes, err := process.ProcessesWithContext(ctx)
65+
if err != nil {
66+
return nil, nil, err
67+
}
68+
69+
var filteredNginxProcesses []*process.Process
70+
71+
for _, p := range processes {
72+
if nginxFilter(ctx, p) {
73+
filteredNginxProcesses = append(filteredNginxProcesses, p)
74+
} else if napFilter(ctx, p) {
75+
nginxAppProtectProcesses = append(nginxAppProtectProcesses, convertProcess(ctx, p))
76+
}
77+
}
78+
79+
nginxProcesses, err = nginxprocess.ListWithProcesses(ctx, filteredNginxProcesses)
80+
if err != nil {
81+
return nil, nil, err
82+
}
83+
84+
return nginxProcesses, nginxAppProtectProcesses, nil
3485
}
3586

3687
func (pw *ProcessOperator) Process(ctx context.Context, pid int32) (*nginxprocess.Process, error) {
3788
return nginxprocess.Find(ctx, pid, nginxprocess.WithStatus(true))
3889
}
90+
91+
func convertProcess(ctx context.Context, proc *process.Process) *nginxprocess.Process {
92+
ppid, _ := proc.PpidWithContext(ctx)
93+
name, _ := proc.NameWithContext(ctx)
94+
cmd, _ := proc.CmdlineWithContext(ctx)
95+
exe, _ := proc.ExeWithContext(ctx)
96+
status, _ := proc.StatusWithContext(ctx)
97+
98+
return &nginxprocess.Process{
99+
PID: proc.Pid,
100+
PPID: ppid,
101+
Name: name,
102+
Cmd: cmd,
103+
Exe: exe,
104+
Status: strings.Join(status, " "),
105+
}
106+
}

internal/watcher/process/processfakes/fake_process_operator_interface.go

Lines changed: 19 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)