diff --git a/Dockerfile b/Dockerfile index 6e95c5307..011d786c2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,4 +17,5 @@ RUN mkdir -p /etc/netclient/config COPY --from=builder /app/netmaker . COPY --from=builder /app/config config EXPOSE 8081 +EXPOSE 6060 ENTRYPOINT ["./netmaker"] diff --git a/compose/docker-compose.pro.yml b/compose/docker-compose.pro.yml index c267b2e85..797eeaf52 100644 --- a/compose/docker-compose.pro.yml +++ b/compose/docker-compose.pro.yml @@ -4,6 +4,7 @@ services: container_name: prometheus image: prom/prometheus:latest environment: + - METRICS_USERNAME=${METRICS_USERNAME:-netmaker} - METRICS_SECRET=${METRICS_SECRET} - NETMAKER_METRICS_TARGET=${NETMAKER_METRICS_TARGET:-http://netmaker-exporter:8085} - PROMETHEUS_RETENTION_TIME=${PROMETHEUS_RETENTION_TIME:-15d} @@ -15,18 +16,19 @@ services: command: - -c - | - cp /etc/prometheus/prometheus.yml.tmpl /tmp/prometheus.yml + cp /etc/prometheus/prometheus.yml.tmpl /etc/prometheus/prometheus.rendered.yml url="$${NETMAKER_METRICS_TARGET}" if echo "$${url}" | grep -q '://'; then scheme="$${url%%://*}"; target="$${url#*://}" else scheme="https"; target="$${url}" fi - sed -i "s~NETMAKER_METRICS_TARGET~$${target}~g" /tmp/prometheus.yml - sed -i "s~__SCHEME__~$${scheme}~g" /tmp/prometheus.yml - sed -i "s~METRICS_SECRET~$${METRICS_SECRET}~g" /tmp/prometheus.yml + sed -i "s~NETMAKER_METRICS_TARGET~$${target}~g" /etc/prometheus/prometheus.rendered.yml + sed -i "s~__SCHEME__~$${scheme}~g" /etc/prometheus/prometheus.rendered.yml + sed -i "s~METRICS_USERNAME~$${METRICS_USERNAME}~g" /etc/prometheus/prometheus.rendered.yml + sed -i "s~METRICS_SECRET~$${METRICS_SECRET}~g" /etc/prometheus/prometheus.rendered.yml exec /bin/prometheus \ - --config.file=/tmp/prometheus.yml \ + --config.file=/etc/prometheus/prometheus.rendered.yml \ --storage.tsdb.path=/prometheus \ --storage.tsdb.retention.time=$${PROMETHEUS_RETENTION_TIME:-15d} \ --storage.tsdb.retention.size=$${PROMETHEUS_RETENTION_SIZE:-0} diff --git a/config/config.go b/config/config.go index 538ff78f3..c3611d6ad 100644 --- a/config/config.go +++ b/config/config.go @@ -47,7 +47,7 @@ type ServerConfig struct { MasterKey string `yaml:"masterkey"` DNSKey string `yaml:"dnskey"` AllowedOrigin string `yaml:"allowedorigin"` - NodeID string `yaml:"nodeid"` + HostName string `yaml:"host_name"` RestBackend string `yaml:"restbackend"` MessageQueueBackend string `yaml:"messagequeuebackend"` DNSMode string `yaml:"dnsmode"` diff --git a/controllers/gateway.go b/controllers/gateway.go index e7c903c4f..8e3a28fc9 100644 --- a/controllers/gateway.go +++ b/controllers/gateway.go @@ -104,40 +104,14 @@ func createGateway(w http.ResponseWriter, r *http.Request) { logic.UpsertHost(host) } } - for _, relayedNodeID := range relayNode.RelayedNodes { - relayedNode, err := logic.GetNodeByID(relayedNodeID) - if err == nil { - if relayedNode.FailedOverBy != uuid.Nil { - go logic.ResetFailedOverPeer(&relayedNode) - } - if len(relayedNode.AutoRelayedPeers) > 0 { - go logic.ResetAutoRelayedPeer(&relayedNode) - } - } - } if len(req.InetNodeClientIDs) > 0 { logic.SetInternetGw(&node, req.InetNodeReq) - if servercfg.IsPro { - if _, exists := logic.FailOverExists(node.Network); exists { - go func() { - logic.ResetFailedOverPeer(&node) - mq.PublishPeerUpdate(false) - }() - } - - go func() { - logic.ResetAutoRelayedPeer(&node) - mq.PublishPeerUpdate(false) - }() - - } if node.IsGw && node.IngressDNS == "" { node.IngressDNS = "1.1.1.1" } logic.UpsertNode(&node) } - logger.Log( 1, r.Header.Get("user"), @@ -168,6 +142,18 @@ func createGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(apiNode) go func() { + for _, relayedNodeID := range relayNode.RelayedNodes { + relayedNode, err := logic.GetNodeByID(relayedNodeID) + if err == nil { + if relayedNode.FailedOverBy != uuid.Nil { + logic.ResetFailedOverPeer(&relayedNode) + } + if len(relayedNode.AutoRelayedPeers) > 0 { + logic.ResetAutoRelayedPeer(&relayedNode) + } + } + } + logic.ResetAutoRelayedPeer(&node) if err := mq.NodeUpdate(&node); err != nil { slog.Error("error publishing node update to node", "node", node.ID, "error", err) } @@ -352,12 +338,7 @@ func assignGw(w http.ResponseWriter, r *http.Request) { autoAssignGw = false } if autoAssignGw { - if node.FailedOverBy != uuid.Nil { - go logic.ResetFailedOverPeer(&node) - } - if len(node.AutoRelayedPeers) > 0 { - go logic.ResetAutoRelayedPeer(&node) - } + if node.RelayedBy != "" { gatewayNode, err := logic.GetNodeByID(node.RelayedBy) if err == nil { @@ -384,6 +365,12 @@ func assignGw(w http.ResponseWriter, r *http.Request) { logic.UpsertNode(&node) logic.GetNodeStatus(&node, false) go func() { + if node.FailedOverBy != uuid.Nil { + logic.ResetFailedOverPeer(&node) + } + if len(node.AutoRelayedPeers) > 0 { + logic.ResetAutoRelayedPeer(&node) + } if err := mq.NodeUpdate(&node); err != nil { slog.Error("error publishing node update to node", "node", node.ID, "error", err) } @@ -408,18 +395,16 @@ func assignGw(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("node %s is not a gateway", nodeid), "badrequest")) return } - - if node.FailedOverBy != uuid.Nil { - go logic.ResetFailedOverPeer(&node) - } - if len(node.AutoRelayedPeers) > 0 { - go logic.ResetAutoRelayedPeer(&node) - } newNodes := []string{node.ID.String()} newNodes = append(newNodes, gatewayNode.RelayedNodes...) newNodes = logic.UniqueStrings(newNodes) logic.UpdateRelayNodes(gatewayNode.ID.String(), gatewayNode.RelayedNodes, newNodes) + node, err = logic.GetNodeByID(node.ID.String()) + if err != nil { + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return + } host := &schema.Host{ ID: node.HostID, } @@ -453,6 +438,13 @@ func assignGw(w http.ResponseWriter, r *http.Request) { apiNode := node.ConvertToAPINode() go func() { + + if node.FailedOverBy != uuid.Nil { + logic.ResetFailedOverPeer(&node) + } + if len(node.AutoRelayedPeers) > 0 { + logic.ResetAutoRelayedPeer(&node) + } if err := mq.NodeUpdate(&node); err != nil { slog.Error("error publishing node update to node", "node", node.ID, "error", err) } diff --git a/controllers/hosts.go b/controllers/hosts.go index 819fe0964..69110d6f6 100644 --- a/controllers/hosts.go +++ b/controllers/hosts.go @@ -301,43 +301,34 @@ func pull(w http.ResponseWriter, r *http.Request) { return } - sendPeerUpdate := false - for _, nodeID := range host.Nodes { - node, err := logic.GetNodeByID(nodeID) - if err != nil { - //slog.Error("failed to get node:", "id", node.ID, "error", err) - continue - } - if r.URL.Query().Get("reset_failovered") == "true" { + resetFailovered := r.URL.Query().Get("reset_failovered") == "true" + if resetFailovered { + for _, nodeID := range host.Nodes { + node, err := logic.GetNodeByID(nodeID) + if err != nil { + continue + } logic.ResetFailedOverPeer(&node) logic.ResetAutoRelayedPeer(&node) - sendPeerUpdate = true - } - } - if sendPeerUpdate { - if err := mq.PublishPeerUpdate(false); err != nil { - logger.Log(0, "fail to publish peer update: ", err.Error()) } - } - allNodes, err := logic.GetAllNodes() - if err != nil { - logger.Log(0, "failed to get nodes: ", hostIDStr) - logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) - return - } - hPU, err := logic.GetPeerUpdateForHost("", host, allNodes, nil, nil) - if err != nil { - logger.Log(0, "could not pull peers for host", hostIDStr, err.Error()) - logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) - return + go mq.PublishPeerUpdate(false) } - portChanged := logic.CheckHostPorts(host) - if portChanged { - // Save the port change to database immediately to prevent conflicts - if err := logic.UpsertHost(host); err != nil { - slog.Error("failed to save host port change", "host", host.Name, "error", err) + hPU, ok := logic.GetCachedHostPeerUpdate(hostID.String()) + if !ok || resetFailovered { + allNodes, err := logic.GetAllNodes() + if err != nil { + logger.Log(0, "failed to get nodes: ", hostID.String()) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return + } + hPU, err = logic.GetPeerUpdateForHost("", host, allNodes, nil, nil) + if err != nil { + logger.Log(0, "could not pull peers for host", hostID.String(), err.Error()) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return } + logic.StoreHostPeerUpdate(hostID.String(), hPU) } response := models.HostPull{ @@ -515,10 +506,6 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) { switch hostUpdate.Action { case models.CheckIn: sendPeerUpdate = mq.HandleHostCheckin(&hostUpdate.Host, currentHost) - changed := logic.CheckHostPorts(currentHost) - if changed { - mq.HostUpdate(&models.HostUpdate{Action: models.UpdateHost, Host: *currentHost}) - } case models.UpdateHost: if hostUpdate.Host.PublicKey != currentHost.PublicKey { //remove old peer entry diff --git a/controllers/node.go b/controllers/node.go index e16730496..d7d936ec0 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -666,15 +666,19 @@ func updateNode(w http.ResponseWriter, r *http.Request) { }, Origin: schema.Dashboard, }) + ipChanged := currentNode.Address.String() != newNode.Address.String() || + currentNode.Address6.String() != newNode.Address6.String() w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(apiNode) go func(aclUpdate, relayupdate bool, newNode *models.Node) { if err := mq.NodeUpdate(newNode); err != nil { slog.Error("error publishing node update to node", "node", newNode.ID, "error", err) } - // if !newNode.Connected { - // mq.HostUpdate(&models.HostUpdate{Host: *host, Action: models.SignalPull}) - // } + if ipChanged { + if err := mq.HostUpdate(&models.HostUpdate{Action: models.RequestPull, Host: *host}); err != nil { + slog.Error("error sending sync pull to host on ip change", "host", host.ID, "error", err) + } + } allNodes, err := logic.GetAllNodes() if err == nil { mq.PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false, nil) diff --git a/database/database.go b/database/database.go index 1af0a40b9..874b5c485 100644 --- a/database/database.go +++ b/database/database.go @@ -2,7 +2,6 @@ package database import ( "errors" - "sync" "time" "github.com/gravitl/netmaker/logger" @@ -98,8 +97,6 @@ const ( isConnected = "isconnected" ) -var dbMutex sync.RWMutex - var Tables = []string{ NODES_TABLE_NAME, CERTS_TABLE_NAME, @@ -176,8 +173,6 @@ func CreateTable(tableName string) error { // Insert - inserts object into db func Insert(key string, value string, tableName string) error { - dbMutex.Lock() - defer dbMutex.Unlock() if key != "" && value != "" { return getCurrentDB()[INSERT].(func(string, string, string) error)(key, value, tableName) } else { @@ -187,37 +182,25 @@ func Insert(key string, value string, tableName string) error { // DeleteRecord - deletes a record from db func DeleteRecord(tableName string, key string) error { - dbMutex.Lock() - defer dbMutex.Unlock() return getCurrentDB()[DELETE].(func(string, string) error)(tableName, key) } // DeleteAllRecords - removes a table and remakes func DeleteAllRecords(tableName string) error { - dbMutex.Lock() - defer dbMutex.Unlock() err := getCurrentDB()[DELETE_ALL].(func(string) error)(tableName) if err != nil { return err } - err = CreateTable(tableName) - if err != nil { - return err - } - return nil + return CreateTable(tableName) } // FetchRecord - fetches a single record by key func FetchRecord(tableName string, key string) (string, error) { - dbMutex.RLock() - defer dbMutex.RUnlock() return getCurrentDB()[FETCH_ONE].(func(string, string) (string, error))(tableName, key) } // FetchRecords - fetches all records in given table func FetchRecords(tableName string) (map[string]string, error) { - dbMutex.RLock() - defer dbMutex.RUnlock() return getCurrentDB()[FETCH_ALL].(func(string) (map[string]string, error))(tableName) } diff --git a/db/sqlite.go b/db/sqlite.go index 048233092..6e3df11f2 100644 --- a/db/sqlite.go +++ b/db/sqlite.go @@ -49,7 +49,8 @@ func (s *sqliteConnector) connect() (*gorm.DB, error) { } } - db, err := gorm.Open(sqlite.Open(dbFilePath), &gorm.Config{ + dsn := dbFilePath + "?_journal_mode=WAL&_busy_timeout=5000" + db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { @@ -61,6 +62,7 @@ func (s *sqliteConnector) connect() (*gorm.DB, error) { return nil, err } + sqlDB.SetMaxOpenConns(1) sqlDB.SetMaxIdleConns(1) return db, nil diff --git a/go.mod b/go.mod index acf02f792..a08a11daf 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/google/go-cmp v0.7.0 github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e github.com/guumaster/tablewriter v0.0.10 + github.com/klauspost/compress v1.18.3 github.com/matryer/is v1.4.1 github.com/okta/okta-sdk-golang/v5 v5.0.6 github.com/pquerna/otp v1.5.0 @@ -91,7 +92,6 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/kelseyhightower/envconfig v1.4.0 // indirect - github.com/klauspost/compress v1.18.3 // indirect github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect diff --git a/logic/extpeers.go b/logic/extpeers.go index 065cb803a..dae78c556 100644 --- a/logic/extpeers.go +++ b/logic/extpeers.go @@ -420,13 +420,11 @@ func SaveExtClient(extclient *models.ExtClient) error { } if servercfg.CacheEnabled() { storeExtClientInCache(key, *extclient) - if _, ok := allocatedIpMap[extclient.Network]; ok { - if extclient.Address != "" { - AddIpToAllocatedIpMap(extclient.Network, net.ParseIP(extclient.Address)) - } - if extclient.Address6 != "" { - AddIpToAllocatedIpMap(extclient.Network, net.ParseIP(extclient.Address6)) - } + if extclient.Address != "" { + AddIpToAllocatedIpMap(extclient.Network, net.ParseIP(extclient.Address)) + } + if extclient.Address6 != "" { + AddIpToAllocatedIpMap(extclient.Network, net.ParseIP(extclient.Address6)) } } diff --git a/logic/hooks.go b/logic/hooks.go new file mode 100644 index 000000000..831b05463 --- /dev/null +++ b/logic/hooks.go @@ -0,0 +1,14 @@ +package logic + +type ServerSyncType string + +const ( + SyncTypeSettings ServerSyncType = "settings" + SyncTypePeerUpdate ServerSyncType = "peerupdate" + SyncTypeIDPSync ServerSyncType = "idpsync" +) + +// PublishServerSync is set by the mq package at startup to broadcast +// sync signals to peer servers in HA mode. The callback avoids a +// circular import (logic -> mq). +var PublishServerSync func(syncType ServerSyncType) diff --git a/logic/networks.go b/logic/networks.go index a8bb755f1..88a50dfd2 100644 --- a/logic/networks.go +++ b/logic/networks.go @@ -98,7 +98,9 @@ func AddIpToAllocatedIpMap(networkName string, ip net.IP) { return } networkCacheMutex.Lock() - allocatedIpMap[networkName][ip.String()] = ip + if m, ok := allocatedIpMap[networkName]; ok { + m[ip.String()] = ip + } networkCacheMutex.Unlock() } @@ -107,7 +109,9 @@ func RemoveIpFromAllocatedIpMap(networkName string, ip string) { return } networkCacheMutex.Lock() - delete(allocatedIpMap[networkName], ip) + if m, ok := allocatedIpMap[networkName]; ok { + delete(m, ip) + } networkCacheMutex.Unlock() } @@ -354,9 +358,11 @@ func UniqueAddressCache(networkName string, reverse bool) (net.IP, error) { newAddrs = net4.LastAddress() } + networkCacheMutex.RLock() ipAllocated := allocatedIpMap[networkName] for { if _, ok := ipAllocated[newAddrs.String()]; !ok { + networkCacheMutex.RUnlock() return newAddrs, nil } if reverse { @@ -368,6 +374,7 @@ func UniqueAddressCache(networkName string, reverse bool) (net.IP, error) { break } } + networkCacheMutex.RUnlock() return add, errors.New("ERROR: No unique addresses available. Check network subnet") } @@ -542,9 +549,11 @@ func UniqueAddress6Cache(networkName string, reverse bool) (net.IP, error) { return add, err } + networkCacheMutex.RLock() ipAllocated := allocatedIpMap[networkName] for { if _, ok := ipAllocated[newAddrs.String()]; !ok { + networkCacheMutex.RUnlock() return newAddrs, nil } if reverse { @@ -556,6 +565,7 @@ func UniqueAddress6Cache(networkName string, reverse bool) (net.IP, error) { break } } + networkCacheMutex.RUnlock() return add, errors.New("ERROR: No unique IPv6 addresses available. Check network subnet") } diff --git a/logic/nodes.go b/logic/nodes.go index 55db78991..bf36fb879 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -160,23 +160,56 @@ func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node return nodes } -// UpdateNodeCheckin - updates the checkin time of a node +var ( + pendingCheckins = make(map[string]models.Node) + pendingCheckinsMu sync.Mutex +) + +// UpdateNodeCheckin - buffers the checkin timestamp in memory when caching is enabled. +// The actual DB write is deferred to FlushNodeCheckins (every 30s). +// When caching is disabled (HA mode), writes directly to the DB. func UpdateNodeCheckin(node *models.Node) error { node.SetLastCheckIn() + node.EgressDetails = models.EgressDetails{} + if servercfg.CacheEnabled() { + pendingCheckinsMu.Lock() + pendingCheckins[node.ID.String()] = *node + pendingCheckinsMu.Unlock() + storeNodeInCache(*node) + storeNodeInNetworkCache(*node, node.Network) + return nil + } data, err := json.Marshal(node) if err != nil { return err } - node.EgressDetails = models.EgressDetails{} - err = database.Insert(node.ID.String(), string(data), database.NODES_TABLE_NAME) - if err != nil { - return err + return database.Insert(node.ID.String(), string(data), database.NODES_TABLE_NAME) +} + +// FlushNodeCheckins - writes all buffered check-in updates to the DB in one batch. +// Called periodically (e.g., every 30s) to avoid per-checkin write lock contention. +func FlushNodeCheckins() { + pendingCheckinsMu.Lock() + batch := pendingCheckins + pendingCheckins = make(map[string]models.Node) + pendingCheckinsMu.Unlock() + if len(batch) == 0 { + return } - if servercfg.CacheEnabled() { - storeNodeInCache(*node) - storeNodeInNetworkCache(*node, node.Network) + var failed int + for id, node := range batch { + data, err := json.Marshal(node) + if err != nil { + failed++ + continue + } + if err := database.Insert(id, string(data), database.NODES_TABLE_NAME); err != nil { + failed++ + } + } + if failed > 0 { + slog.Error("FlushNodeCheckins: failed to persist checkins", "failed", failed, "total", len(batch)) } - return nil } // UpsertNode - updates node in the DB @@ -238,15 +271,13 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error { if servercfg.CacheEnabled() { storeNodeInCache(*newNode) storeNodeInNetworkCache(*newNode, newNode.Network) - if _, ok := allocatedIpMap[newNode.Network]; ok { - if newNode.Address.IP != nil && !newNode.Address.IP.Equal(currentNode.Address.IP) { - AddIpToAllocatedIpMap(newNode.Network, newNode.Address.IP) - RemoveIpFromAllocatedIpMap(currentNode.Network, currentNode.Address.IP.String()) - } - if newNode.Address6.IP != nil && !newNode.Address6.IP.Equal(currentNode.Address6.IP) { - AddIpToAllocatedIpMap(newNode.Network, newNode.Address6.IP) - RemoveIpFromAllocatedIpMap(currentNode.Network, currentNode.Address6.IP.String()) - } + if newNode.Address.IP != nil && !newNode.Address.IP.Equal(currentNode.Address.IP) { + AddIpToAllocatedIpMap(newNode.Network, newNode.Address.IP) + RemoveIpFromAllocatedIpMap(currentNode.Network, currentNode.Address.IP.String()) + } + if newNode.Address6.IP != nil && !newNode.Address6.IP.Equal(currentNode.Address6.IP) { + AddIpToAllocatedIpMap(newNode.Network, newNode.Address6.IP) + RemoveIpFromAllocatedIpMap(currentNode.Network, currentNode.Address6.IP.String()) } } return nil @@ -765,13 +796,11 @@ func createNode(node *models.Node) error { if servercfg.CacheEnabled() { storeNodeInCache(*node) storeNodeInNetworkCache(*node, node.Network) - if _, ok := allocatedIpMap[node.Network]; ok { - if node.Address.IP != nil { - AddIpToAllocatedIpMap(node.Network, node.Address.IP) - } - if node.Address6.IP != nil { - AddIpToAllocatedIpMap(node.Network, node.Address6.IP) - } + if node.Address.IP != nil { + AddIpToAllocatedIpMap(node.Network, node.Address.IP) + } + if node.Address6.IP != nil { + AddIpToAllocatedIpMap(node.Network, node.Address6.IP) } } diff --git a/logic/peers.go b/logic/peers.go index d4883b9e4..ce8486895 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/netip" + "sync" "time" "github.com/google/uuid" @@ -63,16 +64,112 @@ var ( } ) -// GetHostPeerInfo - fetches required peer info per network +var ( + hostPeerInfoCache map[string]models.HostPeerInfo + hostPeerInfoCacheMu sync.RWMutex +) + +var ( + hostPeerUpdateCache map[string]models.HostPeerUpdate + hostPeerUpdateCacheMu sync.RWMutex +) + +// InvalidateHostPeerCaches clears both hostPeerInfoCache and +// hostPeerUpdateCache so they are rebuilt on next access or refresh. +func InvalidateHostPeerCaches() { + hostPeerInfoCacheMu.Lock() + hostPeerInfoCache = nil + hostPeerInfoCacheMu.Unlock() + + hostPeerUpdateCacheMu.Lock() + hostPeerUpdateCache = nil + hostPeerUpdateCacheMu.Unlock() +} + +// StoreHostPeerUpdate - caches a computed HostPeerUpdate for a host. +// Called as a side-effect of PublishSingleHostPeerUpdate during broadcast. +func StoreHostPeerUpdate(hostID string, peerUpdate models.HostPeerUpdate) { + hostPeerUpdateCacheMu.Lock() + if hostPeerUpdateCache == nil { + hostPeerUpdateCache = make(map[string]models.HostPeerUpdate) + } + hostPeerUpdateCache[hostID] = peerUpdate + hostPeerUpdateCacheMu.Unlock() +} + +// GetCachedHostPeerUpdate - returns a cached HostPeerUpdate if available. +func GetCachedHostPeerUpdate(hostID string) (models.HostPeerUpdate, bool) { + hostPeerUpdateCacheMu.RLock() + defer hostPeerUpdateCacheMu.RUnlock() + if hostPeerUpdateCache == nil { + return models.HostPeerUpdate{}, false + } + hpu, ok := hostPeerUpdateCache[hostID] + return hpu, ok +} + +// GetHostPeerInfo - returns cached peer info for a host. +// Falls back to on-demand computation if the cache is not yet populated. func GetHostPeerInfo(host *schema.Host) (models.HostPeerInfo, error) { - peerInfo := models.HostPeerInfo{ - NetworkPeerIDs: make(map[schema.NetworkID]models.PeerMap), + hostID := host.ID.String() + hostPeerInfoCacheMu.RLock() + if hostPeerInfoCache != nil { + if info, ok := hostPeerInfoCache[hostID]; ok { + hostPeerInfoCacheMu.RUnlock() + return info, nil + } + } + hostPeerInfoCacheMu.RUnlock() + return computeHostPeerInfo(host, nil, models.ServerConfig{}) +} + +// RefreshHostPeerInfoCache - batch pre-computes peer info for all hosts +// and stores the results in the cache. Returns the fetched hosts and +// nodes so callers can reuse them without redundant DB queries. +func RefreshHostPeerInfoCache() ([]schema.Host, []models.Node) { + hosts, err := (&schema.Host{}).ListAll(db.WithContext(context.TODO())) + if err != nil { + slog.Error("failed to refresh host peer info cache", "error", err) + return nil, nil } allNodes, err := GetAllNodes() if err != nil { - return peerInfo, err + slog.Error("failed to refresh host peer info cache", "error", err) + return nil, nil } serverInfo := GetServerInfo() + + newCache := make(map[string]models.HostPeerInfo, len(hosts)) + for i := range hosts { + info, err := computeHostPeerInfo(&hosts[i], allNodes, serverInfo) + if err != nil { + continue + } + newCache[hosts[i].ID.String()] = info + } + + hostPeerInfoCacheMu.Lock() + hostPeerInfoCache = newCache + hostPeerInfoCacheMu.Unlock() + return hosts, allNodes +} + +// computeHostPeerInfo - computes peer info for a single host. +// If allNodes is nil or serverInfo is zero-value, fetches them fresh. +func computeHostPeerInfo(host *schema.Host, allNodes []models.Node, serverInfo models.ServerConfig) (models.HostPeerInfo, error) { + peerInfo := models.HostPeerInfo{ + NetworkPeerIDs: make(map[schema.NetworkID]models.PeerMap), + } + var err error + if allNodes == nil { + allNodes, err = GetAllNodes() + if err != nil { + return peerInfo, err + } + } + if serverInfo.Server == "" { + serverInfo = GetServerInfo() + } for _, nodeID := range host.Nodes { nodeID := nodeID node, err := GetNodeByID(nodeID) @@ -90,7 +187,6 @@ func GetHostPeerInfo(host *schema.Host) (models.HostPeerInfo, error) { for _, peer := range currentPeers { peer := peer if peer.ID.String() == node.ID.String() { - // skip yourself continue } diff --git a/logic/settings.go b/logic/settings.go index 1f5f1f54b..0e51d5d2b 100644 --- a/logic/settings.go +++ b/logic/settings.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "context" @@ -32,6 +33,8 @@ var ( var ServerSettingsDBKey = "server_cfg" var SettingsMutex = &sync.RWMutex{} +var serverSettingsCache atomic.Value + var defaultUserSettings = models.UserSettings{ TextSize: "16", Theme: models.Dark, @@ -39,17 +42,37 @@ var defaultUserSettings = models.UserSettings{ } func GetServerSettings() (s models.ServerSettings) { + if cached, ok := serverSettingsCache.Load().(*models.ServerSettings); ok && cached != nil { + return *cached + } + s, err := getServerSettingsFromDB() + if err == nil { + serverSettingsCache.Store(&s) + } + return +} + +// InvalidateServerSettingsCache clears the in-memory settings cache so +// the next GetServerSettings call re-reads from the database. +func InvalidateServerSettingsCache() { + serverSettingsCache.Store((*models.ServerSettings)(nil)) +} + +func getServerSettingsFromDB() (models.ServerSettings, error) { + var s models.ServerSettings data, err := database.FetchRecord(database.SERVER_SETTINGS, ServerSettingsDBKey) if err != nil { - return + return s, err } - json.Unmarshal([]byte(data), &s) - return + if err := json.Unmarshal([]byte(data), &s); err != nil { + return s, err + } + return s, nil } func UpsertServerSettings(s models.ServerSettings) error { - // get curr settings - currSettings := GetServerSettings() + // get curr settings from DB directly (not cache) for accurate comparison + currSettings, _ := getServerSettingsFromDB() if s.ClientSecret == Mask() { s.ClientSecret = currSettings.ClientSecret } @@ -87,6 +110,10 @@ func UpsertServerSettings(s models.ServerSettings) error { if err != nil { return err } + serverSettingsCache.Store(&s) + if PublishServerSync != nil { + PublishServerSync(SyncTypeSettings) + } return nil } @@ -219,7 +246,7 @@ func GetServerConfig() config.ServerConfig { cfg.DNSKey = "(hidden)" cfg.AllowedOrigin = servercfg.GetAllowedOrigin() cfg.RestBackend = "off" - cfg.NodeID = servercfg.GetNodeID() + cfg.HostName = servercfg.GetHostName() cfg.BrokerType = servercfg.GetBrokerType() cfg.EmqxRestEndpoint = servercfg.GetEmqxRestEndpoint() if settings.NetclientAutoUpdate { diff --git a/main.go b/main.go index 8d1ba8b4c..8e99a6c4a 100644 --- a/main.go +++ b/main.go @@ -88,9 +88,12 @@ func setupConfig(absoluteConfigPath string) { } func startHooks(ctx context.Context, wg *sync.WaitGroup) { - err := logic.TimerCheckpoint() - if err != nil { - logger.Log(1, "Timer error occurred: ", err.Error()) + // Only run timer checkpoint (telemetry) on master pod + if servercfg.IsMasterPod() { + err := logic.TimerCheckpoint() + if err != nil { + logger.Log(1, "Timer error occurred: ", err.Error()) + } } logic.EnterpriseCheck(ctx, wg) } @@ -102,8 +105,13 @@ func initialize() { // Client Mode Prereq Check logger.Log(0, "warning: MASTER_KEY not set, this could make account recovery difficult") } - if servercfg.GetNodeID() == "" { - logger.FatalLog("error: must set NODE_ID, currently blank") + // Log master/worker mode for K8s HA setup + if servercfg.IsHA() { + if servercfg.IsMasterPod() { + logger.Log(0, "HA mode: running as MASTER pod - will run migrations and singleton operations") + } else { + logger.Log(0, "HA mode: running as WORKER pod - skipping migrations and singleton operations") + } } // initialize sql schema db. @@ -134,7 +142,10 @@ func initialize() { // Client Mode Prereq Check _, _ = logic.GetAllEnrollmentKeys() _ = logic.CleanExpiredSSOStates() - migrate.Run() + // Only run migrations on master pod to avoid conflicts in HA setup + if servercfg.IsMasterPod() { + migrate.Run() + } logic.SetJWTSecret() logic.InitialiseRoles() @@ -195,7 +206,10 @@ func startControllers(wg *sync.WaitGroup, ctx context.Context) { wg.Add(1) go logic.StartHookManager(ctx, wg) - logic.InitNetworkHooks() + // Only run network cleanup hooks on master pod + if servercfg.IsMasterPod() { + logic.InitNetworkHooks() + } logic.AddSSOStateCleanupHook() } @@ -206,8 +220,12 @@ func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) { go mq.Keepalive(ctx) go func() { - go logic.ManageZombies(ctx) - go logic.DeleteExpiredNodes(ctx) + // Only run zombie management and expired node deletion on master pod + // to avoid duplicate operations in HA setup + if servercfg.IsMasterPod() { + go logic.ManageZombies(ctx) + go logic.DeleteExpiredNodes(ctx) + } for nodeUpdate := range logic.DeleteNodesCh { if nodeUpdate == nil { continue diff --git a/mq/mq.go b/mq/mq.go index 948a8d224..2f56c66af 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -73,29 +73,38 @@ func SetupMQTT(fatal bool) { setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts) logger.Log(0, "Mq Client Connecting with Random ID: ", opts.ClientID) opts.SetOnConnectHandler(func(client mqtt.Client) { + // Only master pod subscribes to incoming client messages in HA setup + // This prevents duplicate message processing across multiple pods + // Worker pods can still publish messages but won't process incoming ones serverName := servercfg.GetServer() - if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - logger.Log(0, "node update subscription failed") - } - if token := client.Subscribe(fmt.Sprintf("host/serverupdate/%s/#", serverName), 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - logger.Log(0, "host update subscription failed") - } - if token := client.Subscribe(fmt.Sprintf("signal/%s/#", serverName), 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - logger.Log(0, "node client subscription failed") + if servercfg.IsMasterPod() { + if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + logger.Log(0, "node update subscription failed") + } + if token := client.Subscribe(fmt.Sprintf("host/serverupdate/%s/#", serverName), 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + logger.Log(0, "host update subscription failed") + } + if token := client.Subscribe(fmt.Sprintf("signal/%s/#", serverName), 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + logger.Log(0, "node client subscription failed") + } + if token := client.Subscribe(fmt.Sprintf("metrics/%s/#", serverName), 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + logger.Log(0, "node metrics subscription failed") + } + logger.Log(0, "MQ subscriptions established (master pod)") + } else { + logger.Log(0, "MQ publish-only mode (worker pod)") } - if token := client.Subscribe(fmt.Sprintf("metrics/%s/#", serverName), 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - logger.Log(0, "node metrics subscription failed") + if servercfg.IsHA() { + if token := client.Subscribe(fmt.Sprintf("serversync/%s", serverName), 0, mqtt.MessageHandler(handleServerSync)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + logger.Log(0, "server sync subscription failed") + } } opts.SetOrderMatters(false) opts.SetResumeSubs(true) }) opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { - slog.Warn("detected broker connection lost", "err", e.Error()) - c.Disconnect(250) - slog.Info("re-initiating MQ connection") - SetupMQTT(false) - + slog.Warn("detected broker connection lost, auto-reconnect will retry", "err", e.Error()) }) mqclient = mqtt.NewClient(opts) tperiod := time.Now().Add(10 * time.Second) @@ -121,17 +130,48 @@ func SetupMQTT(fatal bool) { } time.Sleep(2 * time.Second) } + InitServerSync() } +const CHECKIN_FLUSH_INTERVAL = 30 + // Keepalive -- periodically pings all nodes to let them know server is still alive and doing well func Keepalive(ctx context.Context) { + warmPeerCaches() + StartPeerUpdateWorker(ctx) go PublishPeerUpdate(true) - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second * KEEPALIVE_TIMEOUT): - sendPeers() + metricsExportInterval := servercfg.GetMetricIntervalInMinutes() + if metricsExportInterval < time.Minute { + metricsExportInterval = time.Minute * 10 + } + metricsTicker := time.NewTicker(metricsExportInterval) + defer metricsTicker.Stop() + if servercfg.CacheEnabled() { + checkinTicker := time.NewTicker(CHECKIN_FLUSH_INTERVAL * time.Second) + defer checkinTicker.Stop() + for { + select { + case <-ctx.Done(): + logic.FlushNodeCheckins() + return + case <-time.After(time.Second * KEEPALIVE_TIMEOUT): + sendPeers() + case <-checkinTicker.C: + logic.FlushNodeCheckins() + case <-metricsTicker.C: + PushAllMetricsToExporter() + } + } + } else { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * KEEPALIVE_TIMEOUT): + sendPeers() + case <-metricsTicker.C: + PushAllMetricsToExporter() + } } } } diff --git a/mq/publishers.go b/mq/publishers.go index f01007984..b68fc4de4 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -1,14 +1,19 @@ package mq import ( + "bytes" "context" "encoding/json" "errors" "fmt" + "net/http" + "os" "sync" + "sync/atomic" "time" "github.com/google/uuid" + "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/db" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" @@ -18,11 +23,106 @@ import ( "golang.org/x/exp/slog" ) -// PublishPeerUpdate --- determines and publishes a peer update to all the hosts +var ( + peerUpdateSignal = make(chan struct{}, 1) + peerUpdateReplace atomic.Bool +) + +const ( + peerUpdateDebounce = 500 * time.Millisecond + peerUpdateMaxWait = 3 * time.Second + maxConcurrentPublishes = 5 +) + +var metricsHTTPClient = &http.Client{Timeout: 30 * time.Second} + +// PublishPeerUpdate --- queues a peer update that will be coalesced with other +// rapid-fire updates via a debounce window (500ms) capped by a max-wait (3s). func PublishPeerUpdate(replacePeers bool) error { if !servercfg.IsMessageQueueBackend() { return nil } + if replacePeers { + peerUpdateReplace.Store(true) + } + select { + case peerUpdateSignal <- struct{}{}: + default: + } + return nil +} + +// StartPeerUpdateWorker --- runs a background goroutine that coalesces peer +// update signals using a resettable debounce timer capped by an absolute +// max-wait deadline. This ensures rapid-fire PublishPeerUpdate calls result +// in a single broadcast, while guaranteeing peers never wait longer than +// peerUpdateMaxWait from the first signal. +func StartPeerUpdateWorker(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-peerUpdateSignal: + maxWait := time.After(peerUpdateMaxWait) + debounce := time.After(peerUpdateDebounce) + wait: + for { + select { + case <-ctx.Done(): + return + case <-maxWait: + break wait + case <-debounce: + break wait + case <-peerUpdateSignal: + debounce = time.After(peerUpdateDebounce) + } + } + replacePeers := peerUpdateReplace.Swap(false) + drain: + for { + select { + case <-peerUpdateSignal: + default: + break drain + } + } + logic.RefreshHostPeerInfoCache() + if err := publishPeerUpdateImmediate(replacePeers); err != nil { + slog.Error("error publishing peer update", "error", err) + } else { + publishServerSync(logic.SyncTypePeerUpdate) + } + } + } + }() +} + +// warmPeerCaches pre-computes HostPeerInfo and HostPeerUpdate caches so that +// pull requests arriving before the first debounced broadcast are served +// instantly from cache instead of triggering expensive on-demand computation. +func warmPeerCaches() { + hosts, allNodes := logic.RefreshHostPeerInfoCache() + if hosts == nil || allNodes == nil { + return + } + for i := range hosts { + peerUpdate, err := logic.GetPeerUpdateForHost("", &hosts[i], allNodes, nil, nil) + if err != nil { + slog.Error("warmPeerCaches: failed to compute peer update", "host", hosts[i].ID, "error", err) + continue + } + logic.StoreHostPeerUpdate(hosts[i].ID.String(), peerUpdate) + } + slog.Info("peer update caches warmed", "hosts", len(hosts)) +} + +// publishPeerUpdateImmediate --- determines and publishes a peer update to all the hosts +func publishPeerUpdateImmediate(replacePeers bool) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } if logic.GetManageDNS() { sendDNSSync() @@ -38,11 +138,15 @@ func PublishPeerUpdate(replacePeers bool) error { return err } + sem := make(chan struct{}, maxConcurrentPublishes) + var wg sync.WaitGroup for _, host := range hosts { host := host - time.Sleep(5 * time.Millisecond) + sem <- struct{}{} + wg.Add(1) go func(host schema.Host) { - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil { + defer func() { <-sem; wg.Done() }() + if err := PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil { id := host.Name if host.ID != uuid.Nil { id = host.ID.String() @@ -51,6 +155,7 @@ func PublishPeerUpdate(replacePeers bool) error { } }(host) } + wg.Wait() return nil } @@ -130,6 +235,9 @@ func PublishSingleHostPeerUpdate(host *schema.Host, allNodes []models.Node, dele EndpointDetection: peerUpdate.ServerConfig.EndpointDetection, } peerUpdate.ReplacePeers = replacePeers + if deletedNode == nil && len(deletedClients) == 0 { + logic.StoreHostPeerUpdate(host.ID.String(), peerUpdate) + } data, err := json.Marshal(&peerUpdate) if err != nil { return err @@ -219,25 +327,66 @@ func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool) { } -func PushMetricsToExporter(metrics models.Metrics) error { - logger.Log(2, "----> Pushing metrics to exporter") - data, err := json.Marshal(metrics) +// PushAllMetricsToExporter fetches all node metrics from the database +// and POSTs them as a batch to the exporter's HTTP API. +// Called periodically by a ticker instead of on every individual metrics MQTT message. +func PushAllMetricsToExporter() { + if !servercfg.IsMetricsExporter() { + return + } + baseURL := servercfg.GetMetricsExporterURL() + healthResp, err := metricsHTTPClient.Get(baseURL + "/api/health") if err != nil { - return errors.New("failed to marshal metrics: " + err.Error()) + slog.Warn("metrics export: exporter not reachable, skipping", "error", err) + return } - if mqclient == nil || !mqclient.IsConnectionOpen() { - return errors.New("cannot publish ... mqclient not connected") + healthResp.Body.Close() + if healthResp.StatusCode != http.StatusOK { + slog.Warn("metrics export: exporter unhealthy, skipping", "status", healthResp.StatusCode) + return } - if token := mqclient.Publish(fmt.Sprintf("metrics_exporter/%s", servercfg.GetServer()), 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { - var err error - if token.Error() == nil { - err = errors.New("connection timeout") - } else { - err = token.Error() + records, err := database.FetchRecords(database.METRICS_TABLE_NAME) + if err != nil { + slog.Error("metrics export: failed to fetch records", "error", err) + return + } + batch := make([]models.Metrics, 0, len(records)) + for _, data := range records { + var m models.Metrics + if err := json.Unmarshal([]byte(data), &m); err != nil { + continue } - return err + batch = append(batch, m) + } + if len(batch) == 0 { + return + } + body, err := json.Marshal(batch) + if err != nil { + slog.Error("metrics export: failed to marshal batch", "error", err) + return + } + url := baseURL + "/api/v1/metrics" + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + slog.Error("metrics export: failed to create request", "error", err) + return + } + req.Header.Set("Content-Type", "application/json") + metricsUser := os.Getenv("METRICS_USERNAME") + if metricsUser == "" { + metricsUser = "netmaker" + } + req.SetBasicAuth(metricsUser, os.Getenv("METRICS_SECRET")) + resp, err := metricsHTTPClient.Do(req) + if err != nil { + slog.Error("metrics export: POST failed", "url", url, "error", err) + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + slog.Error("metrics export: unexpected status", "url", url, "status", resp.StatusCode) } - return nil } // sendPeers - retrieve networks, send peer ports to all peers @@ -246,9 +395,12 @@ func sendPeers() { if peer_force_send == 5 { servercfg.SetHost() peer_force_send = 0 - err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed.. - if err != nil { - logger.Log(3, "error occurred on timer,", err.Error()) + // Only run timer checkpoint on master pod in HA setup + if servercfg.IsMasterPod() { + err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed.. + if err != nil { + logger.Log(3, "error occurred on timer,", err.Error()) + } } } } diff --git a/mq/serversync.go b/mq/serversync.go new file mode 100644 index 000000000..e3f2e06fe --- /dev/null +++ b/mq/serversync.go @@ -0,0 +1,70 @@ +package mq + +import ( + "encoding/json" + "fmt" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/servercfg" + "golang.org/x/exp/slog" +) + +type serverSyncMessage struct { + Sender string `json:"sender"` + SyncType logic.ServerSyncType `json:"sync_type"` +} + +// InitServerSync wires up the logic.PublishServerSync hook so that +// mutations in the logic package can broadcast sync signals +// without importing mq (avoiding circular imports). +func InitServerSync() { + logic.PublishServerSync = publishServerSync +} + +func publishServerSync(syncType logic.ServerSyncType) { + if mqclient == nil || !mqclient.IsConnectionOpen() { + return + } + msg := serverSyncMessage{ + Sender: servercfg.GetHostName(), + SyncType: syncType, + } + data, err := json.Marshal(msg) + if err != nil { + slog.Error("serversync: failed to marshal message", "error", err) + return + } + topic := fmt.Sprintf("serversync/%s", servercfg.GetServer()) + token := mqclient.Publish(topic, 0, false, data) + if !token.WaitTimeout(MQ_TIMEOUT * time.Second) { + slog.Warn("serversync: publish timed out", "topic", topic) + } else if token.Error() != nil { + slog.Error("serversync: publish failed", "topic", topic, "error", token.Error()) + } +} + +func handleServerSync(_ mqtt.Client, msg mqtt.Message) { + var syncMsg serverSyncMessage + if err := json.Unmarshal(msg.Payload(), &syncMsg); err != nil { + slog.Error("serversync: failed to parse message", "error", err) + return + } + if syncMsg.Sender == servercfg.GetHostName() { + return + } + slog.Info("serversync: received sync", "from", syncMsg.Sender, "type", syncMsg.SyncType) + + switch syncMsg.SyncType { + case logic.SyncTypeSettings: + logic.InvalidateServerSettingsCache() + case logic.SyncTypePeerUpdate: + logic.InvalidateHostPeerCaches() + go warmPeerCaches() + case logic.SyncTypeIDPSync: + if servercfg.IsMasterPod() { + logic.ResetIDPSyncHook() + } + } +} diff --git a/mq/util.go b/mq/util.go index 7625e386f..9d9b7191a 100644 --- a/mq/util.go +++ b/mq/util.go @@ -2,7 +2,6 @@ package mq import ( "bytes" - "compress/gzip" "context" "crypto/aes" "crypto/cipher" @@ -12,6 +11,7 @@ import ( "io" "math" "strings" + "sync" "time" "github.com/gravitl/netmaker/db" @@ -19,6 +19,7 @@ import ( "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/schema" + "github.com/klauspost/compress/gzip" "golang.org/x/exp/slog" ) @@ -75,14 +76,41 @@ func BatchItems[T any](items []T, batchSize int) [][]T { return batches } +var gzipWriterPool = sync.Pool{ + New: func() any { + w, _ := gzip.NewWriterLevel(io.Discard, gzip.BestSpeed) + return w + }, +} + +var bufferPool = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} + func compressPayload(data []byte) ([]byte, error) { - var buf bytes.Buffer - zw := gzip.NewWriter(&buf) + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufferPool.Put(buf) + + zw := gzipWriterPool.Get().(*gzip.Writer) + zw.Reset(buf) + defer func() { + zw.Reset(io.Discard) + gzipWriterPool.Put(zw) + }() + if _, err := zw.Write(data); err != nil { return nil, err } - zw.Close() - return buf.Bytes(), nil + if err := zw.Close(); err != nil { + return nil, err + } + + result := make([]byte, buf.Len()) + copy(result, buf.Bytes()) + return result, nil } func encryptAESGCM(key, plaintext []byte) ([]byte, error) { // Create AES block cipher @@ -161,19 +189,35 @@ func publish(host *schema.Host, dest string, msg []byte) error { } } - if mqclient == nil || !mqclient.IsConnectionOpen() { - return errors.New("cannot publish ... mqclient not connected") - } + for attempt := 0; attempt < 2; attempt++ { + if mqclient == nil || !mqclient.IsConnectionOpen() { + ok := false + for i := 0; i < 5; i++ { + time.Sleep(time.Second) + if mqclient != nil && mqclient.IsConnectionOpen() { + ok = true + break + } + } + if !ok { + return errors.New("cannot publish ... mqclient not connected") + } + } - if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { - var err error - if token.Error() == nil { - err = errors.New("connection timeout") - } else { + token := mqclient.Publish(dest, 0, true, encrypted) + if token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() == nil { + return nil + } + if attempt == 0 { + slog.Warn("publish failed, retrying after reconnect", "dest", dest) + time.Sleep(2 * time.Second) + continue + } + if token.Error() != nil { slog.Error("publish to mq error", "error", token.Error().Error()) - err = token.Error() + return token.Error() } - return err + return errors.New("connection timeout") } return nil } diff --git a/pro/auth/sync.go b/pro/auth/sync.go index 3020c34e0..842eca5f0 100644 --- a/pro/auth/sync.go +++ b/pro/auth/sync.go @@ -30,6 +30,13 @@ var ( ) func ResetIDPSyncHook() { + if !servercfg.IsMasterPod() { + if servercfg.IsHA() && logic.PublishServerSync != nil { + logic.PublishServerSync(logic.SyncTypeIDPSync) + } + return + } + if cancelSyncHook != nil { cancelSyncHook() hookStopWg.Wait() diff --git a/pro/initialize.go b/pro/initialize.go index bdcd50f83..ba7686501 100644 --- a/pro/initialize.go +++ b/pro/initialize.go @@ -88,6 +88,7 @@ func InitPro() { logger.Log(0, "proceeding with Paid Tier license") logic.SetFreeTierForTelemetry(false) // == End License Handling == + // License validation runs on all pods to avoid audit issues AddLicenseHooks() } else { logger.Log(0, "starting trial license hook") @@ -107,29 +108,37 @@ func InitPro() { if servercfg.CacheEnabled() { proLogic.InitAutoRelayCache() } - auth.ResetIDPSyncHook() - email.Init() - go proLogic.EventWatcher() - logic.GetMetricsMonitor().Start() - proLogic.AddPostureCheckHook() - // Register JIT expiry hook with email notifications - addJitExpiryHookWithEmail() - if proLogic.GetFeatureFlags().EnableFlowLogs && logic.GetServerSettings().EnableFlowLogs { - err := ch.Initialize() - if err != nil { - logger.FatalLog("error connecting to clickhouse:", err.Error()) - } - proLogic.StartFlowCleanupLoop() + // Only run singleton operations on master pod in HA setup + // These include IDP sync, posture checks, JIT expiry, and flow cleanup + if servercfg.IsMasterPod() { + auth.ResetIDPSyncHook() + proLogic.AddPostureCheckHook() + // Register JIT expiry hook with email notifications + addJitExpiryHookWithEmail() + + if proLogic.GetFeatureFlags().EnableFlowLogs && logic.GetServerSettings().EnableFlowLogs { + err := ch.Initialize() + if err != nil { + logger.FatalLog("error connecting to clickhouse:", err.Error()) + } - wg.Add(1) - go func(ctx context.Context, wg *sync.WaitGroup) { - <-ctx.Done() - proLogic.StopFlowCleanupLoop() - ch.Close() - wg.Done() - }(ctx, wg) + proLogic.StartFlowCleanupLoop() + + wg.Add(1) + go func(ctx context.Context, wg *sync.WaitGroup) { + <-ctx.Done() + proLogic.StopFlowCleanupLoop() + ch.Close() + wg.Done() + }(ctx, wg) + } } + + // These can run on all pods + email.Init() + go proLogic.EventWatcher() + logic.GetMetricsMonitor().Start() }) logic.ResetFailOver = proLogic.ResetFailOver logic.ResetFailedOverPeer = proLogic.ResetFailedOverPeer diff --git a/pro/logic/auto_relay.go b/pro/logic/auto_relay.go index fdeb98e83..58968173a 100644 --- a/pro/logic/auto_relay.go +++ b/pro/logic/auto_relay.go @@ -163,6 +163,9 @@ func DoesAutoRelayExist(network string) (autoRelayNodes []models.Node) { // ResetAutoRelayedPeer - removes auto relayed over node from network peers func ResetAutoRelayedPeer(autoRelayedNode *models.Node) error { + if len(autoRelayedNode.AutoRelayedPeers) == 0 { + return nil + } nodes, err := logic.GetNetworkNodes(autoRelayedNode.Network) if err != nil { return err @@ -172,11 +175,15 @@ func ResetAutoRelayedPeer(autoRelayedNode *models.Node) error { if err != nil { return err } + nodeIDStr := autoRelayedNode.ID.String() for _, node := range nodes { if node.AutoRelayedPeers == nil || node.ID == autoRelayedNode.ID { continue } - delete(node.AutoRelayedPeers, autoRelayedNode.ID.String()) + if _, exists := node.AutoRelayedPeers[nodeIDStr]; !exists { + continue + } + delete(node.AutoRelayedPeers, nodeIDStr) logic.UpsertNode(&node) } return nil diff --git a/pro/logic/failover.go b/pro/logic/failover.go index c24bb8938..5c995d526 100644 --- a/pro/logic/failover.go +++ b/pro/logic/failover.go @@ -160,6 +160,9 @@ func FailOverExists(network string) (failOverNode models.Node, exists bool) { // ResetFailedOverPeer - removes failed over node from network peers func ResetFailedOverPeer(failedOveredNode *models.Node) error { + if failedOveredNode.FailedOverBy == uuid.Nil && len(failedOveredNode.FailOverPeers) == 0 { + return nil + } nodes, err := logic.GetNetworkNodes(failedOveredNode.Network) if err != nil { return err @@ -170,11 +173,15 @@ func ResetFailedOverPeer(failedOveredNode *models.Node) error { if err != nil { return err } + nodeIDStr := failedOveredNode.ID.String() for _, node := range nodes { if node.FailOverPeers == nil || node.ID == failedOveredNode.ID { continue } - delete(node.FailOverPeers, failedOveredNode.ID.String()) + if _, exists := node.FailOverPeers[nodeIDStr]; !exists { + continue + } + delete(node.FailOverPeers, nodeIDStr) logic.UpsertNode(&node) } return nil diff --git a/pro/logic/metrics.go b/pro/logic/metrics.go index 3b34046cc..cbef5f2ae 100644 --- a/pro/logic/metrics.go +++ b/pro/logic/metrics.go @@ -133,11 +133,6 @@ func MQUpdateMetricsFallBack(nodeid string, newMetrics models.Metrics) { slog.Error("failed to update node metrics", "id", nodeid, "error", err) return } - if servercfg.IsMetricsExporter() { - if err := mq.PushMetricsToExporter(newMetrics); err != nil { - slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err) - } - } slog.Debug("updated node metrics", "id", nodeid) } @@ -168,11 +163,6 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) { slog.Error("failed to update node metrics", "id", id, "error", err) return } - if servercfg.IsMetricsExporter() { - if err := mq.PushMetricsToExporter(newMetrics); err != nil { - slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err) - } - } slog.Debug("updated node metrics", "id", id) } diff --git a/scripts/netmaker.default.env b/scripts/netmaker.default.env index 8d4ea4816..71bfb5808 100644 --- a/scripts/netmaker.default.env +++ b/scripts/netmaker.default.env @@ -17,11 +17,13 @@ NETMAKER_TENANT_ID= LICENSE_KEY= SERVER_IMAGE_TAG= UI_IMAGE_TAG= -# used for HA - identifies this server vs other servers -NODE_ID=netmaker-server-1 +# used for HA - identifies this server vs other servers if unset uses host_name +HOST_NAME= METRICS_EXPORTER=off #metrics exporter secret METRICS_SECRET= +#metrics exporter user +METRICS_USERNAME=netmaker # Enables DNS Mode, meaning all nodes will set hosts file for private dns settings DNS_MODE=on # Enable auto update of netclient ? ENUM:- enabled,disabled | default=enabled diff --git a/scripts/nm-quick.sh b/scripts/nm-quick.sh index dce23ae3c..dcb9e5ce8 100755 --- a/scripts/nm-quick.sh +++ b/scripts/nm-quick.sh @@ -30,12 +30,14 @@ unset IMAGE_TAG unset NETMAKER_BASE_DOMAIN unset UPGRADE_FLAG unset COLLECT_PRO_VARS +INSTALL_MONITORING="${INSTALL_MONITORING:-off}" # usage - displays usage instructions usage() { echo "nm-quick.sh v$NM_QUICK_VERSION" echo "usage: ./nm-quick.sh [-c]" echo " -c if specified, will install netmaker community version" echo " -p if specified, will install netmaker pro version" + echo " -m if specified, will install netmaker pro with monitoring stack (Prometheus, Grafana, Exporter)" echo " -u if specified, will upgrade netmaker to pro version" echo " -d if specified, will downgrade netmaker to community version" exit 1 @@ -254,7 +256,7 @@ save_config() { ( if [ "$INSTALL_TYPE" = "pro" ]; then save_config_item NETMAKER_TENANT_ID "$NETMAKER_TENANT_ID" save_config_item LICENSE_KEY "$LICENSE_KEY" - save_config_item METRICS_EXPORTER "on" + save_config_item METRICS_EXPORTER "$INSTALL_MONITORING" save_config_item SERVER_IMAGE_TAG "$IMAGE_TAG-ee" else save_config_item METRICS_EXPORTER "off" @@ -564,7 +566,7 @@ set_install_vars() { echo " api.$NETMAKER_BASE_DOMAIN" echo " broker.$NETMAKER_BASE_DOMAIN" - if [ "$INSTALL_TYPE" = "pro" ]; then + if [ "$INSTALL_MONITORING" = "on" ]; then echo " prometheus.$NETMAKER_BASE_DOMAIN" echo " netmaker-exporter.$NETMAKER_BASE_DOMAIN" echo " grafana.$NETMAKER_BASE_DOMAIN" @@ -642,22 +644,35 @@ install_netmaker() { local COMPOSE_OVERRIDE_URL="$BASE_URL/compose/docker-compose.pro.yml" wget -qO "$INSTALL_DIR"/docker-compose.override.yml $COMPOSE_OVERRIDE_URL local CADDY_URL="$BASE_URL/docker/Caddyfile-pro" - # download Grafana assets (dashboards, datasource, config) - mkdir -p "$INSTALL_DIR/grafana" - local GRAFANA_BASE="https://downloads.netmaker.io/assests/grafana" - wget -qO "$INSTALL_DIR/grafana/dashboard-config.yaml" "$GRAFANA_BASE/dashboard-config.yaml" - wget -qO "$INSTALL_DIR/grafana/dashboard.json" "$GRAFANA_BASE/dashboard.json" - wget -qO "$INSTALL_DIR/grafana/datasource.yaml" "$GRAFANA_BASE/datasource.yaml" - wget -qO "$INSTALL_DIR/grafana/grafana.ini" "$GRAFANA_BASE/grafana.ini" - # download Prometheus config - mkdir -p "$INSTALL_DIR/prometheus" - wget -qO "$INSTALL_DIR/prometheus/prometheus.yml" "https://downloads.netmaker.io/assests/prometheus/prometheus.yml" + if [ "$INSTALL_MONITORING" = "on" ]; then + # download Grafana assets (dashboards, datasource, config) + mkdir -p "$INSTALL_DIR/grafana" + local GRAFANA_BASE="https://downloads.netmaker.io/assests/grafana" + wget -qO "$INSTALL_DIR/grafana/dashboard-config.yaml" "$GRAFANA_BASE/dashboard-config.yaml" + wget -qO "$INSTALL_DIR/grafana/dashboard.json" "$GRAFANA_BASE/dashboard.json" + wget -qO "$INSTALL_DIR/grafana/datasource.yaml" "$GRAFANA_BASE/datasource.yaml" + wget -qO "$INSTALL_DIR/grafana/grafana.ini" "$GRAFANA_BASE/grafana.ini" + # download Prometheus config + mkdir -p "$INSTALL_DIR/prometheus" + wget -qO "$INSTALL_DIR/prometheus/prometheus.yml" "https://downloads.netmaker.io/assests/prometheus/prometheus.yml" + else + # strip monitoring services and volumes from the override file + yq -i 'del(.services.prometheus)' "$INSTALL_DIR/docker-compose.override.yml" + yq -i 'del(.services.grafana)' "$INSTALL_DIR/docker-compose.override.yml" + yq -i 'del(.services.netmaker-exporter)' "$INSTALL_DIR/docker-compose.override.yml" + yq -i 'del(.volumes.prometheus_data)' "$INSTALL_DIR/docker-compose.override.yml" + yq -i 'del(.volumes.grafana_data)' "$INSTALL_DIR/docker-compose.override.yml" + fi elif [ -a "$INSTALL_DIR"/docker-compose.override.yml ]; then rm -f "$INSTALL_DIR"/docker-compose.override.yml fi wget -qO "$INSTALL_DIR"/docker-compose.yml $COMPOSE_URL wget -qO "$INSTALL_DIR"/Caddyfile "$CADDY_URL" + if [ "$INSTALL_TYPE" = "pro" ] && [ "$INSTALL_MONITORING" != "on" ]; then + sed -i '/# Netmaker Exporter/,/^}/d' "$INSTALL_DIR/Caddyfile" + sed -i '/# Grafana/,/^}/d' "$INSTALL_DIR/Caddyfile" + fi wget -qO "$INSTALL_DIR"/netmaker.default.env "$BASE_URL/scripts/netmaker.default.env" wget -qO "$INSTALL_DIR"/mosquitto.conf "$BASE_URL/docker/mosquitto.conf" wget -qO "$INSTALL_DIR"/wait.sh "$BASE_URL/docker/wait.sh" @@ -751,6 +766,12 @@ print_success() { echo "Visit dashboard.$NETMAKER_BASE_DOMAIN to log in" echo "" echo "Installation files are located in: $INSTALL_DIR" + if [ "$INSTALL_TYPE" = "pro" ] && [ "$INSTALL_MONITORING" != "on" ]; then + echo "" + echo "To add the monitoring stack (Prometheus, Grafana, Exporter), re-run:" + echo " sudo ./nm-quick.sh -m" + echo "NOTE: The monitoring stack requires at least 2 GB of RAM." + fi echo "-----------------------------------------------------------------" echo "-----------------------------------------------------------------" } @@ -852,6 +873,69 @@ downgrade () { install_netmaker } +add_monitoring() { + print_logo + echo "Adding monitoring stack (Prometheus, Grafana, Exporter)..." + echo "NOTE: The monitoring stack requires at least 2 GB of RAM." + echo "" + + INSTALL_MONITORING="on" + INSTALL_TYPE="pro" + + mkdir -p "$INSTALL_DIR" + CONFIG_PATH="$INSTALL_DIR/$CONFIG_FILE" + if [ ! -f "$CONFIG_PATH" ] && [ -f "$SCRIPT_DIR/$CONFIG_FILE" ]; then + cp "$SCRIPT_DIR/$CONFIG_FILE" "$CONFIG_PATH" + fi + + unset IMAGE_TAG + unset BUILD_TAG + IMAGE_TAG=$UI_IMAGE_TAG + semver=$(chsv_check_version_ex "$UI_IMAGE_TAG") + if [[ ! "$semver" ]]; then + BUILD_TAG=$LATEST + else + BUILD_TAG=$UI_IMAGE_TAG + fi + + local BASE_URL="https://raw.githubusercontent.com/gravitl/netmaker/$BRANCH" + + # re-download the full pro override (with monitoring services intact) + wget -qO "$INSTALL_DIR"/docker-compose.override.yml "$BASE_URL/compose/docker-compose.pro.yml" + + # download Caddyfile-pro (with monitoring reverse-proxy entries) + wget -qO "$INSTALL_DIR"/Caddyfile "$BASE_URL/docker/Caddyfile-pro" + + # download Grafana assets + mkdir -p "$INSTALL_DIR/grafana" + local GRAFANA_BASE="https://downloads.netmaker.io/assests/grafana" + wget -qO "$INSTALL_DIR/grafana/dashboard-config.yaml" "$GRAFANA_BASE/dashboard-config.yaml" + wget -qO "$INSTALL_DIR/grafana/dashboard.json" "$GRAFANA_BASE/dashboard.json" + wget -qO "$INSTALL_DIR/grafana/datasource.yaml" "$GRAFANA_BASE/datasource.yaml" + wget -qO "$INSTALL_DIR/grafana/grafana.ini" "$GRAFANA_BASE/grafana.ini" + + # download Prometheus config + mkdir -p "$INSTALL_DIR/prometheus" + wget -qO "$INSTALL_DIR/prometheus/prometheus.yml" "https://downloads.netmaker.io/assests/prometheus/prometheus.yml" + + # update METRICS_EXPORTER in config + save_config_item METRICS_EXPORTER "on" + + # restart services + stop_services + echo "Starting containers..." + cd "${INSTALL_DIR}" + docker compose up -d --force-recreate + cd - + + echo "" + echo "-----------------------------------------------------------------" + echo "Monitoring stack has been added successfully." + echo " Grafana: https://grafana.${NM_DOMAIN}" + echo " Exporter: https://netmaker-exporter.${NM_DOMAIN}" + echo "-----------------------------------------------------------------" +} + function chsv_check_version() { if [[ $1 =~ ^(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)(-((0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*))*))?(\+([0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*))?$ ]]; then echo "$1" @@ -884,7 +968,7 @@ main (){ fi INSTALL_TYPE="ce" - while getopts :cudpv flag; do + while getopts :cudpmv flag; do case "${flag}" in c) INSTALL_TYPE="ce" @@ -907,6 +991,16 @@ main (){ INSTALL_TYPE="pro" COLLECT_PRO_VARS="true" ;; + m) + if [ -f "$INSTALL_DIR/$CONFIG_FILE" ] || [ -f "$SCRIPT_DIR/$CONFIG_FILE" ]; then + add_monitoring + exit 0 + fi + echo "installing pro version with monitoring stack..." + INSTALL_TYPE="pro" + INSTALL_MONITORING="on" + COLLECT_PRO_VARS="true" + ;; v) usage exit 0 diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 72af99261..60fde09f6 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -302,6 +302,14 @@ func IsMetricsExporter() bool { return export } +// GetMetricsExporterURL returns the base URL of the netmaker-exporter HTTP API. +func GetMetricsExporterURL() string { + if v := os.Getenv("NETMAKER_METRICS_TARGET"); v != "" { + return v + } + return "http://netmaker-exporter:8085" +} + // IsMessageQueueBackend - checks if message queue is on or off func IsMessageQueueBackend() bool { ismessagequeue := true @@ -477,15 +485,15 @@ func GetSQLConn() string { return sqlconn } -// GetNodeID - gets the node id -func GetNodeID() string { +// GetHostName - gets the host name +func GetHostName() string { var id string var err error // id = getMacAddr() - if os.Getenv("NODE_ID") != "" { - id = os.Getenv("NODE_ID") - } else if config.Config.Server.NodeID != "" { - id = config.Config.Server.NodeID + if os.Getenv("HOST_NAME") != "" { + id = os.Getenv("HOST_NAME") + } else if config.Config.Server.HostName != "" { + id = config.Config.Server.HostName } else { id, err = os.Hostname() if err != nil { @@ -496,7 +504,7 @@ func GetNodeID() string { } func SetNodeID(id string) { - config.Config.Server.NodeID = id + config.Config.Server.HostName = id } // GetAuthProviderInfo = gets the oauth provider info @@ -802,3 +810,54 @@ func GetAllowedEmailDomains() string { func GetNmBaseDomain() string { return os.Getenv("NM_DOMAIN") } + +// IsHA - returns true if running in High Availability mode (multiple replicas) +func IsHA() bool { + return os.Getenv("IS_HA") == "true" +} + +// IsMasterPod - returns true if this pod should run singleton operations +// In K8s StatefulSet HA mode, the 0th pod (hostname ending with -0) is the master. +// This ensures migrations, IDP sync, and other singleton operations only run on one pod. +// Can be overridden with IS_MASTER_POD env var for manual control. +// +// TODO: Future Enhancement - Implement Leader Election for True HA Failover +// Current limitation: If pod-0 goes down, singleton operations and MQ subscriptions +// are unavailable until pod-0 recovers. For automatic failover, implement: +// +// Option 1: K8s Lease-based Leader Election +// - Use client-go's leaderelection package +// - Any pod can become leader if current leader fails +// - Requires K8s ServiceAccount with lease permissions +// +// Option 2: Database-based Distributed Lock +// - Use database (PostgreSQL/SQLite) advisory locks +// - Leader heartbeats to maintain lock, others acquire on timeout +// - Works in non-K8s environments too +// +// Option 3: etcd/Consul-based Leader Election +// - Use distributed consensus systems +// - Most robust but adds infrastructure dependency +// +// Implementation would replace static pod-0 check with dynamic leader status. +func IsMasterPod() bool { + // Allow manual override via environment variable + if override := os.Getenv("IS_MASTER_POD"); override != "" { + return override == "true" + } + + // If not in HA mode, default to true (single instance deployment) + if !IsHA() { + return true + } + + // StatefulSet pods are named -. + // Compare the last hyphen-delimited segment to "0" to avoid + // false positives (e.g. "netmaker-10" matching a naive HasSuffix "-0"). + hostname, err := os.Hostname() + if err != nil { + return true + } + parts := strings.Split(hostname, "-") + return parts[len(parts)-1] == "0" +} diff --git a/utils/utils.go b/utils/utils.go index a9bd19c78..10539418f 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -54,12 +54,12 @@ func TraceCaller() { // Skip 1 frame to get the caller of this function pc, file, line, ok := runtime.Caller(2) if !ok { - slog.Debug("Unable to get caller information") + slog.Error("Unable to get caller information") return } tracePc, _, _, ok := runtime.Caller(1) if !ok { - slog.Debug("Unable to get caller information") + slog.Error("Unable to get caller information") return } traceFuncName := runtime.FuncForPC(tracePc).Name()