Skip to content

Commit 95d4007

Browse files
authored
Implement timeout for whole metrics send function (#1400)
* Implement timeout for whole metrics send function I saw occasional slowness from the mist GetState function. The mist client has a 1 minute timeout and we need a much shorter timeout here as the loop runs every 5 seconds * Fix build * Fix build * Use context.WithTimeout * Reduce mist client timeout for catabalancer * review comment: add comment
1 parent 0ce0de8 commit 95d4007

File tree

3 files changed

+69
-60
lines changed

3 files changed

+69
-60
lines changed

balancer/catabalancer/catalyst_balancer.go

+53-48
Original file line numberDiff line numberDiff line change
@@ -411,70 +411,75 @@ func (c *CataBalancer) MistUtilLoadSource(ctx context.Context, streamID, lat, lo
411411
return "", fmt.Errorf("catabalancer no node found for ingest stream: %s stale: false", streamID)
412412
}
413413

414-
var updateNodeStatsEvery = 5 * time.Second
414+
var StatsUpdateInterval = 5 * time.Second
415+
var StatsUpdateTimeout = StatsUpdateInterval - 500*time.Millisecond // have the timeout sit within the update interval so we don't miss sending updates
415416

416417
func isStale(timestamp time.Time, stale time.Duration) bool {
417418
return time.Since(timestamp) >= stale
418419
}
419420

420421
func StartMetricSending(nodeName string, latitude float64, longitude float64, mist clients.MistAPIClient, nodeStatsDB *sql.DB) {
421-
ticker := time.NewTicker(updateNodeStatsEvery)
422+
ticker := time.NewTicker(StatsUpdateInterval)
422423
go func() {
423424
for range ticker.C {
424-
start := time.Now()
425-
sysusage, err := GetSystemUsage()
426-
if err != nil {
427-
log.LogNoRequestID("catabalancer failed to get sys usage", "err", err)
428-
continue
429-
}
425+
sendMetrics(nodeName, latitude, longitude, mist, nodeStatsDB)
426+
}
427+
}()
428+
}
430429

431-
event := NodeUpdateEvent{
432-
Resource: "nodeUpdate",
433-
NodeID: nodeName,
434-
NodeMetrics: NodeMetrics{
435-
CPUUsagePercentage: sysusage.CPUUsagePercentage,
436-
RAMUsagePercentage: sysusage.RAMUsagePercentage,
437-
BandwidthUsagePercentage: sysusage.BWUsagePercentage,
438-
LoadAvg: sysusage.LoadAvg.Load5Min,
439-
GeoLatitude: latitude,
440-
GeoLongitude: longitude,
441-
Timestamp: time.Now(),
442-
},
443-
}
430+
func sendMetrics(nodeName string, latitude float64, longitude float64, mist clients.MistAPIClient, nodeStatsDB *sql.DB) {
431+
start := time.Now()
432+
sysusage, err := GetSystemUsage()
433+
if err != nil {
434+
log.LogNoRequestID("catabalancer failed to get sys usage", "err", err)
435+
return
436+
}
444437

445-
if mist != nil {
446-
mistState, err := mist.GetState()
447-
if err != nil {
448-
log.LogNoRequestID("catabalancer failed to get mist state", "err", err)
449-
continue
450-
}
451-
452-
var nonIngestStreams, ingestStreams []string
453-
for streamID := range mistState.ActiveStreams {
454-
if mistState.IsIngestStream(streamID) {
455-
ingestStreams = append(ingestStreams, streamID)
456-
} else {
457-
nonIngestStreams = append(nonIngestStreams, streamID)
458-
}
459-
}
460-
event.SetStreams(nonIngestStreams, ingestStreams)
461-
}
438+
event := NodeUpdateEvent{
439+
Resource: "nodeUpdate",
440+
NodeID: nodeName,
441+
NodeMetrics: NodeMetrics{
442+
CPUUsagePercentage: sysusage.CPUUsagePercentage,
443+
RAMUsagePercentage: sysusage.RAMUsagePercentage,
444+
BandwidthUsagePercentage: sysusage.BWUsagePercentage,
445+
LoadAvg: sysusage.LoadAvg.Load5Min,
446+
GeoLatitude: latitude,
447+
GeoLongitude: longitude,
448+
Timestamp: time.Now(),
449+
},
450+
}
462451

463-
payload, err := json.Marshal(event)
464-
if err != nil {
465-
log.LogNoRequestID("catabalancer failed to marhsal node update", "err", err)
466-
continue
467-
}
468-
sendMetrics(nodeStatsDB, nodeName, payload)
452+
if mist != nil {
453+
mistState, err := mist.GetState()
454+
if err != nil {
455+
log.LogNoRequestID("catabalancer failed to get mist state", "err", err)
456+
return
457+
}
469458

470-
metrics.Metrics.CatabalancerSendMetricDurationSec.Observe(time.Since(start).Seconds())
459+
var nonIngestStreams, ingestStreams []string
460+
for streamID := range mistState.ActiveStreams {
461+
if mistState.IsIngestStream(streamID) {
462+
ingestStreams = append(ingestStreams, streamID)
463+
} else {
464+
nonIngestStreams = append(nonIngestStreams, streamID)
465+
}
471466
}
472-
}()
467+
event.SetStreams(nonIngestStreams, ingestStreams)
468+
}
469+
470+
payload, err := json.Marshal(event)
471+
if err != nil {
472+
log.LogNoRequestID("catabalancer failed to marhsal node update", "err", err)
473+
return
474+
}
475+
sendMetricsToDB(nodeStatsDB, nodeName, payload)
476+
477+
metrics.Metrics.CatabalancerSendMetricDurationSec.Observe(time.Since(start).Seconds())
473478
}
474479

