Skip to content

Commit 5eeb42b

Browse files
committed
refactor(streams): inbox-worker owns INBOX schema; remove search-sync-worker scaffolding
inbox-worker is now the sole owner of INBOX_{siteID} schema bootstrap. Its bootstrapStreams helper passes Name + Subjects from pkg/stream.Inbox, matching the canonical schema. Federation (Sources + SubjectTransforms) remains owned by ops/IaC and never appears in app code. search-sync-worker stops bootstrapping INBOX. The dead scaffolding inboxBootstrapStreamConfig (originally added with PR #109 and flagged in its own doc comment as a temporary stand-in until inbox-worker migrated) is deleted along with its dedicated unit-test file. The RemoteSiteIDs config field is removed since its only consumer was the deleted function. The service's bootstrap loop now skips INBOX while continuing to create consumers for INBOX-based collections (spotlight, user-room). CLAUDE.md gains an explicit ownership rule under "JetStream Streams" documenting that app code owns Name + Subjects and ops/IaC owns Sources + SubjectTransforms. Tests: inbox-worker bootstrap test now asserts Subjects matches pkg/stream.Inbox(siteID).Subjects and Sources is empty (regression guard against re-introducing federation in app code). https://claude.ai/code/session_015Cu3UPeWDU4DaJwP7JZtvc
1 parent 4827f20 commit 5eeb42b

6 files changed

Lines changed: 43 additions & 134 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ All commands are wrapped in the root Makefile. Always use `make` targets — nev
216216
- `OUTBOX_{siteID}` — Cross-site outbound events
217217
- `INBOX_{siteID}` — Cross-site inbound events (sourced from remote OUTBOX)
218218
- **Stream bootstrap is opt-in.** Services that consume from or publish to a stream MUST NOT create it in production — streams are owned by ops/IaC. Each such service's `config` includes `Bootstrap bootstrapConfig` (env prefix `BOOTSTRAP_`) with a single `Enabled` field tagged `env:"STREAMS" envDefault:"false"`. The service's `bootstrap.go` defines a `bootstrapStreams(ctx, js, siteID, enabled) error` helper that no-ops when `Enabled=false`. Local `deploy/docker-compose.yml` sets `BOOTSTRAP_STREAMS=true` so any service can stand up against a fresh NATS in dev. New services that interact with JetStream MUST follow this convention.
219+
- **Stream bootstrap ownership.** When a service does bootstrap a stream in dev, the helper sets ONLY the stream's schema — `Name + Subjects` from `pkg/stream.<Stream>(siteID)`. Federation config (`Sources` + `SubjectTransforms` for cross-site sourcing) is owned by ops/IaC and MUST NOT appear in any service's `bootstrap.go`. INBOX has a single owning service (`inbox-worker`); other services that consume from INBOX (e.g., `search-sync-worker`) skip it in their bootstrap loop and rely on `inbox-worker` to create the stream.
219220

220221
### MongoDB
221222
- Never use ORMs (no GORM, no ent) — use native drivers directly

inbox-worker/bootstrap.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,24 @@ type streamCreator interface {
3030
CreateOrUpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (oteljetstream.Stream, error)
3131
}
3232

33-
// bootstrapStreams creates the JetStream streams this service consumes from.
34-
// No-op when enabled is false (the production path) — streams are owned by
35-
// ops/IaC. In dev/integration the local docker-compose sets
33+
// bootstrapStreams creates the JetStream INBOX stream this service consumes
34+
// from. No-op when enabled is false (the production path) — streams are owned
35+
// by ops/IaC there. In dev/integration the local docker-compose sets
3636
// BOOTSTRAP_STREAMS=true so a developer can stand the service up in isolation
3737
// against a fresh NATS instance.
3838
//
39-
// Note: Subjects is intentionally omitted from the StreamConfig. In production
40-
// the INBOX stream's Sources and SubjectTransforms (cross-site OUTBOX→INBOX
41-
// sourcing) are configured by ops/IaC. This helper only creates the stream
42-
// with its Name so the consumer can be attached; the sourcing config is
43-
// managed externally and must not be overwritten here.
39+
// Ownership rule: this helper sets only the stream schema (Name + Subjects)
40+
// from pkg/stream.Inbox. Federation config (Sources + SubjectTransforms for
41+
// cross-site OUTBOX→INBOX sourcing) belongs to ops/IaC and is layered on in
42+
// production. App code never sets it.
4443
func bootstrapStreams(ctx context.Context, js streamCreator, siteID string, enabled bool) error {
4544
if !enabled {
4645
return nil
4746
}
4847
inboxCfg := stream.Inbox(siteID)
4948
if _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
50-
Name: inboxCfg.Name,
49+
Name: inboxCfg.Name,
50+
Subjects: inboxCfg.Subjects,
5151
}); err != nil {
5252
return fmt.Errorf("create INBOX stream: %w", err)
5353
}

inbox-worker/bootstrap_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/stretchr/testify/require"
1111

