Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions management/internals/modules/reverseproxy/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Manager interface {
CreateService(ctx context.Context, accountID, userID string, service *Service) (*Service, error)
UpdateService(ctx context.Context, accountID, userID string, service *Service) (*Service, error)
DeleteService(ctx context.Context, accountID, userID, serviceID string) error
DeleteAllServices(ctx context.Context, accountID, userID string) error
SetCertificateIssuedAt(ctx context.Context, accountID, serviceID string) error
SetStatus(ctx context.Context, accountID, serviceID string, status ProxyStatus) error
ReloadAllServicesForAccount(ctx context.Context, accountID string) error
Expand Down
14 changes: 14 additions & 0 deletions management/internals/modules/reverseproxy/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 81 additions & 11 deletions management/internals/modules/reverseproxy/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)

Expand Down Expand Up @@ -150,7 +151,7 @@ func (m *managerImpl) CreateService(ctx context.Context, accountID, userID strin
return nil, fmt.Errorf("failed to replace host by lookup for service %s: %w", service.ID, err)
}

m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Create, "", m.proxyGRPCServer.GetOIDCValidationConfig()), service.ProxyCluster)
m.sendServiceUpdate(service, reverseproxy.Create, service.ProxyCluster, "")

m.accountManager.UpdateAccountPeers(ctx, accountID)

Expand Down Expand Up @@ -330,19 +331,33 @@ func (m *managerImpl) preserveServiceMetadata(service, existingService *reversep
}

func (m *managerImpl) sendServiceUpdateNotifications(service *reverseproxy.Service, updateInfo *serviceUpdateInfo) {
oidcCfg := m.proxyGRPCServer.GetOIDCValidationConfig()

switch {
case updateInfo.domainChanged && updateInfo.oldCluster != service.ProxyCluster:
m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Delete, "", oidcCfg), updateInfo.oldCluster)
m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Create, "", oidcCfg), service.ProxyCluster)
m.sendServiceUpdate(service, reverseproxy.Delete, updateInfo.oldCluster, "")
m.sendServiceUpdate(service, reverseproxy.Create, service.ProxyCluster, "")
case !service.Enabled && updateInfo.serviceEnabledChanged:
m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Delete, "", oidcCfg), service.ProxyCluster)
m.sendServiceUpdate(service, reverseproxy.Delete, service.ProxyCluster, "")
case service.Enabled && updateInfo.serviceEnabledChanged:
m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Create, "", oidcCfg), service.ProxyCluster)
m.sendServiceUpdate(service, reverseproxy.Create, service.ProxyCluster, "")
default:
m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Update, "", oidcCfg), service.ProxyCluster)
m.sendServiceUpdate(service, reverseproxy.Update, service.ProxyCluster, "")
}
}

func (m *managerImpl) sendServiceUpdate(service *reverseproxy.Service, operation reverseproxy.Operation, cluster, oldService string) {
oidcCfg := m.proxyGRPCServer.GetOIDCValidationConfig()
mapping := service.ToProtoMapping(operation, oldService, oidcCfg)
m.sendMappingsToCluster([]*proto.ProxyMapping{mapping}, cluster)
}

func (m *managerImpl) sendMappingsToCluster(mappings []*proto.ProxyMapping, cluster string) {
if len(mappings) == 0 {
return
}
update := &proto.GetMappingUpdateResponse{
Mapping: mappings,
}
m.proxyGRPCServer.SendServiceUpdateToCluster(update, cluster)
}

// validateTargetReferences checks that all target IDs reference existing peers or resources in the account.
Expand Down Expand Up @@ -397,7 +412,54 @@ func (m *managerImpl) DeleteService(ctx context.Context, accountID, userID, serv

m.accountManager.StoreEvent(ctx, userID, serviceID, accountID, activity.ServiceDeleted, service.EventMeta())

m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Delete, "", m.proxyGRPCServer.GetOIDCValidationConfig()), service.ProxyCluster)
m.sendServiceUpdate(service, reverseproxy.Delete, service.ProxyCluster, "")

m.accountManager.UpdateAccountPeers(ctx, accountID)

return nil
}

func (m *managerImpl) DeleteAllServices(ctx context.Context, accountID, userID string) error {
ok, err := m.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Services, operations.Delete)
if err != nil {
return status.NewPermissionValidationError(err)
}
if !ok {
return status.NewPermissionDeniedError()
}

var services []*reverseproxy.Service
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
var err error
services, err = transaction.GetServicesByAccountID(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return err
}

for _, service := range services {
if err = transaction.DeleteService(ctx, accountID, service.ID); err != nil {
return fmt.Errorf("failed to delete service: %w", err)
}
}

return nil
})
if err != nil {
return err
}

clusterMappings := make(map[string][]*proto.ProxyMapping)
oidcCfg := m.proxyGRPCServer.GetOIDCValidationConfig()

