Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cli/providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,15 @@ func setConnector(provider *plugin.Provider, connector *plugin.Connector, run fu
if cliRes.Asset == nil {
log.Warn().Err(err).Msg("failed to discover assets after processing CLI arguments")
} else {
// Enable staged discovery for all provider connections so that
// providers that support it can split discovery into phases.
for _, conf := range cliRes.Asset.Connections {
if conf.Options == nil {
conf.Options = map[string]string{}
}
conf.Options[plugin.OptionStagedDiscovery] = ""
}

// if we have an asset, we check if the path requires piped content
for _, conf := range cliRes.Asset.Connections {
if conf.Path != "-" {
Expand Down
9 changes: 9 additions & 0 deletions providers-sdk/v1/plugin/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ package plugin

import inventory "go.mondoo.com/mql/v13/providers-sdk/v1/inventory"

const (
// OptionStagedDiscovery is set on an asset's inventory config to opt in
// to staged (two-phase) discovery. When present (any truthy value),
// providers split discovery into stages (e.g. cluster+namespaces first,
// workloads per namespace later). When absent, legacy single-pass
// discovery runs unchanged for backward compatibility.
OptionStagedDiscovery = "staged-discovery"
)

type Connection interface {
ID() uint32

Expand Down
149 changes: 146 additions & 3 deletions providers/k8s/resources/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,42 @@ func (f *NamespaceFilterOpts) skipNamespace(namespace string) bool {
return false
}

// Discover routes to the appropriate discovery path based on whether the client
// has opted in to staged discovery via OPTION_STAGED_DISCOVERY.
// TODO(v15): remove discoverLegacy and OPTION_STAGED_DISCOVERY toggle. Staged
// discovery should be the only path.
func Discover(runtime *plugin.Runtime, features mql.Features) (*inventory.Inventory, error) {
conn := runtime.Connection.(shared.Connection)
invConfig := conn.InventoryConfig()

// Check for staged discovery toggle
if _, ok := invConfig.Options[plugin.OptionStagedDiscovery]; ok {
// If a namespace is already set, we're in stage 2 (workload discovery
// for that namespace). Otherwise it's stage 1 (cluster + namespaces).
if invConfig.Options[shared.OPTION_NAMESPACE] != "" {
return discoverNamespaceStage(runtime, conn, invConfig, features)
}
return discoverClusterStage(runtime, conn, invConfig, features)
}

// Legacy single-pass discovery (no toggle = old client)
return discoverLegacy(runtime, conn, invConfig, features)
}

// discoverLegacy is the original single-pass discovery that discovers the cluster,
// namespaces, and all workloads in a single call. This is the default path for
// clients that do not set OPTION_STAGED_DISCOVERY.
// TODO(v15): remove this function once all clients use staged discovery.
func discoverLegacy(runtime *plugin.Runtime, conn shared.Connection, invConfig *inventory.Config, features mql.Features) (*inventory.Inventory, error) {
in := &inventory.Inventory{Spec: &inventory.InventorySpec{
Assets: []*inventory.Asset{},
}}

if (conn.InventoryConfig().Discover == nil || len(conn.InventoryConfig().Discover.Targets) == 0) && conn.Asset() != nil {
if (invConfig.Discover == nil || len(invConfig.Discover.Targets) == 0) && conn.Asset() != nil {
in.Spec.Assets = append(in.Spec.Assets, conn.Asset())
return in, nil
}

invConfig := conn.InventoryConfig()

res, err := runtime.CreateResource(runtime, "k8s", nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,6 +178,127 @@ func Discover(runtime *plugin.Runtime, features mql.Features) (*inventory.Invent
return in, nil
}

// discoverClusterStage is stage 1 of staged discovery: discovers the cluster
// asset and namespaces. Namespace assets are emitted WITH platform IDs (they
// are scannable) and WITH discovery targets. Each namespace's connection config
// is overridden with OPTION_NAMESPACE set, which causes stage 2 to run when
// the client connects to it. No WithParentConnectionId so each namespace gets
// its own resource cache.
func discoverClusterStage(runtime *plugin.Runtime, conn shared.Connection, invConfig *inventory.Config, features mql.Features) (*inventory.Inventory, error) {
in := &inventory.Inventory{Spec: &inventory.InventorySpec{
Assets: []*inventory.Asset{},
}}

if (invConfig.Discover == nil || len(invConfig.Discover.Targets) == 0) && conn.Asset() != nil {
in.Spec.Assets = append(in.Spec.Assets, conn.Asset())
return in, nil
}

nsFilter := setNamespaceFilters(invConfig)

resFilters, err := resourceFilters(invConfig)
if err != nil {
return nil, err
}

// If we can discover the cluster asset, then we use that as root and build all
// platform IDs for the assets based on it. If we cannot discover the cluster, we
// discover the individual namespaces according to the ns filter and then build
// the platform IDs for the assets based on the namespace.
if len(nsFilter.include) == 0 && len(nsFilter.exclude) == 0 {
assetId, err := conn.AssetId()
if err == nil {
root := &inventory.Asset{
PlatformIds: []string{assetId},
Name: conn.Name(),
Platform: conn.Platform(),
Connections: []*inventory.Config{invConfig.Clone(inventory.WithoutDiscovery())}, // pass-in the parent connection config
}
if stringx.ContainsAnyOf(invConfig.Discover.Targets, DiscoveryAuto, DiscoveryAll, DiscoveryClusters) && resFilters.IsEmpty() {
in.Spec.Assets = append(in.Spec.Assets, root)
}
} else {
log.Warn().Err(err).Msg("failed to discover cluster asset")
}
}

// Discover namespaces and emit them as scannable assets with platform IDs
// and discovery targets. Override each namespace's connection config to
// route to stage 2 when the client connects to it later.
nss, err := discoverNamespaces(conn, invConfig, "", nil, nsFilter)
if err != nil {
return nil, err
}

for _, ns := range nss {
// Clone without WithParentConnectionId so each namespace gets its own
// resource cache. With a shared parent cache, the k8s MQL resource would
// be created once (scoped to the first namespace's connection) and reused
// by all other namespaces, returning stale data.
nsConfig := invConfig.Clone() // Clone() copies Options, propagating OPTION_STAGED_DISCOVERY
nsConfig.Options[shared.OPTION_NAMESPACE] = ns.Name

// Override the connection config to route to stage 2, but keep the
// namespace's platform IDs, platform, and labels from discoverNamespaces().
ns.Connections = []*inventory.Config{nsConfig}
in.Spec.Assets = append(in.Spec.Assets, ns)
}

return in, nil
}

// discoverNamespaceStage is stage 2 of staged discovery: discovers workloads
// within a single namespace. It is triggered when the client connects to a
// namespace asset emitted by stage 1.
//
// Only workloads are returned here — the namespace asset itself was already
// emitted by stage 1 with platform IDs and is already known to the client.
func discoverNamespaceStage(runtime *plugin.Runtime, conn shared.Connection, invConfig *inventory.Config, features mql.Features) (*inventory.Inventory, error) {
in := &inventory.Inventory{Spec: &inventory.InventorySpec{
Assets: []*inventory.Asset{},
}}

if invConfig.Discover == nil || len(invConfig.Discover.Targets) == 0 {
return in, nil
}

nsName := invConfig.Options[shared.OPTION_NAMESPACE]

res, err := runtime.CreateResource(runtime, "k8s", nil)
if err != nil {
return nil, err
}
k8s := res.(*mqlK8s)

nsFilter := NamespaceFilterOpts{include: []string{nsName}}

resFilters, err := resourceFilters(invConfig)
if err != nil {
return nil, err
}

// Resolve the namespace's platform ID for use as the ownership root
basePlatformId, err := conn.BasePlatformId()
if err != nil {
return nil, err
}
nsObj, err := conn.Namespace(nsName)
if err != nil {
return nil, fmt.Errorf("failed to get namespace %q: %w", nsName, err)
}
namespacePlatformId := shared.NewNamespacePlatformId(basePlatformId, nsName, string(nsObj.UID))

od := NewPlatformIdOwnershipIndex(namespacePlatformId)

assets, err := discoverAssets(runtime, conn, invConfig, namespacePlatformId, k8s, nsFilter, resFilters, od)
if err != nil {
return nil, err
}

in.Spec.Assets = append(in.Spec.Assets, assets...)
return in, nil
}

func discoverAssets(
runtime *plugin.Runtime,
conn shared.Connection,
Expand Down
Loading