Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions internal/bus/busfakes/fake_message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (p *FakeMessagePipe) DeRegister(ctx context.Context, pluginNames []string)
plugins = p.findPlugins(pluginNames, plugins)

for _, plugin := range plugins {
index := p.GetIndex(plugin.Info().Name, p.plugins)
index := p.Index(plugin.Info().Name, p.plugins)
p.unsubscribePlugin(ctx, index, plugin)
}

return nil
}

func (p *FakeMessagePipe) GetIndex(pluginName string, plugins []bus.Plugin) int {
func (p *FakeMessagePipe) Index(pluginName string, plugins []bus.Plugin) int {
for index, plugin := range plugins {
if pluginName == plugin.Info().Name {
return index
Expand Down Expand Up @@ -85,14 +85,14 @@ func (p *FakeMessagePipe) Process(_ context.Context, msgs ...*bus.Message) {
p.messages = append(p.messages, msgs...)
}

func (p *FakeMessagePipe) GetMessages() []*bus.Message {
func (p *FakeMessagePipe) Messages() []*bus.Message {
p.messagesLock.Lock()
defer p.messagesLock.Unlock()

return p.messages
}

func (p *FakeMessagePipe) GetProcessedMessages() []*bus.Message {
func (p *FakeMessagePipe) ProcessedMessages() []*bus.Message {
return p.processedMessages
}

Expand Down Expand Up @@ -127,14 +127,14 @@ func (p *FakeMessagePipe) RunWithoutInit(ctx context.Context) {
}
}

func (p *FakeMessagePipe) GetPlugins() []bus.Plugin {
func (p *FakeMessagePipe) Plugins() []bus.Plugin {
return p.plugins
}

func (p *FakeMessagePipe) IsPluginRegistered(pluginName string) bool {
pluginAlreadyRegistered := false

for _, plugin := range p.GetPlugins() {
for _, plugin := range p.Plugins() {
if plugin.Info().Name == pluginName {
pluginAlreadyRegistered = true
}
Expand Down
10 changes: 5 additions & 5 deletions internal/bus/message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type (
DeRegister(ctx context.Context, plugins []string) error
Process(ctx context.Context, messages ...*Message)
Run(ctx context.Context)
GetPlugins() []Plugin
Plugins() []Plugin
IsPluginRegistered(pluginName string) bool
}

Expand Down Expand Up @@ -93,7 +93,7 @@ func (p *MessagePipe) DeRegister(ctx context.Context, pluginNames []string) erro
plugins := p.findPlugins(pluginNames)

for _, plugin := range plugins {
index := p.GetIndex(plugin.Info().Name, p.plugins)
index := p.Index(plugin.Info().Name, p.plugins)

err := p.unsubscribePlugin(ctx, index, plugin)
if err != nil {
Expand Down Expand Up @@ -131,14 +131,14 @@ func (p *MessagePipe) Run(ctx context.Context) {
}
}

func (p *MessagePipe) GetPlugins() []Plugin {
func (p *MessagePipe) Plugins() []Plugin {
return p.plugins
}

func (p *MessagePipe) IsPluginRegistered(pluginName string) bool {
isPluginRegistered := false

for _, plugin := range p.GetPlugins() {
for _, plugin := range p.Plugins() {
if plugin.Info().Name == pluginName {
isPluginRegistered = true
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin {
return plugins
}

func (p *MessagePipe) GetIndex(pluginName string, plugins []Plugin) int {
func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
for index, plugin := range plugins {
if pluginName == plugin.Info().Name {
return index
Expand Down
4 changes: 2 additions & 2 deletions internal/bus/message_pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ func TestMessagePipe_DeRegister(t *testing.T) {
err := messagePipe.Register(100, []Plugin{plugin})

require.NoError(t, err)
assert.Len(t, messagePipe.GetPlugins(), 1)
assert.Len(t, messagePipe.Plugins(), 1)

err = messagePipe.DeRegister(ctx, []string{plugin.Info().Name})

require.NoError(t, err)
assert.Empty(t, messagePipe.GetPlugins())
assert.Empty(t, messagePipe.Plugins())
plugin.AssertExpectations(t)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cs *CPUSource) Collect() (ContainerCPUStats, error) {

// nolint: mnd
func (cs *CPUSource) collectCPUStats() (ContainerCPUStats, error) {
clockTicks, err := getClockTicks()
clockTicks, err := clockTicks()
if err != nil {
return ContainerCPUStats{}, err
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (cs *CPUSource) collectCPUStats() (ContainerCPUStats, error) {

cpuTimes.userUsage = convertUsage(cpuTimes.userUsage)
cpuTimes.systemUsage = convertUsage(cpuTimes.systemUsage)
hostSystemUsage, err := getSystemCPUUsage(clockTicks)
hostSystemUsage, err := systemCPUUsage(clockTicks)
if err != nil {
return ContainerCPUStats{}, err
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (cs *CPUSource) cpuUsageTimes(filePath, userKey, systemKey string) (*Contai
}

// nolint: revive, gocritic
func getSystemCPUUsage(clockTicks int) (float64, error) {
func systemCPUUsage(clockTicks int) (float64, error) {
lines, err := internal.ReadLines(CPUStatsPath)
if err != nil {
return 0, err
Expand Down Expand Up @@ -191,7 +191,7 @@ func getSystemCPUUsage(clockTicks int) (float64, error) {
return 0, errors.New("unable to process " + CPUStatsPath + ". No cpu found")
}

func getClockTicks() (int, error) {
func clockTicks() (int, error) {
cmd := exec.Command("getconf", "CLK_TCK")
out := new(bytes.Buffer)
cmd.Stdout = out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (ms *MemorySource) VirtualMemoryStatWithContext(ctx context.Context) (*mem.
return &mem.VirtualMemoryStat{}, err
}

memoryStat, err = GetMemoryStat(
memoryStat, err = CalculateMemoryStat(
path.Join(ms.basePath, memStatFile),
memCachedKey,
memSharedKey,
Expand Down Expand Up @@ -147,7 +147,7 @@ func MemoryLimitInBytes(ctx context.Context, filePath string) (uint64, error) {
}

// nolint: revive, mnd
func GetMemoryStat(statFile, cachedKey, sharedKey string) (MemoryStat, error) {
func CalculateMemoryStat(statFile, cachedKey, sharedKey string) (MemoryStat, error) {
memoryStat := MemoryStat{}
lines, err := internal.ReadLines(statFile)
if err != nil {
Expand Down
14 changes: 11 additions & 3 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func New(conf *config.Config) (*Collector, error) {
}, nil
}

func (oc *Collector) GetState() otelcol.State {
func (oc *Collector) State() otelcol.State {
oc.mu.Lock()
defer oc.mu.Unlock()

Expand Down Expand Up @@ -263,6 +263,7 @@ func (oc *Collector) Subscriptions() []string {
}

func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "OTel collector plugin received nginx config update message")
oc.mu.Lock()
defer oc.mu.Unlock()

Expand All @@ -287,6 +288,7 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
}

func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "OTel collector plugin received resource update message")
oc.mu.Lock()
defer oc.mu.Unlock()

Expand Down Expand Up @@ -407,6 +409,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex
nginxReceiverFound, reloadCollector := oc.updateExistingNginxPlusReceiver(nginxConfigContext)

if !nginxReceiverFound && nginxConfigContext.PlusAPI.URL != "" {
slog.DebugContext(ctx, "Adding new NGINX Plus receiver", "url", nginxConfigContext.PlusAPI.URL)
oc.config.Collector.Receivers.NginxPlusReceivers = append(
oc.config.Collector.Receivers.NginxPlusReceivers,
config.NginxPlusReceiver{
Expand Down Expand Up @@ -443,6 +446,7 @@ func (oc *Collector) addNginxOssReceiver(ctx context.Context, nginxConfigContext
nginxReceiverFound, reloadCollector := oc.updateExistingNginxOSSReceiver(nginxConfigContext)

if !nginxReceiverFound && nginxConfigContext.StubStatus.URL != "" {
slog.DebugContext(ctx, "Adding new NGINX OSS receiver", "url", nginxConfigContext.StubStatus.URL)
oc.config.Collector.Receivers.NginxReceivers = append(
oc.config.Collector.Receivers.NginxReceivers,
config.NginxReceiver{
Expand Down Expand Up @@ -479,6 +483,8 @@ func (oc *Collector) updateExistingNginxPlusReceiver(
oc.config.Collector.Receivers.NginxPlusReceivers[index+1:]...,
)
if nginxConfigContext.PlusAPI.URL != "" {
slog.Debug("Updating existing NGINX Plus receiver", "url",
nginxConfigContext.PlusAPI.URL)
nginxPlusReceiver.PlusAPI.URL = nginxConfigContext.PlusAPI.URL
oc.config.Collector.Receivers.NginxPlusReceivers = append(
oc.config.Collector.Receivers.NginxPlusReceivers,
Expand Down Expand Up @@ -510,6 +516,8 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
oc.config.Collector.Receivers.NginxReceivers[index+1:]...,
)
if nginxConfigContext.StubStatus.URL != "" {
slog.Debug("Updating existing NGINX OSS receiver", "url",
nginxConfigContext.StubStatus.URL)
nginxReceiver.StubStatus = config.APIDetails{
URL: nginxConfigContext.StubStatus.URL,
Listen: nginxConfigContext.StubStatus.Listen,
Expand Down Expand Up @@ -587,7 +595,7 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig
}

func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool {
listenAddressesToBeDeleted := oc.getConfigDeletedNapReceivers(nginxConfigContext)
listenAddressesToBeDeleted := oc.configDeletedNapReceivers(nginxConfigContext)
if len(listenAddressesToBeDeleted) != 0 {
oc.deleteNapReceivers(listenAddressesToBeDeleted)
return true
Expand All @@ -606,7 +614,7 @@ func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bo
oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers
}

func (oc *Collector) getConfigDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
func (oc *Collector) configDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
elements := make(map[string]bool)

for _, tcplogReceiver := range oc.config.Collector.Receivers.TcplogReceivers {
Expand Down
6 changes: 3 additions & 3 deletions internal/collector/otel_collector_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func TestCollector_InitAndClose(t *testing.T) {

collector.service = createFakeCollector()

assert.Equal(t, otelcol.StateRunning, collector.GetState())
assert.Equal(t, otelcol.StateRunning, collector.State())

require.NoError(t, collector.Close(ctx), "Close should not return an error")

assert.Equal(t, otelcol.StateClosed, collector.GetState())
assert.Equal(t, otelcol.StateClosed, collector.State())
}

// nolint: revive
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
name: "Test 1: Resource update adds resource id attribute",
message: &bus.Message{
Topic: bus.ResourceUpdateTopic,
Data: protos.GetHostResource(),
Data: protos.HostResource(),
},
processors: config.Processors{
Resource: &config.Resource{
Expand Down
18 changes: 14 additions & 4 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
case bus.DataPlaneResponseTopic:
cp.processDataPlaneResponse(ctx, msg)
default:
slog.DebugContext(ctx, "Command plugin unknown topic", "topic", msg.Topic)
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
}
}

func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received resource update message")
if resource, ok := msg.Data.(*mpi.Resource); ok {
if !cp.commandService.IsConnected() {
cp.createConnection(ctx, resource)
Expand Down Expand Up @@ -138,9 +139,10 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res
}

func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received data plane health message")
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
correlationID := logger.GetCorrelationID(ctx)
correlationID := logger.CorrelationID(ctx)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
cp.messagePipe.Process(ctx, &bus.Message{
Expand All @@ -152,12 +154,13 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me
cp.messagePipe.Process(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully sent the health status update", ""),
"Successfully sent health status update", ""),
})
}
}

func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received instance health message")
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
if err != nil {
Expand All @@ -167,7 +170,10 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
}

func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received data plane response message")
if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok {
slog.InfoContext(ctx, "Sending data plane response message", "message",
response.GetCommandResponse().GetMessage(), "status", response.GetCommandResponse().GetStatus())
err := cp.commandService.SendDataPlaneResponse(ctx, response)
if err != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
Expand All @@ -176,7 +182,7 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
}

func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received connection reset")
slog.DebugContext(ctx, "Command plugin received connection reset message")
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
connectionErr := cp.conn.Close(ctx)
if connectionErr != nil {
Expand Down Expand Up @@ -217,12 +223,16 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {

switch message.GetRequest().(type) {
case *mpi.ManagementPlaneRequest_ConfigUploadRequest:
slog.InfoContext(ctx, "Received management plane config upload request")
cp.handleConfigUploadRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
slog.InfoContext(ctx, "Received management plane config apply request")
cp.handleConfigApplyRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_HealthRequest:
slog.InfoContext(ctx, "Received management plane health request")
cp.handleHealthRequest(newCtx)
case *mpi.ManagementPlaneRequest_ActionRequest:
slog.InfoContext(ctx, "Received management plane action request")
cp.handleAPIActionRequest(newCtx, message)
default:
slog.DebugContext(newCtx, "Management plane request not implemented yet")
Expand Down
16 changes: 8 additions & 8 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func TestCommandPlugin_createConnection(t *testing.T) {

assert.Eventually(
t,
func() bool { return len(messagePipe.GetMessages()) == 1 },
func() bool { return len(messagePipe.Messages()) == 1 },
2*time.Second,
10*time.Millisecond,
)

messages := messagePipe.GetMessages()
messages := messagePipe.Messages()
assert.Len(t, messages, 1)
assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic)
}
Expand All @@ -125,21 +125,21 @@ func TestCommandPlugin_Process(t *testing.T) {

commandPlugin.commandService = fakeCommandService

commandPlugin.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: protos.GetHostResource()})
commandPlugin.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: protos.HostResource()})
require.Equal(t, 1, fakeCommandService.CreateConnectionCallCount())

commandPlugin.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: protos.GetHostResource()})
commandPlugin.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: protos.HostResource()})
require.Equal(t, 1, fakeCommandService.UpdateDataPlaneStatusCallCount())

commandPlugin.Process(ctx, &bus.Message{Topic: bus.InstanceHealthTopic, Data: protos.GetInstanceHealths()})
commandPlugin.Process(ctx, &bus.Message{Topic: bus.InstanceHealthTopic, Data: protos.InstanceHealths()})
require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount())

commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: protos.OKDataPlaneResponse()})
require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount())

commandPlugin.Process(ctx, &bus.Message{
Topic: bus.DataPlaneHealthResponseTopic,
Data: protos.GetHealthyInstanceHealth(),
Data: protos.HealthyInstanceHealth(),
})
require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount())
require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount())
Expand Down Expand Up @@ -230,12 +230,12 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {

assert.Eventually(
t,
func() bool { return len(messagePipe.GetMessages()) == 1 },
func() bool { return len(messagePipe.Messages()) == 1 },
2*time.Second,
10*time.Millisecond,
)

messages := messagePipe.GetMessages()
messages := messagePipe.Messages()
assert.Len(tt, messages, 1)
assert.Equal(tt, test.expectedTopic.Topic, messages[0].Topic)

Expand Down
Loading