Skip to content

docs: add jetstream consumer defaults design#168

Open
mliu33 wants to merge 12 commits intomainfrom
claude/jetstream-consumer-config-JTIKh
Open

docs: add jetstream consumer defaults design#168
mliu33 wants to merge 12 commits intomainfrom
claude/jetstream-consumer-config-JTIKh

Conversation

@mliu33
Copy link
Copy Markdown
Collaborator

@mliu33 mliu33 commented May 8, 2026

Standardizes durable consumer config across worker services via a
shared pkg/stream helper plus per-service MaxAckPending recommendations.

Summary by CodeRabbit

  • Documentation
    • Added a design spec that standardizes durable JetStream consumer defaults and per-service sizing to improve consistency and reliability.
  • Chores
    • Worker services now derive durable consumer settings from centralized defaults while keeping service-specific pending limits, routing filters, and retry/backoff behaviors.
  • Tests
    • Added unit tests validating the shared defaults and each service’s constructed consumer configuration.

Standardizes durable consumer config across worker services via a
shared pkg/stream helper plus per-service MaxAckPending recommendations.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 8, 2026

📝 Walkthrough

Walkthrough

Adds pkg/stream.DurableConsumerDefaults() and exported default constants; refactors workers to use per-service buildConsumerConfig() helpers derived from those defaults (setting Durable and per-service MaxAckPending), replaces inline consumer literals with helpers, adds unit tests, and adds design/plans/spec docs.

Changes

JetStream Consumer Defaults Design & Implementation

Layer / File(s) Summary
Stream defaults
pkg/stream/consumer.go
Introduce ConsumerSettings and DurableConsumerDefaults() returning a prefilled jetstream.ConsumerConfig; callers must set Durable/FilterSubjects/MaxAckPending.
Docs / Plans / Spec
docs/superpowers/specs/..., docs/superpowers/plans/...
Add design spec and implementation plan documenting baseline defaults, per-service MaxAckPending sizing, tasks per worker, rollout order, tests, and risks.
Per-service helpers & Wiring
broadcast-worker/main.go, inbox-worker/main.go, message-gatekeeper/main.go, message-worker/main.go, notification-worker/main.go, room-worker/main.go, search-sync-worker/main.go
Replace inline consumer literals with unexported buildConsumerConfig helpers that call stream.DurableConsumerDefaults(), then set Durable, MaxAckPending, and preserve service-specific overrides (FilterSubjects, BackOff).
Tests
pkg/stream/consumer_test.go, */consumer_config_test.go
Add unit tests validating DurableConsumerDefaults() propagation and env parsing defaults/overrides, plus per-service tests asserting constructed consumer configs (durable name, MaxAckPending, ack/delivery defaults, filters/backoff where applicable).

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • hmchangw/chat#158: Related to inbox-worker FilterSubjects usage and subject helper introduced earlier.
  • hmchangw/chat#95: Previously added message-worker MaxRedeliver/MaxDeliver logic that this PR replaces/standardizes.
  • hmchangw/chat#64: Related to search-sync-worker consumer/backoff behavior and consumer wiring changes.

Poem

