Skip to content

Commit 6fa0562

Browse files
authored
improve batch query (#2246)
* update query to only return values with the latest time Signed-off-by: pxp928 <[email protected]> * update keyvalue backend to match and update backend tests Signed-off-by: pxp928 <[email protected]> * udpate graphql schema description for batch vuln and license query Signed-off-by: pxp928 <[email protected]> * udpate batch query to aggregate on timestamp and return latest values Signed-off-by: pxp928 <[email protected]> * remove debug from queries Signed-off-by: pxp928 <[email protected]> --------- Signed-off-by: pxp928 <[email protected]>
1 parent c571087 commit 6fa0562

File tree

11 files changed

+155
-24
lines changed

11 files changed

+155
-24
lines changed

internal/testing/backend/certifyLegal_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -631,10 +631,10 @@ func TestBatchQueryPkgIDCertifyLegal(t *testing.T) {
631631
Dec: [][]*model.IDorLicenseInput{{{LicenseInput: testdata.L1}}, {{LicenseInput: testdata.L2}}, {{LicenseInput: testdata.L3}}, {{LicenseInput: testdata.L4}}},
632632
Dis: [][]*model.IDorLicenseInput{{{LicenseInput: testdata.L1}}, {{LicenseInput: testdata.L2}}, {}, {}},
633633
Legal: []*model.CertifyLegalInputSpec{
634-
{Justification: "test justification"},
635-
{Justification: "test justification"},
636-
{Justification: "test justification"},
637-
{Justification: "test justification"},
634+
{Justification: "test justification", TimeScanned: testdata.T1},
635+
{Justification: "test justification", TimeScanned: testdata.T1},
636+
{Justification: "test justification", TimeScanned: testdata.T1},
637+
{Justification: "test justification", TimeScanned: testdata.T1},
638638
},
639639
},
640640
},
@@ -644,22 +644,26 @@ func TestBatchQueryPkgIDCertifyLegal(t *testing.T) {
644644
DeclaredLicenses: []*model.License{testdata.L1out},
645645
DiscoveredLicenses: []*model.License{testdata.L1out},
646646
Justification: "test justification",
647+
TimeScanned: testdata.T1,
647648
},
648649
{
649650
Subject: testdata.P2out,
650651
DeclaredLicenses: []*model.License{testdata.L2out},
651652
DiscoveredLicenses: []*model.License{testdata.L2out},
652653
Justification: "test justification",
654+
TimeScanned: testdata.T1,
653655
},
654656
{
655657
Subject: testdata.P3out,
656658
DeclaredLicenses: []*model.License{testdata.L3out},
657659
Justification: "test justification",
660+
TimeScanned: testdata.T1,
658661
},
659662
{
660663
Subject: testdata.P4out,
661664
DeclaredLicenses: []*model.License{testdata.L4out},
662665
Justification: "test justification",
666+
TimeScanned: testdata.T1,
663667
},
664668
},
665669
},

pkg/assembler/backends/ent/backend/search.go

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,41 @@ func (b *EntBackend) BatchQueryPkgIDCertifyVuln(ctx context.Context, pkgIDs []st
310310
queryList = append(queryList, convertedID)
311311
}
312312

313-
var predicates []predicate.CertifyVuln
313+
var cvLatestScan []struct {
314+
PkgID uuid.UUID `json:"package_id"`
315+
VulnID uuid.UUID `json:"vulnerability_id"`
316+
LastScanTimeDB time.Time `json:"max"`
317+
}
318+
319+
var aggPredicates []predicate.CertifyVuln
320+
aggPredicates = append(aggPredicates, certifyvuln.PackageIDIn(queryList...), certifyvuln.VulnerabilityIDNEQ(noVulnID))
321+
322+
// aggregate to find the latest timescanned for certifyVulns for list of packages
323+
err := b.client.CertifyVuln.Query().
324+
Where(certifyvuln.And(aggPredicates...)).
325+
GroupBy(certifyvuln.FieldPackageID, certifyvuln.FieldVulnerabilityID). // Group by Package ID
326+
Aggregate(func(s *sql.Selector) string {
327+
t := sql.Table(certifyvuln.Table)
328+
return sql.As(sql.Max(t.C(certifyvuln.FieldTimeScanned)), "max")
329+
}).
330+
Scan(ctx, &cvLatestScan)
314331

