Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,392 changes: 578 additions & 814 deletions .golangci.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Makefile.tools
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
OAPICODEGEN = github.com/deepmap/oapi-codegen/v2/cmd/oapi-codegen@v2.1.0
LEFTHOOK = github.com/evilmartians/lefthook@v1.6.9
GOLANGCILINT = github.com/golangci/golangci-lint/cmd/golangci-lint@v1.64.8
GOLANGCILINT = github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.2.1
PROTOCGENGO = google.golang.org/protobuf/cmd/protoc-gen-go@v1.32.0
GOFUMPT = mvdan.cc/gofumpt@v0.6.0
COUNTERFEITER = github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/nginx/agent/v3

go 1.23.7
go 1.24.0

toolchain go1.23.10
toolchain go1.24.4

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.4-20250130201111-63bb56e20495.1
Expand Down
20 changes: 10 additions & 10 deletions internal/bus/message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ func (p *MessagePipe) IsPluginRegistered(pluginName string) bool {
return isPluginRegistered
}

func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
for index, plugin := range plugins {
if pluginName == plugin.Info().Name {
return index
}
}

return -1
}

func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) error {
if index != -1 {
p.plugins = append(p.plugins[:index], p.plugins[index+1:]...)
Expand Down Expand Up @@ -181,16 +191,6 @@ func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin {
return plugins
}

func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
for index, plugin := range plugins {
if pluginName == plugin.Info().Name {
return index
}
}

return -1
}

func (p *MessagePipe) initPlugins(ctx context.Context) {
for index, plugin := range p.plugins {
err := plugin.Init(ctx, p)
Expand Down
32 changes: 16 additions & 16 deletions internal/collector/logsgzipprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ func (p *logsGzipProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error
return p.nextConsumer.ConsumeLogs(ctx, ld)
}

func (p *logsGzipProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: true,
}
}

func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error {
p.settings.Logger.Info("Starting logs gzip processor")
return nil
}

func (p *logsGzipProcessor) Shutdown(ctx context.Context) error {
p.settings.Logger.Info("Shutting down logs gzip processor")
return nil
}

func (p *logsGzipProcessor) processLogRecords(logRecords plog.LogRecordSlice) error {
var errs error
// Filter out unsupported data types in the log before processing
Expand Down Expand Up @@ -164,19 +180,3 @@ func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {

return buf.Bytes(), nil
}

func (p *logsGzipProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: true,
}
}

func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error {
p.settings.Logger.Info("Starting logs gzip processor")
return nil
}

func (p *logsGzipProcessor) Shutdown(ctx context.Context) error {
p.settings.Logger.Info("Shutting down logs gzip processor")
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func generateLogs(numRecords, recordSize int) plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
for i := 0; i < numRecords; i++ {
for range numRecords {
lr := sl.LogRecords().AppendEmpty()
content, _ := randomString(recordSize)
lr.Body().SetStr(content)
Expand Down Expand Up @@ -64,7 +64,7 @@ func BenchmarkGzipProcessor(b *testing.B) {
logs := generateLogs(bm.numRecords, bm.recordSize)

b.ResetTimer()
for i := 0; i < b.N; i++ {
for range b.N {
_ = p.ConsumeLogs(context.Background(), logs)
}
})
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/nginxossreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func TestType(t *testing.T) {
factory := NewFactory()
ft := factory.Type()
require.EqualValues(t, metadata.Type, ft)
require.Equal(t, metadata.Type, ft)
}

func TestValidConfig(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ func (nls *NginxLogScraper) Shutdown(_ context.Context) error {
return err
}

func (nls *NginxLogScraper) ConsumerCallback(_ context.Context, entries []*entry.Entry) {
nls.mut.Lock()
nls.entries = append(nls.entries, entries...)
nls.mut.Unlock()
}

func (nls *NginxLogScraper) initStanzaPipeline(
operators []operator.Config,
logger *zap.Logger,
Expand All @@ -248,12 +254,6 @@ func (nls *NginxLogScraper) initStanzaPipeline(
return pipe, err
}

func (nls *NginxLogScraper) ConsumerCallback(_ context.Context, entries []*entry.Entry) {
nls.mut.Lock()
nls.entries = append(nls.entries, entries...)
nls.mut.Unlock()
}

func (nls *NginxLogScraper) runConsumer(ctx context.Context) {
nls.logger.Info("Starting NGINX access log receiver's consumer")
defer nls.wg.Done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ import (

const operatorType = "access_log_file_input"

// Config is the configuration of a file input operator
type Config struct {
helper.InputConfig `mapstructure:",squash"`
AccessLogFormat string `mapstructure:"access_log_format"`
fileconsumer.Config `mapstructure:",squash"`
}

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}
Expand All @@ -38,13 +45,6 @@ func NewConfigWithID(operatorID string) *Config {
}
}

// Config is the configuration of a file input operator
type Config struct {
helper.InputConfig `mapstructure:",squash"`
AccessLogFormat string `mapstructure:"access_log_format"`
fileconsumer.Config `mapstructure:",squash"`
}

// Build will build a file input operator from the supplied configuration
// nolint: ireturn
func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewConfig(t *testing.T) {
config := NewConfig()

assert.NotNil(t, config)
assert.Equal(t, "access_log_file_input", config.InputConfig.OperatorID)
assert.Equal(t, "access_log_file_input", config.OperatorID)
}

func TestConfig_Build(t *testing.T) {
Expand Down
120 changes: 60 additions & 60 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,63 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
return nil
}

// Info the plugin.
func (oc *Collector) Info() *bus.Info {
return &bus.Info{
Name: "collector",
}
}

// Close the plugin.
func (oc *Collector) Close(ctx context.Context) error {
slog.InfoContext(ctx, "Closing OTel Collector plugin")

if !oc.stopped {
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
oc.service.Shutdown()
oc.cancel()

settings := *oc.config.Client.Backoff
settings.MaxElapsedTime = maxTimeToWaitForShutdown
err := backoff.WaitUntil(ctx, &settings, func() error {
if oc.service.GetState() == otelcol.StateClosed {
return nil
}

return errors.New("OTel Collector not in a closed state yet")
})

if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
} else {
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
oc.stopped = true
}
}

return nil
}

// Process an incoming Message Bus message in the plugin
func (oc *Collector) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.NginxConfigUpdateTopic:
oc.handleNginxConfigUpdate(ctx, msg)
case bus.ResourceUpdateTopic:
oc.handleResourceUpdate(ctx, msg)
default:
slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic)
}
}

// Subscriptions returns the list of topics the plugin is subscribed to
func (oc *Collector) Subscriptions() []string {
return []string{
bus.ResourceUpdateTopic,
bus.NginxConfigUpdateTopic,
}
}

// Process receivers and log warning for sub-optimal configurations
func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) {
for _, receiver := range receivers {
Expand Down Expand Up @@ -167,7 +224,7 @@ func (oc *Collector) bootup(ctx context.Context) error {

go func() {
if oc.service == nil {
errChan <- fmt.Errorf("unable to start OTel collector: service is nil")
errChan <- errors.New("unable to start OTel collector: service is nil")
return
}

Expand All @@ -184,7 +241,7 @@ func (oc *Collector) bootup(ctx context.Context) error {
return err
default:
if oc.service == nil {
return fmt.Errorf("unable to start otel collector: service is nil")
return errors.New("unable to start otel collector: service is nil")
}

state := oc.service.GetState()
Expand All @@ -205,63 +262,6 @@ func (oc *Collector) bootup(ctx context.Context) error {
}
}

// Info the plugin.
func (oc *Collector) Info() *bus.Info {
return &bus.Info{
Name: "collector",
}
}

// Close the plugin.
func (oc *Collector) Close(ctx context.Context) error {
slog.InfoContext(ctx, "Closing OTel Collector plugin")

if !oc.stopped {
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
oc.service.Shutdown()
oc.cancel()

settings := *oc.config.Client.Backoff
settings.MaxElapsedTime = maxTimeToWaitForShutdown
err := backoff.WaitUntil(ctx, &settings, func() error {
if oc.service.GetState() == otelcol.StateClosed {
return nil
}

return errors.New("OTel Collector not in a closed state yet")
})

if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
} else {
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
oc.stopped = true
}
}

return nil
}

// Process an incoming Message Bus message in the plugin
func (oc *Collector) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.NginxConfigUpdateTopic:
oc.handleNginxConfigUpdate(ctx, msg)
case bus.ResourceUpdateTopic:
oc.handleResourceUpdate(ctx, msg)
default:
slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic)
}
}

// Subscriptions returns the list of topics the plugin is subscribed to
func (oc *Collector) Subscriptions() []string {
return []string{
bus.ResourceUpdateTopic,
bus.NginxConfigUpdateTopic,
}
}

func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "OTel collector plugin received nginx config update message")
oc.mu.Lock()
Expand Down Expand Up @@ -436,7 +436,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex
reloadCollector = true
}
} else {
slog.Debug("NAP logs feature disabled", "enabled_features", oc.config.Features)
slog.DebugContext(ctx, "NAP logs feature disabled", "enabled_features", oc.config.Features)
}

return reloadCollector
Expand Down
5 changes: 2 additions & 3 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"path/filepath"
"testing"

Expand Down Expand Up @@ -252,7 +251,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) {

if len(test.receivers.NginxPlusReceivers) == 1 {
apiDetails := config.APIDetails{
URL: fmt.Sprintf("%s/api", nginxPlusMock.URL),
URL: nginxPlusMock.URL + "/api",
Listen: "",
Location: "",
}
Expand All @@ -270,7 +269,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) {
model.PlusAPI.Location = apiDetails.Location
} else {
apiDetails := config.APIDetails{
URL: fmt.Sprintf("%s/stub_status", nginxPlusMock.URL),
URL: nginxPlusMock.URL + "/stub_status",
Listen: "",
Location: "",
}
Expand Down
Loading