Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 52 additions & 21 deletions inventory/pkg/client/cache/schedule/schedule_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ package schedule

import (
"context"
"flag"
"fmt"
"os"
"sort"
"strconv"
"sync"
"time"

Expand All @@ -26,33 +27,63 @@ import (
"github.com/open-edge-platform/infra-core/inventory/v2/pkg/util/paginator"
)

// This is a special inventory client, that subscribed to schedulesByCacheKey changes, caches the schedule resources locally,
// and exposes internal Go API to other components in order to fetch schedulesByCacheKey given some primitive filters

var (
logC = logging.GetLogger("ScheduleCache")

PeriodicCacheRefresh = flag.Duration(
"schedCachePeriodicRefresh", defaultPeriodicRefresh, "Periodic refresh timeout of the schedule cache")
inventoryTimeout = flag.Duration(
"schedCacheInvTimeout", defaultInventoryTimeout, "Schedule cache inventory API calls timeout")
listAllInventoryTimeout = flag.Duration(
"schedCacheListAllTimeout",
defaultListAllTimeout,
"Timeout used when listing all schedule resources in the schedule cache from Inventory",
)
)
var logC = logging.GetLogger("ScheduleCache")

const (
BatchSize = 500
defaultPeriodicRefresh = 5 * time.Minute
defaultInventoryTimeout = 5 * time.Second
defaultListAllTimeout = time.Minute
BatchSize = 500

// Default values for schedule cache timeouts.
defaultPeriodicRefreshDuration = 5 * time.Minute
defaultInventoryTimeoutDuration = 5 * time.Second
defaultListAllTimeoutDuration = time.Minute

defaultRegisterMaxElapsedTime = 30 * time.Second

// eventsWatcherBufSize is the buffer size for the events channel.
eventsWatcherBufSize = 10

// Environment variable names for schedule cache timeouts.
EnvSchedCachePeriodicRefresh = "SCHED_CACHE_PERIODIC_REFRESH"
EnvSchedCacheInvTimeout = "SCHED_CACHE_INV_TIMEOUT"
EnvSchedCacheListAllTimeout = "SCHED_CACHE_LIST_ALL_TIMEOUT"
)

// Helper to parse duration from env as Go duration string (e.g. "5m", "10s"), fallback to seconds if parse fails.
func getEnvGoDurationOrDefault(envVar string, defaultVal time.Duration) time.Duration {
valStr := os.Getenv(envVar)
if valStr == "" {
logC.Warn().Msgf("%s env variable is not set, using default value", envVar)
return defaultVal
}
val, err := time.ParseDuration(valStr)
if err != nil {
// fallback: try parsing as seconds
valInt, err2 := strconv.Atoi(valStr)
if err2 == nil {
return time.Duration(valInt) * time.Second
}
logC.Warn().Msgf("Invalid value for %s env variable: %s, using default value %v",
envVar, valStr, defaultVal,
)
return defaultVal
}
return val
}

// Use environment variables for timeouts, fallback to defaults.
var (
PeriodicCacheRefresh = func() *time.Duration {
d := getEnvGoDurationOrDefault(EnvSchedCachePeriodicRefresh, defaultPeriodicRefreshDuration)
return &d
}()
inventoryTimeout = func() *time.Duration {
d := getEnvGoDurationOrDefault(EnvSchedCacheInvTimeout, defaultInventoryTimeoutDuration)
return &d
}()
listAllInventoryTimeout = func() *time.Duration {
d := getEnvGoDurationOrDefault(EnvSchedCacheListAllTimeout, defaultListAllTimeoutDuration)
return &d
}()
)

type cacheKey struct {
Expand Down
58 changes: 43 additions & 15 deletions inventory/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package client
import (
"context"
"errors"
"flag"
"io"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -66,18 +67,14 @@ const (
)

const (
// Default values for heartbeat related configurations.
defaultHeatbeatBackoffRetries = 3
defaultHeatbeatBackoffInterval = time.Second * 1
defaultHeartbeatInterval = time.Second * 30
)

var (
heatbeatBackoffRetries = flag.Uint64(
"heatbeatBackoffRetries", defaultHeatbeatBackoffRetries, "The number of retries the heartbeat backoff will attempt")
heatbeatBackoffInterval = flag.Duration(
"heatbeatBackoffInterval", defaultHeatbeatBackoffInterval, "The interval between heartbeat backoff retries")
heartbeatInterval = flag.Duration(
"heartbeatInterval", defaultHeartbeatInterval, "The interval between heartbeats")
defaultHeatbeatBackoffInterval = 1
defaultHeartbeatInterval = 30
// Defines heartbeat related environment variables.
EnvHeatbeatBackoffRetries = "HEARTBEAT_BACKOFF_RETRIES"
EnvHeatbeatBackoffInterval = "HEARTBEAT_BACKOFF_INTERVAL"
EnvHeartbeatInterval = "HEARTBEAT_INTERVAL"
)

type WatchEvents struct {
Expand Down Expand Up @@ -593,18 +590,49 @@ func NewTenantAwareInventoryClient(
return cl, nil
}

// getEnvIntOrDefault retrieves an environment variable as int, or returns the default if not set or invalid.
func getEnvIntOrDefault(envVar string, defaultVal int) int {
valStr := os.Getenv(envVar)
if valStr == "" {
zlog.InfraSec().Warn().Msgf("%s env variable is not set, using default value", envVar)
return defaultVal
}
val, err := strconv.Atoi(valStr)
if err != nil {
zlog.InfraSec().Warn().Msgf("Invalid value for %s env variable: %s, using default value %d",
envVar, valStr, defaultVal,
)
return defaultVal
}
return val
}

// Set up the heartbeat ticker to keep the client connection alive.
func (client *inventoryClient) heartbeat(clientUUID string) error {
heatbeatBackoffRetries := getEnvIntOrDefault(EnvHeatbeatBackoffRetries, defaultHeatbeatBackoffRetries)
heatbeatBackoffInterval := getEnvIntOrDefault(EnvHeatbeatBackoffInterval, defaultHeatbeatBackoffInterval)
heartbeatInterval := getEnvIntOrDefault(EnvHeartbeatInterval, defaultHeartbeatInterval)

heartbeetReq := &inv_v1.HeartbeatRequest{
ClientUuid: clientUUID,
}

ticker := time.NewTicker(*heartbeatInterval)
heartbeatIntervalDuration := time.Duration(heartbeatInterval) * time.Second
heatbeatBackoffIntervalDuration := time.Duration(heatbeatBackoffInterval) * time.Second
heatbeatBackoffRetriesUint64, err := util.IntToUint64(heatbeatBackoffRetries)
if err != nil {
zlog.InfraSec().Warn().Msgf("Invalid value for %s env variable: %s, using default value %d",
EnvHeatbeatBackoffRetries, strconv.Itoa(heatbeatBackoffRetries), defaultHeatbeatBackoffRetries,
)
heatbeatBackoffRetriesUint64 = uint64(defaultHeatbeatBackoffRetries)
}

ticker := time.NewTicker(heartbeatIntervalDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
clientCtxDeadline, cancel := context.WithDeadline(client.streamCtx, time.Now().Add(*heatbeatBackoffInterval))
clientCtxDeadline, cancel := context.WithDeadline(client.streamCtx, time.Now().Add(heatbeatBackoffIntervalDuration))
clientCtx := tenant.AddTenantIDToContext(clientCtxDeadline, clientUUID)

err := backoff.Retry(func() error {
Expand All @@ -616,7 +644,7 @@ func (client *inventoryClient) heartbeat(clientUUID string) error {
return backoff.Permanent(errHearbeat)
}
return errHearbeat
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(*heatbeatBackoffInterval), *heatbeatBackoffRetries))
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(heatbeatBackoffIntervalDuration), heatbeatBackoffRetriesUint64))
if err != nil {
zlog.InfraErr(err).Msgf("failed to heartbeat client UUID: %s", clientUUID)
cancel() // cancel the context to avoid leaking resources
Expand Down
Loading