Skip to content
Open
11 changes: 11 additions & 0 deletions CHANGELOG-6.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ Versioning](http://semver.org/spec/v2.0.0.html).

## Unreleased

### 2024-03-07

### Added
- Addition of a new watcher configuration to monitor the user updates
- Added the exit mechanism to disconnect agent when user is disabled
- Added a struct in store which will get passed down for userConfigs

### Changed
- The session config to watch over the user updated and the wizzard bus


### Changed
- Upgraded CI Go version to 1.21.3
- Upgraded jwt version to 4.4.3
Expand Down
32 changes: 31 additions & 1 deletion backend/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/sensu/sensu-go/agent"
corev2 "github.com/sensu/core/v2"
"github.com/sensu/sensu-go/agent"
"github.com/sensu/sensu-go/backend/apid/actions"
"github.com/sensu/sensu-go/backend/apid/middlewares"
"github.com/sensu/sensu-go/backend/apid/routers"
Expand Down Expand Up @@ -105,6 +105,7 @@ type Agentd struct {
serveWaitTime time.Duration
ready func()
backendEntity *corev2.Entity
userWatcher <-chan *store.WatchEventUserConfig
}

// Config configures an Agentd.
Expand All @@ -121,6 +122,7 @@ type Config struct {
EtcdClientTLSConfig *tls.Config
Watcher <-chan store.WatchEventEntityConfig
BackendEntity *corev2.Entity
UserWatcher <-chan *store.WatchEventUserConfig
}

// Option is a functional option.
Expand Down Expand Up @@ -149,6 +151,7 @@ func New(c Config, opts ...Option) (*Agentd, error) {
etcdClientTLSConfig: c.EtcdClientTLSConfig,
serveWaitTime: c.ServeWaitTime,
backendEntity: c.BackendEntity,
userWatcher: c.UserWatcher,
}

// prepare server TLS config
Expand Down Expand Up @@ -275,6 +278,7 @@ func (a *Agentd) Start() error {
func (a *Agentd) runWatcher() {
defer func() {
logger.Warn("shutting down entity config watcher")
logger.Warn("shutting down user config watcher")
}()
for {
select {
Expand All @@ -287,10 +291,18 @@ func (a *Agentd) runWatcher() {
if err := a.handleEvent(event); err != nil {
logger.WithError(err).Error("error handling entity config watch event")
}
case userEvent, ok := <-a.userWatcher:
if !ok {
return
}
if err := a.handleUserEvent(userEvent); err != nil {
logger.WithError(err).Error("error handling user config watch event")
}
}
}
}

// adding the config updates to the etcd bus for watcher to consume
func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error {
if event.Entity == nil {
return errors.New("nil entity received from entity config watcher")
Expand All @@ -308,6 +320,24 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error {
return nil
}

// adding the UserConfig updates to the etcd bus for the watcher to consume
func (a *Agentd) handleUserEvent(event *store.WatchEventUserConfig) error {
if event.User == nil {
return errors.New("nil entry received from the user config watcher")
}

topic := messaging.UserConfigTopic(event.User.Username)
if err := a.bus.Publish(topic, event); err != nil {
logger.WithField("topic", topic).WithError(err).
Error("unable to publish a user config update to the bus")
return err
}

logger.WithField("topic", topic).
Debug("successfully published an user config update to the bus")
return nil
}

// Stop Agentd.
func (a *Agentd) Stop() error {
a.cancel()
Expand Down
9 changes: 4 additions & 5 deletions backend/agentd/agentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/backend/apid/middlewares"
Expand All @@ -22,6 +17,10 @@ import (
"github.com/sensu/sensu-go/transport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
)

func TestAgentdMiddlewares(t *testing.T) {
Expand Down
111 changes: 104 additions & 7 deletions backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/sensu/sensu-go/agent"
corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/agent"
"github.com/sensu/sensu-go/backend/messaging"
"github.com/sensu/sensu-go/backend/metrics"
"github.com/sensu/sensu-go/backend/ringv2"
Expand Down Expand Up @@ -95,6 +95,8 @@ type Session struct {
marshal agent.MarshalFunc
unmarshal agent.UnmarshalFunc
entityConfig *entityConfig
userConfig *userConfig
user string
mu sync.Mutex
subscriptionsMap map[string]subscription
}
Expand All @@ -111,12 +113,23 @@ type entityConfig struct {
updatesChannel chan interface{}
}

// userConfig is used by a session to subscribe to entity config updates
type userConfig struct {
subscription chan messaging.Subscription
updatesChannel chan interface{}
}

// Receiver returns the channel for incoming entity updates from the entity
// watcher
func (e *entityConfig) Receiver() chan<- interface{} {
return e.updatesChannel
}

// Receiver returns the channel for incoming entity updates from the entity watcher
func (u *userConfig) Receiver() chan<- interface{} {
return u.updatesChannel
}

func newSessionHandler(s *Session) *handler.MessageHandler {
handler := handler.NewMessageHandler()
handler.AddHandler(transport.MessageTypeKeepalive, s.handleKeepalive)
Expand Down Expand Up @@ -149,6 +162,7 @@ type SessionConfig struct {
// with the session has been buried. Necessary when running parallel keepalived
// workers.
BurialReceiver *BurialReceiver
userConfig *userConfig
}

type BurialReceiver struct {
Expand Down Expand Up @@ -193,10 +207,15 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
ringPool: cfg.RingPool,
unmarshal: cfg.Unmarshal,
marshal: cfg.Marshal,
user: cfg.User,
entityConfig: &entityConfig{
subscriptions: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
},
userConfig: &userConfig{
subscription: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
},
}

// Optionally subscribe to burial notifications
Expand All @@ -211,6 +230,13 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
}()
}

if len(cfg.User) > 0 {
_, err := s.bus.Subscribe(messaging.UserConfigTopic(cfg.User), cfg.AgentName, s.userConfig)
if err != nil {
return nil, err
}
}

if err := s.bus.Publish(messaging.TopicKeepalive, makeEntitySwitchBurialEvent(cfg)); err != nil {
return nil, err
}
Expand Down Expand Up @@ -321,6 +347,44 @@ func (s *Session) sender() {
for {
var msg *transport.Message
select {
//---- user -----//

case u := <-s.userConfig.updatesChannel:
watchEvent, ok := u.(*store.WatchEventUserConfig)
if !ok {
logger.Errorf("session received unexoected user struct : %T", u)
continue
}
// Handle the delete/disable event
switch watchEvent.Action {
case store.WatchUpdate:
if watchEvent.Disabled {
logger.Warn("The user associated with the agent is now disabled")
return
}
logger.Println("The update operation has been performed on user")
default:
panic("unhandled default case")
}

if watchEvent.User == nil {
logger.Error("session received nil user in watch event")
}
//
lagger := logger.WithFields(logrus.Fields{
"action": watchEvent.Action.String(),
"user": watchEvent.User.Username,
"namespace": watchEvent.User.GetMetadata().GetNamespace(),
})
lagger.Debug("user update received")

bytes, err := s.marshal(watchEvent.User)
if err != nil {
lagger.WithError(err).Error("session failed to serialize user config")
}
msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)

// ---- entity ----//
case e := <-s.entityConfig.updatesChannel:
watchEvent, ok := e.(*store.WatchEventEntityConfig)
if !ok {
Expand Down Expand Up @@ -448,7 +512,9 @@ func (s *Session) sender() {
// 2. Start receiver
// 3. Start goroutine that waits for context cancellation, and shuts down service.
func (s *Session) Start() (err error) {
defer close(s.userConfig.subscription)
defer close(s.entityConfig.subscriptions)

sessionCounter.WithLabelValues(s.cfg.Namespace).Inc()
s.wg = &sync.WaitGroup{}
s.wg.Add(2)
Expand All @@ -471,22 +537,45 @@ func (s *Session) Start() (err error) {
"agent": s.cfg.AgentName,
"namespace": s.cfg.Namespace,
})

// Subscribe the agent to its entity_config topic
topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
lager.WithField("topic", topic).Debug("subscribing to topic")
// Get a unique name for the agent, which will be used as the consumer of the
// bus, in order to avoid problems with an agent reconnecting before its
// session is ended
agentName := agentUUID(s.cfg.Namespace, s.cfg.AgentName)

// Subscribe the agent to its user_config topic

userTopic := messaging.UserConfigTopic(s.cfg.User)
logger.WithField("topic", userTopic).Debug("subscribing to topic")
userSubscription, usrErr := s.bus.Subscribe(userTopic, agentName, s.userConfig)
if usrErr != nil {
lager.WithError(err).Error("error starting subscription")
return err
}
s.userConfig.subscription <- userSubscription

// Send back this user config to the agent so it uses that rather than it's local config
watchEvent := &store.WatchEventUserConfig{
Action: store.WatchUpdate,
User: &corev2.User{},
}
usrErr = s.bus.Publish(messaging.UserConfigTopic(s.cfg.AgentName), watchEvent)
if usrErr != nil {
lager.WithError(err).Error("error publishing user config")
return err
}

// Subscribe the agent to its entity_config topic

topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
lager.WithField("topic", topic).Debug("subscribing to topic")

// Determine if the entity already exists
subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig)
if err != nil {
lager.WithError(err).Error("error starting subscription")
return err
}
s.entityConfig.subscriptions <- subscription

// Determine if the entity already exists
req := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev3.EntityConfig{}).StoreName())
wrapper, err := s.storev2.Get(req)
if err != nil {
Expand Down Expand Up @@ -577,6 +666,7 @@ func (s *Session) stop() {
logger.WithError(err).Error("error closing session")
}
}()
defer close(s.userConfig.updatesChannel)
defer close(s.entityConfig.updatesChannel)
defer close(s.checkChannel)

Expand All @@ -602,6 +692,13 @@ func (s *Session) stop() {
}
}

// Remove the user config subscription
for sub := range s.userConfig.subscription {
if err := sub.Cancel(); err != nil {
logger.WithError(err).Error("unable to unsubscribe from message bus")
}
}

// Unsubscribe the session from every configured check subscriptions
s.unsubscribe(s.cfg.Subscriptions)
}
Expand Down
Loading