Skip to content

Commit 9d75010

Browse files
authored
Merge branch 'main' into dependabot/cargo/loadgen/kitchen-sink-gen/anstream-0.6.20
2 parents 2009280 + 72c0ed3 commit 9d75010

83 files changed

Lines changed: 10170 additions & 3331 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/all-docker-images.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
name: Build all language docker images
2+
3+
permissions:
4+
contents: read
5+
26
on:
37
workflow_call:
48
# TODO: Can eventually support repo refs too rather than just versions if/when we need that.

.github/workflows/ci.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
name: Continuous Integration
22

3+
permissions:
4+
contents: read
5+
actions: write
6+
checks: write
7+
38
env:
4-
MISE_VERSION: v2025.8.1
9+
MISE_VERSION: v2025.12.1
10+
MISE_NODE_VERIFY: no
511

612
on: # rebuild any PRs and main branch changes
713
pull_request:

.github/workflows/docker-images.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
name: Build docker images
2+
3+
permissions:
4+
contents: read
5+
26
on:
37
workflow_call:
48
inputs:

cmd/cli/cleanup_scenario.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ type scenarioCleaner struct {
4949
func (c *scenarioCleaner) addCLIFlags(fs *pflag.FlagSet) {
5050
c.scenario.AddCLIFlags(fs)
5151
fs.DurationVar(&c.pollInterval, "poll-interval", time.Second, "Interval for polling completion of job")
52-
c.clientOptions.AddCLIFlags(fs)
53-
c.metricsOptions.AddCLIFlags(fs, "")
54-
c.loggingOptions.AddCLIFlags(fs)
52+
fs.AddFlagSet(c.clientOptions.FlagSet())
53+
fs.AddFlagSet(c.metricsOptions.FlagSet(""))
54+
fs.AddFlagSet(c.loggingOptions.FlagSet())
5555
}
5656

5757
func (c *scenarioCleaner) run(ctx context.Context) error {
@@ -62,8 +62,8 @@ func (c *scenarioCleaner) run(ctx context.Context) error {
6262
} else if c.scenario.RunID == "" {
6363
return fmt.Errorf("run ID not found")
6464
}
65-
metrics := c.metricsOptions.MustCreateMetrics(c.logger)
66-
defer metrics.Shutdown(ctx)
65+
metrics := c.metricsOptions.MustCreateMetrics(ctx, c.logger)
66+
defer metrics.Shutdown(ctx, c.logger)
6767
client := c.clientOptions.MustDial(metrics, c.logger)
6868
defer client.Close()
6969
taskQueue := loadgen.TaskQueueForRun(c.scenario.RunID)

cmd/cli/prepare_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type workerBuilder struct {
4242
func (b *workerBuilder) addCLIFlags(fs *pflag.FlagSet) {
4343
fs.StringVar(&b.DirName, "dir-name", "", "Directory name for prepared worker")
4444
b.SdkOptions.AddCLIFlags(fs)
45-
b.loggingOptions.AddCLIFlags(fs)
45+
fs.AddFlagSet(b.loggingOptions.FlagSet())
4646
}
4747

4848
func (b *workerBuilder) preRun() {

cmd/cli/run_scenario.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,17 @@ type scenarioRunConfig struct {
5757
timeout time.Duration
5858
doNotRegisterSearchAttributes bool
5959
ignoreAlreadyStarted bool
60+
exportHistoriesDir string
61+
exportHistoriesFilter string
6062
}
6163

6264
func (r *scenarioRunner) addCLIFlags(fs *pflag.FlagSet) {
6365
r.scenario.AddCLIFlags(fs)
6466
r.scenarioRunConfig.addCLIFlags(fs)
6567
fs.DurationVar(&r.connectTimeout, "connect-timeout", 0, "Duration to try to connect to server before failing")
66-
r.clientOptions.AddCLIFlags(fs)
67-
r.metricsOptions.AddCLIFlags(fs, "")
68-
r.loggingOptions.AddCLIFlags(fs)
68+
fs.AddFlagSet(r.clientOptions.FlagSet())
69+
fs.AddFlagSet(r.metricsOptions.FlagSet(""))
70+
fs.AddFlagSet(r.loggingOptions.FlagSet())
6971
}
7072

7173
func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
@@ -84,6 +86,8 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
8486
"If the search attributes are not registed by the scenario they must be registered through some other method")
8587
fs.BoolVar(&r.ignoreAlreadyStarted, "ignore-already-started", false,
8688
"Ignore if a workflow with the same ID already exists. A Scenario may choose to override this behavior.")
89+
fs.StringVar(&r.exportHistoriesDir, "export-histories-dir", "", "Export workflow histories to this directory")
90+
fs.StringVar(&r.exportHistoriesFilter, "export-histories-filter", "all", "Filter which workflows are exported by execution status (options: 'failed', 'terminated', 'failed,terminated', 'all'). Default is 'all'")
8791
}
8892

8993
func (r *scenarioRunner) preRun() {
@@ -121,8 +125,8 @@ func (r *scenarioRunner) run(ctx context.Context) error {
121125
scenarioOptions[key] = value
122126
}
123127

124-
metrics := r.metricsOptions.MustCreateMetrics(r.logger)
125-
defer metrics.Shutdown(ctx)
128+
metrics := r.metricsOptions.MustCreateMetrics(ctx, r.logger)
129+
defer metrics.Shutdown(ctx, r.logger)
126130
start := time.Now()
127131
var client client.Client
128132
var err error
@@ -145,9 +149,16 @@ func (r *scenarioRunner) run(ctx context.Context) error {
145149
return fmt.Errorf("failed to get root directory: %w", err)
146150
}
147151

152+
// Generate a random execution ID to ensure no two executions with the same RunID collide
153+
executionID, err := generateExecutionID()
154+
if err != nil {
155+
return fmt.Errorf("failed to generate execution ID: %w", err)
156+
}
157+
148158
scenarioInfo := loadgen.ScenarioInfo{
149159
ScenarioName: r.scenario.Scenario,
150160
RunID: r.scenario.RunID,
161+
ExecutionID: executionID,
151162
Logger: r.logger,
152163
MetricsHandler: metrics.NewHandler(),
153164
Client: client,
@@ -164,11 +175,19 @@ func (r *scenarioRunner) run(ctx context.Context) error {
164175
ScenarioOptions: scenarioOptions,
165176
Namespace: r.clientOptions.Namespace,
166177
RootPath: repoDir,
178+
ExportOptions: loadgen.ExportOptions{
179+
ExportHistoriesDir: r.exportHistoriesDir,
180+
ExportHistoriesFilter: r.exportHistoriesFilter,
181+
},
167182
}
168183
executor := scenario.ExecutorFn()
169184
err = executor.Run(ctx, scenarioInfo)
170185
if err != nil {
171186
return fmt.Errorf("failed scenario: %w", err)
172187
}
188+
err = loadgen.ExportWorkflowHistories(ctx, scenarioInfo)
189+
if err != nil {
190+
scenarioInfo.Logger.Errorf("Error exporting workflow histories:\n %v", err)
191+
}
173192
return nil
174193
}

cmd/cli/run_scenario_with_worker.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type workerWithScenarioRunner struct {
4141
func (r *workerWithScenarioRunner) addCLIFlags(fs *pflag.FlagSet) {
4242
r.workerRunner.addCLIFlags(fs)
4343
r.scenarioRunConfig.addCLIFlags(fs)
44-
r.metricsOptions.AddCLIFlags(fs, "")
44+
fs.AddFlagSet(r.metricsOptions.FlagSet(""))
4545
}
4646

4747
func (r *workerWithScenarioRunner) preRun() {
@@ -71,19 +71,11 @@ func (r *workerWithScenarioRunner) run(ctx context.Context) error {
7171

7272
// Run scenario
7373
scenarioRunner := scenarioRunner{
74-
logger: r.Logger,
75-
scenario: r.ScenarioID,
76-
scenarioRunConfig: scenarioRunConfig{
77-
iterations: r.iterations,
78-
duration: r.duration,
79-
maxConcurrent: r.maxConcurrent,
80-
maxIterationsPerSecond: r.maxIterationsPerSecond,
81-
scenarioOptions: r.scenarioOptions,
82-
timeout: r.timeout,
83-
doNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,
84-
},
85-
clientOptions: r.ClientOptions,
86-
metricsOptions: r.metricsOptions,
74+
logger: r.Logger,
75+
scenario: r.ScenarioID,
76+
scenarioRunConfig: r.scenarioRunConfig,
77+
clientOptions: r.ClientOptions,
78+
metricsOptions: r.metricsOptions,
8779
}
8880
scenarioErr := scenarioRunner.run(ctx)
8981
cancel()

cmd/cli/run_worker.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ func (r *workerRunner) addCLIFlags(fs *pflag.FlagSet) {
5151
fs.StringVar(&r.EmbeddedServerAddress, "embedded-server-address", "", "Address to bind local embedded server to")
5252
fs.IntVar(&r.TaskQueueIndexSuffixStart, "task-queue-suffix-index-start", 0, "Inclusive start for task queue suffix range")
5353
fs.IntVar(&r.TaskQueueIndexSuffixEnd, "task-queue-suffix-index-end", 0, "Inclusive end for task queue suffix range")
54-
r.ClientOptions.AddCLIFlags(fs)
55-
r.MetricsOptions.AddCLIFlags(fs, "worker-")
56-
r.WorkerOptions.AddCLIFlags(fs, "worker-")
54+
fs.AddFlagSet(r.ClientOptions.FlagSet())
55+
fs.AddFlagSet(r.MetricsOptions.FlagSet("worker-"))
56+
fs.AddFlagSet(r.WorkerOptions.FlagSet())
5757
}
5858

5959
func (r *workerRunner) preRun() {

cmd/cli/util.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package cli
22

33
import (
4+
"crypto/rand"
5+
"encoding/hex"
46
"fmt"
57
"path/filepath"
68
"runtime"
@@ -16,3 +18,13 @@ func getRepoDir() (string, error) {
1618
repoDir := filepath.Dir(cmdDir) // project root
1719
return repoDir, nil
1820
}
21+
22+
// generateExecutionID generates a random execution ID to uniquely identify this particular
23+
// execution of a scenario. This ensures no two executions with the same RunID collide.
24+
func generateExecutionID() (string, error) {
25+
bytes := make([]byte, 8) // 8 bytes = 16 hex characters
26+
if _, err := rand.Read(bytes); err != nil {
27+
return "", err
28+
}
29+
return hex.EncodeToString(bytes), nil
30+
}

cmd/cli/util_nonwindows.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

0 commit comments

Comments
 (0)