Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/core/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func (env *EnvironmentType) IsContainer() bool {
res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) {
for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} {
if _, err := os.Stat(filename); err == nil {
log.Debugf("Is a container because (%s) exists", filename)
return true, nil
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/core/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
pluginsRegistered := []string{}
extensionPluginsRegistered := []string{}

for _, plugin := range p.plugins {
for _, plugin := range plugins {
for _, subscription := range plugin.Subscriptions() {
p.regMu.Lock()
err := p.bus.Subscribe(subscription, plugin.Process)
Expand All @@ -83,7 +83,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
pluginsRegistered = append(pluginsRegistered, *plugin.Info().name)
}

for _, plugin := range p.extensionPlugins {
for _, plugin := range extensionPlugins {
for _, subscription := range plugin.Subscriptions() {
p.regMu.Lock()
err := p.bus.Subscribe(subscription, plugin.Process)
Expand All @@ -94,6 +94,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
}
extensionPluginsRegistered = append(extensionPluginsRegistered, *plugin.Info().name)
}

log.Infof("The following core plugins have been registered: %q", pluginsRegistered)
log.Infof("The following extension plugins have been registered: %q", extensionPluginsRegistered)

Expand Down Expand Up @@ -127,6 +128,8 @@ func (p *MessagePipe) DeRegister(pluginNames []string) error {
return err
}
}

plugin = nil
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/plugins/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ func NewCommander(cmdr client.Commander, config *config.Config) *Commander {
}

func (c *Commander) Init(pipeline core.MessagePipeInterface) {
log.Info("Commander initializing")
c.pipeline = pipeline
c.ctx = pipeline.Context()
log.Info("Commander initializing")
go c.dispatchLoop()
}

func (c *Commander) Close() {
log.Info("Commander is wrapping up")
log.Info("Commander is closed")
}

func (c *Commander) Info() *core.Info {
Expand Down Expand Up @@ -158,7 +159,6 @@ func (c *Commander) dispatchLoop() {
case *proto.Command_AgentConnectRequest, *proto.Command_AgentConnectResponse:
topic = core.AgentConnected
case *proto.Command_AgentConfigRequest, *proto.Command_AgentConfig:
log.Debugf("agent config %T command data type received and ignored", cmd.GetData())
topic = core.AgentConfig
case *proto.Command_CmdStatus:
data := cmd.GetData().(*proto.Command_CmdStatus)
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/extensions"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
"github.com/nginx/agent/sdk/v2/agent/events"
Expand Down Expand Up @@ -36,15 +37,15 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E

if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil {
corePlugins = append(corePlugins,
NewMetricsSender(reporter),
NewMetricsSender(reporter, atomic.NewBool(false)),
)
}

corePlugins = append(corePlugins,
NewConfigReader(loadedConfig),
NewNginx(commander, binary, env, loadedConfig, processes),
NewExtensions(loadedConfig, env),
NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta),
NewFeatures(commander, reporter, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta),
)

if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) {
Expand Down
14 changes: 12 additions & 2 deletions src/plugins/config_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewConfigReader(config *config.Config) *ConfigReader {
}

func (r *ConfigReader) Init(pipeline core.MessagePipeInterface) {
log.Info("ConfigReader initializing")
r.messagePipeline = pipeline
}

Expand All @@ -46,6 +47,7 @@ func (r *ConfigReader) Info() *core.Info {

func (r *ConfigReader) Close() {
log.Info("ConfigReader is wrapping up")
log.Info("ConfigReader is closed")
}

func (r *ConfigReader) Process(msg *core.Message) {
Expand Down Expand Up @@ -73,8 +75,10 @@ func (r *ConfigReader) Process(msg *core.Message) {
// Update the agent config on disk
switch commandData := cmd.Data.(type) {
case *proto.Command_AgentConfig:
log.Debugf("Config reader: AgentConfig message recevied: %v, topic: %v", commandData, msg.Topic())
r.updateAgentConfig(commandData.AgentConfig)
case *proto.Command_AgentConnectResponse:
log.Debugf("Config reader: AgentConnectResponse message recevied: %v, topic: %v", commandData, msg.Topic())
r.updateAgentConfig(commandData.AgentConnectResponse.AgentConfig)
}
}
Expand Down Expand Up @@ -152,7 +156,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig)
}

if synchronizeFeatures {
log.Info("Agent config features changed, synchronizing features")
r.synchronizeFeatures(payloadAgentConfig)
r.config.Features = payloadAgentConfig.Details.Features
}

r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig))
Expand All @@ -164,6 +170,7 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {
r.detailsMu.RLock()
for _, feature := range r.config.Features {
if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync {
log.Debugf("Deregistering the feature %s", feature)
r.deRegisterPlugin(feature)
}
}
Expand All @@ -177,12 +184,15 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {

func (r *ConfigReader) deRegisterPlugin(data string) {
if data == agent_config.FeatureFileWatcher {

err := r.messagePipeline.DeRegister([]string{agent_config.FeatureFileWatcher, agent_config.FeatureFileWatcherThrottle})
if err != nil {
log.Warnf("Error De-registering %v Plugin: %v", data, err)
}

} else if data == agent_config.FeatureMetrics {
err := r.messagePipeline.DeRegister([]string{agent_config.FeatureMetrics, agent_config.FeatureMetricsThrottle, agent_config.FeatureMetricsSender})
if err != nil {
log.Warnf("Error De-registering %v Plugin: %v", data, err)
}
} else {
err := r.messagePipeline.DeRegister([]string{data})
if err != nil {
Expand Down
48 changes: 28 additions & 20 deletions src/plugins/dataplane_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ import (
)

type DataPlaneStatus struct {
messagePipeline core.MessagePipeInterface
ctx context.Context
sendStatus chan bool
healthTicker *time.Ticker
interval time.Duration
meta *proto.Metadata
binary core.NginxBinary
env core.Environment
version string
tags *[]string
configDirs string
lastSendDetails time.Time
envHostInfo *proto.HostInfo
reportInterval time.Duration
softwareDetails map[string]*proto.DataplaneSoftwareDetails
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
softwareDetailsMutex sync.RWMutex
structMu sync.RWMutex
processes []*core.Process
messagePipeline core.MessagePipeInterface
ctx context.Context
sendStatus chan bool
healthTicker *time.Ticker
interval time.Duration
meta *proto.Metadata
binary core.NginxBinary
env core.Environment
version string
tags *[]string
configDirs string
lastSendDetails time.Time
envHostInfo *proto.HostInfo
reportInterval time.Duration
softwareDetails map[string]*proto.DataplaneSoftwareDetails
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
nginxConfigActivityStatusesMutex sync.RWMutex
softwareDetailsMutex sync.RWMutex
structMu sync.RWMutex
processes []*core.Process
}

const (
Expand Down Expand Up @@ -81,12 +82,15 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) {

func (dps *DataPlaneStatus) Close() {
log.Info("DataPlaneStatus is wrapping up")
dps.nginxConfigActivityStatusesMutex.Lock()
dps.nginxConfigActivityStatuses = nil
dps.nginxConfigActivityStatusesMutex.Unlock()
dps.softwareDetailsMutex.Lock()
dps.softwareDetails = nil
dps.softwareDetailsMutex.Unlock()
dps.healthTicker.Stop()
dps.sendStatus <- true
log.Info("DataPlaneStatus is closed")
}

func (dps *DataPlaneStatus) Info() *core.Info {
Expand Down Expand Up @@ -144,8 +148,10 @@ func (dps *DataPlaneStatus) Subscriptions() []string {

func (dps *DataPlaneStatus) updateNginxConfigActivityStatuses(newAgentActivityStatus *proto.AgentActivityStatus) {
log.Tracef("DataplaneStatus: Updating nginxConfigActivityStatuses with %v", newAgentActivityStatus)
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); ok {
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); dps.nginxConfigActivityStatuses != nil && ok {
dps.nginxConfigActivityStatusesMutex.Lock()
dps.nginxConfigActivityStatuses[newAgentActivityStatus.GetNginxConfigStatus().GetNginxId()] = newAgentActivityStatus
dps.nginxConfigActivityStatusesMutex.Unlock()
}
}

Expand Down Expand Up @@ -184,6 +190,8 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface)
func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus {
forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails)

dps.nginxConfigActivityStatusesMutex.Lock()
defer dps.nginxConfigActivityStatusesMutex.Unlock()
agentActivityStatuses := []*proto.AgentActivityStatus{}
for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses {
agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus)
Expand Down
1 change: 1 addition & 0 deletions src/plugins/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (a *Events) Init(pipeline core.MessagePipeInterface) {

func (a *Events) Close() {
log.Info("Events is wrapping up")
log.Info("Events is closed")
}

func (a *Events) Process(msg *core.Message) {
Expand Down
1 change: 1 addition & 0 deletions src/plugins/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (e *Extensions) Init(pipeline core.MessagePipeInterface) {

func (e *Extensions) Close() {
log.Info("Extensions is wrapping up")
log.Info("Extensions is closed")
}

func (e *Extensions) Process(msg *core.Message) {
Expand Down
Loading
Loading