-
Notifications
You must be signed in to change notification settings - Fork 0
feat(loadgen): implement load test harness for messaging workers #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
c470c55
docs: add load test messaging workers design
claude 70502fd
docs: add load test messaging workers implementation plan
claude 182ef25
feat(loadgen): scaffold main.go with subcommand dispatch
claude b6e9ac1
feat(loadgen): add Preset type and four built-in presets
claude 354afa0
test(loadgen): guard preset lookup ok in uniform/realistic shape tests
claude 2ae8310
feat(loadgen): deterministic fixture generation from (preset, seed)
claude 7cd9a80
test(loadgen): drop unused default branch in realistic room-type switch
claude 3df2cd6
fix(loadgen): address gocritic/errcheck findings in preset.go
claude 4641d98
refactor(loadgen): pass Preset by pointer; revert lint config bump
claude b53be6b
test(loadgen): cover pickMembers padding and sampleWithoutReplacement…
claude 9a9b6bc
feat(loadgen): Seed and Teardown mongo collections from fixtures
claude 3787437
feat(loadgen): Prometheus registry with loadgen collectors
claude 68e48b3
feat(loadgen): collector correlates publishes with replies and broadc…
claude 3d483b8
fix(loadgen): close race in Collector samples; add coverage tests
claude c10f31b
feat(loadgen): percentiles, summary printer, CSV export, exit code
claude 9ef41ef
test(loadgen): drop redundant nolint; _test.go is already excluded fr…
claude a5d86e5
feat(loadgen): open-loop generator with injected publisher
claude 7e79a79
fix(loadgen): clear Collector orphans on publish failure; tighten tests
claude 2bad977
feat(loadgen): JetStream consumer-lag sampler
claude 9c8d962
fix(loadgen): warn (not debug) on consumer poll errors; document Snap…
claude 021a409
feat(loadgen): wire seed/run/teardown subcommands in main.go
claude eac94f2
fix(loadgen): skip byReqID in canonical mode to avoid false missing-r…
claude b4ea921
feat(loadgen): docker-compose harness, Dockerfile, grafana dashboard
claude feb4c19
fix(loadgen): drop NATS scrape job (port 8222 serves JSON, not Promet…
claude d3b1e54
feat(loadgen): scoped Makefile for harness
claude 6084ba7
test(loadgen): integration test for end-to-end wiring
claude dd19404
docs(loadgen): add operator README
claude 69c0eab
test(loadgen): add unit tests for main helpers and sampler Snapshot
claude 57d9f93
fix(loadgen): address final review — indexes, canonical rate, DM broa…
claude 1905810
refactor(loadgen): simplify pass — pre-compute content, unify handler…
claude eb8eea8
fix(loadgen): split sent counter into warmup/measured phases for clea…
claude fdde0d0
fix(loadgen): index users.account so broadcast-worker enrichment isn'…
claude 54acee8
perf(loadgen): dispatch publishes to worker pool; add opt-in pprof
claude 45ff2ad
Merge branch 'main' into claude/load-test-messaging-workers-tDKZn
hmchangw 8a9e64d
fix: group to channel
hmchangw 6ef91ee
fix linting
hmchangw File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
2,780 changes: 2,780 additions & 0 deletions
2,780
docs/superpowers/plans/2026-04-21-load-test-messaging-workers.md
Large diffs are not rendered by default.
Oops, something went wrong.
620 changes: 620 additions & 0 deletions
620
docs/superpowers/specs/2026-04-21-load-test-messaging-workers-design.md
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| # loadgen | ||
|
|
||
| Capacity-baseline load generator for the single-site messaging pipeline | ||
| (`message-gatekeeper` → `MESSAGES_CANONICAL` → `message-worker` + | ||
| `broadcast-worker`). Single Go binary with three subcommands. | ||
|
|
||
| ## Quick start | ||
|
|
||
| ``` | ||
| make -C tools/loadgen/deploy up | ||
| make -C tools/loadgen/deploy seed PRESET=medium | ||
| make -C tools/loadgen/deploy run PRESET=medium RATE=500 DURATION=60s | ||
| ``` | ||
|
|
||
| For live dashboards: | ||
|
|
||
| ``` | ||
| make -C tools/loadgen/deploy run-dashboards PRESET=medium | ||
| # Grafana at http://localhost:3000 (anonymous admin) | ||
| ``` | ||
|
|
||
| Tear down: | ||
|
|
||
| ``` | ||
| make -C tools/loadgen/deploy down | ||
| ``` | ||
|
|
||
| ## Presets | ||
|
|
||
| | preset | users | rooms | notes | | ||
| |-------------|--------|-------|--------------------------------------------------------| | ||
| | `small` | 10 | 5 | uniform, 200-byte content | | ||
| | `medium` | 1 000 | 100 | uniform, 200-byte content | | ||
| | `large` | 10 000 | 1 000 | uniform, 200-byte content | | ||
| | `realistic` | 1 000 | 100 | Zipf senders, mixed room sizes, 50–2000 bytes, mentions| | ||
|
|
||
| ## Subcommands | ||
|
|
||
| - `loadgen seed --preset=<name> [--seed=42]` — idempotently populate | ||
| MongoDB with deterministic fixtures. | ||
| - `loadgen run --preset=<name> [flags]` — open-loop publish at `--rate` | ||
| msgs/sec for `--duration`, print a summary at the end. Flags: | ||
| `--seed`, `--warmup`, `--inject=frontdoor|canonical`, `--csv=<path>`. | ||
| - `loadgen teardown` — drop the three seeded collections. | ||
|
|
||
| ## Reading the summary | ||
|
|
||
| - `final_pending == 0` on both durables, zero errors → the pipeline is | ||
| sustaining your target rate. | ||
| - `final_pending` climbing, or error counts > 0 → over capacity or a | ||
| regression upstream of the worker. | ||
|
|
||
| ## Non-goals | ||
|
|
||
| - Not a CI regression gate. Invoked manually. | ||
| - Not an auth benchmark. Uses shared `backend.creds`. | ||
| - Not a cross-site benchmark. Single-site only. | ||
| - Not an absolute-number tool. Numbers vary by host — compare within one | ||
| machine across changes, don't compare across machines. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "sort" | ||
| "sync" | ||
| "time" | ||
| ) | ||
|
|
||
| type publishEntry struct { | ||
| publishedAt time.Time | ||
| } | ||
|
|
||
| // sample pairs a latency with its publish timestamp so warmup can discard by time. | ||
| type sample struct { | ||
| publishedAt time.Time | ||
| latency time.Duration | ||
| } | ||
|
|
||
| // Collector correlates publishes with replies (E1) and broadcasts (E2). | ||
| type Collector struct { | ||
| m *Metrics | ||
| preset string | ||
| mu sync.Mutex | ||
| byReqID map[string]publishEntry | ||
| byMsgID map[string]publishEntry | ||
| e1 []sample | ||
| e2 []sample | ||
| } | ||
|
|
||
| // NewCollector returns a ready-to-use Collector. | ||
| func NewCollector(m *Metrics, preset string) *Collector { | ||
| return &Collector{ | ||
| m: m, preset: preset, | ||
| byReqID: make(map[string]publishEntry), | ||
| byMsgID: make(map[string]publishEntry), | ||
| } | ||
| } | ||
|
|
||
| // RecordPublish stores the publish time under both correlation keys. | ||
| func (c *Collector) RecordPublish(requestID, messageID string, t time.Time) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| c.byReqID[requestID] = publishEntry{publishedAt: t} | ||
| c.byMsgID[messageID] = publishEntry{publishedAt: t} | ||
| } | ||
|
|
||
| // RecordReply consumes one pending publish keyed by requestID. | ||
| func (c *Collector) RecordReply(requestID string, at time.Time) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| e, ok := c.byReqID[requestID] | ||
| if !ok { | ||
| return | ||
| } | ||
| delete(c.byReqID, requestID) | ||
| d := at.Sub(e.publishedAt) | ||
| c.e1 = append(c.e1, sample{publishedAt: e.publishedAt, latency: d}) | ||
| c.m.E1Latency.WithLabelValues(c.preset).Observe(d.Seconds()) | ||
| } | ||
|
|
||
| // RecordPublishBroadcastOnly stores only the message-ID correlation, for | ||
| // injection modes that bypass the gatekeeper (no reply is expected). | ||
| func (c *Collector) RecordPublishBroadcastOnly(messageID string, t time.Time) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| c.byMsgID[messageID] = publishEntry{publishedAt: t} | ||
| } | ||
|
|
||
| // RecordPublishFailed removes entries previously stored by RecordPublish. | ||
| // Use when the publish itself failed (message never reached NATS) so the | ||
| // orphans do not inflate Finalize's missing-reply / missing-broadcast counts. | ||
| func (c *Collector) RecordPublishFailed(requestID, messageID string) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| delete(c.byReqID, requestID) | ||
| delete(c.byMsgID, messageID) | ||
| } | ||
|
|
||
| // RecordBroadcast consumes one pending publish keyed by messageID. | ||
| func (c *Collector) RecordBroadcast(messageID string, at time.Time) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| e, ok := c.byMsgID[messageID] | ||
| if !ok { | ||
| return | ||
| } | ||
| delete(c.byMsgID, messageID) | ||
| d := at.Sub(e.publishedAt) | ||
| c.e2 = append(c.e2, sample{publishedAt: e.publishedAt, latency: d}) | ||
| c.m.E2Latency.WithLabelValues(c.preset).Observe(d.Seconds()) | ||
| } | ||
|
|
||
| // DiscardBefore drops any samples whose publish time is before cutoff (warmup). | ||
| func (c *Collector) DiscardBefore(cutoff time.Time) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| c.e1 = filterAtOrAfter(c.e1, cutoff) | ||
| c.e2 = filterAtOrAfter(c.e2, cutoff) | ||
| } | ||
|
|
||
| func filterAtOrAfter(in []sample, cutoff time.Time) []sample { | ||
| out := in[:0] | ||
| for i := range in { | ||
| if !in[i].publishedAt.Before(cutoff) { | ||
| out = append(out, in[i]) | ||
| } | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| // Finalize returns the count of unmatched publishes as missing replies and broadcasts. | ||
| func (c *Collector) Finalize() (missingReplies int, missingBroadcasts int) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| return len(c.byReqID), len(c.byMsgID) | ||
| } | ||
|
|
||
| // E1Count returns the number of matched E1 samples. | ||
| func (c *Collector) E1Count() int { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| return len(c.e1) | ||
| } | ||
|
|
||
| // E2Count returns the number of matched E2 samples. | ||
| func (c *Collector) E2Count() int { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| return len(c.e2) | ||
| } | ||
|
|
||
| // E1Samples returns a sorted copy of E1 latencies for tests/reporting. | ||
| func (c *Collector) E1Samples() []time.Duration { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| return c.snapshotLatenciesLocked(c.e1) | ||
| } | ||
|
|
||
| // E2Samples returns a sorted copy of E2 latencies for tests/reporting. | ||
| func (c *Collector) E2Samples() []time.Duration { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| return c.snapshotLatenciesLocked(c.e2) | ||
| } | ||
|
|
||
| // snapshotLatenciesLocked copies and sorts latencies from in. | ||
| // Callers must hold c.mu before calling this method. | ||
| func (c *Collector) snapshotLatenciesLocked(in []sample) []time.Duration { | ||
| out := make([]time.Duration, len(in)) | ||
| for i := range in { | ||
| out[i] = in[i].latency | ||
| } | ||
| sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) | ||
| return out | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add unit tests for the new exported helpers.
As per coding guidelines: "Every exported function in
pkg/must have corresponding test cases." Please add tests forUserResponseWildcard,RoomEventWildcard, andUserRoomEventWildcardinpkg/subject/subject_test.goasserting the exact returned strings.🤖 Prompt for AI Agents