Skip to content

Commit c42a042

Browse files
committed
Merge branch 'main' into add-auxiliary-command-server-proto
2 parents 5b31494 + 9f24720 commit c42a042

File tree

59 files changed

+634
-561
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+634
-561
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ PROTO_DIR := proto
4343
BINARY_NAME := nginx-agent
4444
PROJECT_DIR = cmd/agent
4545
PROJECT_FILE = main.go
46+
COLLECTOR_PATH ?= /etc/nginx-agent/opentelemetry-collector-agent.yaml
47+
MANIFEST_DIR ?= /var/lib/nginx-agent
4648
DIRS = $(BUILD_DIR) $(TEST_BUILD_DIR) $(BUILD_DIR)/$(DOCS_DIR) $(BUILD_DIR)/$(DOCS_DIR)/$(PROTO_DIR)
4749
$(shell mkdir -p $(DIRS))
4850

@@ -181,7 +183,7 @@ run: build ## Run code
181183

182184
dev: ## Run agent executable
183185
@echo "🚀 Running App"
184-
$(GORUN) -ldflags=$(DEBUG_LDFLAGS) $(PROJECT_DIR)/$(PROJECT_FILE)
186+
NGINX_AGENT_COLLECTOR_CONFIG_PATH=$(COLLECTOR_PATH) NGINX_AGENT_MANIFEST_DIR=$(MANIFEST_DIR) $(GORUN) -ldflags=$(DEBUG_LDFLAGS) $(PROJECT_DIR)/$(PROJECT_FILE)
185187

186188
race-condition-dev: ## Run agent executable with race condition detection
187189
@echo "🏎️ Running app with race condition detection enabled"

internal/bus/busfakes/fake_message_pipe.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ func (p *FakeMessagePipe) DeRegister(ctx context.Context, pluginNames []string)
3939
plugins = p.findPlugins(pluginNames, plugins)
4040

4141
for _, plugin := range plugins {
42-
index := p.GetIndex(plugin.Info().Name, p.plugins)
42+
index := p.Index(plugin.Info().Name, p.plugins)
4343
p.unsubscribePlugin(ctx, index, plugin)
4444
}
4545

4646
return nil
4747
}
4848

