Skip to content

Commit a87934a

Browse files
authored
Add min-throughput-per-hour option for throughput_stress (#232)
## What was changed Added a `min-throughput-per-hour` option for `throughput_stress` This can be used as criteria to determine whether a scenario succeeds or not. For example, expecting 'x' number of workflows to completed within a certain amount of time. ## Why? Provides the ability to run `throughput_stress` with work-based criteria.
1 parent f664025 commit a87934a

1 file changed

Lines changed: 32 additions & 0 deletions

File tree

scenarios/throughput_stress.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ const (
3636
// SleepActivityJsonFlag is a JSON string that defines the sleep activity's behavior.
3737
// See throughputstress.SleepActivityConfig for details.
3838
SleepActivityJsonFlag = "sleep-activity-json"
39+
// MinThroughputPerHourFlag is the minimum workflow throughput required (workflows/hour).
40+
// Default is 0, meaning disabled. The scenario calculates actual throughput and compares.
41+
MinThroughputPerHourFlag = "min-throughput-per-hour"
3942
)
4043

4144
const (
@@ -47,6 +50,9 @@ type tpsState struct {
4750
CompletedIterations int `json:"completedIterations"`
4851
// LastCompletedIterationAt is the time when the last iteration was completed. Helpful for debugging.
4952
LastCompletedIterationAt time.Time `json:"lastCompletedIterationAt"`
53+
// AccumulatedDuration is the total execution time across all runs (original + resumes).
54+
// This excludes any downtime between runs. Used for accurate throughput calculation.
55+
AccumulatedDuration time.Duration `json:"accumulatedDuration"`
5056
}
5157

5258
type tpsConfig struct {
@@ -58,6 +64,7 @@ type tpsConfig struct {
5864
SkipCleanNamespaceCheck bool
5965
SleepActivities *loadgen.SleepActivityConfig
6066
VisibilityVerificationTimeout time.Duration
67+
MinThroughputPerHour float64
6168
ScenarioRunID string
6269
RngSeed int64
6370
}
@@ -111,11 +118,13 @@ func (t *tpsExecutor) LoadState(loader func(any) error) error {
111118
return nil
112119
}
113120

121+
// Configure initializes tpsConfig. Largely, it reads and validates throughput_stress scenario options
114122
func (t *tpsExecutor) Configure(info loadgen.ScenarioInfo) error {
115123
config := &tpsConfig{
116124
InternalIterTimeout: info.ScenarioOptionDuration(IterTimeoutFlag, cmp.Or(info.Configuration.Duration+1*time.Minute, 1*time.Minute)),
117125
NexusEndpoint: info.ScenarioOptions[NexusEndpointFlag],
118126
SkipCleanNamespaceCheck: info.ScenarioOptionBool(SkipCleanNamespaceCheckFlag, false),
127+
MinThroughputPerHour: info.ScenarioOptionFloat(MinThroughputPerHourFlag, 0),
119128
ScenarioRunID: info.RunID,
120129
}
121130

@@ -174,6 +183,9 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
174183
}
175184
t.runID = info.RunID
176185

186+
// Track start time of current run
187+
currentRunStartTime := time.Now()
188+
177189
// Add search attribute, if it doesn't exist yet, to query for workflows by run ID.
178190
// Running this on resume, too, in case a previous Omes run crashed before it could add the search attribute.
179191
if err := loadgen.InitSearchAttribute(ctx, info, ThroughputStressScenarioIdSearchAttribute); err != nil {
@@ -257,6 +269,8 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
257269

258270
t.lock.Lock()
259271
completedIterations := t.state.CompletedIterations
272+
t.state.AccumulatedDuration += time.Since(currentRunStartTime)
273+
totalDuration := t.state.AccumulatedDuration
260274
t.lock.Unlock()
261275

262276
completedChildWorkflows := completedIterations * t.config.InternalIterations
@@ -299,6 +313,24 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
299313
return err
300314
}
301315

316+
// Post-scenario: check throughput threshold
317+
if t.config.MinThroughputPerHour > 0 {
318+
actualThroughputPerHour := float64(completedWorkflows) / totalDuration.Hours()
319+
320+
if actualThroughputPerHour < t.config.MinThroughputPerHour {
321+
// Calculate how many workflows we expected given the duration
322+
expectedWorkflows := int(totalDuration.Hours() * t.config.MinThroughputPerHour)
323+
324+
return fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+
325+
"(completed %d workflows, expected %d in %v)",
326+
actualThroughputPerHour,
327+
t.config.MinThroughputPerHour,
328+
completedWorkflows,
329+
expectedWorkflows,
330+
totalDuration.Round(time.Second))
331+
}
332+
}
333+
302334
// Post-scenario: ensure there are no failed or terminated workflows for this run.
303335
return loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID)
304336
}

0 commit comments

Comments
 (0)