1212
"github.com/Marz32onE/instrumentation-go/otel-nats/oteljetstream"
13+
14+
"github.com/hmchangw/chat/pkg/stream"
1315
)
1416

1517
type fakeStreamCreator struct {
@@ -42,7 +44,7 @@ func TestBootstrapStreams(t *testing.T) {
4244
wantCreated: nil,
4345
},
4446
{
45-
name: "enabled - creates INBOX",
47+
name: "enabled - creates INBOX with Name and Subjects",
4648
enabled: true,
4749
wantCreated: []string{"INBOX_test"},
4850
},
@@ -66,13 +68,16 @@ func TestBootstrapStreams(t *testing.T) {
6668
}
6769
require.NoError(t, err)
6870
require.Len(t, fake.created, len(tc.wantCreated))
71+
wantSubjects := stream.Inbox("test").Subjects
6972
for i, wantName := range tc.wantCreated {
7073
assert.Equal(t, wantName, fake.created[i].Name)
71-
// The INBOX stream is created Name-only; ops/IaC owns
72-
// cross-site Sources + SubjectTransforms. Lock that
73-
// invariant in the test so a future "fix" that adds
74-
// Subjects fails loudly here.
75-
assert.Empty(t, fake.created[i].Subjects, "INBOX bootstrap must not set Subjects")
74+
// App owns the schema (Name + Subjects). Federation
75+
// (Sources + SubjectTransforms) belongs to ops/IaC and
76+
// must not appear here.
77+
assert.Equal(t, wantSubjects, fake.created[i].Subjects,
78+
"INBOX bootstrap must set Subjects from pkg/stream.Inbox")
79+
assert.Empty(t, fake.created[i].Sources,
80+
"federation Sources are owned by ops/IaC and must not be set in app code")
7681
}
7782
})
7883
}

search-sync-worker/inbox_stream.go

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,47 +11,6 @@ import (
1111
"github.com/hmchangw/chat/pkg/subject"
1212
)
1313

14-
// inboxBootstrapStreamConfig returns the INBOX stream config with cross-site
15-
// Sources + SubjectTransforms layered on top of the canonical baseline from
16-
// pkg/stream.Inbox. Only used from the bootstrap path in main.go when
17-
// BOOTSTRAP_STREAMS=true. In production, inbox-worker owns INBOX creation
18-
// and search-sync-worker never calls this.
19-
//
20-
// Federation mechanics: for each remote site we add a StreamSource pointing
21-
// at `OUTBOX_{remote}` with a SubjectTransform whose `Source` field acts
22-
// as both the filter and the rewrite source — `outbox.{remote}.to.{siteID}.>`
23-
// is matched against the upstream OUTBOX and rewritten to
24-
// `chat.inbox.{siteID}.aggregate.>` on ingest. (NATS JetStream forbids
25-
// setting `FilterSubject` and `SubjectTransforms` together on the same
26-
// source — they are mutually exclusive options; the transform's `Source`
27-
// covers the filter responsibility.) This lets consumers tell local events
28-
// apart from federated ones by the presence of the `aggregate` segment.
29-
//
30-
// This helper stays local to search-sync-worker because it's bootstrap-only
31-
// — inbox-worker will own an equivalent construction (as a proper feature,
32-
// not a test toggle) when it migrates in its own PR.
33-
//
34-
// Requires NATS Server 2.10+ for SubjectTransforms support.
35-
func inboxBootstrapStreamConfig(siteID string, remoteSiteIDs []string) jetstream.StreamConfig {
36-
baseline := stream.Inbox(siteID)
37-
destPattern := fmt.Sprintf("chat.inbox.%s.aggregate.>", siteID)
38-
sources := make([]*jetstream.StreamSource, 0, len(remoteSiteIDs))
39-
for _, remote := range remoteSiteIDs {
40-
sourcePattern := fmt.Sprintf("outbox.%s.to.%s.>", remote, siteID)
41-
sources = append(sources, &jetstream.StreamSource{
42-
Name: fmt.Sprintf("OUTBOX_%s", remote),
43-
SubjectTransforms: []jetstream.SubjectTransformConfig{
44-
{Source: sourcePattern, Destination: destPattern},
45-
},
46-
})
47-
}
48-
return jetstream.StreamConfig{
49-
Name: baseline.Name,
50-
Subjects: baseline.Subjects,
51-
Sources: sources,
52-
}
53-
}
54-
5514
// inboxMemberCollection is the shared base for collections that index
5615
// subscription lifecycle events (member_added, member_removed) off the
5716
// INBOX stream. It centralizes stream config and subject filters so
@@ -60,9 +19,9 @@ func inboxBootstrapStreamConfig(siteID string, remoteSiteIDs []string) jetstream
6019
//
6120
// The stream name + local subject pattern come straight from pkg/stream.Inbox
6221
// so there's one canonical definition for every consumer of INBOX.
63-
// Cross-site federation (Sources + SubjectTransforms) is a deployment
64-
// concern owned by whichever service creates the INBOX stream and is
65-
// layered on separately — see inboxBootstrapStreamConfig.
22+
// inbox-worker owns INBOX schema bootstrap; cross-site federation (Sources
23+
// + SubjectTransforms) is owned by ops/IaC. search-sync-worker is a pure
24+
// consumer of INBOX.
6625
type inboxMemberCollection struct{}
6726

