From 21dca11209fc3bb5e11cba1aa27eca354783d713 Mon Sep 17 00:00:00 2001 From: raphaelvrosa Date: Mon, 16 Jun 2025 10:18:31 +0200 Subject: [PATCH 1/2] Moves heartbeat flags to env variables --- inventory/pkg/client/client.go | 58 +++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/inventory/pkg/client/client.go b/inventory/pkg/client/client.go index 62b510483..25bbc0121 100644 --- a/inventory/pkg/client/client.go +++ b/inventory/pkg/client/client.go @@ -6,8 +6,9 @@ package client import ( "context" "errors" - "flag" "io" + "os" + "strconv" "sync" "time" @@ -66,18 +67,14 @@ const ( ) const ( + // Default values for heartbeat related configurations. defaultHeatbeatBackoffRetries = 3 - defaultHeatbeatBackoffInterval = time.Second * 1 - defaultHeartbeatInterval = time.Second * 30 -) - -var ( - heatbeatBackoffRetries = flag.Uint64( - "heatbeatBackoffRetries", defaultHeatbeatBackoffRetries, "The number of retries the heartbeat backoff will attempt") - heatbeatBackoffInterval = flag.Duration( - "heatbeatBackoffInterval", defaultHeatbeatBackoffInterval, "The interval between heartbeat backoff retries") - heartbeatInterval = flag.Duration( - "heartbeatInterval", defaultHeartbeatInterval, "The interval between heartbeats") + defaultHeatbeatBackoffInterval = 1 + defaultHeartbeatInterval = 30 + // Defines heartbeat related environment variables. + EnvHeatbeatBackoffRetries = "HEARTBEAT_BACKOFF_RETRIES" + EnvHeatbeatBackoffInterval = "HEARTBEAT_BACKOFF_INTERVAL" + EnvHeartbeatInterval = "HEARTBEAT_INTERVAL" ) type WatchEvents struct { @@ -593,18 +590,49 @@ func NewTenantAwareInventoryClient( return cl, nil } +// getEnvIntOrDefault retrieves an environment variable as int, or returns the default if not set or invalid. +func getEnvIntOrDefault(envVar string, defaultVal int) int { + valStr := os.Getenv(envVar) + if valStr == "" { + zlog.InfraSec().Warn().Msgf("%s env variable is not set, using default value", envVar) + return defaultVal + } + val, err := strconv.Atoi(valStr) + if err != nil { + zlog.InfraSec().Warn().Msgf("Invalid value for %s env variable: %s, using default value %d", + envVar, valStr, defaultVal, + ) + return defaultVal + } + return val +} + // Set up the heartbeat ticker to keep the client connection alive. func (client *inventoryClient) heartbeat(clientUUID string) error { + heatbeatBackoffRetries := getEnvIntOrDefault(EnvHeatbeatBackoffRetries, defaultHeatbeatBackoffRetries) + heatbeatBackoffInterval := getEnvIntOrDefault(EnvHeatbeatBackoffInterval, defaultHeatbeatBackoffInterval) + heartbeatInterval := getEnvIntOrDefault(EnvHeartbeatInterval, defaultHeartbeatInterval) + heartbeetReq := &inv_v1.HeartbeatRequest{ ClientUuid: clientUUID, } - ticker := time.NewTicker(*heartbeatInterval) + heartbeatIntervalDuration := time.Duration(heartbeatInterval) * time.Second + heatbeatBackoffIntervalDuration := time.Duration(heatbeatBackoffInterval) * time.Second + heatbeatBackoffRetriesUint64, err := util.IntToUint64(heatbeatBackoffRetries) + if err != nil { + zlog.InfraSec().Warn().Msgf("Invalid value for %s env variable: %s, using default value %d", + EnvHeatbeatBackoffRetries, strconv.Itoa(heatbeatBackoffRetries), defaultHeatbeatBackoffRetries, + ) + heatbeatBackoffRetriesUint64 = uint64(defaultHeatbeatBackoffRetries) + } + + ticker := time.NewTicker(heartbeatIntervalDuration) defer ticker.Stop() for { select { case <-ticker.C: - clientCtxDeadline, cancel := context.WithDeadline(client.streamCtx, time.Now().Add(*heatbeatBackoffInterval)) + clientCtxDeadline, cancel := context.WithDeadline(client.streamCtx, time.Now().Add(heatbeatBackoffIntervalDuration)) clientCtx := tenant.AddTenantIDToContext(clientCtxDeadline, clientUUID) err := backoff.Retry(func() error { @@ -616,7 +644,7 @@ func (client *inventoryClient) heartbeat(clientUUID string) error { return backoff.Permanent(errHearbeat) } return errHearbeat - }, backoff.WithMaxRetries(backoff.NewConstantBackOff(*heatbeatBackoffInterval), *heatbeatBackoffRetries)) + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(heatbeatBackoffIntervalDuration), heatbeatBackoffRetriesUint64)) if err != nil { zlog.InfraErr(err).Msgf("failed to heartbeat client UUID: %s", clientUUID) cancel() // cancel the context to avoid leaking resources From a822af107d35b6536f351bc8b1745fc15c45186b Mon Sep 17 00:00:00 2001 From: raphaelvrosa Date: Mon, 16 Jun 2025 10:35:51 +0200 Subject: [PATCH 2/2] Sets client sched cache flags to env variables --- .../client/cache/schedule/schedule_cache.go | 73 +++++++++++++------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/inventory/pkg/client/cache/schedule/schedule_cache.go b/inventory/pkg/client/cache/schedule/schedule_cache.go index 44983c711..a481ba7d0 100644 --- a/inventory/pkg/client/cache/schedule/schedule_cache.go +++ b/inventory/pkg/client/cache/schedule/schedule_cache.go @@ -5,9 +5,10 @@ package schedule import ( "context" - "flag" "fmt" + "os" "sort" + "strconv" "sync" "time" @@ -26,33 +27,63 @@ import ( "github.com/open-edge-platform/infra-core/inventory/v2/pkg/util/paginator" ) -// This is a special inventory client, that subscribed to schedulesByCacheKey changes, caches the schedule resources locally, -// and exposes internal Go API to other components in order to fetch schedulesByCacheKey given some primitive filters - -var ( - logC = logging.GetLogger("ScheduleCache") - - PeriodicCacheRefresh = flag.Duration( - "schedCachePeriodicRefresh", defaultPeriodicRefresh, "Periodic refresh timeout of the schedule cache") - inventoryTimeout = flag.Duration( - "schedCacheInvTimeout", defaultInventoryTimeout, "Schedule cache inventory API calls timeout") - listAllInventoryTimeout = flag.Duration( - "schedCacheListAllTimeout", - defaultListAllTimeout, - "Timeout used when listing all schedule resources in the schedule cache from Inventory", - ) -) +var logC = logging.GetLogger("ScheduleCache") const ( - BatchSize = 500 - defaultPeriodicRefresh = 5 * time.Minute - defaultInventoryTimeout = 5 * time.Second - defaultListAllTimeout = time.Minute + BatchSize = 500 + + // Default values for schedule cache timeouts. + defaultPeriodicRefreshDuration = 5 * time.Minute + defaultInventoryTimeoutDuration = 5 * time.Second + defaultListAllTimeoutDuration = time.Minute defaultRegisterMaxElapsedTime = 30 * time.Second // eventsWatcherBufSize is the buffer size for the events channel. eventsWatcherBufSize = 10 + + // Environment variable names for schedule cache timeouts. + EnvSchedCachePeriodicRefresh = "SCHED_CACHE_PERIODIC_REFRESH" + EnvSchedCacheInvTimeout = "SCHED_CACHE_INV_TIMEOUT" + EnvSchedCacheListAllTimeout = "SCHED_CACHE_LIST_ALL_TIMEOUT" +) + +// Helper to parse duration from env as Go duration string (e.g. "5m", "10s"), fallback to seconds if parse fails. +func getEnvGoDurationOrDefault(envVar string, defaultVal time.Duration) time.Duration { + valStr := os.Getenv(envVar) + if valStr == "" { + logC.Warn().Msgf("%s env variable is not set, using default value", envVar) + return defaultVal + } + val, err := time.ParseDuration(valStr) + if err != nil { + // fallback: try parsing as seconds + valInt, err2 := strconv.Atoi(valStr) + if err2 == nil { + return time.Duration(valInt) * time.Second + } + logC.Warn().Msgf("Invalid value for %s env variable: %s, using default value %v", + envVar, valStr, defaultVal, + ) + return defaultVal + } + return val +} + +// Use environment variables for timeouts, fallback to defaults. +var ( + PeriodicCacheRefresh = func() *time.Duration { + d := getEnvGoDurationOrDefault(EnvSchedCachePeriodicRefresh, defaultPeriodicRefreshDuration) + return &d + }() + inventoryTimeout = func() *time.Duration { + d := getEnvGoDurationOrDefault(EnvSchedCacheInvTimeout, defaultInventoryTimeoutDuration) + return &d + }() + listAllInventoryTimeout = func() *time.Duration { + d := getEnvGoDurationOrDefault(EnvSchedCacheListAllTimeout, defaultListAllTimeoutDuration) + return &d + }() ) type cacheKey struct {