315-
predicates = append(predicates, certifyvuln.PackageIDIn(queryList...), certifyvuln.VulnerabilityIDNEQ(noVulnID))
332+
if err != nil {
333+
return nil, fmt.Errorf("failed aggregate certifyVuln based on packageIDs with error: %w", err)
334+
}
335+
336+
var predicates []predicate.CertifyVuln
337+
for _, record := range cvLatestScan {
338+
predicates = append(predicates,
339+
certifyvuln.And(
340+
certifyvuln.VulnerabilityID(record.VulnID),
341+
certifyvuln.PackageID(record.PkgID),
342+
certifyvuln.TimeScannedEQ(record.LastScanTimeDB),
343+
))
344+
}
316345

317346
certVulnConn, err := b.client.CertifyVuln.Query().
318-
Where(certifyvuln.And(predicates...)).
347+
Where(certifyvuln.Or(predicates...)).
319348
WithVulnerability(func(query *ent.VulnerabilityIDQuery) {}).
320349
WithPackage(func(q *ent.PackageVersionQuery) {
321350
q.WithName(func(q *ent.PackageNameQuery) {})
@@ -344,15 +373,46 @@ func (b *EntBackend) BatchQueryPkgIDCertifyLegal(ctx context.Context, pkgIDs []s
344373
queryList = append(queryList, convertedID)
345374
}
346375

376+
var clLatestScan []struct {
377+
PkgID uuid.UUID `json:"package_id"`
378+
DeclaredLicense string `json:"declared_licenses_hash"`
379+
DiscoveredLicense string `json:"discovered_licenses_hash"`
380+
LastScanTimeDB time.Time `json:"max"`
381+
}
382+
383+
var aggPredicates []predicate.CertifyLegal
384+
// aggregate to find the latest timescanned for certifyLegals for list of packages
385+
aggPredicates = append(aggPredicates, certifylegal.PackageIDIn(queryList...), certifylegal.SourceIDIsNil())
386+
err := b.client.CertifyLegal.Query().
387+
Where(certifylegal.And(aggPredicates...)).
388+
GroupBy(certifylegal.FieldPackageID, certifylegal.FieldDeclaredLicensesHash, certifylegal.FieldDiscoveredLicensesHash). // Group by certifylegal ID
389+
Aggregate(func(s *sql.Selector) string {
390+
t := sql.Table(certifylegal.Table)
391+
return sql.As(sql.Max(t.C(certifylegal.FieldTimeScanned)), "max")
392+
}).
393+
Scan(ctx, &clLatestScan)
394+
395+
if err != nil {
396+
return nil, fmt.Errorf("failed aggregate certifylegal based on packageIDs with error: %w", err)
397+
}
398+
347399
var predicates []predicate.CertifyLegal
400+
for _, record := range clLatestScan {
401+
predicates = append(predicates,
402+
certifylegal.And(
403+
certifylegal.PackageID(record.PkgID),
404+
certifylegal.SourceIDIsNil(),
405+
certifylegal.DeclaredLicensesHashEQ(record.DeclaredLicense),
406+
certifylegal.DiscoveredLicensesHashEQ(record.DiscoveredLicense),
407+
certifylegal.TimeScannedEQ(record.LastScanTimeDB),
408+
))
409+
}
348410

349-
predicates = append(predicates, certifylegal.PackageIDIn(queryList...), certifylegal.SourceIDIsNil())
350411
certLegalConn, err := b.client.CertifyLegal.Query().
351-
Where(certifylegal.And(predicates...)).
412+
Where(certifylegal.Or(predicates...)).
352413
WithPackage(func(q *ent.PackageVersionQuery) {
353414
q.WithName(func(q *ent.PackageNameQuery) {})
354415
}).
355-
WithSource(func(q *ent.SourceNameQuery) {}).
356416
WithDeclaredLicenses().
357417
WithDiscoveredLicenses().All(ctx)
358418

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Create index "certifylegal_package_id_declared_licenses_hash_discovered_licen" to table: "certify_legals"
2+
CREATE INDEX "certifylegal_package_id_declared_licenses_hash_discovered_licen" ON "certify_legals" ("package_id", "declared_licenses_hash", "discovered_licenses_hash", "time_scanned") WHERE ((package_id IS NOT NULL) AND (source_id IS NULL));
3+
-- Create index "certifyvuln_vulnerability_id_package_id_time_scanned" to table: "certify_vulns"
4+
CREATE INDEX "certifyvuln_vulnerability_id_package_id_time_scanned" ON "certify_vulns" ("vulnerability_id", "package_id", "time_scanned");

pkg/assembler/backends/ent/migrate/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:Ru5VFYpW/024wBxj0NuPPYqNe+IcDzjNmi/bBoLOQgw=
1+
h1:7U2rKCWB5tpN3SOma2KThbIofWfFpnkN72sc+cXrYX8=
22
20240503123155_baseline.sql h1:oZtbKI8sJj3xQq7ibfvfhFoVl+Oa67CWP7DFrsVLVds=
33
20240626153721_ent_diff.sql h1:FvV1xELikdPbtJk7kxIZn9MhvVVoFLF/2/iT/wM5RkA=
44
20240702195630_ent_diff.sql h1:y8TgeUg35krYVORmC7cN4O96HqOc3mVO9IQ2lYzIzwg=
@@ -10,3 +10,4 @@ h1:Ru5VFYpW/024wBxj0NuPPYqNe+IcDzjNmi/bBoLOQgw=
1010
20240918165345.sql h1:wpfJhr9rJSWWzbTA85rnLppDjGscJVaFpE1uZJXpScY=
1111
20240919142722_ent_diff.sql h1:hcb42aHj5QUwbd7HXsUFnnAzHIckdXfGRDNYa24rns8=
1212
20241017140224_ent_diff.sql h1:BrrQdJnjtZJ9FYOXc5PgEafQ6N3ADdydFPevjdyTqnU=
13+
20241030212025_ent_diff.sql h1:IlCPmPKr+81472GhqF+hris+RX4zaKwBxVC1pCCi8vE=

pkg/assembler/backends/ent/migrate/schema.go

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/assembler/backends/ent/schema/certifylegal.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func (CertifyLegal) Indexes() []ent.Index {
7171
"origin", "collector", "document_ref", "declared_licenses_hash", "discovered_licenses_hash").
7272
Unique().
7373
Annotations(entsql.IndexWhere("package_id IS NOT NULL AND source_id IS NULL")),
74-
index.Fields("package_id").Annotations(entsql.IndexWhere("package_id IS NOT NULL AND source_id IS NULL")), // query when subject is package ID
74+
index.Fields("package_id").Annotations(entsql.IndexWhere("package_id IS NOT NULL AND source_id IS NULL")), // query when subject is package ID
75+
index.Fields("package_id", "declared_licenses_hash", "discovered_licenses_hash", "time_scanned").Annotations(entsql.IndexWhere("package_id IS NOT NULL AND source_id IS NULL")), // index on for batch query
7576
}
7677
}

