Skip to content

Commit 626c8fc

Browse files
committed
fix bugs
Signed-off-by: Ivan Milchev <[email protected]>
1 parent 8fd55df commit 626c8fc

File tree

11 files changed

+225
-56
lines changed

11 files changed

+225
-56
lines changed

.vscode/launch.json

+4-8
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,10 @@
159159
"program": "${workspaceRoot}/apps/cnquery/cnquery.go",
160160
"cwd": "${workspaceRoot}/",
161161
"args": [
162-
"run",
162+
"scan",
163163
"k8s",
164-
"--namespaces",
165-
"default",
166-
"-c",
167-
"k8s.pods",
168-
"--log-level",
169-
"debug"
164+
"--discover", "namespaces,pods",
165+
"--namespaces", "default"
170166
],
171167
},
172168
{
@@ -195,7 +191,7 @@
195191
"program": "${workspaceRoot}/apps/cnquery/cnquery.go",
196192
"console": "integratedTerminal",
197193
"args": [
198-
"shell", "ssh", "[email protected]",
194+
"shell", "cloudflare"
199195
],
200196
},
201197
{

apps/cnquery/cmd/plugin.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ func (c *cnqueryPlugin) RunQuery(conf *run.RunQueryConfig, runtime *providers.Ru
114114
}
115115

116116
ctx := context.Background()
117-
discoveredAssets, err := scan.DiscoverAssets(ctx, conf.Inventory, upstreamConfig, runtime.Recording())
117+
discoveredAssets, err := scan.DiscoverAssets(ctx, &scan.AssetDiscoveryRequest{
118+
Inv: conf.Inventory,
119+
Upstream: upstreamConfig,
120+
Recording: runtime.Recording(),
121+
})
118122
if err != nil {
119123
return err
120124
}

apps/cnquery/cmd/shell.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,11 @@ func StartShell(runtime *providers.Runtime, conf *ShellConfig) error {
104104
// we go through inventory resolution to resolve credentials properly for the passed-in asset
105105
ctx := context.Background()
106106
discoveredAssets, err := scan.DiscoverAssets(ctx,
107-
inventory.New(inventory.WithAssets(conf.Asset)),
108-
conf.UpstreamConfig,
109-
runtime.Recording())
107+
&scan.AssetDiscoveryRequest{
108+
Inv: inventory.New(inventory.WithAssets(conf.Asset)),
109+
Upstream: conf.UpstreamConfig,
110+
Recording: runtime.Recording(),
111+
})
110112
if err != nil {
111113
log.Fatal().Err(err).Msg("could not process assets")
112114
}

explorer/scan/discovery.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,18 @@ func (d *DiscoveredAssets) GetAssetsByPlatformID(platformID string) []*AssetWith
7979
return assets
8080
}
8181