for _, service := range services {
m.accountManager.StoreEvent(ctx, userID, service.ID, accountID, activity.ServiceDeleted, service.EventMeta())
mapping := service.ToProtoMapping(reverseproxy.Delete, "", oidcCfg)
clusterMappings[service.ProxyCluster] = append(clusterMappings[service.ProxyCluster], mapping)
}

for cluster, mappings := range clusterMappings {
m.sendMappingsToCluster(mappings, cluster)
}

m.accountManager.UpdateAccountPeers(ctx, accountID)

Expand Down Expand Up @@ -452,7 +514,7 @@ func (m *managerImpl) ReloadService(ctx context.Context, accountID, serviceID st
return fmt.Errorf("failed to replace host by lookup for service %s: %w", service.ID, err)
}

m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Update, "", m.proxyGRPCServer.GetOIDCValidationConfig()), service.ProxyCluster)
m.sendServiceUpdate(service, reverseproxy.Update, service.ProxyCluster, "")

m.accountManager.UpdateAccountPeers(ctx, accountID)

Expand All @@ -465,12 +527,20 @@ func (m *managerImpl) ReloadAllServicesForAccount(ctx context.Context, accountID
return fmt.Errorf("failed to get services: %w", err)
}

clusterMappings := make(map[string][]*proto.ProxyMapping)
oidcCfg := m.proxyGRPCServer.GetOIDCValidationConfig()

for _, service := range services {
err = m.replaceHostByLookup(ctx, accountID, service)
if err != nil {
return fmt.Errorf("failed to replace host by lookup for service %s: %w", service.ID, err)
}
m.proxyGRPCServer.SendServiceUpdateToCluster(service.ToProtoMapping(reverseproxy.Update, "", m.proxyGRPCServer.GetOIDCValidationConfig()), service.ProxyCluster)
mapping := service.ToProtoMapping(reverseproxy.Update, "", oidcCfg)
clusterMappings[service.ProxyCluster] = append(clusterMappings[service.ProxyCluster], mapping)
}

for cluster, mappings := range clusterMappings {
m.sendMappingsToCluster(mappings, cluster)
}