475-
func sendMetrics(nodeStatsDB *sql.DB, nodeName string, payload []byte) {
480+
func sendMetricsToDB(nodeStatsDB *sql.DB, nodeName string, payload []byte) {
476481
start := time.Now()
477-
queryContext, cancel := context.WithTimeout(context.Background(), updateNodeStatsEvery)
482+
queryContext, cancel := context.WithTimeout(context.Background(), StatsUpdateTimeout)
478483
defer cancel()
479484
insertStatement := `insert into "node_stats"(
480485
"node_id",

clients/mist_client.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,21 @@ type MistClient struct {
5050
HttpReqUrl string
5151
configMu sync.Mutex
5252
cache *cache.Cache
53+
httpClient *http.Client
5354
}
5455

55-
func NewMistAPIClient(user, password, host string, port int) MistAPIClient {
56+
const MistClientTimeout = 1 * time.Minute
57+
58+
func NewMistAPIClient(user, password, host string, port int, timeout time.Duration) MistAPIClient {
59+
if timeout == 0 {
60+
timeout = MistClientTimeout
61+
}
5662
mist := &MistClient{
57-
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
58-
Username: user,
59-
Password: password,
60-
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
63+
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
64+
Username: user,
65+
Password: password,
66+
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
67+
httpClient: newRetryableClient(&http.Client{Timeout: timeout}),
6168
}
6269
return mist
6370
}
@@ -238,10 +245,6 @@ type MistPushStats struct {
238245
Tracks []int `json:"tracks"`
239246
}
240247

241-
const MIST_CLIENT_TIMEOUT = 1 * time.Minute
242-
243-
var mistRetryableClient = newRetryableClient(&http.Client{Timeout: MIST_CLIENT_TIMEOUT})
244-
245248
func (mc *MistClient) AddStream(streamName, sourceUrl string) error {
246249
c := commandAddStream(streamName, sourceUrl)
247250
return wrapErr(validateAddStream(mc.sendCommand(c)), streamName)
@@ -405,7 +408,7 @@ func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) {
405408
return "", err
406409
}
407410
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
408-
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mistRetryableClient, req)
411+
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mc.httpClient, req)
409412
if err != nil {
410413
return "", err
411414
}
@@ -437,7 +440,7 @@ func (mc *MistClient) sendHttpRequest(streamName string) (string, error) {
437440
return "", err
438441
}
439442
req.Header.Add("Content-Type", "application/json")
440-
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mistRetryableClient, req)
443+
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mc.httpClient, req)
441444
if err != nil {
442445
return "", err
443446
}

main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func main() {
227227
}
228228

229229
if cli.MistEnabled {
230-
mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort)
230+
mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort, 0)
231231
}
232232

233233
catabalancerEnabled := balancer.CombinedBalancerEnabled(cli.CataBalancer)
@@ -267,6 +267,7 @@ func main() {
267267

268268
if catabalancerEnabled && nodeStatsDB != nil {
269269
if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes
270+
mist := clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort, catabalancer.StatsUpdateTimeout)
270271
catabalancer.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, mist, nodeStatsDB)
271272
}
272273
}

0 commit comments

Comments
 (0)