Skip to content

Commit ff24816

Browse files
committed
syz-cluster: avoid UUIDs in blob store
Make blob store URIs dependent on the IDs explicitly passed into the Write() function. In many cases this removes the need to distinguish between the case when the object has already been saved and we must overwrite it and when it's saved the first time. Keep on first storing the object to the blob storage and only then submitting the entities to Spanner. This will lead to some wasted space, but we'll add garbage collection at some point.
1 parent 98683f8 commit ff24816

File tree

8 files changed

+58
-106
lines changed

8 files changed

+58
-106
lines changed

syz-cluster/controller/processor.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
138138
app.Errorf("failed to query workflow %q status: %v", session.ID, err)
139139
continue
140140
}
141-
if workflowLog != nil {
141+
if len(workflowLog) > 0 {
142142
err := sp.updateSessionLog(ctx, session, workflowLog)
143143
if err != nil {
144144
app.Errorf("failed to update session log: %v", err)
@@ -214,19 +214,12 @@ func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID strin
214214
}
215215

216216
func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error {
217+
logURI, err := sp.blobStorage.Write(bytes.NewReader(log), "Session", session.ID, "log")
218+
if err != nil {
219+
return fmt.Errorf("failed to save the log: %w", err)
220+
}
217221
return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
218-
if session.LogURI == "" {
219-
path, err := sp.blobStorage.Store(bytes.NewReader(log))
220-
if err != nil {
221-
return fmt.Errorf("failed to save the log: %w", err)
222-
}
223-
session.LogURI = path
224-
} else {
225-
err := sp.blobStorage.Update(session.LogURI, bytes.NewReader(log))
226-
if err != nil {
227-
return fmt.Errorf("failed to update the log %q: %w", session.LogURI, err)
228-
}
229-
}
222+
session.LogURI = logURI
230223
return nil
231224
})
232225
}

syz-cluster/pkg/blob/gcs.go

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
"context"
88
"fmt"
99
"io"
10+
"path"
1011
"regexp"
1112

1213
"github.com/google/syzkaller/pkg/gcs"
13-
"github.com/google/uuid"
1414
)
1515

1616
type gcsDriver struct {
@@ -29,54 +29,39 @@ func NewGCSClient(ctx context.Context, bucket string) (Storage, error) {
2929
}, nil
3030
}
3131

