Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions syz-cluster/controller/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
app.Errorf("failed to query workflow %q status: %v", session.ID, err)
continue
}
if workflowLog != nil {
if len(workflowLog) > 0 {
err := sp.updateSessionLog(ctx, session, workflowLog)
if err != nil {
app.Errorf("failed to update session log: %v", err)
Expand Down Expand Up @@ -214,19 +214,12 @@ func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID strin
}

func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error {
logURI, err := sp.blobStorage.Write(bytes.NewReader(log), "Session", session.ID, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
if session.LogURI == "" {
path, err := sp.blobStorage.Store(bytes.NewReader(log))
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
session.LogURI = path
} else {
err := sp.blobStorage.Update(session.LogURI, bytes.NewReader(log))
if err != nil {
return fmt.Errorf("failed to update the log %q: %w", session.LogURI, err)
}
}
session.LogURI = logURI
return nil
})
}
51 changes: 18 additions & 33 deletions syz-cluster/pkg/blob/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"context"
"fmt"
"io"
"path"
"regexp"

"github.com/google/syzkaller/pkg/gcs"
"github.com/google/uuid"
)

type gcsDriver struct {
Expand All @@ -29,54 +29,39 @@ func NewGCSClient(ctx context.Context, bucket string) (Storage, error) {
}, nil
}

func (gcs *gcsDriver) Store(source io.Reader) (string, error) {
object := uuid.NewString()
err := gcs.writeObject(object, source)
func (gcs *gcsDriver) Write(source io.Reader, parts ...string) (string, error) {
if len(parts) == 0 {
return "", fmt.Errorf("no identifiers for the object were passed to Write")
}
object := path.Join(gcs.bucket, path.Join(parts...))
w, err := gcs.client.FileWriter(object, "", "")
if err != nil {
return "", err
}
return gcs.objectURI(object), nil
}

func (gcs *gcsDriver) Update(uri string, source io.Reader) error {
object, err := gcs.objectName(uri)
defer w.Close()
_, err = io.Copy(w, source)
if err != nil {
return err
return "", err
}
return gcs.writeObject(object, source)
return "gcs://" + object, nil
}

func (gcs *gcsDriver) Read(uri string) (io.ReadCloser, error) {
object, err := gcs.objectName(uri)
bucket, object, err := gcs.parseURI(uri)
if err != nil {
return nil, err
}
return gcs.client.FileReader(fmt.Sprintf("%s/%s", gcs.bucket, object))
return gcs.client.FileReader(path.Join(bucket, object))
}

var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/([\w-]+)$`)
var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/(.+)$`)

