Skip to content

Commit 25e17fd

Browse files
committed
syz-cluster/controller: make parallelization configurable
Configure the number of patch series processed in parallel via an env variable.
1 parent 881832d commit 25e17fd

File tree

3 files changed

+29
-19
lines changed

3 files changed

+29
-19
lines changed

syz-cluster/controller/processor.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"context"
99
"fmt"
1010
"log"
11+
"os"
12+
"strconv"
1113
"sync"
1214
"time"
1315

@@ -19,31 +21,37 @@ import (
1921
)
2022

2123
type SeriesProcessor struct {
22-
blobStorage blob.Storage
23-
seriesRepo *db.SeriesRepository
24-
sessionRepo *db.SessionRepository
25-
workflows workflow.Service
26-
dbPollInterval time.Duration
24+
blobStorage blob.Storage
25+
seriesRepo *db.SeriesRepository
26+
sessionRepo *db.SessionRepository
27+
workflows workflow.Service
28+
dbPollInterval time.Duration
29+
parallelWorkers int
2730
}
2831

2932
func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor {
3033
workflows, err := workflow.NewArgoService()
3134
if err != nil {
3235
app.Fatalf("failed to initialize workflows: %v", err)
3336
}
37+
parallelWorkers := 1
38+
if val := os.Getenv("PARALLEL_WORKERS"); val != "" {
39+
var err error
40+
parallelWorkers, err = strconv.Atoi(val)
41+
if err != nil || parallelWorkers < 1 {
42+
app.Fatalf("invalid PARALLEL_WORKERS value")
43+
}
44+
}
3445
return &SeriesProcessor{
35-
blobStorage: env.BlobStorage,
36-
seriesRepo: db.NewSeriesRepository(env.Spanner),
37-
sessionRepo: db.NewSessionRepository(env.Spanner),
38-
dbPollInterval: time.Minute,
39-
workflows: workflows,
46+
blobStorage: env.BlobStorage,
47+
seriesRepo: db.NewSeriesRepository(env.Spanner),
48+
sessionRepo: db.NewSessionRepository(env.Spanner),
49+
dbPollInterval: time.Minute,
50+
workflows: workflows,
51+
parallelWorkers: parallelWorkers,
4052
}
4153
}
4254

43-
// Do not run more than this number of sessions in parallel.
44-
// TODO: it'd be different for dev and prod, make it configurable.
45-
const parallelWorkers = 1
46-
4755
func (sp *SeriesProcessor) Loop(ctx context.Context) error {
4856
var wg sync.WaitGroup
4957
defer wg.Wait()
@@ -107,7 +115,7 @@ func (sp *SeriesProcessor) seriesRunner(ctx context.Context, ch <-chan *db.Sessi
107115
var eg errgroup.Group
108116
defer eg.Wait()
109117

110-
eg.SetLimit(parallelWorkers)
118+
eg.SetLimit(sp.parallelWorkers)
111119
for {
112120
var session *db.Session
113121
select {

syz-cluster/controller/processor_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,10 @@ func newMockedWorkflows() *mockedWorkflows {
142142
func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProcessor, context.Context) {
143143
client, ctx := db.NewTransientDB(t)
144144
return &SeriesProcessor{
145-
seriesRepo: db.NewSeriesRepository(client),
146-
sessionRepo: db.NewSessionRepository(client),
147-
workflows: workflows,
148-
dbPollInterval: time.Second / 10,
145+
seriesRepo: db.NewSeriesRepository(client),
146+
sessionRepo: db.NewSessionRepository(client),
147+
workflows: workflows,
148+
dbPollInterval: time.Second / 10,
149+
parallelWorkers: 2,
149150
}, ctx
150151
}

syz-cluster/overlays/dev/global-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ data:
99
SPANNER_EMULATOR_HOST: "cloud-spanner-emulator:9010"
1010
SPANNER_DATABASE_URI: "projects/my-project/instances/my-instance/databases/db"
1111
LOCAL_BLOB_STORAGE_PATH: "/blob-storage"
12+
PARALLEL_WORKERS: "1" # Process only one series at a time.

0 commit comments

Comments
 (0)