pkg/assembler/backends/ent/schema/certifyvuln.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ func (CertifyVuln) Edges() []ent.Edge {
6161
func (CertifyVuln) Indexes() []ent.Index {
6262
return []ent.Index{
6363
index.Fields("db_uri", "db_version", "scanner_uri", "scanner_version", "origin", "collector", "time_scanned", "document_ref").Edges("vulnerability", "package").Unique(),
64-
index.Fields("package_id"), // speed up frequently run queries to check when CV nodes affect certain package IDs
65-
index.Fields("vulnerability_id"), // speed up frequently run queries to check when CV nodes have a vulnerability
64+
index.Fields("package_id"), // speed up frequently run queries to check when CV nodes affect certain package IDs
65+
index.Fields("vulnerability_id"), // speed up frequently run queries to check when CV nodes have a vulnerability
66+
index.Fields("vulnerability_id", "package_id", "time_scanned"), // index on for batch query
6667
}
6768
}

pkg/assembler/backends/keyvalue/search.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/guacsec/guac/internal/testing/ptrfrom"
2727
"github.com/guacsec/guac/pkg/assembler/graphql/model"
28+
"golang.org/x/exp/maps"
2829
)
2930

3031
const guacType string = "guac"
@@ -58,27 +59,73 @@ func (c *demoClient) BatchQueryDepPkgDependency(ctx context.Context, pkgIDs []st
5859
}
5960

