Skip to content

Commit 0f694a3

Browse files
authored
Add batch querying for clearly defined to reduce ingestion time (#2088)
* change CD certifier to batch query Signed-off-by: pxp928 <[email protected]> * rebase ingestor scanner to use certifier Signed-off-by: pxp928 <[email protected]> * update unit tests and change ent to not index on attribution for clearlyDefined Signed-off-by: pxp928 <[email protected]> * add query rate limit per service for certifier Signed-off-by: pxp928 <[email protected]> * add query limit for license scanner on ingestion Signed-off-by: pxp928 <[email protected]> * fix and order unit test for certifiers Signed-off-by: pxp928 <[email protected]> * add comments to functions Signed-off-by: pxp928 <[email protected]> * fix unit tests Signed-off-by: pxp928 <[email protected]> * add re-try for clearly defined Signed-off-by: pxp928 <[email protected]> * add constant query size for CD and OSV Signed-off-by: pxp928 <[email protected]> --------- Signed-off-by: pxp928 <[email protected]>
1 parent 7fe8848 commit 0f694a3

File tree

28 files changed

+947
-15804
lines changed

28 files changed

+947
-15804
lines changed

cmd/guaccollect/cmd/license.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@ import (
2626
"github.com/guacsec/guac/pkg/certifier"
2727
"github.com/guacsec/guac/pkg/certifier/certify"
2828
"github.com/guacsec/guac/pkg/certifier/clearlydefined"
29+
"github.com/guacsec/guac/pkg/certifier/components/root_package"
2930
"github.com/guacsec/guac/pkg/cli"
3031
"github.com/guacsec/guac/pkg/logging"
3132
"github.com/spf13/cobra"
3233
"github.com/spf13/viper"
3334
)
3435

36+
const (
37+
cdQuerySize = 248
38+
)
39+
3540
type cdOptions struct {
3641
graphqlEndpoint string
3742
headerFile string
@@ -98,7 +103,7 @@ you have access to read and write to the respective blob store.`,
98103
httpClient := http.Client{Transport: transport}
99104
gqlclient := graphql.NewClient(opts.graphqlEndpoint, &httpClient)
100105

101-
packageQueryFunc, err := getPackageQuery(gqlclient, opts.batchSize, opts.addedLatency)
106+
packageQueryFunc, err := getCDPackageQuery(gqlclient, opts.batchSize, opts.addedLatency)
102107
if err != nil {
103108
logger.Errorf("error: %v", err)
104109
os.Exit(1)
@@ -108,6 +113,13 @@ you have access to read and write to the respective blob store.`,
108113
},
109114
}
110115

