Skip to content

Commit d742455

Browse files
authored
feat(collector): extend kubernetes collector (#2387)
1 parent 5b22b69 commit d742455

File tree

2 files changed

+105
-35
lines changed

2 files changed

+105
-35
lines changed

collector/benthos/input/kubernetes.go

Lines changed: 74 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@ func kubernetesResourcesInputConfig() *service.ConfigSpec {
2525
return service.NewConfigSpec().
2626
Beta().
2727
Categories("Services").
28-
Summary("List pods in Kubernetes.").
28+
Summary("List resources in Kubernetes.").
2929
Fields(
3030
service.NewStringListField("namespaces").
31-
Description("List of namespaces to list pods from."),
31+
Description("List of namespaces to list resources from."),
32+
service.NewStringEnumField("resource_type", "pod", "node", "persistentvolume", "persistentvolumeclaim").
33+
Description("Type of resource to list.").
34+
Default("pod"),
3235
service.NewStringField("label_selector").
3336
Description("Label selector applied to each list operation.").
3437
Optional(),
@@ -46,6 +49,7 @@ func init() {
4649

4750
type kubernetesResourcesInput struct {
4851
namespaces []string
52+
resourceType string
4953
labelSelector labels.Selector
5054
logger *service.Logger
5155

@@ -63,6 +67,11 @@ func newKubernetesResourcesInput(conf *service.ParsedConfig, logger *service.Log
6367
return nil, err
6468
}
6569

70+
resourceType, err := conf.FieldString("resource_type")
71+
if err != nil {
72+
return nil, err
73+
}
74+
6675
// Normalize the namespaces to lowercase and deduplicate.
6776
namespaces = lo.Uniq(lo.Map(namespaces, func(s string, _ int) string { return strings.ToLower(s) }))
6877

@@ -116,6 +125,7 @@ func newKubernetesResourcesInput(conf *service.ParsedConfig, logger *service.Log
116125
return &kubernetesResourcesInput{
117126
namespaces: namespaces,
118127
labelSelector: selector,
128+
resourceType: resourceType,
119129
manager: mgr,
120130
client: client,
121131
logger: logger,
@@ -153,36 +163,84 @@ func (in *kubernetesResourcesInput) ReadBatch(ctx context.Context) (service.Mess
153163

154164
// Iterate over each namespace and list pods.
155165
for _, ns := range in.namespaces {
156-
podList := &corev1.PodList{}
157166
opts := []client.ListOption{client.InNamespace(ns)}
158167
if in.labelSelector != nil {
159168
opts = append(opts, client.MatchingLabelsSelector{
160169
Selector: in.labelSelector,
161170
})
162171
}
163172

164-
if err := in.client.List(ctx, podList, opts...); err != nil {
165-
return nil, nil, err
166-
}
173+
switch in.resourceType {
174+
case "pod":
175+
podList := &corev1.PodList{}
176+
if err := in.client.List(ctx, podList, opts...); err != nil {
177+
return nil, nil, err
178+
}
179+
180+
for _, pod := range podList.Items {
181+
if !lo.EveryBy(pod.Status.ContainerStatuses, func(cs corev1.ContainerStatus) bool {
182+
return cs.State.Running != nil
183+
}) {
184+
continue
185+
}
186+
187+
encoded, err := json.Marshal(pod)
188+
if err != nil {
189+
return nil, nil, err
190+
}
191+
192+
in.logger.Debugf("adding pod %s to batch", pod.Name)
193+
batch = append(batch, service.NewMessage(encoded))
194+
}
195+
case "node":
196+
nodeList := &corev1.NodeList{}
197+
if err := in.client.List(ctx, nodeList, opts...); err != nil {
198+
return nil, nil, err
199+
}
167200

168-
for _, pod := range podList.Items {
169-
if !lo.EveryBy(pod.Status.ContainerStatuses, func(cs corev1.ContainerStatus) bool {
170-
return cs.State.Running != nil
171-
}) {
172-
continue
201+
for _, node := range nodeList.Items {
202+
encoded, err := json.Marshal(node)
203+
if err != nil {
204+
return nil, nil, err
205+
}
206+
207+
in.logger.Debugf("adding node %s to batch", node.Name)
208+
batch = append(batch, service.NewMessage(encoded))
209+
}
210+
case "persistentvolume":
211+
persistentVolumeList := &corev1.PersistentVolumeList{}
212+
if err := in.client.List(ctx, persistentVolumeList, opts...); err != nil {
213+
return nil, nil, err
173214
}
174215

175-
encoded, err := json.Marshal(pod)
176-
if err != nil {
216+
for _, persistentVolume := range persistentVolumeList.Items {
217+
encoded, err := json.Marshal(persistentVolume)
218+
if err != nil {
219+
return nil, nil, err
220+
}
221+
222+
in.logger.Debugf("adding persistent volume %s to batch", persistentVolume.Name)
223+
batch = append(batch, service.NewMessage(encoded))
224+
}
225+
case "persistentvolumeclaim":
226+
persistentVolumeClaimList := &corev1.PersistentVolumeClaimList{}
227+
if err := in.client.List(ctx, persistentVolumeClaimList, opts...); err != nil {
177228
return nil, nil, err
178229
}
179230

180-
in.logger.Debugf("adding pod %s to batch", pod.Name)
181-
batch = append(batch, service.NewMessage(encoded))
231+
for _, persistentVolumeClaim := range persistentVolumeClaimList.Items {
232+
encoded, err := json.Marshal(persistentVolumeClaim)
233+
if err != nil {
234+
return nil, nil, err
235+
}
236+
237+
in.logger.Debugf("adding persistent volume claim %s to batch", persistentVolumeClaim.Name)
238+
batch = append(batch, service.NewMessage(encoded))
239+
}
182240
}
183241
}
184242

185-
in.logger.Debugf("batch size: %d", len(batch))
243+
in.logger.Debugf("batch size of %s: %d", in.resourceType, len(batch))
186244

187245
return batch, func(context.Context, error) error {
188246
// A nack (when err is non-nil) is handled automatically when we

collector/benthos/input/run_ai.go

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
fieldResourceType = "resource_type"
2424
fieldMetrics = "metrics"
2525
fieldSchedule = "schedule"
26+
fieldMetricsScrapeOffset = "metrics_scrape_offset"
2627
fieldHTTPConfig = "http"
2728
fieldHTTPTimeout = "timeout"
2829
fieldHTTPRetryCount = "retry_count"
@@ -66,6 +67,9 @@ func runAIInputConfig() *service.ConfigSpec {
6667
Description("The cron expression to use for the scrape job.").
6768
Examples("*/30 * * * * *", "@every 30s").
6869
Default("*/30 * * * * *"),
70+
service.NewDurationField(fieldMetricsScrapeOffset).
71+
Description("Indicates how far back in time the scraping window should start to account for delays in metric availability.").
72+
Default("0s"),
6973
service.NewObjectField(fieldHTTPConfig,
7074
service.NewDurationField(fieldHTTPTimeout).
7175
Description("Request timeout.").
@@ -87,6 +91,7 @@ input:
8791
app_id: "${RUNAI_APP_ID:}"
8892
app_secret: "${RUNAI_APP_SECRET:}"
8993
schedule: "${RUNAI_SCRAPE_SCHEDULE:*/30 * * * * *}"
94+
metrics_scrape_offset: "${RUNAI_METRICS_SCRAPE_OFFSET:30s}"
9095
resource_type: "${RUNAI_RESOURCE_TYPE:workload}"
9196
metrics:
9297
- CPU_LIMIT_CORES
@@ -121,15 +126,16 @@ func init() {
121126
var _ service.BatchInput = (*runAIInput)(nil)
122127

123128
type runAIInput struct {
124-
logger *service.Logger
125-
service *runai.Service
126-
resourceType string
127-
metrics []runai.MetricType
128-
interval time.Duration
129-
schedule string
130-
scheduler gocron.Scheduler
131-
store map[time.Time][]runai.ResourceWithMetrics
132-
mu sync.Mutex
129+
logger *service.Logger
130+
service *runai.Service
131+
resourceType string
132+
metrics []runai.MetricType
133+
interval time.Duration
134+
schedule string
135+
metricsScrapeOffset time.Duration
136+
scheduler gocron.Scheduler
137+
store map[time.Time][]runai.ResourceWithMetrics
138+
mu sync.Mutex
133139
}
134140

135141
func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIInput, error) {
@@ -163,6 +169,11 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
163169
return nil, err
164170
}
165171

172+
metricsScrapeOffset, err := conf.FieldDuration(fieldMetricsScrapeOffset)
173+
if err != nil {
174+
return nil, err
175+
}
176+
166177
var interval time.Duration
167178
{
168179
// Create a cron scheduler
@@ -219,12 +230,13 @@ func newRunAIInput(conf *service.ParsedConfig, logger *service.Logger) (*runAIIn
219230
}
220231

221232
return &runAIInput{
222-
logger: logger,
223-
service: service,
224-
resourceType: resourceType,
225-
interval: interval,
226-
schedule: schedule,
227-
scheduler: scheduler,
233+
logger: logger,
234+
service: service,
235+
resourceType: resourceType,
236+
interval: interval,
237+
schedule: schedule,
238+
metricsScrapeOffset: metricsScrapeOffset,
239+
scheduler: scheduler,
228240
metrics: lo.Map(metrics, func(metric string, _ int) runai.MetricType {
229241
return runai.MetricType(metric)
230242
}),
@@ -241,8 +253,8 @@ func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
241253
case "workload":
242254
workloadsWithMetrics, err := in.service.GetAllWorkloadWithMetrics(ctx, runai.MeasurementParams{
243255
MetricType: in.metrics,
244-
StartTime: t.Add(-in.interval),
245-
EndTime: t,
256+
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
257+
EndTime: t.Add(-in.metricsScrapeOffset),
246258
})
247259
if err != nil {
248260
return err
@@ -256,8 +268,8 @@ func (in *runAIInput) scrape(ctx context.Context, t time.Time) error {
256268
case "pod":
257269
podsWithMetrics, err := in.service.GetAllPodWithMetrics(ctx, runai.MeasurementParams{
258270
MetricType: in.metrics,
259-
StartTime: t.Add(-in.interval),
260-
EndTime: t,
271+
StartTime: t.Add(-in.interval).Add(-in.metricsScrapeOffset),
272+
EndTime: t.Add(-in.metricsScrapeOffset),
261273
})
262274
if err != nil {
263275
return err

0 commit comments

Comments
 (0)