49-
func (p *FakeMessagePipe) GetIndex(pluginName string, plugins []bus.Plugin) int {
49+
func (p *FakeMessagePipe) Index(pluginName string, plugins []bus.Plugin) int {
5050
for index, plugin := range plugins {
5151
if pluginName == plugin.Info().Name {
5252
return index
@@ -85,14 +85,14 @@ func (p *FakeMessagePipe) Process(_ context.Context, msgs ...*bus.Message) {
8585
p.messages = append(p.messages, msgs...)
8686
}
8787

88-
func (p *FakeMessagePipe) GetMessages() []*bus.Message {
88+
func (p *FakeMessagePipe) Messages() []*bus.Message {
8989
p.messagesLock.Lock()
9090
defer p.messagesLock.Unlock()
9191

9292
return p.messages
9393
}
9494

95-
func (p *FakeMessagePipe) GetProcessedMessages() []*bus.Message {
95+
func (p *FakeMessagePipe) ProcessedMessages() []*bus.Message {
9696
return p.processedMessages
9797
}
9898

@@ -127,14 +127,14 @@ func (p *FakeMessagePipe) RunWithoutInit(ctx context.Context) {
127127
}
128128
}
129129

130-
func (p *FakeMessagePipe) GetPlugins() []bus.Plugin {
130+
func (p *FakeMessagePipe) Plugins() []bus.Plugin {
131131
return p.plugins
132132
}
133133

134134
func (p *FakeMessagePipe) IsPluginRegistered(pluginName string) bool {
135135
pluginAlreadyRegistered := false
136136

137-
for _, plugin := range p.GetPlugins() {
137+
for _, plugin := range p.Plugins() {
138138
if plugin.Info().Name == pluginName {
139139
pluginAlreadyRegistered = true
140140
}

internal/bus/message_pipe.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type (
3535
DeRegister(ctx context.Context, plugins []string) error
3636
Process(ctx context.Context, messages ...*Message)
3737
Run(ctx context.Context)
38-
GetPlugins() []Plugin
38+
Plugins() []Plugin
3939
IsPluginRegistered(pluginName string) bool
4040
}
4141

@@ -93,7 +93,7 @@ func (p *MessagePipe) DeRegister(ctx context.Context, pluginNames []string) erro
9393
plugins := p.findPlugins(pluginNames)
9494

9595
for _, plugin := range plugins {
96-
index := p.GetIndex(plugin.Info().Name, p.plugins)
96+
index := p.Index(plugin.Info().Name, p.plugins)
9797

9898
err := p.unsubscribePlugin(ctx, index, plugin)
9999
if err != nil {
@@ -131,14 +131,14 @@ func (p *MessagePipe) Run(ctx context.Context) {
131131
}
132132
}
133133

134-
func (p *MessagePipe) GetPlugins() []Plugin {
134+
func (p *MessagePipe) Plugins() []Plugin {
135135
return p.plugins
136136
}
137137

138138
func (p *MessagePipe) IsPluginRegistered(pluginName string) bool {
139139
isPluginRegistered := false
140140

141-
for _, plugin := range p.GetPlugins() {
141+
for _, plugin := range p.Plugins() {
142142
if plugin.Info().Name == pluginName {
143143
isPluginRegistered = true
144144
}
@@ -181,7 +181,7 @@ func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin {
181181
return plugins
182182
}
183183

184-
func (p *MessagePipe) GetIndex(pluginName string, plugins []Plugin) int {
184+
func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
185185
for index, plugin := range plugins {
186186
if pluginName == plugin.Info().Name {
187187
return index

internal/bus/message_pipe_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ func TestMessagePipe_DeRegister(t *testing.T) {
8888
err := messagePipe.Register(100, []Plugin{plugin})
8989

9090
require.NoError(t, err)
91-
assert.Len(t, messagePipe.GetPlugins(), 1)
91+
assert.Len(t, messagePipe.Plugins(), 1)
9292

9393
err = messagePipe.DeRegister(ctx, []string{plugin.Info().Name})
9494

9595
require.NoError(t, err)
96-
assert.Empty(t, messagePipe.GetPlugins())
96+
assert.Empty(t, messagePipe.Plugins())
9797
plugin.AssertExpectations(t)
9898
}
9999

internal/collector/containermetricsreceiver/internal/scraper/cpuscraper/internal/cgroup/cpu.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (cs *CPUSource) Collect() (ContainerCPUStats, error) {
7474

7575
// nolint: mnd
7676
func (cs *CPUSource) collectCPUStats() (ContainerCPUStats, error) {
77-
clockTicks, err := getClockTicks()
77+
clockTicks, err := clockTicks()
7878
if err != nil {
7979
return ContainerCPUStats{}, err
8080
}
@@ -107,7 +107,7 @@ func (cs *CPUSource) collectCPUStats() (ContainerCPUStats, error) {
107107

108108
cpuTimes.userUsage = convertUsage(cpuTimes.userUsage)
109109
cpuTimes.systemUsage = convertUsage(cpuTimes.systemUsage)
110-
hostSystemUsage, err := getSystemCPUUsage(clockTicks)
110+
hostSystemUsage, err := systemCPUUsage(clockTicks)
111111
if err != nil {
112112
return ContainerCPUStats{}, err
113113
}
@@ -162,7 +162,7 @@ func (cs *CPUSource) cpuUsageTimes(filePath, userKey, systemKey string) (*Contai
162162
}
163163

164164
// nolint: revive, gocritic
165-
func getSystemCPUUsage(clockTicks int) (float64, error) {
165+
func systemCPUUsage(clockTicks int) (float64, error) {
166166
lines, err := internal.ReadLines(CPUStatsPath)
167167
if err != nil {
168168
return 0, err
@@ -191,7 +191,7 @@ func getSystemCPUUsage(clockTicks int) (float64, error) {
191191
return 0, errors.New("unable to process " + CPUStatsPath + ". No cpu found")
192192
}
193193

194-
func getClockTicks() (int, error) {
194+
func clockTicks() (int, error) {
195195
cmd := exec.Command("getconf", "CLK_TCK")
196196
out := new(bytes.Buffer)
197197
cmd.Stdout = out

internal/collector/containermetricsreceiver/internal/scraper/memoryscraper/internal/cgroup/memory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (ms *MemorySource) VirtualMemoryStatWithContext(ctx context.Context) (*mem.
9494
return &mem.VirtualMemoryStat{}, err
9595
}
9696

97-
memoryStat, err = GetMemoryStat(
97+
memoryStat, err = CalculateMemoryStat(
9898
path.Join(ms.basePath, memStatFile),
9999
memCachedKey,
100100
memSharedKey,
@@ -147,7 +147,7 @@ func MemoryLimitInBytes(ctx context.Context, filePath string) (uint64, error) {
147147
}
148148

149149
// nolint: revive, mnd
150-
func GetMemoryStat(statFile, cachedKey, sharedKey string) (MemoryStat, error) {
150+
func CalculateMemoryStat(statFile, cachedKey, sharedKey string) (MemoryStat, error) {
151151
memoryStat := MemoryStat{}
152152
lines, err := internal.ReadLines(statFile)
153153
if err != nil {

internal/collector/otel_collector_plugin.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func New(conf *config.Config) (*Collector, error) {
9393
}, nil
9494
}
9595

96-
func (oc *Collector) GetState() otelcol.State {
96+
func (oc *Collector) State() otelcol.State {
9797
oc.mu.Lock()
9898
defer oc.mu.Unlock()
9999

@@ -263,6 +263,7 @@ func (oc *Collector) Subscriptions() []string {
263263
}
264264

265265
func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) {
266+
slog.DebugContext(ctx, "OTel collector plugin received nginx config update message")
266267
oc.mu.Lock()
267268
defer oc.mu.Unlock()
268269

@@ -287,6 +288,7 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
287288
}
288289

289290
func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) {
291+
slog.DebugContext(ctx, "OTel collector plugin received resource update message")
290292
oc.mu.Lock()
291293
defer oc.mu.Unlock()
292294

@@ -407,6 +409,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex
407409
nginxReceiverFound, reloadCollector := oc.updateExistingNginxPlusReceiver(nginxConfigContext)
408410

409411
if !nginxReceiverFound && nginxConfigContext.PlusAPI.URL != "" {
412+
slog.DebugContext(ctx, "Adding new NGINX Plus receiver", "url", nginxConfigContext.PlusAPI.URL)
410413
oc.config.Collector.Receivers.NginxPlusReceivers = append(
411414
oc.config.Collector.Receivers.NginxPlusReceivers,
412415
config.NginxPlusReceiver{
@@ -443,6 +446,7 @@ func (oc *Collector) addNginxOssReceiver(ctx context.Context, nginxConfigContext
443446
nginxReceiverFound, reloadCollector := oc.updateExistingNginxOSSReceiver(nginxConfigContext)
444447

445448
if !nginxReceiverFound && nginxConfigContext.StubStatus.URL != "" {
449+
slog.DebugContext(ctx, "Adding new NGINX OSS receiver", "url", nginxConfigContext.StubStatus.URL)
446450
oc.config.Collector.Receivers.NginxReceivers = append(
447451
oc.config.Collector.Receivers.NginxReceivers,
448452
config.NginxReceiver{
@@ -479,6 +483,8 @@ func (oc *Collector) updateExistingNginxPlusReceiver(
479483
oc.config.Collector.Receivers.NginxPlusReceivers[index+1:]...,
480484
)
481485
if nginxConfigContext.PlusAPI.URL != "" {
486+
slog.Debug("Updating existing NGINX Plus receiver", "url",
487+
nginxConfigContext.PlusAPI.URL)
482488
nginxPlusReceiver.PlusAPI.URL = nginxConfigContext.PlusAPI.URL
483489
oc.config.Collector.Receivers.NginxPlusReceivers = append(
484490
oc.config.Collector.Receivers.NginxPlusReceivers,
@@ -510,6 +516,8 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
510516
oc.config.Collector.Receivers.NginxReceivers[index+1:]...,
511517
)
512518
if nginxConfigContext.StubStatus.URL != "" {
519+
slog.Debug("Updating existing NGINX OSS receiver", "url",
520+
nginxConfigContext.StubStatus.URL)
513521
nginxReceiver.StubStatus = config.APIDetails{
514522
URL: nginxConfigContext.StubStatus.URL,
515523
Listen: nginxConfigContext.StubStatus.Listen,
@@ -587,7 +595,7 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig
587595
}
588596

589597
func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool {
590-
listenAddressesToBeDeleted := oc.getConfigDeletedNapReceivers(nginxConfigContext)
598+
listenAddressesToBeDeleted := oc.configDeletedNapReceivers(nginxConfigContext)
591599
if len(listenAddressesToBeDeleted) != 0 {
592600
oc.deleteNapReceivers(listenAddressesToBeDeleted)
593601
return true
@@ -606,7 +614,7 @@ func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bo
606614
oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers
607615
}
608616

609-
func (oc *Collector) getConfigDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
617+
func (oc *Collector) configDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
610618
elements := make(map[string]bool)
611619

612620
for _, tcplogReceiver := range oc.config.Collector.Receivers.TcplogReceivers {

internal/collector/otel_collector_plugin_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,11 @@ func TestCollector_InitAndClose(t *testing.T) {
146146

147147
collector.service = createFakeCollector()
148148

149-
assert.Equal(t, otelcol.StateRunning, collector.GetState())
149+
assert.Equal(t, otelcol.StateRunning, collector.State())
150150

151151
require.NoError(t, collector.Close(ctx), "Close should not return an error")
152152

153-
assert.Equal(t, otelcol.StateClosed, collector.GetState())
153+
assert.Equal(t, otelcol.StateClosed, collector.State())
154154
}
155155

156156
// nolint: revive
@@ -347,7 +347,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
347347
name: "Test 1: Resource update adds resource id attribute",
348348
message: &bus.Message{
349349
Topic: bus.ResourceUpdateTopic,
350-
Data: protos.GetHostResource(),
350+
Data: protos.HostResource(),
351351
},
352352
processors: config.Processors{
353353
Resource: &config.Resource{

internal/command/command_plugin.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,12 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
9898
case bus.DataPlaneResponseTopic:
9999
cp.processDataPlaneResponse(ctx, msg)
100100
default:
101-
slog.DebugContext(ctx, "Command plugin unknown topic", "topic", msg.Topic)
101+
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
102102
}
103103
}
104104

105105
func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) {
106+
slog.DebugContext(ctx, "Command plugin received resource update message")
106107
if resource, ok := msg.Data.(*mpi.Resource); ok {
107108
if !cp.commandService.IsConnected() {
108109
cp.createConnection(ctx, resource)
@@ -138,9 +139,10 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res
138139
}
139140

140141
func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Message) {
142+
slog.DebugContext(ctx, "Command plugin received data plane health message")
141143
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
142144
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
143-
correlationID := logger.GetCorrelationID(ctx)
145+
correlationID := logger.CorrelationID(ctx)
144146
if err != nil {
145147
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
146148
cp.messagePipe.Process(ctx, &bus.Message{
@@ -152,12 +154,13 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me
152154
cp.messagePipe.Process(ctx, &bus.Message{
153155
Topic: bus.DataPlaneResponseTopic,
154156
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
155-
"Successfully sent the health status update", ""),
157+
"Successfully sent health status update", ""),
156158
})
157159
}
158160
}
159161

160162
func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Message) {
163+
slog.DebugContext(ctx, "Command plugin received instance health message")
161164
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
162165
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
163166
if err != nil {
@@ -167,7 +170,10 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
167170
}
168171

169172
func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.Message) {
173+
slog.DebugContext(ctx, "Command plugin received data plane response message")
170174
if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok {
175+
slog.InfoContext(ctx, "Sending data plane response message", "message",
176+
response.GetCommandResponse().GetMessage(), "status", response.GetCommandResponse().GetStatus())
171177
err := cp.commandService.SendDataPlaneResponse(ctx, response)
172178
if err != nil {
173179
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
@@ -176,7 +182,7 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
176182
}
177183

178184
func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
179-
slog.DebugContext(ctx, "Command plugin received connection reset")
185+
slog.DebugContext(ctx, "Command plugin received connection reset message")
180186
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
181187
connectionErr := cp.conn.Close(ctx)
182188
if connectionErr != nil {
@@ -217,12 +223,16 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
217223

218224
switch message.GetRequest().(type) {
219225
case *mpi.ManagementPlaneRequest_ConfigUploadRequest:
226+
slog.InfoContext(ctx, "Received management plane config upload request")
220227
cp.handleConfigUploadRequest(newCtx, message)
221228
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
229+
slog.InfoContext(ctx, "Received management plane config apply request")
222230
cp.handleConfigApplyRequest(newCtx, message)
223231
case *mpi.ManagementPlaneRequest_HealthRequest:
232+
slog.InfoContext(ctx, "Received management plane health request")
224233
cp.handleHealthRequest(newCtx)
225234
case *mpi.ManagementPlaneRequest_ActionRequest:
235+
slog.InfoContext(ctx, "Received management plane action request")
226236
cp.handleAPIActionRequest(newCtx, message)
227237
default:
228238
slog.DebugContext(newCtx, "Management plane request not implemented yet")

0 commit comments

Comments
 (0)