32-
func (gcs *gcsDriver) Store(source io.Reader) (string, error) {
33-
object := uuid.NewString()
34-
err := gcs.writeObject(object, source)
32+
func (gcs *gcsDriver) Write(source io.Reader, parts ...string) (string, error) {
33+
if len(parts) == 0 {
34+
return "", fmt.Errorf("no identifiers for the object were passed to Write")
35+
}
36+
object := fmt.Sprintf("%s/%s", gcs.bucket, path.Join(parts...))
37+
w, err := gcs.client.FileWriter(object, "", "")
3538
if err != nil {
3639
return "", err
3740
}
38-
return gcs.objectURI(object), nil
39-
}
40-
41-
func (gcs *gcsDriver) Update(uri string, source io.Reader) error {
42-
object, err := gcs.objectName(uri)
41+
defer w.Close()
42+
_, err = io.Copy(w, source)
4343
if err != nil {
44-
return err
44+
return "", err
4545
}
46-
return gcs.writeObject(object, source)
46+
return "gcs://" + object, nil
4747
}
4848

4949
func (gcs *gcsDriver) Read(uri string) (io.ReadCloser, error) {
50-
object, err := gcs.objectName(uri)
50+
bucket, object, err := gcs.parseURI(uri)
5151
if err != nil {
5252
return nil, err
5353
}
54-
return gcs.client.FileReader(fmt.Sprintf("%s/%s", gcs.bucket, object))
54+
return gcs.client.FileReader(fmt.Sprintf("%s/%s", bucket, object))
5555
}
5656

5757
var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/([\w-]+)$`)
5858

59-
func (gcs *gcsDriver) objectName(uri string) (string, error) {
59+
func (gcs *gcsDriver) parseURI(uri string) (string, string, error) {
6060
match := gcsObjectRe.FindStringSubmatch(uri)
6161
if len(match) == 0 {
62-
return "", fmt.Errorf("invalid GCS URI")
62+
return "", "", fmt.Errorf("invalid GCS URI")
6363
} else if match[1] != gcs.bucket {
64-
return "", fmt.Errorf("unexpected GCS bucket")
64+
return "", "", fmt.Errorf("unexpected GCS bucket")
6565
}
66-
return match[2], nil
67-
}
68-
69-
func (gcs *gcsDriver) objectURI(object string) string {
70-
return fmt.Sprintf("gcs://%s/%s", gcs.bucket, object)
71-
}
72-
73-
func (gcs *gcsDriver) writeObject(object string, source io.Reader) error {
74-
w, err := gcs.client.FileWriter(fmt.Sprintf("%s/%s", gcs.bucket, object), "", "")
75-
if err != nil {
76-
return err
77-
}
78-
defer w.Close()
79-
80-
_, err = io.Copy(w, source)
81-
return err
66+
return gcs.bucket, match[2], nil
8267
}

syz-cluster/pkg/blob/storage.go

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,21 @@
44
package blob
55

66
import (
7+
"encoding/base64"
78
"fmt"
89
"io"
910
"os"
1011
"path/filepath"
1112
"strings"
12-
13-
"github.com/google/uuid"
1413
)
1514

1615
// Storage is not assumed to be used for partciularly large objects (e.g. GB of size),
1716
// but rather for blobs that risk overwhelming Spanner column size limits.
1817
type Storage interface {
19-
// Store returns a URI to use later.
20-
Store(source io.Reader) (string, error)
21-
Update(key string, source io.Reader) error
18+
// Write stores the object uniquely identified by a set of IDs (parts).
19+
// If it already exists, it will be overwritten.
20+
// The first argument is the URI which can be used to later retrieve it with Read.
21+
Write(source io.Reader, parts ...string) (string, error)
2222
Read(uri string) (io.ReadCloser, error)
2323
}
2424

@@ -36,20 +36,19 @@ func NewLocalStorage(baseFolder string) *LocalStorage {
3636

3737
const localStoragePrefix = "local://"
3838

39-
func (ls *LocalStorage) Store(source io.Reader) (string, error) {
40-
name := uuid.NewString()
41-
err := ls.writeFile(name, source)
39+
func (ls *LocalStorage) Write(source io.Reader, parts ...string) (string, error) {
40+
// A whatever approach that can handle arbitrary inputs.
41+
name := base64.StdEncoding.EncodeToString([]byte(filepath.Join(parts...)))
42+
file, err := os.Create(filepath.Join(ls.baseFolder, name))
4243
if err != nil {
4344
return "", err
4445
}
45-
return localStoragePrefix + name, nil
46-
}
47-
48-
func (ls *LocalStorage) Update(uri string, source io.Reader) error {
49-
if !strings.HasPrefix(uri, localStoragePrefix) {
50-
return fmt.Errorf("unsupported URI type")
46+
defer file.Close()
47+
_, err = io.Copy(file, source)
48+
if err != nil {
49+
return "", fmt.Errorf("failed to save data: %w", err)
5150
}
52-
return ls.writeFile(strings.TrimPrefix(uri, localStoragePrefix), source)
51+
return localStoragePrefix + name, nil
5352
}
5453

5554
func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
@@ -61,19 +60,6 @@ func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
6160
return os.Open(path)
6261
}
6362

64-
func (ls *LocalStorage) writeFile(name string, source io.Reader) error {
65-
file, err := os.Create(filepath.Join(ls.baseFolder, name))
66-
if err != nil {
67-
return err
68-
}
69-
defer file.Close()
70-
_, err = io.Copy(file, source)
71-
if err != nil {
72-
return fmt.Errorf("failed to save data: %w", err)
73-
}
74-
return nil
75-
}
76-
7763
func ReadAllBytes(storage Storage, uri string) ([]byte, error) {
7864
if uri == "" {
7965
return nil, nil

syz-cluster/pkg/blob/storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestLocalStorage(t *testing.T) {
1616
var uris []string
1717
for i := 0; i < 2; i++ {
1818
content := fmt.Sprintf("object #%d", i)
19-
uri, err := storage.Store(bytes.NewReader([]byte(content)))
19+
uri, err := storage.Write(bytes.NewReader([]byte(content)), fmt.Sprint(i))
2020
assert.NoError(t, err)
2121
uris = append(uris, uri)
2222
}

syz-cluster/pkg/service/finding.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/google/syzkaller/syz-cluster/pkg/app"
1313
"github.com/google/syzkaller/syz-cluster/pkg/blob"
1414
"github.com/google/syzkaller/syz-cluster/pkg/db"
15+
"github.com/google/uuid"
1516
)
1617

1718
type FindingService struct {
@@ -27,28 +28,27 @@ func NewFindingService(env *app.AppEnvironment) *FindingService {
2728
}
2829

2930
func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error {
30-
var reportURI, logURI string
31+
finding := &db.Finding{
32+
ID: uuid.NewString(),
33+
SessionID: req.SessionID,
34+
TestName: req.TestName,
35+
Title: req.Title,
36+
}
3137
var err error
3238
if len(req.Log) > 0 {
33-
logURI, err = s.blobStorage.Store(bytes.NewReader(req.Log))
39+
finding.LogURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), "Finding", finding.ID, "log")
3440
if err != nil {
3541
return fmt.Errorf("failed to save the log: %w", err)
3642
}
3743
}
3844
if len(req.Report) > 0 {
39-
reportURI, err = s.blobStorage.Store(bytes.NewReader(req.Report))
45+
finding.ReportURI, err = s.blobStorage.Write(bytes.NewReader(req.Report), "Finding", finding.ID, "report")
4046
if err != nil {
4147
return fmt.Errorf("failed to save the report: %w", err)
4248
}
4349
}
4450
// TODO: if it's not actually addded, the blob records will be orphaned.
45-
err = s.findingRepo.Save(ctx, &db.Finding{
46-
SessionID: req.SessionID,
47-
TestName: req.TestName,
48-
Title: req.Title,
49-
ReportURI: reportURI,
50-
LogURI: logURI,
51-
})
51+
err = s.findingRepo.Save(ctx, finding)
5252
if err == db.ErrFindingExists {
5353
// It's ok, just ignore.
5454
return nil

syz-cluster/pkg/service/series.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/google/syzkaller/syz-cluster/pkg/app"
1414
"github.com/google/syzkaller/syz-cluster/pkg/blob"
1515
"github.com/google/syzkaller/syz-cluster/pkg/db"
16+
"github.com/google/uuid"
1617
)
1718

1819
// SeriesService is tested in controller/.
@@ -53,6 +54,7 @@ func (s *SeriesService) getSessionSeries(ctx context.Context, sessionID string,
5354

5455
func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*api.UploadSeriesResp, error) {
5556
seriesObj := &db.Series{
57+
ID: uuid.NewString(),
5658
ExtID: series.ExtID,
5759
AuthorEmail: series.AuthorEmail,
5860
Title: series.Title,
@@ -66,7 +68,8 @@ func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*
6668
for _, patch := range series.Patches {
6769
// In case of errors, we will waste some space, but let's ignore it for simplicity.
6870
// Patches are not super big.
69-
uri, err := s.blobStorage.Store(bytes.NewReader(patch.Body))
71+
uri, err := s.blobStorage.Write(bytes.NewReader(patch.Body),
72+
"Series", seriesObj.ID, "Patches", fmt.Sprint(patch.Seq))
7073
if err != nil {
7174
return nil, fmt.Errorf("failed to upload patch body: %w", err)
7275
}

syz-cluster/pkg/service/session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ func (s *SessionService) SkipSession(ctx context.Context, sessionID string, skip
3636
var triageLogURI string
3737
if len(skip.TriageLog) > 0 {
3838
var err error
39-
triageLogURI, err = s.blobStorage.Store(bytes.NewReader(skip.TriageLog))
39+
triageLogURI, err = s.blobStorage.Write(bytes.NewReader(skip.TriageLog), "Session", sessionID, "triage_log")
4040
if err != nil {
41-
return fmt.Errorf("failed to save the log: %w", err)
41+
return fmt.Errorf("failed to save the triage log: %w", err)
4242
}
4343
}
4444
err := s.sessionRepo.Update(ctx, sessionID, func(session *db.Session) error {

syz-cluster/pkg/service/sessiontest.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro
4141
}
4242
logURI := entity.LogURI
4343
if len(req.Log) > 0 {
44-
logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log))
44+
logURI, err = s.blobStorage.Write(bytes.NewReader(req.Log),
45+
"Session", req.SessionID, "Test", req.TestName, "log")
4546
if err != nil {
4647
return fmt.Errorf("failed to save the log: %w", err)
4748
}
@@ -66,27 +67,11 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN
6667
} else if entity == nil {
6768
return fmt.Errorf("the test has not been submitted yet")
6869
}
69-
newArchiveURI, err := s.uploadOrUpdate(ctx, entity.ArtifactsArchiveURI, reader)
70+
archiveURI, err := s.blobStorage.Write(reader, "Session", sessionID, "Test", testName, "artifacts")
7071
if err != nil {
7172
return fmt.Errorf("failed to save the artifacts archive: %w", err)
7273
}
7374
return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) {
74-
test.ArtifactsArchiveURI = newArchiveURI
75+
test.ArtifactsArchiveURI = archiveURI
7576
})
7677
}
77-
78-
func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) {
79-
if uri != "" {
80-
err := s.blobStorage.Update(uri, reader)
81-
if err != nil {
82-
return "", fmt.Errorf("failed to update: %w", err)
83-
}
84-
return uri, nil
85-
}
86-
// TODO: it will leak if we fail to save the entity.
87-
uri, err := s.blobStorage.Store(reader)
88-
if err != nil {
89-
return "", fmt.Errorf("failed to save: %w", err)
90-
}
91-
return uri, nil
92-
}

0 commit comments

Comments
 (0)