Skip to content

Commit 4f21bcd

Browse files
committed
docs(oplog-collections-transformer): tighten comments to the 2-line cap
Comment-only cleanup pass: collapse verbose godoc/WHY blocks to ≤2 lines across the collections-transformer mappers and the migration's added blocks in inbox-worker / pkg/model. No behavior change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
1 parent 979cb6c commit 4f21bcd

14 files changed

Lines changed: 69 additions & 139 deletions

File tree

data-migration/oplog-collections-transformer/bootstrap.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,8 @@ type streamManager interface {
1616
CreateOrUpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (oteljetstream.Stream, error)
1717
}
1818

19-
// bootstrapStreams is a no-op in production (Enabled=false): this service consumes from
20-
// MIGRATION_OPLOG (owned by the oplog-connector) and publishes to INBOX (owned by inbox-worker),
21-
// so it owns no streams. When Enabled (local dev / integration), it creates only the
22-
// MIGRATION_OPLOG_{siteID} schema (Name+Subjects) so the service can stand up against a fresh
23-
// NATS before the connector runs — createConsumerWithRetry still tolerates the stream's absence.
24-
// INBOX is never created here; inbox-worker owns it. Federation config stays ops/IaC-owned.
19+
// bootstrapStreams is a no-op in production (this service owns no streams). When Enabled
20+
// (dev/integration) it creates only the MIGRATION_OPLOG_{siteID} schema; inbox-worker owns INBOX.
2521
func bootstrapStreams(ctx context.Context, js streamManager, siteID string, enabled bool) error {
2622
if !enabled {
2723
return nil

data-migration/oplog-collections-transformer/classify.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,8 @@ type roomClass struct {
99
Reason string // exclusion reason (for metrics), set only when Excluded
1010
}
1111

12-
// classifyRoom maps a source rocketchat_rooms doc to a destination RoomType, or marks it
13-
// excluded. Inputs are the raw source fields:
14-
//
15-
// t — source room type: c, p, d, l, v
16-
// hasPrid — whether the room has a parent-room id (prid set) → discussion
17-
// hasTeamID — whether the room has a teamId (team room → plain channel; team-ness dropped)
18-
// participantCount — number of members (uids/usernames); for t=d, >2 is a group DM (excluded)
19-
// hasBot — for t=d with 2 participants, whether a participant is a bot (→ botDM)
20-
//
21-
// Rules (spec §4.2):
22-
//
23-
// c|p (no prid) → channel (no public/private distinction)
24-
// p with prid → discussion
25-
// d, 2 participants → dm, or botDM if hasBot
26-
// d, >2 participants → EXCLUDED "group_dm"
27-
// l → EXCLUDED "livechat"
28-
// v → EXCLUDED "voip"
29-
// teamId present → channel (team rooms migrate as plain channel; c/p branch already yields channel)
30-
// anything else → EXCLUDED "unknown_type"
12+
// classifyRoom maps a source room type t (+ prid/teamId/participant/bot signals) to a destination
13+
// RoomType or an exclusion (group_dm/livechat/voip/unknown_type). p+prid→discussion, team→channel. §4.2.
3114
func classifyRoom(t string, hasPrid, hasTeamID bool, hasBot bool, participantCount int) roomClass {
3215
// hasTeamID is accepted for caller clarity and future use; c/p branch already returns channel
3316
// regardless of teamId, so no separate branch is needed here.

data-migration/oplog-collections-transformer/handler.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,11 @@ type inboxPublisher interface {
3636

3737
// targetStore is the new-stack per-site Mongo access the transformer needs.
3838
type targetStore interface {
39-
// UpsertUserIfAbsent inserts the user keyed by account only when absent, leaving an
40-
// existing doc (owned by the company-wide sync) untouched. inserted reports whether a
41-
// new doc was created (false ⇒ already present), so the caller can meter seed outcome.
39+
// UpsertUserIfAbsent inserts the user keyed by account only when absent, leaving an existing
40+
// doc (owned by the company-wide sync) untouched. inserted is false when already present.
4241
UpsertUserIfAbsent(ctx context.Context, u model.User) (inserted bool, err error)
43-
// FindThreadRoom looks up the thread room derived from the given parent message id.
44-
// Returns roomID, threadRoomID, and siteID (the thread room's home site) when found.
45-
// Thread-subs inherit the room's site per spec §6.
42+
// FindThreadRoom resolves the thread room for a parent message id, returning roomID,
43+
// threadRoomID, and the thread room's home siteID (thread-subs inherit the room's site, §6).
4644
FindThreadRoom(ctx context.Context, parentMessageID string) (roomID, threadRoomID, siteID string, found bool, err error)
4745
FindUserID(ctx context.Context, account string) (userID string, found bool, err error)
4846
}
@@ -56,9 +54,8 @@ type handler struct {
5654
usersColl string
5755
pub inboxPublisher
5856
target targetStore
59-
// lookups re-read the current source doc on update events, keyed by source collection
60-
// name — each watched collection re-reads from ITS OWN collection (the connector forwards
61-
// only the delta on update). One SourceLookup per rooms/subs/threadsubs/users collection.
57+
// lookups re-read the current source doc on update events (the connector forwards only the
58+
// delta), keyed by source collection name — one SourceLookup per watched collection.
6259
lookups map[string]migration.SourceLookup
6360
metrics *metrics // nil-safe
6461
now func() int64 // injectable clock, defaults to time.Now().UTC().UnixMilli
@@ -72,9 +69,8 @@ func (h *handler) nowMillis() int64 {
7269
return time.Now().UTC().UnixMilli()
7370
}
7471

75-
// handle dispatches one decoded oplog event by collection. nil = ack+count;
76-
// migration.ErrSkipped = ack-without-counting (deliberate drop, already metered);
77-
// migration.ErrPoison => Term; any other error => Nak (transient).
72+
// handle dispatches one decoded oplog event by collection. nil = ack+count; ErrSkipped =
73+
// ack-without-count (already metered); ErrPoison => Term; any other error => Nak (transient).
7874
//
7975
//nolint:gocritic // ev passed by value: it's the decoded event the consume loop hands off, one per message off the hot path.
8076
func (h *handler) handle(ctx context.Context, ev oplogEvent) error {
@@ -95,10 +91,8 @@ func (h *handler) handle(ctx context.Context, ev oplogEvent) error {
9591
}
9692
}
9793

98-
// resolveDoc returns the full current source doc bytes for the event, or (nil, true, nil)
99-
// when the event should be skipped (delete: doc is gone & un-actionable; update lookup miss).
100-
// insert/replace carry the full doc inline; update re-reads by documentKey._id via the source
101-
// lookup (the connector forwards only the delta on update). delete is always skip.
94+
// resolveDoc returns the full current source doc for the event, or (nil, true, nil) to skip.
95+
// insert/replace carry the doc inline; update re-reads by documentKey._id; delete is always skip.
10296
//
10397
//nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path.
10498
func (h *handler) resolveDoc(ctx context.Context, ev oplogEvent) (doc []byte, skip bool, err error) {
@@ -151,9 +145,8 @@ func documentKeyID(documentKey json.RawMessage) (string, error) {
151145
return key.ID, nil
152146
}
153147

154-
// siteIDFromOrigin returns the record's home siteId. federation.origin absent or the literal
155-
// "local" → the deployment's siteID; otherwise the first dotted label of the origin domain
156-
// (e.g. "0030204.tchat-test..." → "0030204").
148+
// siteIDFromOrigin returns the record's home siteId: the deployment's siteID when origin is
149+
// absent or "local", else the first dotted label of the origin domain ("0030204.tchat..." → "0030204").
157150
func siteIDFromOrigin(origin, deploymentSiteID string) string {
158151
if origin == "" || origin == "local" {
159152
return deploymentSiteID

data-migration/oplog-collections-transformer/inboxpublisher.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ type jetstreamPublisher struct {
1818
publish func(ctx context.Context, msg *nats.Msg, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error)
1919
}
2020

21-
// Publish emits one InboxEvent onto the INBOX external lane for the event's type,
22-
// blocking on the pub-ack. The request id flows from ctx into the message headers via
23-
// natsutil.NewMsg so the transformer→inbox-worker hop shares one correlation id.
21+
// Publish emits one InboxEvent onto the INBOX external lane, blocking on the pub-ack. The request
22+
// id flows from ctx into the message headers (natsutil.NewMsg) so transformer→inbox-worker shares it.
2423
//
2524
//nolint:gocritic // model.InboxEvent passed by value: one per migrated record, off the hot path.
2625
func (p *jetstreamPublisher) Publish(ctx context.Context, evt model.InboxEvent) error {

data-migration/oplog-collections-transformer/main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,8 @@ func main() {
192192
)
193193
}
194194

195-
// processOne decodes one event and dispatches it, mapping the outcome to a JetStream disposition:
196-
// Ack on success, Term on poison (never redelivered), Nak-with-delay on transient up to maxDeliver
197-
// — then Termed with a distinct metric instead of JetStream's silent drop (see migration.IsFinalDelivery).
195+
// processOne decodes one event and maps its outcome to a JetStream disposition: Ack on success,
196+
// Term on poison, Nak-with-delay on transient up to maxDeliver, then Term-with-metric (not silent drop).
198197
func processOne(ctx context.Context, h *handler, m jetstream.Msg, mtr *metrics, maxDeliver, deleteMaxDeliver int) {
199198
// Stamp a correlation id once at entry; it flows via ctx into the inbox publish
200199
// (read from ctx through natsutil.NewMsg), so transformer→inbox-worker shares one request_id.

data-migration/oplog-collections-transformer/metrics.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ import (
99
"go.opentelemetry.io/otel/metric"
1010
)
1111

12-
// metrics holds the collections transformer's instruments: processed throughput plus the
13-
// nak/term/exhausted dispositions that flag a stuck or poison-heavy stream, the user-seed
14-
// outcome, and the thread-sub FK resolution misses. Nil-safe so unit tests run without a meter.
12+
// metrics holds the transformer's instruments: processed throughput, nak/term/exhausted
13+
// dispositions, user-seed outcome, and FK resolution misses. Nil-safe (tests run without a meter).
1514
type metrics struct {
1615
processed metric.Int64Counter
1716
naks metric.Int64Counter

data-migration/oplog-collections-transformer/rooms.go

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ import (
1313
"github.com/hmchangw/chat/pkg/natsutil"
1414
)
1515

16-
// sourceRoom is the subset of a rocketchat_rooms doc the mapper consumes. Decoded from the
17-
// connector's relaxed extended JSON via bson.UnmarshalExtJSON, same as the message mapper.
16+
// sourceRoom is the subset of a rocketchat_rooms doc the mapper decodes (relaxed extended JSON).
1817
type sourceRoom struct {
1918
ID string `bson:"_id"`
2019
T string `bson:"t"`
@@ -33,8 +32,7 @@ type sourceRoom struct {
3332
} `bson:"federation"`
3433
}
3534

36-
// updateDescription is the connector's update delta for a room change event. Only the
37-
// changed field *keys* matter for event selection, so the values are decoded as opaque any.
35+
// updateDescription is the connector's update delta; only changed field keys matter, values are opaque.
3836
type updateDescription struct {
3937
UpdatedFields map[string]any `bson:"updatedFields" json:"updatedFields"`
4038
RemovedFields []string `bson:"removedFields" json:"removedFields"`
@@ -56,9 +54,8 @@ func (r *sourceRoom) displayName() string {
5654
return r.Name
5755
}
5856

59-
// handleRoom maps a rocketchat_rooms change event to an inbox InboxEvent (room_sync /
60-
// room_renamed / room_restricted) per spec §4.2 / §4.0. Returns migration.ErrSkipped for
61-
// deletes, excluded room types, and update lookup misses (all metered).
57+
// handleRoom maps a rocketchat_rooms change event to an inbox InboxEvent (§4.2 / §4.0).
58+
// Returns migration.ErrSkipped for deletes, excluded room types, and update lookup misses.
6259
//
6360
//nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path.
6461
func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
@@ -94,9 +91,8 @@ func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
9491
return migration.ErrSkipped
9592
}
9693

97-
// Zero-guard: if the source timestamp is absent (zero), fall back to the handler's now()
98-
// so the room doc never carries a year-0001 UpdatedAt, keeping the UpsertRoom high-water-mark
99-
// guard functional for first and subsequent syncs.
94+
// Zero-guard an absent source timestamp with now() so the room doc never carries a year-0001
95+
// UpdatedAt, keeping the UpsertRoom high-water-mark guard functional.
10096
nowMillis := h.nowMillis()
10197
updatedAt := sr.UpdatedAt.UTC()
10298
if updatedAt.IsZero() {
@@ -134,12 +130,8 @@ func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
134130
return nil
135131
}
136132

137-
// roomEvents selects and builds the InboxEvents for the room change.
138-
//
139-
// - insert/replace → [room_sync]
140-
// - update, name/fname changed → [room_renamed, room_sync] (subs + room doc)
141-
// - update, ro changed → [room_restricted, room_sync] (subs + room doc)
142-
// - update, other field → [room_sync]
133+
// roomEvents builds the InboxEvents for a room change: always room_sync, preceded by
134+
// room_renamed (name/fname changed) or room_restricted (ro changed) on the matching update.
143135
//
144136
//nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path.
145137
func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEvent, error) {
@@ -182,9 +174,8 @@ func (h *handler) roomSyncEvent(room *model.Room) model.InboxEvent {
182174
}
183175

184176
func (h *handler) roomRenamedEvent(room *model.Room) model.InboxEvent {
185-
// Use the source _updatedAt millis (already zero-guarded in handleRoom) as the
186-
// nameUpdatedAt high-water mark so UpdateSubscriptionNamesForRoom uses the same guard
187-
// timestamp as the companion room_sync.
177+
// Use the source _updatedAt millis (zero-guarded in handleRoom) as the nameUpdatedAt high-water
178+
// mark so UpdateSubscriptionNamesForRoom matches the companion room_sync guard.
188179
return h.inboxEvent(model.InboxRoomRenamed, room.SiteID, mustMarshal(model.RoomRenamedInboxPayload{
189180
RoomID: room.ID,
190181
NewName: room.Name,
@@ -193,9 +184,8 @@ func (h *handler) roomRenamedEvent(room *model.Room) model.InboxEvent {
193184
}
194185

195186
func (h *handler) roomRestrictedEvent(room *model.Room) model.InboxEvent {
196-
// Use the source _updatedAt millis (already zero-guarded in handleRoom) as the
197-
// visibilityUpdatedAt high-water mark so ApplySubscriptionVisibility uses the same
198-
// guard timestamp as the companion room_sync.
187+
// Use the source _updatedAt millis (zero-guarded in handleRoom) as the visibilityUpdatedAt
188+
// high-water mark so ApplySubscriptionVisibility matches the companion room_sync guard.
199189
return h.inboxEvent(model.InboxRoomRestricted, room.SiteID, mustMarshal(model.RoomRestrictedInboxPayload{
200190
RoomID: room.ID,
201191
Restricted: room.Restricted,

data-migration/oplog-collections-transformer/subscriptions.go

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ import (
1414
"github.com/hmchangw/chat/pkg/natsutil"
1515
)
1616

17-
// sourceSubscription is the subset of a rocketchat_subscriptions doc the mapper consumes,
18-
// decoded from the connector's relaxed extended JSON via bson.UnmarshalExtJSON (handles $date).
17+
// sourceSubscription is the subset of a rocketchat_subscriptions doc the mapper decodes (handles $date).
1918
type sourceSubscription struct {
2019
ID string `bson:"_id"`
2120
U struct {
@@ -51,8 +50,7 @@ func (s *sourceSubscription) lastSeenMillis() int64 {
5150
return ls
5251
}
5352

54-
// subUpdateDescription is the connector's update delta for a subscription change event. Only the
55-
// changed field keys matter for event selection, so values are decoded as opaque any.
53+
// subUpdateDescription is the connector's update delta; only changed field keys matter, values are opaque.
5654
type subUpdateDescription struct {
5755
UpdatedFields map[string]any `bson:"updatedFields" json:"updatedFields"`
5856
RemovedFields []string `bson:"removedFields" json:"removedFields"`
@@ -71,18 +69,14 @@ func subChanged(desc subUpdateDescription, field string) bool {
7169
return false
7270
}
7371

74-
// handleSubscription maps a rocketchat_subscriptions change event to inbox InboxEvents per
75-
// spec §4.3 / §4.0. insert/replace emit member_added plus the state events that reproduce the
76-
// source row; update emits the matching event(s) (open toggle drives membership). delete is
77-
// un-actionable (only the source _id) → skip + metric.
72+
// handleSubscription maps a rocketchat_subscriptions change event to inbox InboxEvents (§4.3 / §4.0):
73+
// insert/replace reproduce the source row, update emits the matching event(s), delete maps by _id.
7874
//
7975
//nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path.
8076
func (h *handler) handleSubscription(ctx context.Context, ev oplogEvent) error {
8177
if ev.Op == "delete" {
82-
// A genuine leave/remove hard-deletes the source row (distinct from the open:false hide
83-
// handled as a member_removed update below). The delete event carries only the source _id,
84-
// but the destination adopted that _id at member_added time, so emit a subscription_deleted
85-
// keyed by it and let inbox-worker delete the row by _id.
78+
// A genuine leave hard-deletes the source row (vs the open:false hide → member_removed below).
79+
// The destination adopted this _id at member_added time, so delete-by-_id maps back to it.
8680
id, idErr := documentKeyID(ev.DocumentKey)
8781
if idErr != nil {
8882
return idErr
@@ -195,9 +189,8 @@ func (h *handler) publishSubscriptionState(ctx context.Context, ss *sourceSubscr
195189
return h.pub.Publish(ctx, h.readEvent(ss, siteID))
196190
}
197191

198-
// memberAddedEvent builds the member_added InboxEvent. RoomType is classified from the
199-
// subscription's t alone — a subscription can't distinguish discussion/botDM (no prid/teamId/bot
200-
// visibility), so those degrade to channel/dm; roles are overridden by the role_updated below.
192+
// memberAddedEvent builds the member_added InboxEvent. RoomType is classified from t alone
193+
// (a sub can't see prid/teamId/bot, so discussion/botDM degrade to channel/dm); roles via role_updated.
201194
func (h *handler) memberAddedEvent(ss *sourceSubscription, siteID string) model.InboxEvent {
202195
class := classifyRoom(ss.T, false, false, false, 2)
203196
payload := mustMarshal(model.MemberAddEvent{
@@ -215,8 +208,7 @@ func (h *handler) memberAddedEvent(ss *sourceSubscription, siteID string) model.
215208
}
216209

217210
// subscriptionDeletedEvent builds the subscription_deleted InboxEvent for a source hard-delete,
218-
// carrying the source subscription _id (adopted as the destination _id at member_added time).
219-
// Routed to the local inbox-worker like every other migrated row.
211+
// carrying the source _id (adopted as the destination _id at member_added time).
220212
func (h *handler) subscriptionDeletedEvent(subID string) model.InboxEvent {
221213
payload := mustMarshal(model.SubscriptionDeletedEvent{
222214
SubID: subID,
@@ -302,13 +294,12 @@ func mapSubscriptionRoles(roles []string) []model.Role {
302294
return out
303295
}
304296

305-
// mustMarshal JSON-encodes an inner InboxEvent payload. The argument is always a fixed-shape
306-
// model struct of plain scalars/slices, so json.Marshal cannot fail; a marshal error would be a
307-
// programmer error, not a runtime condition, hence the panic.
297+
// mustMarshal JSON-encodes a fixed-shape model payload; json.Marshal cannot fail on these,
298+
// so an error is a programmer error and panics.
308299
func mustMarshal(v any) []byte {
309300
b, err := json.Marshal(v)
310301
if err != nil {
311-
panic(fmt.Sprintf("marshal subscription payload: %v", err))
302+
panic(fmt.Sprintf("marshal inbox payload: %v", err))
312303
}
313304
return b
314305
}

data-migration/oplog-collections-transformer/targetstore.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ func (s *mongoTargetStore) UpsertUserIfAbsent(ctx context.Context, u model.User)
5858
return res.UpsertedCount > 0, nil
5959
}
6060

61-
// FindThreadRoom looks up the thread room derived from parentMessageID, returning the
62-
// owning room id, the thread room id, and the thread room's home site. found=false (no error)
63-
// when no thread room exists yet for that parent message.
61+
// FindThreadRoom resolves the thread room for parentMessageID, returning room id, thread room id,
62+
// and its home site. found=false (no error) when no thread room exists yet for that parent message.
6463
func (s *mongoTargetStore) FindThreadRoom(ctx context.Context, parentMessageID string) (string, string, string, bool, error) {
6564
var tr model.ThreadRoom
6665
err := s.threadRooms.FindOne(ctx, bson.M{"parentMessageId": parentMessageID},

0 commit comments

Comments
 (0)