diff --git a/cli/providers/providers.go b/cli/providers/providers.go index b52745edc3..08cf217374 100644 --- a/cli/providers/providers.go +++ b/cli/providers/providers.go @@ -572,6 +572,15 @@ func setConnector(provider *plugin.Provider, connector *plugin.Connector, run fu if cliRes.Asset == nil { log.Warn().Err(err).Msg("failed to discover assets after processing CLI arguments") } else { + // Enable staged discovery for all provider connections so that + // providers that support it can split discovery into phases. + for _, conf := range cliRes.Asset.Connections { + if conf.Options == nil { + conf.Options = map[string]string{} + } + conf.Options[plugin.OptionStagedDiscovery] = "" + } + // if we have an asset, we check if the path requires piped content for _, conf := range cliRes.Asset.Connections { if conf.Path != "-" { diff --git a/providers-sdk/v1/plugin/connection.go b/providers-sdk/v1/plugin/connection.go index 953946c6cd..260ee84c9c 100644 --- a/providers-sdk/v1/plugin/connection.go +++ b/providers-sdk/v1/plugin/connection.go @@ -5,6 +5,15 @@ package plugin import inventory "go.mondoo.com/mql/v13/providers-sdk/v1/inventory" +const ( + // OptionStagedDiscovery is set on an asset's inventory config to opt in + // to staged (two-phase) discovery. When present (any truthy value), + // providers split discovery into stages (e.g. cluster+namespaces first, + // workloads per namespace later). When absent, legacy single-pass + // discovery runs unchanged for backward compatibility. + OptionStagedDiscovery = "staged-discovery" +) + type Connection interface { ID() uint32 diff --git a/providers/k8s/resources/discovery.go b/providers/k8s/resources/discovery.go index dc5ba47f1b..aef650157a 100644 --- a/providers/k8s/resources/discovery.go +++ b/providers/k8s/resources/discovery.go @@ -82,20 +82,42 @@ func (f *NamespaceFilterOpts) skipNamespace(namespace string) bool { return false } +// Discover routes to the appropriate discovery path based on whether the client +// has opted in to staged discovery via OPTION_STAGED_DISCOVERY. +// TODO(v15): remove discoverLegacy and OPTION_STAGED_DISCOVERY toggle. Staged +// discovery should be the only path. func Discover(runtime *plugin.Runtime, features mql.Features) (*inventory.Inventory, error) { conn := runtime.Connection.(shared.Connection) + invConfig := conn.InventoryConfig() + + // Check for staged discovery toggle + if _, ok := invConfig.Options[plugin.OptionStagedDiscovery]; ok { + // If a namespace is already set, we're in stage 2 (workload discovery + // for that namespace). Otherwise it's stage 1 (cluster + namespaces). + if invConfig.Options[shared.OPTION_NAMESPACE] != "" { + return discoverNamespaceStage(runtime, conn, invConfig, features) + } + return discoverClusterStage(runtime, conn, invConfig, features) + } + + // Legacy single-pass discovery (no toggle = old client) + return discoverLegacy(runtime, conn, invConfig, features) +} +// discoverLegacy is the original single-pass discovery that discovers the cluster, +// namespaces, and all workloads in a single call. This is the default path for +// clients that do not set OPTION_STAGED_DISCOVERY. +// TODO(v15): remove this function once all clients use staged discovery. +func discoverLegacy(runtime *plugin.Runtime, conn shared.Connection, invConfig *inventory.Config, features mql.Features) (*inventory.Inventory, error) { in := &inventory.Inventory{Spec: &inventory.InventorySpec{ Assets: []*inventory.Asset{}, }} - if (conn.InventoryConfig().Discover == nil || len(conn.InventoryConfig().Discover.Targets) == 0) && conn.Asset() != nil { + if (invConfig.Discover == nil || len(invConfig.Discover.Targets) == 0) && conn.Asset() != nil { in.Spec.Assets = append(in.Spec.Assets, conn.Asset()) return in, nil } - invConfig := conn.InventoryConfig() - res, err := runtime.CreateResource(runtime, "k8s", nil) if err != nil { return nil, err @@ -156,6 +178,127 @@ func Discover(runtime *plugin.Runtime, features mql.Features) (*inventory.Invent return in, nil } +// discoverClusterStage is stage 1 of staged discovery: discovers the cluster +// asset and namespaces. Namespace assets are emitted WITH platform IDs (they +// are scannable) and WITH discovery targets. Each namespace's connection config +// is overridden with OPTION_NAMESPACE set, which causes stage 2 to run when +// the client connects to it. No WithParentConnectionId so each namespace gets +// its own resource cache. +func discoverClusterStage(runtime *plugin.Runtime, conn shared.Connection, invConfig *inventory.Config, features mql.Features) (*inventory.Inventory, error) { + in := &inventory.Inventory{Spec: &inventory.InventorySpec{ + Assets: []*inventory.Asset{}, + }} + + if (invConfig.Discover == nil || len(invConfig.Discover.Targets) == 0) && conn.Asset() != nil { + in.Spec.Assets = append(in.Spec.Assets, conn.Asset()) + return in, nil + } + + nsFilter := setNamespaceFilters(invConfig) + + resFilters, err := resourceFilters(invConfig) + if err != nil { + return nil, err + } + + // If we can discover the cluster asset, then we use that as root and build all + // platform IDs for the assets based on it. If we cannot discover the cluster, we + // discover the individual namespaces according to the ns filter and then build + // the platform IDs for the assets based on the namespace. + if len(nsFilter.include) == 0 && len(nsFilter.exclude) == 0 { + assetId, err := conn.AssetId() + if err == nil { + root := &inventory.Asset{ + PlatformIds: []string{assetId}, + Name: conn.Name(), + Platform: conn.Platform(), + Connections: []*inventory.Config{invConfig.Clone(inventory.WithoutDiscovery())}, // pass-in the parent connection config + } + if stringx.ContainsAnyOf(invConfig.Discover.Targets, DiscoveryAuto, DiscoveryAll, DiscoveryClusters) && resFilters.IsEmpty() { + in.Spec.Assets = append(in.Spec.Assets, root) + } + } else { + log.Warn().Err(err).Msg("failed to discover cluster asset") + } + } + + // Discover namespaces and emit them as scannable assets with platform IDs + // and discovery targets. Override each namespace's connection config to + // route to stage 2 when the client connects to it later. + nss, err := discoverNamespaces(conn, invConfig, "", nil, nsFilter) + if err != nil { + return nil, err + } + + for _, ns := range nss { + // Clone without WithParentConnectionId so each namespace gets its own + // resource cache. With a shared parent cache, the k8s MQL resource would + // be created once (scoped to the first namespace's connection) and reused + // by all other namespaces, returning stale data. + nsConfig := invConfig.Clone() // Clone() copies Options, propagating OPTION_STAGED_DISCOVERY + nsConfig.Options[shared.OPTION_NAMESPACE] = ns.Name + + // Override the connection config to route to stage 2, but keep the + // namespace's platform IDs, platform, and labels from discoverNamespaces(). + ns.Connections = []*inventory.Config{nsConfig} + in.Spec.Assets = append(in.Spec.Assets, ns) + } + + return in, nil +} + +// discoverNamespaceStage is stage 2 of staged discovery: discovers workloads +// within a single namespace. It is triggered when the client connects to a +// namespace asset emitted by stage 1. +// +// Only workloads are returned here — the namespace asset itself was already +// emitted by stage 1 with platform IDs and is already known to the client. +func discoverNamespaceStage(runtime *plugin.Runtime, conn shared.Connection, invConfig *inventory.Config, features mql.Features) (*inventory.Inventory, error) { + in := &inventory.Inventory{Spec: &inventory.InventorySpec{ + Assets: []*inventory.Asset{}, + }} + + if invConfig.Discover == nil || len(invConfig.Discover.Targets) == 0 { + return in, nil + } + + nsName := invConfig.Options[shared.OPTION_NAMESPACE] + + res, err := runtime.CreateResource(runtime, "k8s", nil) + if err != nil { + return nil, err + } + k8s := res.(*mqlK8s) + + nsFilter := NamespaceFilterOpts{include: []string{nsName}} + + resFilters, err := resourceFilters(invConfig) + if err != nil { + return nil, err + } + + // Resolve the namespace's platform ID for use as the ownership root + basePlatformId, err := conn.BasePlatformId() + if err != nil { + return nil, err + } + nsObj, err := conn.Namespace(nsName) + if err != nil { + return nil, fmt.Errorf("failed to get namespace %q: %w", nsName, err) + } + namespacePlatformId := shared.NewNamespacePlatformId(basePlatformId, nsName, string(nsObj.UID)) + + od := NewPlatformIdOwnershipIndex(namespacePlatformId) + + assets, err := discoverAssets(runtime, conn, invConfig, namespacePlatformId, k8s, nsFilter, resFilters, od) + if err != nil { + return nil, err + } + + in.Spec.Assets = append(in.Spec.Assets, assets...) + return in, nil +} + func discoverAssets( runtime *plugin.Runtime, conn shared.Connection,