Skip to content

Commit ddb63d5

Browse files
Increase timeout period for collecting metrics (#755)
* added parallel calls for getting plus stats to speed up the metrics retrieval process
1 parent 594da08 commit ddb63d5

File tree

10 files changed

+868
-27
lines changed

10 files changed

+868
-27
lines changed

src/core/metrics/sources/nginx_plus.go

Lines changed: 357 additions & 3 deletions
Large diffs are not rendered by default.

src/core/metrics/sources/nginx_plus_test.go

Lines changed: 139 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ const (
5454
currentPeer2UpstreamHeaderTime = 80
5555
currentPeer1UpstreamResponseTime = 100
5656
currentPeer2UpstreamResponseTime = 80
57-
currentUpstreamResponseTime = 100
58-
currentUpstreamConnectTime = 80
5957
currentUpstreamFirstByteTime = 50
6058
previousUpstreamHeaderTime = 98
6159
previousUpstreamResponseTime = 98
@@ -97,14 +95,28 @@ const (
9795
workerProcessID = 12345
9896
)
9997

100-
type FakeNginxPlus struct {
101-
*NginxPlus
102-
}
103-
104-
// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api
105-
func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
106-
defer wg.Done()
107-
stats := plusclient.Stats{
98+
var (
99+
availableZones = []string{"server_zones", "upstreams", "limit_conns", "zone_sync"}
100+
stats = plusclient.Stats{
101+
StreamZoneSync: &plusclient.StreamZoneSync{
102+
Zones: map[string]plusclient.SyncZone{
103+
"0": {
104+
RecordsPending: 1,
105+
RecordsTotal: 2,
106+
},
107+
"1": {
108+
RecordsPending: 3,
109+
RecordsTotal: 4,
110+
},
111+
},
112+
Status: plusclient.StreamZoneSyncStatus{
113+
BytesIn: 1,
114+
MsgsIn: 2,
115+
MsgsOut: 3,
116+
BytesOut: 4,
117+
NodesOnline: 5,
118+
},
119+
},
108120
HTTPRequests: plusclient.HTTPRequests{
109121
Total: currentHTTPRequestTotal,
110122
Current: currentHTTPRequestCurrent,
@@ -387,8 +399,7 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<
387399
},
388400
},
389401
}
390-
391-
prevStats := plusclient.Stats{
402+
prevStats = plusclient.Stats{
392403
HTTPRequests: plusclient.HTTPRequests{
393404
Total: previousHTTPRequestTotal,
394405
Current: previousHTTPRequestCurrent,
@@ -547,6 +558,15 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<
547558
},
548559
},
549560
}
561+
)
562+
563+
type FakeNginxPlus struct {
564+
*NginxPlus
565+
}
566+
567+
// Collect is fake collector that hard codes a stats struct response to avoid dependency on external NGINX Plus api
568+
func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) {
569+
defer wg.Done()
550570

551571
f.baseDimensions.NginxType = "plus"
552572
f.baseDimensions.PublishedAPI = f.plusAPI
@@ -556,6 +576,113 @@ func (f *FakeNginxPlus) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<
556576
f.sendMetrics(ctx, m, f.collectMetrics(&stats, &prevStats)...)
557577
}
558578

579+
var _ Client = (*MockClient)(nil)
580+
581+
type MockClient struct{}
582+
583+
func (m *MockClient) GetAvailableEndpoints() ([]string, error) {
584+
return []string{"stream"}, nil
585+
}
586+
587+
func (m *MockClient) GetAvailableStreamEndpoints() ([]string, error) {
588+
return availableZones, nil
589+
}
590+
591+
func (m *MockClient) GetStreamServerZones() (*plusclient.StreamServerZones, error) {
592+
return &stats.StreamServerZones, nil
593+
}
594+
595+
func (m *MockClient) GetStreamUpstreams() (*plusclient.StreamUpstreams, error) {
596+
return &stats.StreamUpstreams, nil
597+
}
598+
599+
func (m *MockClient) GetStreamConnectionsLimit() (*plusclient.StreamLimitConnections, error) {
600+
return &stats.StreamLimitConnections, nil
601+
}
602+
603+
func (m *MockClient) GetStreamZoneSync() (*plusclient.StreamZoneSync, error) {
604+
return &plusclient.StreamZoneSync{
605+
Zones: stats.StreamZoneSync.Zones,
606+
Status: stats.StreamZoneSync.Status,
607+
}, nil
608+
}
609+
610+
func (m *MockClient) GetNginxInfo() (*plusclient.NginxInfo, error) {
611+
return &stats.NginxInfo, nil
612+
}
613+
614+
func (m *MockClient) GetCaches() (*plusclient.Caches, error) {
615+
return &stats.Caches, nil
616+
}
617+
618+
func (m *MockClient) GetProcesses() (*plusclient.Processes, error) {
619+
return &stats.Processes, nil
620+
}
621+
622+
func (m *MockClient) GetSlabs() (*plusclient.Slabs, error) {
623+
return &stats.Slabs, nil
624+
}
625+
626+
func (m *MockClient) GetConnections() (*plusclient.Connections, error) {
627+
return &stats.Connections, nil
628+
}
629+
630+
func (m *MockClient) GetHTTPRequests() (*plusclient.HTTPRequests, error) {
631+
return &stats.HTTPRequests, nil
632+
}
633+
634+
func (m *MockClient) GetSSL() (*plusclient.SSL, error) {
635+
return &stats.SSL, nil
636+
}
637+
638+
func (m *MockClient) GetServerZones() (*plusclient.ServerZones, error) {
639+
return &stats.ServerZones, nil
640+
}
641+
642+
func (m *MockClient) GetUpstreams() (*plusclient.Upstreams, error) {
643+
return &stats.Upstreams, nil
644+
}
645+
646+
func (m *MockClient) GetLocationZones() (*plusclient.LocationZones, error) {
647+
return &stats.LocationZones, nil
648+
}
649+
650+
func (m *MockClient) GetResolvers() (*plusclient.Resolvers, error) {
651+
return &stats.Resolvers, nil
652+
}
653+
654+
func (m *MockClient) GetHTTPLimitReqs() (*plusclient.HTTPLimitRequests, error) {
655+
return &stats.HTTPLimitRequests, nil
656+
}
657+
658+
func (m *MockClient) GetHTTPConnectionsLimit() (*plusclient.HTTPLimitConnections, error) {
659+
return &stats.HTTPLimitConnections, nil
660+
}
661+
662+
func (m *MockClient) GetWorkers() ([]*plusclient.Workers, error) {
663+
return stats.Workers, nil
664+
}
665+
666+
func TestGetStats(t *testing.T) {
667+
client := &MockClient{}
668+
669+
source := NewNginxPlus(nil, "", "", "", 9)
670+
671+
tests := []struct {
672+
stats plusclient.Stats
673+
}{
674+
{
675+
stats: stats,
676+
},
677+
}
678+
679+
for _, test := range tests {
680+
resultStats, err := source.getStats(client)
681+
require.NoError(t, err)
682+
assert.Equal(t, test.stats, *resultStats)
683+
}
684+
}
685+
559686
func TestNginxPlusUpdate(t *testing.T) {
560687
nginxPlus := NewNginxPlus(&metrics.CommonDim{}, "test", PlusNamespace, "http://localhost:8080/api", 6)
561688

src/core/nginx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (n *NginxBinaryType) GetNginxDetailsFromProcess(nginxProcess *Process) *pro
218218
if urlsLength == 0 || nginxStatus == "" {
219219
stubStatusApiUrl, err := sdk.GetStubStatusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives)
220220
if err != nil {
221-
log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure aStub Status API to get NGINX OSS metrics: %v", err)
221+
log.Tracef("Unable to get Stub Status API URL from the configuration: NGINX OSS metrics will be unavailable for this system. please configure a Stub Status API to get NGINX OSS metrics: %v", err)
222222
}
223223

224224
nginxPlusApiUrl, err := sdk.GetNginxPlusApiUrl(nginxDetailsFacade.ConfPath, n.config.IgnoreDirectives)

src/plugins/metrics.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,8 @@ func (m *Metrics) metricsGoroutine() {
211211
}
212212

213213
func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) {
214-
// setups a collect duration of half-time of the poll interval
215-
ctx, cancel := context.WithTimeout(m.ctx, m.interval/2)
214+
// set a timeout for a millisecond less than the collection interval
215+
ctx, cancel := context.WithTimeout(m.ctx, (m.interval - 1*time.Millisecond))
216216
defer cancel()
217217
// locks the m.collectors to make sure it doesn't get deleted in the middle
218218
// of collection, as we will delete the old one if config changes.
@@ -232,7 +232,9 @@ func (m *Metrics) collectStats() (stats []*metrics.StatsEntityWrapper) {
232232
// drain the buf, since our sources/collectors are all done, we can rely on buffer length
233233
select {
234234
case <-ctx.Done():
235-
err := m.ctx.Err()
235+
log.Debugf("context done in %s collectStats", time.Since(start))
236+
237+
err := ctx.Err()
236238
if err != nil {
237239
log.Errorf("error in done context collectStats %v", err)
238240
}

src/plugins/metrics_throlling.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W
154154
return
155155
case <-r.ticker.C:
156156
reports := r.getAggregatedReports()
157+
log.Debugf("metricsThrottle: metricsReportGoroutine, got %d reports to send", len(reports))
157158
if len(reports) > 0 {
158159
r.messagePipeline.Process(core.NewMessage(core.CommMetrics, reports))
159160
}

test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)