6061
func (c *demoClient) BatchQueryPkgIDCertifyVuln(ctx context.Context, pkgIDs []string) ([]*model.CertifyVuln, error) {
61-
var collectedCertVulns []*model.CertifyVuln
62+
pkgCVs := make(map[string][]*model.CertifyVuln)
6263
for _, pkgID := range pkgIDs {
6364
certVuln, err := c.CertifyVuln(ctx, &model.CertifyVulnSpec{Package: &model.PkgSpec{ID: &pkgID}})
6465
if err != nil {
6566
return nil, fmt.Errorf("failed to query CertifyVuln for pkgID: %s, with error: %w", pkgID, err)
6667
}
67-
collectedCertVulns = append(collectedCertVulns, certVuln...)
68+
pkgCVs[pkgID] = append(pkgCVs[pkgID], certVuln...)
6869
}
69-
return collectedCertVulns, nil
70+
71+
deduplicatedPkgCVs := make(map[string][]*model.CertifyVuln)
72+
for _, certVulns := range pkgCVs {
73+
pkgID := certVulns[0].Package.Namespaces[0].Names[0].Versions[0].ID
74+
cvsByVulnID := make(map[string]*model.CertifyVuln)
75+
for _, cv := range certVulns {
76+
cv := cv
77+
vulnID := cv.Vulnerability.VulnerabilityIDs[0].VulnerabilityID
78+
if existing, ok := cvsByVulnID[vulnID]; ok {
79+
if existing.Metadata.TimeScanned.After(cv.Metadata.TimeScanned) {
80+
continue
81+
}
82+
}
83+
cvsByVulnID[vulnID] = cv
84+
}
85+
deduplicatedPkgCVs[pkgID] = append(deduplicatedPkgCVs[pkgID], maps.Values(cvsByVulnID)...)
86+
}
87+
88+
var filteredCertVulns []*model.CertifyVuln
89+
for _, certVulns := range deduplicatedPkgCVs {
90+
filteredCertVulns = append(filteredCertVulns, certVulns...)
91+
}
92+
93+
return filteredCertVulns, nil
7094
}
7195

7296
func (c *demoClient) BatchQueryPkgIDCertifyLegal(ctx context.Context, pkgIDs []string) ([]*model.CertifyLegal, error) {
73-
var collectedCertLegal []*model.CertifyLegal
97+
pkgCLs := make(map[string][]*model.CertifyLegal)
7498
for _, pkgID := range pkgIDs {
7599
certLegal, err := c.CertifyLegal(ctx, &model.CertifyLegalSpec{Subject: &model.PackageOrSourceSpec{Package: &model.PkgSpec{ID: &pkgID}}})
76100
if err != nil {
77101
return nil, fmt.Errorf("failed to query CertifyLegal for pkgID: %s, with error: %w", pkgID, err)
78102
}
79-
collectedCertLegal = append(collectedCertLegal, certLegal...)
103+
pkgCLs[pkgID] = append(pkgCLs[pkgID], certLegal...)
80104
}
81-
return collectedCertLegal, nil
105+
106+
deduplicatedPkgCLs := make(map[string]*model.CertifyLegal)
107+
for _, certLegals := range pkgCLs {
108+
if pkg, ok := certLegals[0].Subject.(*model.Package); ok {
109+
var latest time.Time
110+
pkgID := pkg.Namespaces[0].Names[0].Versions[0].ID
111+
for _, cl := range certLegals {
112+
if cl.TimeScanned.After(latest) {
113+
latestcl := cl
114+
latest = cl.TimeScanned
115+
deduplicatedPkgCLs[pkgID] = latestcl
116+
}
117+
}
118+
} else {
119+
continue
120+
}
121+
}
122+
123+
var filteredCertLegals []*model.CertifyLegal
124+
for _, certLegal := range deduplicatedPkgCLs {
125+
filteredCertLegals = append(filteredCertLegals, certLegal)
126+
}
127+
128+
return filteredCertLegals, nil
82129
}
83130

84131
func (c *demoClient) FindSoftware(ctx context.Context, searchText string) ([]model.PackageSourceOrArtifact, error) {

pkg/assembler/graphql/generated/root_.generated.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/assembler/graphql/schema/certifyLegal.graphql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ extend type Query {
128128
CertifyLegal(certifyLegalSpec: CertifyLegalSpec!): [CertifyLegal!]!
129129
"Returns a paginated results via CertifyLegalConnection"
130130
CertifyLegalList(certifyLegalSpec: CertifyLegalSpec!, after: ID, first: Int): CertifyLegalConnection
131-
"Batch queries via pkgVersion IDs to find all CertifyLegal"
131+
"Batch queries via pkgVersion IDs to find all CertifyLegal (latest timestamp)"
132132
BatchQueryPkgIDCertifyLegal(pkgIDs: [ID!]!): [CertifyLegal!]!
133133
}
134134

0 commit comments

Comments
 (0)