func (gcs *gcsDriver) objectName(uri string) (string, error) {
func (gcs *gcsDriver) parseURI(uri string) (string, string, error) {
match := gcsObjectRe.FindStringSubmatch(uri)
if len(match) == 0 {
return "", fmt.Errorf("invalid GCS URI")
return "", "", fmt.Errorf("invalid GCS URI")
} else if match[1] != gcs.bucket {
return "", fmt.Errorf("unexpected GCS bucket")
return "", "", fmt.Errorf("unexpected GCS bucket")
}
return match[2], nil
}

func (gcs *gcsDriver) objectURI(object string) string {
return fmt.Sprintf("gcs://%s/%s", gcs.bucket, object)
}

func (gcs *gcsDriver) writeObject(object string, source io.Reader) error {
w, err := gcs.client.FileWriter(fmt.Sprintf("%s/%s", gcs.bucket, object), "", "")
if err != nil {
return err
}
defer w.Close()

_, err = io.Copy(w, source)
return err
return gcs.bucket, match[2], nil
}
42 changes: 14 additions & 28 deletions syz-cluster/pkg/blob/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
package blob

import (
"encoding/base64"
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/google/uuid"
)

// Storage is not assumed to be used for partciularly large objects (e.g. GB of size),
// but rather for blobs that risk overwhelming Spanner column size limits.
type Storage interface {
// Store returns a URI to use later.
Store(source io.Reader) (string, error)
Update(key string, source io.Reader) error
// Write stores the object uniquely identified by a set of IDs (parts).
// If it already exists, it will be overwritten.
// The first argument is the URI which can be used to later retrieve it with Read.
Write(source io.Reader, parts ...string) (string, error)
Read(uri string) (io.ReadCloser, error)
}

Expand All @@ -36,20 +36,19 @@ func NewLocalStorage(baseFolder string) *LocalStorage {

const localStoragePrefix = "local://"

func (ls *LocalStorage) Store(source io.Reader) (string, error) {
name := uuid.NewString()
err := ls.writeFile(name, source)
func (ls *LocalStorage) Write(source io.Reader, parts ...string) (string, error) {
// A whatever approach that can handle arbitrary inputs.
name := base64.StdEncoding.EncodeToString([]byte(filepath.Join(parts...)))
file, err := os.Create(filepath.Join(ls.baseFolder, name))
if err != nil {
return "", err
}
return localStoragePrefix + name, nil
}

func (ls *LocalStorage) Update(uri string, source io.Reader) error {
if !strings.HasPrefix(uri, localStoragePrefix) {
return fmt.Errorf("unsupported URI type")
defer file.Close()
_, err = io.Copy(file, source)
if err != nil {
return "", fmt.Errorf("failed to save data: %w", err)
}
return ls.writeFile(strings.TrimPrefix(uri, localStoragePrefix), source)
return localStoragePrefix + name, nil
}

func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
Expand All @@ -61,19 +60,6 @@ func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
return os.Open(path)
}

func (ls *LocalStorage) writeFile(name string, source io.Reader) error {
file, err := os.Create(filepath.Join(ls.baseFolder, name))
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, source)
if err != nil {
return fmt.Errorf("failed to save data: %w", err)
}
return nil
}

func ReadAllBytes(storage Storage, uri string) ([]byte, error) {
if uri == "" {
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion syz-cluster/pkg/blob/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestLocalStorage(t *testing.T) {
var uris []string
for i := 0; i < 2; i++ {
content := fmt.Sprintf("object #%d", i)
uri, err := storage.Store(bytes.NewReader([]byte(content)))
uri, err := storage.Write(bytes.NewReader([]byte(content)), fmt.Sprint(i))
assert.NoError(t, err)
uris = append(uris, uri)
}
Expand Down
20 changes: 10 additions & 10 deletions syz-cluster/pkg/service/finding.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
"github.com/google/uuid"
)

type FindingService struct {
Expand All @@ -27,28 +28,27 @@ func NewFindingService(env *app.AppEnvironment) *FindingService {
}

func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error {
var reportURI, logURI string
finding := &db.Finding{
ID: uuid.NewString(),
SessionID: req.SessionID,
TestName: req.TestName,
Title: req.Title,
}
var err error
if len(req.Log) > 0 {
logURI, err = s.blobStorage.Store(bytes.NewReader(req.Log))
finding.LogURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), "Finding", finding.ID, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
}
if len(req.Report) > 0 {
reportURI, err = s.blobStorage.Store(bytes.NewReader(req.Report))
finding.ReportURI, err = s.blobStorage.Write(bytes.NewReader(req.Report), "Finding", finding.ID, "report")
if err != nil {
return fmt.Errorf("failed to save the report: %w", err)
}
}
// TODO: if it's not actually addded, the blob records will be orphaned.
err = s.findingRepo.Save(ctx, &db.Finding{
SessionID: req.SessionID,
TestName: req.TestName,
Title: req.Title,
ReportURI: reportURI,
LogURI: logURI,
})
err = s.findingRepo.Save(ctx, finding)
if err == db.ErrFindingExists {
// It's ok, just ignore.
return nil
Expand Down
5 changes: 4 additions & 1 deletion syz-cluster/pkg/service/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
"github.com/google/uuid"
)

// SeriesService is tested in controller/.
Expand Down Expand Up @@ -53,6 +54,7 @@ func (s *SeriesService) getSessionSeries(ctx context.Context, sessionID string,

func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*api.UploadSeriesResp, error) {
seriesObj := &db.Series{
ID: uuid.NewString(),
ExtID: series.ExtID,
AuthorEmail: series.AuthorEmail,
Title: series.Title,
Expand All @@ -66,7 +68,8 @@ func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*
for _, patch := range series.Patches {
// In case of errors, we will waste some space, but let's ignore it for simplicity.
// Patches are not super big.
uri, err := s.blobStorage.Store(bytes.NewReader(patch.Body))
uri, err := s.blobStorage.Write(bytes.NewReader(patch.Body),
"Series", seriesObj.ID, "Patches", fmt.Sprint(patch.Seq))
if err != nil {
return nil, fmt.Errorf("failed to upload patch body: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions syz-cluster/pkg/service/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func (s *SessionService) SkipSession(ctx context.Context, sessionID string, skip
var triageLogURI string
if len(skip.TriageLog) > 0 {
var err error
triageLogURI, err = s.blobStorage.Store(bytes.NewReader(skip.TriageLog))
triageLogURI, err = s.blobStorage.Write(bytes.NewReader(skip.TriageLog), "Session", sessionID, "triage_log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
return fmt.Errorf("failed to save the triage log: %w", err)
}
}
err := s.sessionRepo.Update(ctx, sessionID, func(session *db.Session) error {
Expand Down
23 changes: 4 additions & 19 deletions syz-cluster/pkg/service/sessiontest.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro
}
logURI := entity.LogURI
if len(req.Log) > 0 {
logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log))
logURI, err = s.blobStorage.Write(bytes.NewReader(req.Log),
"Session", req.SessionID, "Test", req.TestName, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
Expand All @@ -66,27 +67,11 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN
} else if entity == nil {
return fmt.Errorf("the test has not been submitted yet")
}
newArchiveURI, err := s.uploadOrUpdate(ctx, entity.ArtifactsArchiveURI, reader)
archiveURI, err := s.blobStorage.Write(reader, "Session", sessionID, "Test", testName, "artifacts")
if err != nil {
return fmt.Errorf("failed to save the artifacts archive: %w", err)
}
return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) {
test.ArtifactsArchiveURI = newArchiveURI
test.ArtifactsArchiveURI = archiveURI
})
}

func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) {
if uri != "" {
err := s.blobStorage.Update(uri, reader)
if err != nil {
return "", fmt.Errorf("failed to update: %w", err)
}
return uri, nil
}
// TODO: it will leak if we fail to save the entity.
uri, err := s.blobStorage.Store(reader)
if err != nil {
return "", fmt.Errorf("failed to save: %w", err)
}
return uri, nil
}
Loading