return nil
Expand Down
52 changes: 30 additions & 22 deletions management/internals/shared/grpc/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ProxyServiceServer struct {
clusterProxies sync.Map

// Channel for broadcasting reverse proxy updates to all proxies
updatesChan chan *proto.ProxyMapping
updatesChan chan *proto.GetMappingUpdateResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

rg -n "updatesChan" --type=go

Repository: netbirdio/netbird

Length of output: 271


🏁 Script executed:

sed -n '60,70p' management/internals/shared/grpc/proxy.go
sed -n '108,118p' management/internals/shared/grpc/proxy.go

Repository: netbirdio/netbird

Length of output: 942


Remove unused updatesChan field — dead code.

updatesChan is declared (line 65) and initialized (line 113) in NewProxyServiceServer, but is never written to or read from anywhere in the codebase. Both SendServiceUpdate and SendServiceUpdateToCluster write directly to each conn.sendChan; the sender goroutine reads from conn.sendChan. The field should be removed.

♻️ Proposed fix
 type ProxyServiceServer struct {
     proto.UnimplementedProxyServiceServer
     connectedProxies sync.Map
     clusterProxies   sync.Map
-    // Channel for broadcasting reverse proxy updates to all proxies
-    updatesChan chan *proto.GetMappingUpdateResponse
     // Manager for access logs
     accessLogManager accesslogs.Manager
 s := &ProxyServiceServer{
-    updatesChan:       make(chan *proto.GetMappingUpdateResponse, 100),
     accessLogManager:  accessLogMgr,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@management/internals/shared/grpc/proxy.go` at line 65, Remove the unused
field updatesChan from the proxy server struct and its initialization in
NewProxyServiceServer; specifically, delete the updatesChan field declaration
and the code that allocates it (since SendServiceUpdate and
SendServiceUpdateToCluster already write to each conn.sendChan and the sender
goroutine consumes conn.sendChan). Ensure no other code references updatesChan
and run tests/build to confirm removal is safe; use symbol names updatesChan,
NewProxyServiceServer, SendServiceUpdate, SendServiceUpdateToCluster,
conn.sendChan, and sender to locate the related code.


// Manager for access logs
accessLogManager accesslogs.Manager
Expand Down Expand Up @@ -101,7 +101,7 @@ type proxyConnection struct {
proxyID string
address string
stream proto.ProxyService_GetMappingUpdateServer
sendChan chan *proto.ProxyMapping
sendChan chan *proto.GetMappingUpdateResponse
ctx context.Context
cancel context.CancelFunc
}
Expand All @@ -110,7 +110,7 @@ type proxyConnection struct {
func NewProxyServiceServer(accessLogMgr accesslogs.Manager, tokenStore *OneTimeTokenStore, oidcConfig ProxyOIDCConfig, peersManager peers.Manager, usersManager users.Manager) *ProxyServiceServer {
ctx, cancel := context.WithCancel(context.Background())
s := &ProxyServiceServer{
updatesChan: make(chan *proto.ProxyMapping, 100),
updatesChan: make(chan *proto.GetMappingUpdateResponse, 100),
accessLogManager: accessLogMgr,
oidcConfig: oidcConfig,
tokenStore: tokenStore,
Expand Down Expand Up @@ -177,7 +177,7 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
proxyID: proxyID,
address: proxyAddress,
stream: stream,
sendChan: make(chan *proto.ProxyMapping, 100),
sendChan: make(chan *proto.GetMappingUpdateResponse, 100),
ctx: connCtx,
cancel: cancel,
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func (s *ProxyServiceServer) sender(conn *proxyConnection, errChan chan<- error)
for {
select {
case msg := <-conn.sendChan:
if err := conn.stream.Send(&proto.GetMappingUpdateResponse{Mapping: []*proto.ProxyMapping{msg}}); err != nil {
if err := conn.stream.Send(msg); err != nil {
errChan <- err
return
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func (s *ProxyServiceServer) SendAccessLog(ctx context.Context, req *proto.SendA
// Management should call this when services are created/updated/removed.
// For create/update operations a unique one-time auth token is generated per
// proxy so that every replica can independently authenticate with management.
func (s *ProxyServiceServer) SendServiceUpdate(update *proto.ProxyMapping) {
func (s *ProxyServiceServer) SendServiceUpdate(update *proto.GetMappingUpdateResponse) {
log.Debugf("Broadcasting service update to all connected proxy servers")
s.connectedProxies.Range(func(key, value interface{}) bool {
conn := value.(*proxyConnection)
Expand All @@ -349,7 +349,7 @@ func (s *ProxyServiceServer) SendServiceUpdate(update *proto.ProxyMapping) {
}
select {
case conn.sendChan <- msg:
log.Debugf("Sent service update with id %s to proxy server %s", update.Id, conn.proxyID)
log.Debugf("Sent service update to proxy server %s", conn.proxyID)
default:
log.Warnf("Failed to send service update to proxy server %s (channel full)", conn.proxyID)
}
Expand Down Expand Up @@ -418,7 +418,7 @@ func (s *ProxyServiceServer) removeFromCluster(clusterAddr, proxyID string) {
// If clusterAddr is empty, broadcasts to all connected proxy servers (backward compatibility).
// For create/update operations a unique one-time auth token is generated per
// proxy so that every replica can independently authenticate with management.
func (s *ProxyServiceServer) SendServiceUpdateToCluster(update *proto.ProxyMapping, clusterAddr string) {
func (s *ProxyServiceServer) SendServiceUpdateToCluster(update *proto.GetMappingUpdateResponse, clusterAddr string) {
if clusterAddr == "" {
s.SendServiceUpdate(update)
return
Expand All @@ -441,7 +441,7 @@ func (s *ProxyServiceServer) SendServiceUpdateToCluster(update *proto.ProxyMappi
}
select {
case conn.sendChan <- msg:
log.Debugf("Sent service update with id %s to proxy %s in cluster %s", update.Id, proxyID, clusterAddr)
log.Debugf("Sent service update to proxy %s in cluster %s", proxyID, clusterAddr)
default:
log.Warnf("Failed to send service update to proxy %s in cluster %s (channel full)", proxyID, clusterAddr)
}
Expand All @@ -451,23 +451,31 @@ func (s *ProxyServiceServer) SendServiceUpdateToCluster(update *proto.ProxyMappi
}

// perProxyMessage returns a copy of update with a fresh one-time token for
// create/update operations. For delete operations the original message is
// returned unchanged because proxies do not need to authenticate for removal.
// create/update operations. For delete operations the original mapping is
// used unchanged because proxies do not need to authenticate for removal.
// Returns nil if token generation fails (the proxy should be skipped).
func (s *ProxyServiceServer) perProxyMessage(update *proto.ProxyMapping, proxyID string) *proto.ProxyMapping {
if update.Type == proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED || update.AccountId == "" {
return update
}
func (s *ProxyServiceServer) perProxyMessage(update *proto.GetMappingUpdateResponse, proxyID string) *proto.GetMappingUpdateResponse {
resp := make([]*proto.ProxyMapping, 0, len(update.Mapping))
for _, mapping := range update.Mapping {
if mapping.Type == proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED {
resp = append(resp, mapping)
continue
}

token, err := s.tokenStore.GenerateToken(update.AccountId, update.Id, 5*time.Minute)
if err != nil {
log.Warnf("Failed to generate token for proxy %s: %v", proxyID, err)
return nil
token, err := s.tokenStore.GenerateToken(mapping.AccountId, mapping.Id, 5*time.Minute)
if err != nil {
log.Warnf("Failed to generate token for proxy %s: %v", proxyID, err)
return nil
}

msg := shallowCloneMapping(mapping)
msg.AuthToken = token
resp = append(resp, msg)
}

msg := shallowCloneMapping(update)
msg.AuthToken = token
return msg
return &proto.GetMappingUpdateResponse{
Mapping: resp,
}
}
Comment on lines +453 to 475
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

perProxyMessage: a single token-generation failure silently drops the entire batch for that proxy.

The early return nil on line 468 causes SendServiceUpdate/SendServiceUpdateToCluster to skip the proxy entirely, discarding all other mappings in the batch that may have been processed successfully. Before this PR, each mapping was dispatched in its own GetMappingUpdateResponse, so a failure only lost one service update. With batching (e.g. DeleteAllServices deletes N services in one shot), a transient token-store error now silently loses all N delete notifications for one proxy.

Consider continuing to build the response with successfully tokenized mappings and only skipping the failed entry, returning nil only when resp ends up empty:

🛡️ Proposed fix
 func (s *ProxyServiceServer) perProxyMessage(update *proto.GetMappingUpdateResponse, proxyID string) *proto.GetMappingUpdateResponse {
 	resp := make([]*proto.ProxyMapping, 0, len(update.Mapping))
 	for _, mapping := range update.Mapping {
 		if mapping.Type == proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED {
 			resp = append(resp, mapping)
 			continue
 		}

 		token, err := s.tokenStore.GenerateToken(mapping.AccountId, mapping.Id, 5*time.Minute)
 		if err != nil {
 			log.Warnf("Failed to generate token for proxy %s mapping %s: %v", proxyID, mapping.Id, err)
-			return nil
+			continue
 		}

 		msg := shallowCloneMapping(mapping)
 		msg.AuthToken = token
 		resp = append(resp, msg)
 	}

+	if len(resp) == 0 {
+		return nil
+	}
 	return &proto.GetMappingUpdateResponse{
 		Mapping: resp,
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (s *ProxyServiceServer) perProxyMessage(update *proto.GetMappingUpdateResponse, proxyID string) *proto.GetMappingUpdateResponse {
resp := make([]*proto.ProxyMapping, 0, len(update.Mapping))
for _, mapping := range update.Mapping {
if mapping.Type == proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED {
resp = append(resp, mapping)
continue
}
token, err := s.tokenStore.GenerateToken(update.AccountId, update.Id, 5*time.Minute)
if err != nil {
log.Warnf("Failed to generate token for proxy %s: %v", proxyID, err)
return nil
token, err := s.tokenStore.GenerateToken(mapping.AccountId, mapping.Id, 5*time.Minute)
if err != nil {
log.Warnf("Failed to generate token for proxy %s: %v", proxyID, err)
return nil
}
msg := shallowCloneMapping(mapping)
msg.AuthToken = token
resp = append(resp, msg)
}
msg := shallowCloneMapping(update)
msg.AuthToken = token
return msg
return &proto.GetMappingUpdateResponse{
Mapping: resp,
}
}
func (s *ProxyServiceServer) perProxyMessage(update *proto.GetMappingUpdateResponse, proxyID string) *proto.GetMappingUpdateResponse {
resp := make([]*proto.ProxyMapping, 0, len(update.Mapping))
for _, mapping := range update.Mapping {
if mapping.Type == proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED {
resp = append(resp, mapping)
continue
}
token, err := s.tokenStore.GenerateToken(mapping.AccountId, mapping.Id, 5*time.Minute)
if err != nil {
log.Warnf("Failed to generate token for proxy %s mapping %s: %v", proxyID, mapping.Id, err)
continue
}
msg := shallowCloneMapping(mapping)
msg.AuthToken = token
resp = append(resp, msg)
}
if len(resp) == 0 {
return nil
}
return &proto.GetMappingUpdateResponse{
Mapping: resp,
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@management/internals/shared/grpc/proxy.go` around lines 457 - 479, In
perProxyMessage, avoid aborting the whole batch on a single token generation
error: when tokenStore.GenerateToken(mapping.AccountId, mapping.Id,
5*time.Minute) fails, log the error and skip that mapping (do not return nil),
continue processing the rest, and only return nil if the resulting resp slice is
empty; keep using shallowCloneMapping to copy entries and set msg.AuthToken for
successful tokens so SendServiceUpdate/SendServiceUpdateToCluster still receives
partial batches rather than discarding all mappings.


// shallowCloneMapping creates a shallow copy of a ProxyMapping, reusing the
Expand Down
Loading
Loading