Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 43 additions & 15 deletions inventory/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package client
import (
"context"
"errors"
"flag"
"io"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -66,18 +67,14 @@ const (
)

const (
// Default values for heartbeat related configurations.
defaultHeatbeatBackoffRetries = 3
defaultHeatbeatBackoffInterval = time.Second * 1
defaultHeartbeatInterval = time.Second * 30
)

var (
heatbeatBackoffRetries = flag.Uint64(
"heatbeatBackoffRetries", defaultHeatbeatBackoffRetries, "The number of retries the heartbeat backoff will attempt")
heatbeatBackoffInterval = flag.Duration(
"heatbeatBackoffInterval", defaultHeatbeatBackoffInterval, "The interval between heartbeat backoff retries")
heartbeatInterval = flag.Duration(
"heartbeatInterval", defaultHeartbeatInterval, "The interval between heartbeats")
defaultHeatbeatBackoffInterval = 1
defaultHeartbeatInterval = 30
// Defines heartbeat related environment variables.
EnvHeatbeatBackoffRetries = "HEARTBEAT_BACKOFF_RETRIES"
EnvHeatbeatBackoffInterval = "HEARTBEAT_BACKOFF_INTERVAL"
EnvHeartbeatInterval = "HEARTBEAT_INTERVAL"
)

type WatchEvents struct {
Expand Down Expand Up @@ -593,18 +590,49 @@ func NewTenantAwareInventoryClient(
return cl, nil
}

// getEnvIntOrDefault retrieves an environment variable as int, or returns the default if not set or invalid.
func getEnvIntOrDefault(envVar string, defaultVal int) int {
valStr := os.Getenv(envVar)
if valStr == "" {
zlog.InfraSec().Warn().Msgf("%s env variable is not set, using default value", envVar)
return defaultVal
}
val, err := strconv.Atoi(valStr)
if err != nil {
zlog.InfraSec().Warn().Msgf("Invalid value for %s env variable: %s, using default value %d",
envVar, valStr, defaultVal,
)
return defaultVal
}
return val
}

// Set up the heartbeat ticker to keep the client connection alive.
func (client *inventoryClient) heartbeat(clientUUID string) error {
heatbeatBackoffRetries := getEnvIntOrDefault(EnvHeatbeatBackoffRetries, defaultHeatbeatBackoffRetries)
heatbeatBackoffInterval := getEnvIntOrDefault(EnvHeatbeatBackoffInterval, defaultHeatbeatBackoffInterval)
heartbeatInterval := getEnvIntOrDefault(EnvHeartbeatInterval, defaultHeartbeatInterval)

heartbeetReq := &inv_v1.HeartbeatRequest{
ClientUuid: clientUUID,
}

ticker := time.NewTicker(*heartbeatInterval)
heartbeatIntervalDuration := time.Duration(heartbeatInterval) * time.Second
heatbeatBackoffIntervalDuration := time.Duration(heatbeatBackoffInterval) * time.Second
heatbeatBackoffRetriesUint64, err := util.IntToUint64(heatbeatBackoffRetries)
if err != nil {
zlog.InfraSec().Warn().Msgf("Invalid value for %s env variable: %s, using default value %d",
EnvHeatbeatBackoffRetries, strconv.Itoa(heatbeatBackoffRetries), defaultHeatbeatBackoffRetries,
)
heatbeatBackoffRetriesUint64 = uint64(defaultHeatbeatBackoffRetries)
}

ticker := time.NewTicker(heartbeatIntervalDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
clientCtxDeadline, cancel := context.WithDeadline(client.streamCtx, time.Now().Add(*heatbeatBackoffInterval))
clientCtxDeadline, cancel := context.WithDeadline(client.streamCtx, time.Now().Add(heatbeatBackoffIntervalDuration))
clientCtx := tenant.AddTenantIDToContext(clientCtxDeadline, clientUUID)

err := backoff.Retry(func() error {
Expand All @@ -616,7 +644,7 @@ func (client *inventoryClient) heartbeat(clientUUID string) error {
return backoff.Permanent(errHearbeat)
}
return errHearbeat
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(*heatbeatBackoffInterval), *heatbeatBackoffRetries))
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(heatbeatBackoffIntervalDuration), heatbeatBackoffRetriesUint64))
if err != nil {
zlog.InfraErr(err).Msgf("failed to heartbeat client UUID: %s", clientUUID)
cancel() // cancel the context to avoid leaking resources
Expand Down
Loading