Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions cmd/mondoo-operator/resource_watcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func init() {
apiProxy := Cmd.Flags().String("api-proxy", "", "HTTP proxy to use for API requests.")
timeout := Cmd.Flags().Duration("timeout", 25*time.Minute, "Timeout for scan operations.")
annotations := Cmd.Flags().StringToString("annotation", nil, "Annotations to add to scanned assets (can specify multiple, e.g., --annotation env=prod --annotation team=platform).")
clusterUID := Cmd.Flags().String("cluster-uid", "", "The unique identifier of the cluster for asset labeling.")
integrationMRN := Cmd.Flags().String("integration-mrn", "", "The integration MRN for asset labeling.")

Cmd.RunE = func(cmd *cobra.Command, args []string) error {
log.SetLogger(logger.NewLogger())
Expand Down Expand Up @@ -167,22 +169,26 @@ func init() {

// Create scanner
scanner := resource_watcher.NewScanner(resource_watcher.ScannerConfig{
ConfigPath: *configPath,
APIProxy: *apiProxy,
Timeout: *timeout,
Annotations: *annotations,
ConfigPath: *configPath,
APIProxy: *apiProxy,
Timeout: *timeout,
Annotations: *annotations,
Namespaces: namespacesList,
NamespacesExclude: namespacesExcludeList,
ClusterUID: *clusterUID,
IntegrationMRN: *integrationMRN,
})

// Create debouncer with rate limiting
debouncer := resource_watcher.NewDebouncer(*debounceInterval, *minimumScanInterval, scanner.ScanManifestsFunc())
debouncer := resource_watcher.NewDebouncer(*debounceInterval, *minimumScanInterval, scanner.ScanResourcesFunc())

// Create watcher
watcher := resource_watcher.NewResourceWatcher(c, debouncer, resource_watcher.WatcherConfig{
Namespaces: namespacesList,
NamespacesExclude: namespacesExcludeList,
ResourceTypes: resourceTypesList,
WatchAllResources: *watchAllResources,
}, scheme)
})

// Start components
errChan := make(chan error, 3)
Expand Down
77 changes: 24 additions & 53 deletions controllers/resource_watcher/debouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Debouncer struct {
interval time.Duration
minInterval time.Duration // minimum time between scans (rate limit)
mu sync.Mutex
pending map[string][]byte // resource key -> YAML manifest
scanFunc func(ctx context.Context, manifests []byte) error
pending map[string]K8sResourceIdentifier // resource key -> resource identifier
scanFunc func(ctx context.Context, resources []K8sResourceIdentifier) error
timer *time.Timer
ctx context.Context
cancel context.CancelFunc
Expand All @@ -31,24 +31,23 @@ type Debouncer struct {
// NewDebouncer creates a new Debouncer with the given intervals and scan function.
// interval is the debounce interval (time to wait after last change before scanning).
// minInterval is the minimum time between scans (rate limit). Set to 0 to disable rate limiting.
func NewDebouncer(interval, minInterval time.Duration, scanFunc func(ctx context.Context, manifests []byte) error) *Debouncer {
func NewDebouncer(interval, minInterval time.Duration, scanFunc func(ctx context.Context, resources []K8sResourceIdentifier) error) *Debouncer {
return &Debouncer{
interval: interval,
minInterval: minInterval,
pending: make(map[string][]byte),
pending: make(map[string]K8sResourceIdentifier),
scanFunc: scanFunc,
}
}

// Add adds a resource to the pending queue. The key should be unique per resource
// (e.g., "namespace/kind/name"). If a resource with the same key is already pending,
// it will be replaced with the new manifest.
func (d *Debouncer) Add(key string, manifest []byte) {
// Add adds a resource to the pending queue. The key should be unique per resource.
// resource contains the type, namespace, and name of the K8s resource.
func (d *Debouncer) Add(key string, resource K8sResourceIdentifier) {
d.mu.Lock()
defer d.mu.Unlock()

d.pending[key] = manifest
debouncerLogger.V(1).Info("Added resource to debounce queue", "key", key, "queueSize", len(d.pending))
d.pending[key] = resource
debouncerLogger.V(1).Info("Added resource to debounce queue", "key", key, "resource", resource, "queueSize", len(d.pending))

// Reset the timer if it exists, or start a new one
if d.timer != nil {
Expand Down Expand Up @@ -81,8 +80,8 @@ func (d *Debouncer) stop() {
debouncerLogger.Info("Debouncer stopped")
}

// flush processes all pending resources by combining them into a single manifest
// and calling the scan function. It enforces the minimum interval between scans.
// flush processes all pending resources and calls the scan function.
// It enforces the minimum interval between scans.
func (d *Debouncer) flush() {
d.mu.Lock()
if len(d.pending) == 0 {
Expand All @@ -102,35 +101,36 @@ func (d *Debouncer) flush() {
}
}

// Collect all pending manifests
manifests := make([][]byte, 0, len(d.pending))
// Collect all pending resources
resources := make([]K8sResourceIdentifier, 0, len(d.pending))
keys := make([]string, 0, len(d.pending))
for key, manifest := range d.pending {
manifests = append(manifests, manifest)
for key, resource := range d.pending {
resources = append(resources, resource)
keys = append(keys, key)
}

// Clear pending and update last scan time
d.pending = make(map[string][]byte)
d.lastScanTime = time.Now()
// Clear pending
d.pending = make(map[string]K8sResourceIdentifier)
d.mu.Unlock()

debouncerLogger.Info("Flushing debounce queue", "resourceCount", len(manifests))

// Combine all manifests with YAML document separators
combined := combineManifests(manifests)
debouncerLogger.Info("Flushing debounce queue", "resourceCount", len(resources))

// Execute scan
ctx := d.ctx
if ctx == nil {
ctx = context.Background()
}

if err := d.scanFunc(ctx, combined); err != nil {
if err := d.scanFunc(ctx, resources); err != nil {
debouncerLogger.Error(err, "Failed to scan resources", "keys", keys)
} else {
debouncerLogger.Info("Successfully scanned resources", "keys", keys)
}

// Update last scan time after scan completes
d.mu.Lock()
d.lastScanTime = time.Now()
d.mu.Unlock()
}

// QueueSize returns the current number of pending resources.
Expand All @@ -139,32 +139,3 @@ func (d *Debouncer) QueueSize() int {
defer d.mu.Unlock()
return len(d.pending)
}

// combineManifests combines multiple YAML manifests into a single multi-document YAML.
func combineManifests(manifests [][]byte) []byte {
if len(manifests) == 0 {
return nil
}

// Calculate total size
totalSize := 0
separator := []byte("---\n")
for _, m := range manifests {
totalSize += len(m) + len(separator)
}

// Combine
result := make([]byte, 0, totalSize)
for i, m := range manifests {
if i > 0 {
result = append(result, separator...)
}
result = append(result, m...)
// Ensure each manifest ends with a newline
if len(m) > 0 && m[len(m)-1] != '\n' {
result = append(result, '\n')
}
}

return result
}
Loading
Loading