116+
func getCDPackageQuery(client graphql.Client, batchSize int, addedLatency *time.Duration) (func() certifier.QueryComponents, error) {
117+
return func() certifier.QueryComponents {
118+
packageQuery := root_package.NewPackageQuery(client, batchSize, cdQuerySize, addedLatency)
119+
return packageQuery
120+
}, nil
121+
}
122+
111123
func validateCDFlags(
112124
graphqlEndpoint,
113125
headerFile,

cmd/guaccollect/cmd/osv.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ import (
4141
"github.com/spf13/viper"
4242
)
4343

44+
const (
45+
osvQuerySize = 999
46+
)
47+
4448
type osvOptions struct {
4549
graphqlEndpoint string
4650
headerFile string
@@ -107,7 +111,7 @@ you have access to read and write to the respective blob store.`,
107111
httpClient := http.Client{Transport: transport}
108112
gqlclient := graphql.NewClient(opts.graphqlEndpoint, &httpClient)
109113

110-
packageQueryFunc, err := getPackageQuery(gqlclient, opts.batchSize, opts.addedLatency)
114+
packageQueryFunc, err := getOSVPackageQuery(gqlclient, opts.batchSize, opts.addedLatency)
111115
if err != nil {
112116
logger.Errorf("error: %v", err)
113117
os.Exit(1)
@@ -164,9 +168,9 @@ func getCertifierPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub
164168
}, nil
165169
}
166170

167-
func getPackageQuery(client graphql.Client, batchSize int, addedLatency *time.Duration) (func() certifier.QueryComponents, error) {
171+
func getOSVPackageQuery(client graphql.Client, batchSize int, addedLatency *time.Duration) (func() certifier.QueryComponents, error) {
168172
return func() certifier.QueryComponents {
169-
packageQuery := root_package.NewPackageQuery(client, batchSize, addedLatency)
173+
packageQuery := root_package.NewPackageQuery(client, batchSize, osvQuerySize, addedLatency)
170174
return packageQuery
171175
}, nil
172176
}

cmd/guacone/cmd/license.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ import (
4040
"github.com/spf13/viper"
4141
)
4242

43+
const (
44+
cdQuerySize = 248
45+
)
46+
4347
type cdOptions struct {
4448
graphqlEndpoint string
4549
headerFile string
@@ -96,7 +100,7 @@ var cdCmd = &cobra.Command{
96100

97101
httpClient := http.Client{Transport: transport}
98102
gqlclient := graphql.NewClient(opts.graphqlEndpoint, &httpClient)
99-
packageQuery := root_package.NewPackageQuery(gqlclient, opts.batchSize, opts.addedLatency)
103+
packageQuery := root_package.NewPackageQuery(gqlclient, opts.batchSize, cdQuerySize, opts.addedLatency)
100104

101105
totalNum := 0
102106
docChan := make(chan *processor.Document)
@@ -176,12 +180,10 @@ var cdCmd = &cobra.Command{
176180

177181
// Collect
178182
errHandler := func(err error) bool {
179-
if err == nil {
180-
logger.Info("certifier ended gracefully")
181-
return true
183+
if err != nil {
184+
logger.Errorf("certifier ended with error: %v", err)
185+
atomic.StoreInt32(&gotErr, 1)
182186
}
183-
logger.Errorf("certifier ended with error: %v", err)
184-
atomic.StoreInt32(&gotErr, 1)
185187
// process documents already captures
186188
return true
187189
}

cmd/guacone/cmd/osv.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ import (
4040
"github.com/spf13/viper"
4141
)
4242

43+
const (
44+
osvQuerySize = 999
45+
)
46+
4347
type osvOptions struct {
4448
graphqlEndpoint string
4549
headerFile string
@@ -96,7 +100,7 @@ var osvCmd = &cobra.Command{
96100

97101
httpClient := http.Client{Transport: transport}
98102
gqlclient := graphql.NewClient(opts.graphqlEndpoint, &httpClient)
99-
packageQuery := root_package.NewPackageQuery(gqlclient, opts.batchSize, opts.addedLatency)
103+
packageQuery := root_package.NewPackageQuery(gqlclient, opts.batchSize, osvQuerySize, opts.addedLatency)
100104

101105
totalNum := 0
102106
docChan := make(chan *processor.Document)

internal/testing/cmd/pubsub_test/cmd/osv.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func getCertifierPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub
9292

9393
func getPackageQuery(client graphql.Client) (func() certifier.QueryComponents, error) {
9494
return func() certifier.QueryComponents {
95-
packageQuery := root_package.NewPackageQuery(client, 60000, nil)
95+
packageQuery := root_package.NewPackageQuery(client, 60000, 999, nil)
9696
return packageQuery
9797
}, nil
9898
}

internal/testing/dochelper/dochelper.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package dochelper
1717

1818
import (
19+
"errors"
1920
"fmt"
2021
"reflect"
2122
"time"
@@ -141,6 +142,23 @@ func DocNode(v *processor.Document, children ...*processor.DocumentNode) *proces
141142
}
142143
}
143144

145+
type minimalDocument struct {
146+
Subject []struct {
147+
URI string `json:"uri"`
148+
} `json:"subject"`
149+
}
150+
151+
func ExtractURI(blob []byte) (string, error) {
152+
var doc minimalDocument
153+
if err := json.Unmarshal(blob, &doc); err != nil {
154+
return "", err
155+
}
156+
if len(doc.Subject) == 0 {
157+
return "", errors.New("no subject found in document")
158+
}
159+
return doc.Subject[0].URI, nil
160+
}
161+
144162
func DocEqualWithTimestamp(gotDoc, wantDoc *processor.Document) (bool, error) {
145163
var testTime = time.Unix(1597826280, 0)
146164

0 commit comments

Comments
 (0)