🐰 I hopped through defaults, small and spry,
A helper stitched the configs neat and dry.
Workers now borrow a common brew,
Tests hum softly, docs nod too.
A rabbit cheers: coherent streams — woohoo!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 32.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately summarizes the main objective: introducing and documenting JetStream consumer defaults design across worker services.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/jetstream-consumer-config-JTIKh

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/superpowers/specs/2026-05-08-jetstream-consumer-defaults-design.md`:
- Around line 21-23: Update the sentence that currently claims "No runtime
impact on existing durable consumers" to explicitly qualify that only the
consumer cursor position is preserved by DeliverPolicy, and that runtime
redelivery behavior can change if MaxDeliver is modified; reference the symbols
DeliverPolicy, MaxDeliver and durable consumers and add a short note that
reducing MaxDeliver (e.g., from unlimited to 5) will limit subsequent
redeliveries for existing durables even though their cursor positions are
unchanged so migrations or operator notice may be required.
- Line 183: The statement on line 183 claiming tests don't rely on unlimited
redeliveries is incorrect because several tests create consumers without setting
MaxDeliver (e.g., ConsumerConfig instances in tools/loadgen/integration_test.go,
search-sync-worker/integration_test.go, and inbox_integration_test.go), so
update the doc and/or tests: either modify those tests to set an explicit
MaxDeliver (for example MaxDeliver: 6) and run/record verification that they
pass with that limit, or change the design doc sentence to read that "test
consumers will be updated to use explicit MaxDeliver limits instead of relying
on defaults" and list the affected test files
(tools/loadgen/integration_test.go, search-sync-worker/integration_test.go,
inbox_integration_test.go) so readers know which consumers must be changed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f6c03213-b4a7-403d-91c1-905393724156

📥 Commits

Reviewing files that changed from the base of the PR and between 127d3e0 and c2546e2.

📒 Files selected for processing (1)
  • docs/superpowers/specs/2026-05-08-jetstream-consumer-defaults-design.md

- `AckPolicy` is unchanged.
- `AckWait = 30s` matches the prior NATS default.
- `MaxDeliver = 5` is permissive enough for any test that previously
relied on default unlimited redeliveries (none exist in the codebase).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Read-only verification: surface potential retry/redelivery dependencies.
set -euo pipefail

echo "== Consumer retry-related config usage =="
rg -n --iglob '*.go' 'MaxDeliver|MaxRedeliver|AckWait|BackOff|CreateOrUpdateConsumer\('

echo
echo "== Deploy/env references to MAX_REDELIVER =="
rg -n --iglob '*.{yml,yaml,tpl,env,sh,md}' 'MAX_REDELIVER|MaxRedeliver'

echo
echo "== Tests with explicit redelivery/retry expectations =="
rg -n --iglob '*_test.go' 'redeliver|retry|MaxDeliver|BackOff|Nak\(|Term\('

Repository: hmchangw/chat

Length of output: 5166


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Test files creating ConsumerConfig without explicit MaxDeliver =="
# Search for ConsumerConfig{} or ConsumerConfig{ blocks in test files to see patterns
rg -A 15 'ConsumerConfig\{' --iglob '*_test.go' | head -100

echo
echo "== Check message-worker main.go MaxRedeliver usage =="
cat -n message-worker/main.go | sed -n '30,125p'

Repository: hmchangw/chat

Length of output: 11630


Line 183's "none exist in the codebase" claim is unsupported—test files create consumers without MaxDeliver.

The design doc claims tests don't rely on unlimited redeliveries defaults, but test code shows otherwise. ConsumerConfig creations in tools/loadgen/integration_test.go, search-sync-worker/integration_test.go, and multiple search-sync-worker/inbox_integration_test.go tests omit the MaxDeliver field, meaning they implicitly rely on JetStream's default unlimited redeliveries behavior during test execution.

Either add verification showing these tests pass with explicit MaxDeliver limits (e.g., MaxDeliver: 6), or revise the claim to "test consumers will be updated to use explicit MaxDeliver limits instead of relying on defaults."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/superpowers/specs/2026-05-08-jetstream-consumer-defaults-design.md` at
line 183, The statement on line 183 claiming tests don't rely on unlimited
redeliveries is incorrect because several tests create consumers without setting
MaxDeliver (e.g., ConsumerConfig instances in tools/loadgen/integration_test.go,
search-sync-worker/integration_test.go, and inbox_integration_test.go), so
update the doc and/or tests: either modify those tests to set an explicit
MaxDeliver (for example MaxDeliver: 6) and run/record verification that they
pass with that limit, or change the design doc sentence to read that "test
consumers will be updated to use explicit MaxDeliver limits instead of relying
on defaults" and list the affected test files
(tools/loadgen/integration_test.go, search-sync-worker/integration_test.go,
inbox_integration_test.go) so readers know which consumers must be changed.

claude added 11 commits May 8, 2026 16:16
9-task TDD plan to add a pkg/stream helper for durable consumer
defaults and apply per-service MaxAckPending recommendations across
all 7 worker services.
…t reduction

The unified MaxDeliver=5 reduces message-worker's retry budget by 1
attempt (was 6, now 5 total deliveries). This is intentional as part
of standardizing the project-wide default; the prior wording "no
operational behavior change" was inaccurate.
Add stream.ConsumerSettings (AckWait, MaxDeliver, MaxWaiting,
MaxAckPending) and parameterize DurableConsumerDefaults to take it as
input. Each service's Config gains:

    Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"`