82+
type AssetDiscoveryRequest struct {
83+
Inv *inventory.Inventory
84+
Upstream *upstream.UpstreamConfig
85+
Recording llx.Recording
86+
// DiscoveredAssets *DiscoveredAssets
87+
SkipRoot bool
88+
Depth uint
89+
}
90+
8291
// DiscoverAssets discovers assets from the given inventory and upstream configuration. Returns only unique assets
83-
func DiscoverAssets(ctx context.Context, inv *inventory.Inventory, upstream *upstream.UpstreamConfig, recording llx.Recording) (*DiscoveredAssets, error) {
84-
im, err := manager.NewManager(manager.WithInventory(inv, providers.DefaultRuntime()))
92+
func DiscoverAssets(ctx context.Context, req *AssetDiscoveryRequest) (*DiscoveredAssets, error) {
93+
im, err := manager.NewManager(manager.WithInventory(req.Inv, providers.DefaultRuntime()))
8594
if err != nil {
8695
return nil, errors.New("failed to resolve inventory for connection: " + err.Error())
8796
}
@@ -96,7 +105,7 @@ func DiscoverAssets(ctx context.Context, inv *inventory.Inventory, upstream *ups
96105
// CI/CD scan and we need to apply the runtime labels to the assets
97106
if runtimeEnv != nil &&
98107
runtimeEnv.IsAutomatedEnv() &&
99-
inv.Spec.Assets[0].Category == inventory.AssetCategory_CATEGORY_CICD {
108+
req.Inv.Spec.Assets[0].Category == inventory.AssetCategory_CATEGORY_CICD {
100109
runtimeLabels = runtimeEnv.Labels()
101110
}
102111

@@ -110,13 +119,18 @@ func DiscoverAssets(ctx context.Context, inv *inventory.Inventory, upstream *ups
110119
}
111120

112121
// create runtime for root asset
113-
rootAssetWithRuntime, err := createRuntimeForAsset(resolvedRootAsset, upstream, recording)
122+
rootAssetWithRuntime, err := createRuntimeForAsset(resolvedRootAsset, req.Upstream, req.Recording)
114123
if err != nil {
115124
log.Error().Err(err).Str("asset", resolvedRootAsset.Name).Msg("unable to create runtime for asset")
116125
discoveredAssets.AddError(resolvedRootAsset, err)
117126
continue
118127
}
119128

129+
// If the root asset is nil, then we observed an asset we have already discovered
130+
if rootAssetWithRuntime == nil {
131+
continue
132+
}
133+
120134
resolvedRootAsset = rootAssetWithRuntime.Asset // to ensure we get all the information the connect call gave us
121135

122136
// Make sure the root runtime is closed at the end of the loop if needed. This will close the runtimes for all
@@ -145,7 +159,7 @@ func DiscoverAssets(ctx context.Context, inv *inventory.Inventory, upstream *ups
145159
}
146160

147161
// for all discovered assets, we apply mondoo-specific labels and annotations that come from the root asset
148-
discoverAssets(rootAssetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
162+
discoverAssets(rootAssetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, req.Upstream, req.Recording)
149163
}
150164

151165
// if there is exactly one asset, assure that the --asset-name is used
@@ -204,7 +218,9 @@ func discoverAssets(rootAssetWithRuntime *AssetWithRuntime, resolvedRootAsset *i
204218
// If the asset has been already added, we should close its runtime
205219
if !discoveredAssets.Add(resolvedAsset, assetWithRuntime.Runtime) {
206220
assetWithRuntime.Runtime.Close()
221+
continue
207222
}
223+
208224
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)
209225
} else {
210226
discoverAssets(assetWithRuntime, resolvedRootAsset, discoveredAssets, runtimeLabels, upstream, recording)

explorer/scan/discovery_test.go

+40-8
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ func TestDiscoverAssets(t *testing.T) {
142142

143143
t.Run("normal", func(t *testing.T) {
144144
inv := getInventory()
145-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
145+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
146+
Inv: inv,
147+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
148+
Recording: recording.Null{},
149+
})
146150
require.NoError(t, err)
147151
assert.Len(t, discoveredAssets.Assets, 3)
148152
assert.Len(t, discoveredAssets.Errors, 0)
@@ -158,7 +162,11 @@ func TestDiscoverAssets(t *testing.T) {
158162
t.Run("with duplicate root assets", func(t *testing.T) {
159163
inv := getInventory()
160164
inv.Spec.Assets = append(inv.Spec.Assets, inv.Spec.Assets[0])
161-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
165+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
166+
Inv: inv,
167+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
168+
Recording: recording.Null{},
169+
})
162170
require.NoError(t, err)
163171

164172
// Make sure no duplicates are returned
@@ -173,7 +181,11 @@ func TestDiscoverAssets(t *testing.T) {
173181
t.Run("with duplicate discovered assets", func(t *testing.T) {
174182
inv := getInventory()
175183
inv.Spec.Assets[0].Connections[0].Options["path"] = "./testdata/3pods_with_duplicate.yaml"
176-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
184+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
185+
Inv: inv,
186+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
187+
Recording: recording.Null{},
188+
})
177189
require.NoError(t, err)
178190

179191
// Make sure no duplicates are returned
@@ -191,7 +203,11 @@ func TestDiscoverAssets(t *testing.T) {
191203
"key1": "value1",
192204
"key2": "value2",
193205
}
194-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
206+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
207+
Inv: inv,
208+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
209+
Recording: recording.Null{},
210+
})
195211
require.NoError(t, err)
196212

197213
for _, asset := range discoveredAssets.Assets {
@@ -209,7 +225,11 @@ func TestDiscoverAssets(t *testing.T) {
209225
t.Run("copy root asset managedBy", func(t *testing.T) {
210226
inv := getInventory()
211227
inv.Spec.Assets[0].ManagedBy = "managed-by-test"
212-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
228+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
229+
Inv: inv,
230+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
231+
Recording: recording.Null{},
232+
})
213233
require.NoError(t, err)
214234

215235
for _, asset := range discoveredAssets.Assets {
@@ -234,7 +254,11 @@ func TestDiscoverAssets(t *testing.T) {
234254
}()
235255
inv.Spec.Assets[0].Category = inventory.AssetCategory_CATEGORY_CICD
236256
require.NoError(t, os.Setenv("GITHUB_ACTION", "go-test"))
237-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
257+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
258+
Inv: inv,
259+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
260+
Recording: recording.Null{},
261+
})
238262
require.NoError(t, err)
239263

240264
for _, asset := range discoveredAssets.Assets {
@@ -261,7 +285,11 @@ func TestDiscoverAssets(t *testing.T) {
261285
}()
262286
inv.Spec.Assets[0].Category = inventory.AssetCategory_CATEGORY_CICD
263287
require.NoError(t, os.Setenv("GITHUB_ACTION", "go-test"))
264-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
288+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
289+
Inv: inv,
290+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
291+
Recording: recording.Null{},
292+
})
265293
require.NoError(t, err)
266294

267295
for _, asset := range discoveredAssets.Assets {
@@ -278,7 +306,11 @@ func TestDiscoverAssets(t *testing.T) {
278306
inv := getInventory()
279307
inv.Spec.Assets[0].Connections[0].Type = "local"
280308

281-
discoveredAssets, err := DiscoverAssets(context.Background(), inv, nil, recording.Null{})
309+
discoveredAssets, err := DiscoverAssets(context.Background(), &AssetDiscoveryRequest{
310+
Inv: inv,
311+
DiscoveredAssets: &DiscoveredAssets{platformIds: map[string]struct{}{}},
312+
Recording: recording.Null{},
313+
})
282314
require.NoError(t, err)
283315
assert.Len(t, discoveredAssets.Assets, 1)
284316

explorer/scan/local_scanner.go

+69-25
Original file line numberDiff line numberDiff line change
@@ -208,23 +208,6 @@ func CreateProgressBar(discoveredAssets *DiscoveredAssets, disableProgressBar bo
208208
func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *upstream.UpstreamConfig) (*explorer.ReportCollection, error) {
209209
log.Info().Msgf("discover related assets for %d asset(s)", len(job.Inventory.Spec.Assets))
210210

211-
discoveredAssets, err := DiscoverAssets(ctx, job.Inventory, upstream, s.recording)
212-
if err != nil {
213-
return nil, err
214-
}
215-
216-
// For each discovered asset, we initialize a new runtime and connect to it.
217-
// Within this process, we set up a catch-all deferred function, that shuts
218-
// down all runtimes, in case we exit early.
219-
defer func() {
220-
for _, asset := range discoveredAssets.Assets {
221-
// we can call close multiple times and it will only execute once
222-
if asset.Runtime != nil {
223-
asset.Runtime.Close()
224-
}
225-
}
226-
}()
227-
228211
// plan scan jobs
229212
reporter := NewAggregateReporter()
230213
if job.Bundle == nil && upstream != nil && upstream.Creds != nil {
@@ -246,18 +229,79 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
246229
reporter.AddBundle(bundle)
247230
}
248231

232+
processedAssets := map[string]struct{}{}
233+
if err := s.loopyLoop(ctx, job, processedAssets, reporter, upstream); err != nil {
234+
return nil, err
235+
}
236+
237+
return reporter.Reports(), nil
238+
}
239+
240+
func (s *LocalScanner) loopyLoop(ctx context.Context, job *Job, processedAssets map[string]struct{}, reporter *AggregateReporter, upstream *upstream.UpstreamConfig) error {
241+
discoveryReq := &AssetDiscoveryRequest{
242+
Inv: job.Inventory,
243+
Upstream: upstream,
244+
Recording: s.recording,
245+
}
246+
discoveredAssets, err := DiscoverAssets(ctx, discoveryReq)
247+
if err != nil {
248+
return err
249+
}
250+
251+
newAssets := &DiscoveredAssets{platformIds: processedAssets}
252+
for _, asset := range discoveredAssets.Assets {
253+
newAssets.Add(asset.Asset, asset.Runtime)
254+
}
255+
256+
if len(newAssets.Assets) == 0 {
257+
return nil
258+
}
259+
260+
if err := s.scanAssets(ctx, newAssets, job, reporter, upstream); err != nil {
261+
return err
262+
}
263+
for pid := range discoveredAssets.platformIds {
264+
processedAssets[pid] = struct{}{}
265+
}
266+
267+
for _, asset := range newAssets.Assets {
268+
err := s.loopyLoop(ctx, &Job{
269+
Inventory: &inventory.Inventory{
270+
Spec: &inventory.InventorySpec{
271+
Assets: []*inventory.Asset{asset.Asset},
272+
},
273+
},
274+
Bundle: job.Bundle,
275+
DoRecord: job.DoRecord,
276+
QueryPackFilters: job.QueryPackFilters,
277+
Props: job.Props,
278+
}, processedAssets, reporter, upstream)
279+
280+
// Close the runtime for the asset once we are done with it
281+
if asset.Runtime != nil {
282+
asset.Runtime.Close()
283+
}
284+
if err != nil {
285+
return err
286+
}
287+
}
288+
289+
return nil
290+
}
291+
292+
func (s *LocalScanner) scanAssets(ctx context.Context, discoveredAssets *DiscoveredAssets, job *Job, reporter *AggregateReporter, upstream *upstream.UpstreamConfig) error {
249293
// if we had asset errors we want to place them into the reporter
250294
for i := range discoveredAssets.Errors {
251295
reporter.AddScanError(discoveredAssets.Errors[i].Asset, discoveredAssets.Errors[i].Err)
252296
}
253297

254298
if len(discoveredAssets.Assets) == 0 {
255-
return reporter.Reports(), nil
299+
return nil
256300
}
257301

258302
multiprogress, err := CreateProgressBar(discoveredAssets, s.disableProgressBar)
259303
if err != nil {
260-
return nil, err
304+
return err
261305
}
262306
// start the progress bar
263307
scanGroups := sync.WaitGroup{}
@@ -274,13 +318,13 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
274318
if upstream != nil && upstream.ApiEndpoint != "" && !upstream.Incognito {
275319
client, err := upstream.InitClient(ctx)
276320
if err != nil {
277-
return nil, err
321+
return err
278322
}
279323
spaceMrn = client.SpaceMrn
280324

281325
services, err = explorer.NewRemoteServices(client.ApiEndpoint, client.Plugins, client.HttpClient)
282326
if err != nil {
283-
return nil, err
327+
return err
284328
}
285329
}
286330

@@ -305,7 +349,7 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
305349
List: assetsToSync,
306350
})
307351
if err != nil {
308-
return nil, err
352+
return err
309353
}
310354
log.Debug().Int("assets", len(resp.Details)).Msg("got assets details")
311355
platformAssetMapping := make(map[string]*explorer.SynchronizeAssetsRespAssetDetail)
@@ -332,7 +376,7 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
332376
randID := "//" + explorer.SERVICE_NAME + "/" + explorer.MRN_RESOURCE_ASSET + "/" + ksuid.New().String()
333377
x, err := mrn.NewMRN(randID)
334378
if err != nil {
335-
return nil, multierr.Wrap(err, "failed to generate a random asset MRN")
379+
return multierr.Wrap(err, "failed to generate a random asset MRN")
336380
}
337381
asset.Mrn = x.String()
338382
}
@@ -342,7 +386,7 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
342386
// if a bundle was provided check that it matches the filter, bundles can also be downloaded
343387
// later therefore we do not want to stop execution here
344388
if job.Bundle != nil && job.Bundle.FilterQueryPacks(job.QueryPackFilters) {
345-
return nil, errors.New("all available packs filtered out. nothing to do")
389+
return errors.New("all available packs filtered out. nothing to do")
346390
}
347391

348392
wg := sync.WaitGroup{}
@@ -395,7 +439,7 @@ func (s *LocalScanner) distributeJob(job *Job, ctx context.Context, upstream *up
395439
wg.Wait()
396440
}
397441
scanGroups.Wait()
398-
return reporter.Reports(), nil
442+
return nil
399443
}
400444

401445
func HandleDelayedDiscovery(ctx context.Context, asset *inventory.Asset, runtime *providers.Runtime, services *explorer.Services, spaceMrn string) (*inventory.Asset, error) {

0 commit comments

Comments
 (0)