Skip to content

Commit 7a106fb

Browse files
chris-rockclaude
andauthored
🧹 resource watcher improvements (#1369)
* 🧹 Changed the Resource Watcher to use inventory-based K8s API scanning This makes sure we properly identifies scanned resources as live K8s cluster resources. * ⭐️ batch scan specific resources instead of all resources by type Instead of scanning all resources of changed types, now scans only the specific resources that changed using cnspec's k8s-resources filter. - Added K8sResourceIdentifier to track type, namespace, and name - Updated debouncer to collect full resource identifiers - Scanner generates inventory with k8s-resources option for targeted scanning - More efficient: scans only changed resources, not all of a type Fixes #1366 * 🐛 fix resource type pluralization for ingresses Use explicit mapping between plural and singular resource type names instead of naive string manipulation. This fixes scanning for ingresses (ingresses → ingress, not ingresse) and other irregular plurals. - Add resourceTypePluralization map and ToSingular() function - Store plural form in K8sResourceIdentifier.Type - Convert to singular only in String() for cnspec k8s-resources filter - Add tests for pluralization logic Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * 🐛 fix inventory namespace options, ToSingular fallback, and test key consistency Avoid emitting empty namespace/namespaces-exclude in inventory options, return unknown resource types as-is instead of naively stripping trailing 's', and fix plural key format in debouncer test. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * 🐛 fix empty ClusterUID producing trailing hyphen in ManagedBy field When ClusterUID is not provided, ManagedBy was set to "mondoo-operator-" (with trailing hyphen). Now defaults to "mondoo-operator" and only appends the hyphen and UID when present. Also extracts IIFEs into plain variables for readability. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * 🧹 remove logger side-effect from ToSingular and modernize watcher.go Make ToSingular a pure function by removing the watcherLogger call. Also apply Go modernize lints: use slices.Contains for namespace filtering and replace interface{} with any. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * 🧹 move K8sResourceIdentifier and ToSingular to types.go These shared types were defined in scanner.go but used across debouncer.go, watcher.go, and their tests. Moving them to a dedicated types.go improves discoverability. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * 🐛 make cluster UID and integration MRN lookups best-effort Previously, failures to fetch the cluster UID or integration MRN would abort the entire deployment sync. These are optional metadata for asset labeling and should not block the resource watcher from being deployed, e.g. in RBAC-restricted environments. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * 🧹 remove dead fields, fix scan rate limiting, and sort discovery targets - Remove unused `scheme` field from ResourceWatcher and `gvk` from resourceEventHandler (leftover from YAML serialization approach) - Move lastScanTime update to after scan completion so the rate limit interval is measured between scan completions, not scan starts - Sort discovery targets for deterministic inventory YAML output Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * 🐛 fix missing integrationMRN/clusterUID args in TestDeployment_WithAnnotations The rebase conflict resolution missed updating this test call to include the new integrationMRN and clusterUID parameters. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * ✅ add integration test for resource watcher Add TestReconcile_ResourceWatcher to verify the resource watcher detects K8s resource changes and scans them via cnspec. The test enables the resource watcher with short debounce/scan intervals, waits for the deployment to become ready, creates a test deployment to trigger a scan, and polls until assets appear upstream and are scored. Also fix the resource watcher deployment to use MondooOperatorImage instead of CnspecImage, since the deployment runs /mondoo-operator (not cnspec). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent a00fa19 commit 7a106fb

12 files changed

Lines changed: 568 additions & 213 deletions

File tree

cmd/mondoo-operator/resource_watcher/cmd.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ func init() {
5656
apiProxy := Cmd.Flags().String("api-proxy", "", "HTTP proxy to use for API requests.")
5757
timeout := Cmd.Flags().Duration("timeout", 25*time.Minute, "Timeout for scan operations.")
5858
annotations := Cmd.Flags().StringToString("annotation", nil, "Annotations to add to scanned assets (can specify multiple, e.g., --annotation env=prod --annotation team=platform).")
59+
clusterUID := Cmd.Flags().String("cluster-uid", "", "The unique identifier of the cluster for asset labeling.")
60+
integrationMRN := Cmd.Flags().String("integration-mrn", "", "The integration MRN for asset labeling.")
5961

6062
Cmd.RunE = func(cmd *cobra.Command, args []string) error {
6163
log.SetLogger(logger.NewLogger())
@@ -167,22 +169,26 @@ func init() {
167169

168170
// Create scanner
169171
scanner := resource_watcher.NewScanner(resource_watcher.ScannerConfig{
170-
ConfigPath: *configPath,
171-
APIProxy: *apiProxy,
172-
Timeout: *timeout,
173-
Annotations: *annotations,
172+
ConfigPath: *configPath,
173+
APIProxy: *apiProxy,
174+
Timeout: *timeout,
175+
Annotations: *annotations,
176+
Namespaces: namespacesList,
177+
NamespacesExclude: namespacesExcludeList,
178+
ClusterUID: *clusterUID,
179+
IntegrationMRN: *integrationMRN,
174180
})
175181

176182
// Create debouncer with rate limiting
177-
debouncer := resource_watcher.NewDebouncer(*debounceInterval, *minimumScanInterval, scanner.ScanManifestsFunc())
183+
debouncer := resource_watcher.NewDebouncer(*debounceInterval, *minimumScanInterval, scanner.ScanResourcesFunc())
178184

179185
// Create watcher
180186
watcher := resource_watcher.NewResourceWatcher(c, debouncer, resource_watcher.WatcherConfig{
181187
Namespaces: namespacesList,
182188
NamespacesExclude: namespacesExcludeList,
183189
ResourceTypes: resourceTypesList,
184190
WatchAllResources: *watchAllResources,
185-
}, scheme)
191+
})
186192

187193
// Start components
188194
errChan := make(chan error, 3)

controllers/resource_watcher/debouncer.go

Lines changed: 24 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type Debouncer struct {
2020
interval time.Duration
2121
minInterval time.Duration // minimum time between scans (rate limit)
2222
mu sync.Mutex
23-
pending map[string][]byte // resource key -> YAML manifest
24-
scanFunc func(ctx context.Context, manifests []byte) error
23+
pending map[string]K8sResourceIdentifier // resource key -> resource identifier
24+
scanFunc func(ctx context.Context, resources []K8sResourceIdentifier) error
2525
timer *time.Timer
2626
ctx context.Context
2727
cancel context.CancelFunc
@@ -31,24 +31,23 @@ type Debouncer struct {
3131
// NewDebouncer creates a new Debouncer with the given intervals and scan function.
3232
// interval is the debounce interval (time to wait after last change before scanning).
3333
// minInterval is the minimum time between scans (rate limit). Set to 0 to disable rate limiting.
34-
func NewDebouncer(interval, minInterval time.Duration, scanFunc func(ctx context.Context, manifests []byte) error) *Debouncer {
34+
func NewDebouncer(interval, minInterval time.Duration, scanFunc func(ctx context.Context, resources []K8sResourceIdentifier) error) *Debouncer {
3535
return &Debouncer{
3636
interval: interval,
3737
minInterval: minInterval,
38-
pending: make(map[string][]byte),
38+
pending: make(map[string]K8sResourceIdentifier),
3939
scanFunc: scanFunc,
4040
}
4141
}
4242

43-
// Add adds a resource to the pending queue. The key should be unique per resource
44-
// (e.g., "namespace/kind/name"). If a resource with the same key is already pending,
45-
// it will be replaced with the new manifest.
46-
func (d *Debouncer) Add(key string, manifest []byte) {
43+
// Add adds a resource to the pending queue. The key should be unique per resource.
44+
// resource contains the type, namespace, and name of the K8s resource.
45+
func (d *Debouncer) Add(key string, resource K8sResourceIdentifier) {
4746
d.mu.Lock()
4847
defer d.mu.Unlock()
4948

50-
d.pending[key] = manifest
51-
debouncerLogger.V(1).Info("Added resource to debounce queue", "key", key, "queueSize", len(d.pending))
49+
d.pending[key] = resource
50+
debouncerLogger.V(1).Info("Added resource to debounce queue", "key", key, "resource", resource, "queueSize", len(d.pending))
5251

5352
// Reset the timer if it exists, or start a new one
5453
if d.timer != nil {
@@ -81,8 +80,8 @@ func (d *Debouncer) stop() {
8180
debouncerLogger.Info("Debouncer stopped")
8281
}
8382

84-
// flush processes all pending resources by combining them into a single manifest
85-
// and calling the scan function. It enforces the minimum interval between scans.
83+
// flush processes all pending resources and calls the scan function.
84+
// It enforces the minimum interval between scans.
8685
func (d *Debouncer) flush() {
8786
d.mu.Lock()
8887
if len(d.pending) == 0 {
@@ -102,35 +101,36 @@ func (d *Debouncer) flush() {
102101
}
103102
}
104103

105-
// Collect all pending manifests
106-
manifests := make([][]byte, 0, len(d.pending))
104+
// Collect all pending resources
105+
resources := make([]K8sResourceIdentifier, 0, len(d.pending))
107106
keys := make([]string, 0, len(d.pending))
108-
for key, manifest := range d.pending {
109-
manifests = append(manifests, manifest)
107+
for key, resource := range d.pending {
108+
resources = append(resources, resource)
110109
keys = append(keys, key)
111110
}
112111

113-
// Clear pending and update last scan time
114-
d.pending = make(map[string][]byte)
115-
d.lastScanTime = time.Now()
112+
// Clear pending
113+
d.pending = make(map[string]K8sResourceIdentifier)
116114
d.mu.Unlock()
117115

118-
debouncerLogger.Info("Flushing debounce queue", "resourceCount", len(manifests))
119-
120-
// Combine all manifests with YAML document separators
121-
combined := combineManifests(manifests)
116+
debouncerLogger.Info("Flushing debounce queue", "resourceCount", len(resources))
122117

123118
// Execute scan
124119
ctx := d.ctx
125120
if ctx == nil {
126121
ctx = context.Background()
127122
}
128123

129-
if err := d.scanFunc(ctx, combined); err != nil {
124+
if err := d.scanFunc(ctx, resources); err != nil {
130125
debouncerLogger.Error(err, "Failed to scan resources", "keys", keys)
131126
} else {
132127
debouncerLogger.Info("Successfully scanned resources", "keys", keys)
133128
}
129+
130+
// Update last scan time after scan completes
131+
d.mu.Lock()
132+
d.lastScanTime = time.Now()
133+
d.mu.Unlock()
134134
}
135135

136136
// QueueSize returns the current number of pending resources.
@@ -139,32 +139,3 @@ func (d *Debouncer) QueueSize() int {
139139
defer d.mu.Unlock()
140140
return len(d.pending)
141141
}
142-
143-
// combineManifests combines multiple YAML manifests into a single multi-document YAML.
144-
func combineManifests(manifests [][]byte) []byte {
145-
if len(manifests) == 0 {
146-
return nil
147-
}
148-
149-
// Calculate total size
150-
totalSize := 0
151-
separator := []byte("---\n")
152-
for _, m := range manifests {
153-
totalSize += len(m) + len(separator)
154-
}
155-
156-
// Combine
157-
result := make([]byte, 0, totalSize)
158-
for i, m := range manifests {
159-
if i > 0 {
160-
result = append(result, separator...)
161-
}
162-
result = append(result, m...)
163-
// Ensure each manifest ends with a newline
164-
if len(m) > 0 && m[len(m)-1] != '\n' {
165-
result = append(result, '\n')
166-
}
167-
}
168-
169-
return result
170-
}

0 commit comments

Comments
 (0)