driven by CONSUMER_ACK_WAIT, CONSUMER_MAX_DELIVER, CONSUMER_MAX_WAITING,
and CONSUMER_MAX_ACK_PENDING. Defaults: 30s, 5, 512, 1000 — applied
uniformly via the envDefault tags on ConsumerSettings.

Architectural invariants (AckPolicy=Explicit, DeliverPolicy=New) remain
hardcoded in the helper. Per-service buildConsumerConfig functions no
longer hardcode MaxAckPending; operators tune per-service values via the
service's deployment env.

DEPLOY-CONFIG DEPENDENCY: prior code shipped per-service MaxAckPending
defaults (gatekeeper/broadcast=1000, message/notification/search-sync=500,
room=200, inbox=100). The new code-level default is 1000 across the
board. To preserve current production behavior, set CONSUMER_MAX_ACK_PENDING
explicitly in the K8s manifests for message-worker (500),
notification-worker (500), search-sync-worker (500), room-worker (200),
and inbox-worker (100).
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
notification-worker/consumer_config_test.go (1)

31-44: ⚡ Quick win

Re-verify architectural invariants in the second subtest.

The "overrides flow through" subtest validates that custom settings propagate correctly but omits assertions for the fixed policies (AckPolicy, DeliverPolicy). Re-checking these invariants would confirm they remain independent of the settings values.

✅ Add invariant assertions for robustness
 	assert.Equal(t, "notification-worker", cc.Durable)
 	assert.Equal(t, 500, cc.MaxAckPending)
+	assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy)
 	assert.Equal(t, 45*time.Second, cc.AckWait)
 	assert.Equal(t, 3, cc.MaxDeliver)
 	assert.Equal(t, 256, cc.MaxWaiting)
+	assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
 })
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@notification-worker/consumer_config_test.go` around lines 31 - 44, The test
"overrides flow through" omits asserting immutable policies; update the subtest
to also assert that buildConsumerConfig(stream.ConsumerSettings{...}) returns cc
with the expected invariant policies (check cc.AckPolicy and cc.DeliverPolicy)
so these remain unchanged when AckWait/MaxDeliver/MaxWaiting/MaxAckPending are
overridden; locate the test and add assertions comparing cc.AckPolicy and
cc.DeliverPolicy to the expected constant values used elsewhere in the codebase.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@notification-worker/consumer_config_test.go`:
- Around line 31-44: The test "overrides flow through" omits asserting immutable
policies; update the subtest to also assert that
buildConsumerConfig(stream.ConsumerSettings{...}) returns cc with the expected
invariant policies (check cc.AckPolicy and cc.DeliverPolicy) so these remain
unchanged when AckWait/MaxDeliver/MaxWaiting/MaxAckPending are overridden;
locate the test and add assertions comparing cc.AckPolicy and cc.DeliverPolicy
to the expected constant values used elsewhere in the codebase.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ca943dd8-a7ed-4dfd-915d-17295df8d070

📥 Commits

Reviewing files that changed from the base of the PR and between 7888f0a and a618e9b.

📒 Files selected for processing (16)
  • broadcast-worker/consumer_config_test.go
  • broadcast-worker/main.go
  • inbox-worker/consumer_config_test.go
  • inbox-worker/main.go
  • message-gatekeeper/consumer_config_test.go
  • message-gatekeeper/main.go
  • message-worker/consumer_config_test.go
  • message-worker/main.go
  • notification-worker/consumer_config_test.go
  • notification-worker/main.go
  • pkg/stream/consumer.go
  • pkg/stream/consumer_test.go
  • room-worker/consumer_config_test.go
  • room-worker/main.go
  • search-sync-worker/consumer_config_test.go
  • search-sync-worker/main.go
🚧 Files skipped from review as they are similar to previous changes (6)
  • room-worker/consumer_config_test.go
  • message-worker/consumer_config_test.go
  • message-gatekeeper/consumer_config_test.go
  • room-worker/main.go
  • search-sync-worker/main.go
  • message-worker/main.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants