Skip to content

Commit b8a6867

Browse files
authored
feat(collector): add http timing metrics and page_size to run_ai input (#2621)
1 parent de8ae9f commit b8a6867

File tree

4 files changed

+53
-7
lines changed

4 files changed

+53
-7
lines changed

collector/benthos/input/run_ai.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package input
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"sync"
78
"time"
89

@@ -24,6 +25,7 @@ const (
2425
fieldMetrics = "metrics"
2526
fieldSchedule = "schedule"
2627
fieldMetricsOffset = "metrics_offset"
28+
fieldPageSize = "page_size"
2729
fieldHTTPConfig = "http"
2830
fieldHTTPTimeout = "timeout"
2931
fieldHTTPRetryCount = "retry_count"
@@ -45,6 +47,9 @@ func runAIInputConfig() *service.ConfigSpec {
4547
service.NewStringEnumField(fieldResourceType, resourceTypes...).
4648
Default("workload").
4749
Description("Run AI resource to collect metrics from."),
50+
service.NewIntField(fieldPageSize).
51+
Description("Run AI page size.").
52+
Default(500),
4853
service.NewStringListField(fieldMetrics).
4954
Description("Run AI metrics to collect.").
5055
Default(lo.Map([]runai.MetricType{
@@ -90,9 +95,10 @@ input:
9095
url: "${RUNAI_URL:}"
9196
app_id: "${RUNAI_APP_ID:}"
9297
app_secret: "${RUNAI_APP_SECRET:}"
93-
schedule: "${RUNAI_SCRAPE_SCHEDULE:*/30 * * * * *}"
94-
metrics_scrape_offset: "${RUNAI_METRICS_SCRAPE_OFFSET:30s}"
95-
resource_type: "${RUNAI_RESOURCE_TYPE:workload}"
98+
schedule: "*/30 * * * * *"
99+
metrics_scrape_offset: "30s"
100+
resource_type: "workload"
101+
page_size: 500
96102
metrics:
97103
- CPU_LIMIT_CORES
98104
- CPU_MEMORY_LIMIT_BYTES
@@ -116,7 +122,15 @@ input:
116122

117123
func init() {
118124
err := service.RegisterBatchInput("run_ai", runAIInputConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) {
119-
return newRunAIInput(conf, mgr.Logger())
125+
httpMetrics := mgr.Metrics().NewTimer("run_ai_http_request", "url")
126+
in, err := newRunAIInput(conf, mgr.Logger(), httpMetrics)
127+
if err != nil {
128+
return nil, err
129+
}
130+
131+
in.timingMetrics = httpMetrics
132+
133+
return in, nil
120134
})
121135
if err != nil {
122136
panic(err)
@@ -136,9 +150,11 @@ type runAIInput struct {
136150
scheduler gocron.Scheduler
137151
store map[time.Time][]runai.ResourceWithMetrics
138152
mu sync.Mutex
153+
154+
timingMetrics *service.MetricTimer
139155
}
140156

141-
func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIInput, error) {
157+
func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger, httpMetrics *service.MetricTimer) (*runAIInput, error) {
142158
url, err := conf.FieldString(fieldURL)
143159
if err != nil {
144160
return nil, err
@@ -174,6 +190,15 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
174190
return nil, err
175191
}
176192

193+
pageSize, err := conf.FieldInt(fieldPageSize)
194+
if err != nil {
195+
return nil, err
196+
}
197+
198+
if pageSize < 100 || pageSize > 500 {
199+
return nil, errors.New("page size must be between 100 and 500")
200+
}
201+
177202
var interval time.Duration
178203
{
179204
// Create a cron scheduler
@@ -219,6 +244,8 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
219244
RetryCount: retryCount,
220245
RetryWaitTime: retryWaitTime,
221246
RetryMaxWaitTime: retryMaxWaitTime,
247+
TimingMetrics: httpMetrics,
248+
PageSize: pageSize,
222249
})
223250
if err != nil {
224251
return nil, err

collector/benthos/input/runai/pods.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (s *Service) ListAllPods(ctx context.Context) ([]Pod, error) {
116116

117117
for {
118118
resp, err := s.ListPods(ctx, ListPodsParams{
119-
Limit: 100,
119+
Limit: s.pageSize,
120120
Offset: len(pods),
121121
})
122122
if err != nil {

collector/benthos/input/runai/service.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package runai
22

33
import (
44
"net/http"
5+
"regexp"
56
"time"
67

78
"github.com/go-resty/resty/v2"
@@ -14,20 +15,24 @@ type Service struct {
1415
appID string
1516
appSecret string
1617
token string
18+
pageSize int
1719
}
1820

1921
type HTTPRequestConfig struct {
2022
Timeout time.Duration
2123
RetryWaitTime time.Duration
2224
RetryMaxWaitTime time.Duration
2325
RetryCount int
26+
PageSize int
27+
TimingMetrics *service.MetricTimer
2428
}
2529

2630
func NewService(baseURL, appID, appSecret string, logger *service.Logger, requestConfig HTTPRequestConfig) (*Service, error) {
2731
service := &Service{
2832
logger: logger,
2933
appID: appID,
3034
appSecret: appSecret,
35+
pageSize: requestConfig.PageSize,
3136
}
3237

3338
client := resty.New().
@@ -59,6 +64,20 @@ func NewService(baseURL, appID, appSecret string, logger *service.Logger, reques
5964
service.SetToken("")
6065
}
6166

67+
if requestConfig.TimingMetrics != nil {
68+
path := response.Request.RawRequest.URL.Path
69+
if matched, err := regexp.MatchString("/api/v1/workloads/[0-9a-f-]+/pods/[0-9a-f-]+/metrics", path); err == nil && matched {
70+
path = "/api/v1/workloads/:workloadID/pods/:podID/metrics"
71+
} else if matched, err := regexp.MatchString("/api/v1/workloads/[0-9a-f-]+/metrics", path); err == nil && matched {
72+
path = "/api/v1/workloads/:workloadID/metrics"
73+
} else if matched, err := regexp.MatchString("/api/v1/pods", path); err == nil && matched {
74+
path = "/api/v1/pods"
75+
} else if matched, err := regexp.MatchString("/api/v1/workloads", path); err == nil && matched {
76+
path = "/api/v1/workloads"
77+
}
78+
requestConfig.TimingMetrics.Timing(response.Time().Nanoseconds(), path)
79+
}
80+
6281
return nil
6382
})
6483

collector/benthos/input/runai/workloads.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (s *Service) ListAllWorkloads(ctx context.Context) ([]Workload, error) {
136136

137137
for {
138138
resp, err := s.ListWorkloads(ctx, ListWorkloadParams{
139-
Limit: 100,
139+
Limit: s.pageSize,
140140
Offset: len(workloads),
141141
})
142142
if err != nil {

0 commit comments

Comments
 (0)