6827
func (b *inboxMemberCollection) StreamConfig(siteID string) jetstream.StreamConfig {

search-sync-worker/inbox_stream_test.go

Lines changed: 0 additions & 47 deletions
This file was deleted.

search-sync-worker/main.go

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,22 @@ import (
2121

2222
// bootstrapConfig groups every field that is ONLY meaningful when the worker
2323
// is being stood up in dev or integration tests without its normal upstream
24-
// services. In production none of these fields should be set — streams are
25-
// owned by their publisher services (message-gatekeeper for
26-
// MESSAGES_CANONICAL, inbox-worker for INBOX) and search-sync-worker only
27-
// manages its own durable consumers.
24+
// services. In production Enabled must remain false — streams are owned by
25+
// their publisher services (message-gatekeeper for MESSAGES_CANONICAL,
26+
// inbox-worker for INBOX) and search-sync-worker only manages its own
27+
// durable consumers.
28+
//
29+
// search-sync-worker NEVER bootstraps INBOX, even when Enabled=true; that
30+
// stream's schema is owned by inbox-worker and its federation by ops/IaC.
2831
//
2932
// Env vars in this group are all prefixed `BOOTSTRAP_` so they're easy to
3033
// spot in deployment manifests and obvious to grep.
3134
type bootstrapConfig struct {
3235
// Enabled (BOOTSTRAP_STREAMS) toggles whether the worker calls
3336
// CreateOrUpdateStream at startup for each collection's stream. Leave
34-
// false in production.
37+
// false in production. INBOX is intentionally excluded from this loop
38+
// — inbox-worker owns INBOX schema bootstrap.
3539
Enabled bool `env:"STREAMS" envDefault:"false"`
36-
// RemoteSiteIDs (BOOTSTRAP_REMOTE_SITE_IDS) lists the other sites whose
37-
// OUTBOX streams should be sourced into this site's INBOX when the
38-
// worker is creating it itself. Used to build the cross-site Sources +
39-
// SubjectTransforms config during bootstrap. Only consulted when
40-
// Enabled is true; unused in production.
41-
RemoteSiteIDs []string `env:"REMOTE_SITE_IDS" envSeparator:","`
4240
}
4341

4442
type config struct {
@@ -165,29 +163,22 @@ func main() {
165163
// we don't redundantly call CreateOrUpdateStream per collection.
166164
createdStreams := make(map[string]struct{}, len(collections))
167165

168-
// Canonical INBOX stream name, used below to decide when to layer on
169-
// cross-site Sources + SubjectTransforms during bootstrap.
166+
// INBOX is owned by inbox-worker — see the skip in the loop below.
170167
inboxName := stream.Inbox(cfg.SiteID).Name
171168

172169
for _, coll := range collections {
173170
streamCfg := coll.StreamConfig(cfg.SiteID)
174-
if cfg.Bootstrap.Enabled {
175-
bootstrapCfg := streamCfg
176-
// The INBOX stream is the only one that needs cross-site Sources
177-
// + SubjectTransforms. Collections return a minimal baseline
178-
// (name + local subjects from pkg/stream.Inbox) and the
179-
// bootstrap path layers on the federation config here, keeping
180-
// the cross-site topology out of the Collection type entirely.
181-
if streamCfg.Name == inboxName {
182-
bootstrapCfg = inboxBootstrapStreamConfig(cfg.SiteID, cfg.Bootstrap.RemoteSiteIDs)
183-
}
184-
if _, alreadyCreated := createdStreams[bootstrapCfg.Name]; !alreadyCreated {
185-
if _, err := js.CreateOrUpdateStream(ctx, bootstrapCfg); err != nil {
186-
slog.Error("create stream failed", "stream", bootstrapCfg.Name, "error", err)
171+
// Skip INBOX bootstrap — inbox-worker owns its schema, ops/IaC
172+
// owns its federation. Consumer creation still runs for
173+
// INBOX-based collections (spotlight, user-room).
174+
if cfg.Bootstrap.Enabled && streamCfg.Name != inboxName {
175+
if _, alreadyCreated := createdStreams[streamCfg.Name]; !alreadyCreated {
176+
if _, err := js.CreateOrUpdateStream(ctx, streamCfg); err != nil {
177+
slog.Error("create stream failed", "stream", streamCfg.Name, "error", err)
187178
os.Exit(1)
188179
}
189-
createdStreams[bootstrapCfg.Name] = struct{}{}
190-
slog.Info("stream bootstrapped", "stream", bootstrapCfg.Name)
180+
createdStreams[streamCfg.Name] = struct{}{}
181+
slog.Info("stream bootstrapped", "stream", streamCfg.Name)
191182
}
192183
}
193184

0 commit comments

Comments
 (0)