-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: adding kata integration #47816
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
TheRayquaza
wants to merge
3
commits into
main
Choose a base branch
from
ml/kata-integration
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,334 @@ | ||
| // Unless explicitly stated otherwise all files in this repository are licensed | ||
| // under the Apache License Version 2.0. | ||
| // This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| // Copyright 2016-present Datadog, Inc. | ||
|
|
||
| //go:build linux | ||
|
|
||
| // Package kata implements the kata_containers check. | ||
| package kata | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "fmt" | ||
| "io" | ||
| "maps" | ||
| "net" | ||
| "net/http" | ||
| "os" | ||
| "path/filepath" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| yaml "go.yaml.in/yaml/v2" | ||
|
|
||
| "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" | ||
| tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" | ||
| taggertypes "github.com/DataDog/datadog-agent/comp/core/tagger/types" | ||
| workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" | ||
| "github.com/DataDog/datadog-agent/pkg/aggregator/sender" | ||
| "github.com/DataDog/datadog-agent/pkg/collector/check" | ||
| core "github.com/DataDog/datadog-agent/pkg/collector/corechecks" | ||
| "github.com/DataDog/datadog-agent/pkg/metrics/servicecheck" | ||
| "github.com/DataDog/datadog-agent/pkg/util/option" | ||
| prom "github.com/DataDog/datadog-agent/pkg/util/prometheus" | ||
| ) | ||
|
|
||
| const ( | ||
| // CheckName is the name of the check | ||
| CheckName = "kata_containers" | ||
|
|
||
| shimSocket = "shim-monitor.sock" | ||
| shimDialTimeout = 5 * time.Second | ||
| shimReadTimeout = 10 * time.Second | ||
| defaultScrapeInterval = 15 * time.Second | ||
| ) | ||
|
|
||
| var defaultSandboxStoragePaths = []string{"/run/vc/sbs", "/run/kata"} | ||
| var defaultRenameLabels = map[string]string{"version": "go_version"} | ||
|
|
||
| // KataConfig holds the check instance configuration | ||
| type KataConfig struct { | ||
| SandboxStoragePaths []string `yaml:"sandbox_storage_paths"` | ||
| RenameLabels map[string]string `yaml:"rename_labels"` | ||
| ExcludeLabels []string `yaml:"exclude_labels"` | ||
| Tags []string `yaml:"tags"` | ||
| } | ||
|
|
||
| // KataCheck collects metrics from Kata Containers sandboxes | ||
| type KataCheck struct { | ||
| core.CheckBase | ||
| instance *KataConfig | ||
| tagger tagger.Component | ||
| store workloadmeta.Component | ||
| excludeSet map[string]struct{} // built once at Configure time | ||
|
|
||
| mu sync.RWMutex | ||
| sandboxContainerID map[string]string // sandboxID -> containerID, updated by workloadmeta events | ||
|
|
||
| stopOnce sync.Once | ||
| stopCh chan struct{} | ||
| } | ||
|
|
||
| // Factory returns a check factory for kata_containers | ||
| func Factory(store workloadmeta.Component, tagger tagger.Component) option.Option[func() check.Check] { | ||
| return option.New(func() check.Check { | ||
| return core.NewLongRunningCheckWrapper(&KataCheck{ | ||
| CheckBase: core.NewCheckBase(CheckName), | ||
| instance: &KataConfig{}, | ||
| tagger: tagger, | ||
| store: store, | ||
| sandboxContainerID: make(map[string]string), | ||
| stopCh: make(chan struct{}), | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| // Parse parses the KataConfig and sets defaults | ||
| func (c *KataConfig) Parse(data []byte) error { | ||
| c.SandboxStoragePaths = defaultSandboxStoragePaths | ||
| c.RenameLabels = maps.Clone(defaultRenameLabels) | ||
|
|
||
| if err := yaml.Unmarshal(data, c); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if len(c.SandboxStoragePaths) == 0 { | ||
| c.SandboxStoragePaths = defaultSandboxStoragePaths | ||
| } | ||
| if c.RenameLabels == nil { | ||
| c.RenameLabels = maps.Clone(defaultRenameLabels) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Configure parses the check configuration | ||
| func (c *KataCheck) Configure(senderManager sender.SenderManager, _ uint64, config, initConfig integration.Data, source string) error { | ||
| if err := c.CommonConfigure(senderManager, initConfig, config, source); err != nil { | ||
| return err | ||
| } | ||
| if err := c.instance.Parse(config); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| c.excludeSet = make(map[string]struct{}, len(c.instance.ExcludeLabels)) | ||
| for _, l := range c.instance.ExcludeLabels { | ||
| c.excludeSet[l] = struct{}{} | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Run is the long-running event loop: it subscribes to workloadmeta container | ||
| // events to maintain a sandboxID→containerID cache, and periodically scrapes | ||
| // all discovered Kata shim sockets. | ||
| func (c *KataCheck) Run() error { | ||
| filter := workloadmeta.NewFilterBuilder(). | ||
| AddKind(workloadmeta.KindContainer). | ||
| Build() | ||
| containerEventsCh := c.store.Subscribe(CheckName, workloadmeta.NormalPriority, filter) | ||
| defer c.store.Unsubscribe(containerEventsCh) | ||
|
|
||
| ticker := time.NewTicker(defaultScrapeInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case eventBundle, ok := <-containerEventsCh: | ||
| if !ok { | ||
| return nil | ||
| } | ||
| c.processContainerEvents(eventBundle) | ||
| case <-ticker.C: | ||
| c.runScrape() | ||
| case <-c.stopCh: | ||
| return nil | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Stop signals the Run loop to exit. | ||
| func (c *KataCheck) Stop() { | ||
| c.stopOnce.Do(func() { close(c.stopCh) }) | ||
| } | ||
|
|
||
| // processContainerEvents updates the sandboxContainerID cache from a workloadmeta event bundle. | ||
| func (c *KataCheck) processContainerEvents(eventBundle workloadmeta.EventBundle) { | ||
| defer eventBundle.Acknowledge() | ||
|
|
||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| for _, event := range eventBundle.Events { | ||
| ctr, ok := event.Entity.(*workloadmeta.Container) | ||
| if !ok || ctr.SandboxID == "" { | ||
| continue | ||
TheRayquaza marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| switch event.Type { | ||
| case workloadmeta.EventTypeSet: | ||
| c.sandboxContainerID[ctr.SandboxID] = ctr.ID | ||
TheRayquaza marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| case workloadmeta.EventTypeUnset: | ||
| delete(c.sandboxContainerID, ctr.SandboxID) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // runScrape discovers sandboxes and scrapes each one. | ||
| func (c *KataCheck) runScrape() { | ||
| s, err := c.GetSender() | ||
| if err != nil { | ||
| _ = c.Warnf("kata_containers: failed to get sender: %v", err) | ||
| return | ||
| } | ||
| defer s.Commit() | ||
|
|
||
| sandboxes := c.discoverSandboxes() | ||
|
|
||
| s.Gauge("kata.running_shim_count", float64(len(sandboxes)), "", c.instance.Tags) | ||
|
|
||
| for sandboxID, socketPath := range sandboxes { | ||
| baseTags := c.buildBaseTags(sandboxID) | ||
| c.scrapeSandbox(s, sandboxID, socketPath, baseTags) | ||
| } | ||
| } | ||
|
|
||
| // buildBaseTags returns sandbox_id tag plus any orchestrator tags from the tagger. | ||
| func (c *KataCheck) buildBaseTags(sandboxID string) []string { | ||
| tags := []string{"sandbox_id:" + sandboxID} | ||
|
|
||
| c.mu.RLock() | ||
| containerID, ok := c.sandboxContainerID[sandboxID] | ||
| c.mu.RUnlock() | ||
|
|
||
| if ok { | ||
| entityID := taggertypes.NewEntityID(taggertypes.ContainerID, containerID) | ||
| if taggerTags, err := c.tagger.Tag(entityID, taggertypes.OrchestratorCardinality); err == nil { | ||
| tags = append(tags, taggerTags...) | ||
| } | ||
| } | ||
|
|
||
| return tags | ||
| } | ||
|
|
||
| // discoverSandboxes scans sandbox storage paths and returns a map of sandboxID → socketPath | ||
| func (c *KataCheck) discoverSandboxes() map[string]string { | ||
| sandboxes := make(map[string]string) | ||
|
|
||
| for _, basePath := range c.instance.SandboxStoragePaths { | ||
| if _, err := os.Stat(basePath); os.IsNotExist(err) { | ||
| continue | ||
| } | ||
|
|
||
| entries, err := os.ReadDir(basePath) | ||
| if err != nil { | ||
| _ = c.Warnf("kata_containers: failed to read directory %s: %v", basePath, err) | ||
| continue | ||
| } | ||
|
|
||
| for _, entry := range entries { | ||
| if !entry.IsDir() { | ||
| continue | ||
| } | ||
| socketPath := filepath.Join(basePath, entry.Name(), shimSocket) | ||
| if _, err := os.Stat(socketPath); err == nil { | ||
| sandboxes[entry.Name()] = socketPath | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return sandboxes | ||
| } | ||
|
|
||
| // scrapeSandbox scrapes Prometheus metrics from a single sandbox's shim socket. | ||
| // It dials the unix socket directly and issues a raw HTTP/1.1 GET. | ||
| // https://github.com/kata-containers/kata-containers/blob/main/docs/design/kata-2-0-metrics.md#metrics-architecture | ||
| func (c *KataCheck) scrapeSandbox(s sender.Sender, sandboxID, socketPath string, baseTags []string) { | ||
| conn, err := net.DialTimeout("unix", socketPath, shimDialTimeout) | ||
| if err != nil { | ||
| s.ServiceCheck("kata.openmetrics.health", servicecheck.ServiceCheckCritical, "", baseTags, | ||
| fmt.Sprintf("failed to connect to sandbox %s: %v", sandboxID, err)) | ||
| return | ||
| } | ||
| defer conn.Close() | ||
| conn.SetDeadline(time.Now().Add(shimReadTimeout)) //nolint:errcheck | ||
|
|
||
| fmt.Fprintf(conn, "GET /metrics HTTP/1.0\r\nHost: local\r\n\r\n") //nolint:errcheck | ||
|
|
||
| resp, err := http.ReadResponse(bufio.NewReader(conn), nil) | ||
| if err != nil { | ||
| s.ServiceCheck("kata.openmetrics.health", servicecheck.ServiceCheckCritical, "", baseTags, | ||
| fmt.Sprintf("failed to read response from sandbox %s: %v", sandboxID, err)) | ||
| return | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| body, err := io.ReadAll(resp.Body) | ||
| if err != nil { | ||
| s.ServiceCheck("kata.openmetrics.health", servicecheck.ServiceCheckCritical, "", baseTags, | ||
| fmt.Sprintf("failed to read body from sandbox %s: %v", sandboxID, err)) | ||
| return | ||
| } | ||
|
|
||
| families, err := prom.ParseMetrics(body) | ||
| if err != nil { | ||
| s.ServiceCheck("kata.openmetrics.health", servicecheck.ServiceCheckCritical, "", baseTags, | ||
| fmt.Sprintf("failed to parse metrics from sandbox %s: %v", sandboxID, err)) | ||
| return | ||
| } | ||
|
|
||
| for _, family := range families { | ||
| for _, sample := range family.Samples { | ||
| rawName := sample.Metric["__name__"] | ||
| if rawName == "" { | ||
| rawName = family.Name | ||
| } | ||
| metricName := formatMetricName(rawName) | ||
| tags := c.buildSampleTags(baseTags, sample.Metric) | ||
|
|
||
| switch strings.ToUpper(family.Type) { | ||
| case "COUNTER": | ||
| s.Rate(metricName, sample.Value, "", tags) | ||
| default: // GAUGE, HISTOGRAM, SUMMARY, UNTYPED | ||
| s.Gauge(metricName, sample.Value, "", tags) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| s.ServiceCheck("kata.openmetrics.health", servicecheck.ServiceCheckOK, "", baseTags, "") | ||
| } | ||
|
|
||
| // formatMetricName converts a raw Prometheus metric name to a Datadog metric name. | ||
| // "kata_hypervisor_fds" -> "kata.hypervisor.fds" | ||
| func formatMetricName(rawName string) string { | ||
| name := rawName | ||
| if after, ok := strings.CutPrefix(name, "kata_"); ok { | ||
| name = after | ||
| } | ||
| name = strings.ReplaceAll(name, "_", ".") | ||
| return "kata." + name | ||
| } | ||
|
|
||
| // buildSampleTags builds the full tag list for a metric sample from pre-resolved baseTags. | ||
| func (c *KataCheck) buildSampleTags(baseTags []string, metric prom.Metric) []string { | ||
| tags := make([]string, len(baseTags), len(baseTags)+len(metric)+len(c.instance.Tags)) | ||
| copy(tags, baseTags) | ||
| tags = append(tags, c.instance.Tags...) | ||
|
|
||
| for k, v := range metric { | ||
| if k == "__name__" { | ||
| continue | ||
| } | ||
| if _, excluded := c.excludeSet[k]; excluded { | ||
| continue | ||
| } | ||
| labelName := k | ||
| if renamed, ok := c.instance.RenameLabels[k]; ok { | ||
| labelName = renamed | ||
| } | ||
| tags = append(tags, labelName+":"+v) | ||
| } | ||
|
|
||
| return tags | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParseassignsdefaultSandboxStoragePathsdirectly toc.SandboxStoragePaths, so YAML unmarshalling can reuse and mutate the shared backing array. If one check instance sets a customsandbox_storage_paths, later instances that rely on defaults can inherit those mutated paths and stop discovering Kata sandboxes in/run/vc/sbs//run/kata. This creates cross-instance config leakage and incorrect runtime behavior.Useful? React with 👍 / 👎.