Skip to content

Commit 1d91f1a

Browse files
committed
refactor(oplog-collections-transformer): simplify cleanups from /simplify pass
- targetstore: add explicit projections to FindThreadRoom/FindUserID (CLAUDE.md §MongoDB "always project precisely"; both run per thread-sub event) - inboxpublisher: drop the dead `siteID` field (Publish routes on evt.DestSiteID) - rooms: use mustMarshal in the event builders like the other mappers, dropping the (InboxEvent, error) plumbing and collapsing roomEvents' error threading - users: stamp one nowMillis() per publishUserStatus event instead of two - threadsubs: fix stale (threadRoomId, userId) comment to userAccount Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
1 parent d8b05d5 commit 1d91f1a

8 files changed

Lines changed: 27 additions & 62 deletions

File tree

data-migration/oplog-collections-transformer/inboxpublisher.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
// jetstreamPublisher publishes InboxEvents into the local INBOX stream.
1717
type jetstreamPublisher struct {
18-
siteID string
1918
publish func(ctx context.Context, msg *nats.Msg, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error)
2019
}
2120

data-migration/oplog-collections-transformer/inboxpublisher_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ func TestJetstreamPublisher_Publish(t *testing.T) {
2121
t.Run("publishes event to correct subject with correct data", func(t *testing.T) {
2222
var captured *nats.Msg
2323
pub := &jetstreamPublisher{
24-
siteID: site,
2524
publish: func(_ context.Context, msg *nats.Msg, _ ...jetstream.PublishOpt) (*jetstream.PubAck, error) {
2625
captured = msg
2726
return &jetstream.PubAck{}, nil
@@ -55,7 +54,6 @@ func TestJetstreamPublisher_Publish(t *testing.T) {
5554
t.Run("publish error propagates as wrapped error", func(t *testing.T) {
5655
publishErr := errors.New("nats down")
5756
pub := &jetstreamPublisher{
58-
siteID: site,
5957
publish: func(_ context.Context, _ *nats.Msg, _ ...jetstream.PublishOpt) (*jetstream.PubAck, error) {
6058
return nil, publishErr
6159
},

data-migration/oplog-collections-transformer/integration_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func TestInboxPublisher_RoundTrip(t *testing.T) {
153153
})
154154
require.NoError(t, err)
155155

156-
pub := &jetstreamPublisher{siteID: site, publish: js.PublishMsg}
156+
pub := &jetstreamPublisher{publish: js.PublishMsg}
157157

158158
evt := model.InboxEvent{
159159
Type: "room_sync",
@@ -248,7 +248,7 @@ func TestEndToEnd_UserInsert(t *testing.T) {
248248
subsColl: "rocketchat_subscriptions",
249249
threadSubsColl: "tsmc_thread_subscriptions",
250250
usersColl: "users",
251-
pub: &jetstreamPublisher{siteID: site, publish: js.PublishMsg},
251+
pub: &jetstreamPublisher{publish: js.PublishMsg},
252252
target: tgtStore,
253253
lookups: map[string]migration.SourceLookup{"users": lookup},
254254
now: func() int64 { return 1700000000000 },
@@ -317,7 +317,7 @@ func TestEndToEnd_RoomInsert_PublishesRoomSync(t *testing.T) {
317317
subsColl: "rocketchat_subscriptions",
318318
threadSubsColl: "tsmc_thread_subscriptions",
319319
usersColl: "users",
320-
pub: &jetstreamPublisher{siteID: site, publish: js.PublishMsg},
320+
pub: &jetstreamPublisher{publish: js.PublishMsg},
321321
target: tgtStore,
322322
lookups: map[string]migration.SourceLookup{"rocketchat_rooms": lookup},
323323
now: func() int64 { return 1700000000000 },
@@ -419,7 +419,7 @@ func TestEndToEnd_ThreadSub_NakThenResolve(t *testing.T) {
419419
subsColl: "rocketchat_subscriptions",
420420
threadSubsColl: "tsmc_thread_subscriptions",
421421
usersColl: "users",
422-
pub: &jetstreamPublisher{siteID: site, publish: js.PublishMsg},
422+
pub: &jetstreamPublisher{publish: js.PublishMsg},
423423
target: tgtStore,
424424
lookups: map[string]migration.SourceLookup{"tsmc_thread_subscriptions": lookup},
425425
now: func() int64 { return 1700000000000 },

data-migration/oplog-collections-transformer/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func main() {
136136
subsColl: cfg.SubscriptionsCollection,
137137
threadSubsColl: cfg.ThreadSubsCollection,
138138
usersColl: cfg.UsersCollection,
139-
pub: &jetstreamPublisher{siteID: cfg.SiteID, publish: js.PublishMsg},
139+
pub: &jetstreamPublisher{publish: js.PublishMsg},
140140
target: target,
141141
lookups: lookups,
142142
metrics: m,

data-migration/oplog-collections-transformer/rooms.go

Lines changed: 12 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76
"log/slog"
87
"time"
@@ -145,11 +144,7 @@ func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
145144
//nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path.
146145
func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEvent, error) {
147146
if ev.Op != "update" {
148-
evt, err := h.roomSyncEvent(room)
149-
if err != nil {
150-
return nil, err
151-
}
152-
return []model.InboxEvent{evt}, nil
147+
return []model.InboxEvent{h.roomSyncEvent(room)}, nil
153148
}
154149

155150
var desc updateDescription
@@ -161,31 +156,11 @@ func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEven
161156

162157
switch {
163158
case changed(desc, "name") || changed(desc, "fname"):
164-
renamed, err := h.roomRenamedEvent(room)
165-
if err != nil {
166-
return nil, err
167-
}
168-
sync, err := h.roomSyncEvent(room)
169-
if err != nil {
170-
return nil, err
171-
}
172-
return []model.InboxEvent{renamed, sync}, nil
159+
return []model.InboxEvent{h.roomRenamedEvent(room), h.roomSyncEvent(room)}, nil
173160
case changed(desc, "ro"):
174-
restricted, err := h.roomRestrictedEvent(room)
175-
if err != nil {
176-
return nil, err
177-
}
178-
sync, err := h.roomSyncEvent(room)
179-
if err != nil {
180-
return nil, err
181-
}
182-
return []model.InboxEvent{restricted, sync}, nil
161+
return []model.InboxEvent{h.roomRestrictedEvent(room), h.roomSyncEvent(room)}, nil
183162
default:
184-
evt, err := h.roomSyncEvent(room)
185-
if err != nil {
186-
return nil, err
187-
}
188-
return []model.InboxEvent{evt}, nil
163+
return []model.InboxEvent{h.roomSyncEvent(room)}, nil
189164
}
190165
}
191166

@@ -202,44 +177,32 @@ func changed(desc updateDescription, field string) bool {
202177
return false
203178
}
204179

205-
func (h *handler) roomSyncEvent(room *model.Room) (model.InboxEvent, error) {
206-
payload, err := json.Marshal(room)
207-
if err != nil {
208-
return model.InboxEvent{}, fmt.Errorf("marshal room_sync payload: %w", err)
209-
}
210-
return h.inboxEvent(model.InboxEventType("room_sync"), room.SiteID, payload), nil
180+
func (h *handler) roomSyncEvent(room *model.Room) model.InboxEvent {
181+
return h.inboxEvent(model.InboxEventType("room_sync"), room.SiteID, mustMarshal(room))
211182
}
212183

213-
func (h *handler) roomRenamedEvent(room *model.Room) (model.InboxEvent, error) {
184+
func (h *handler) roomRenamedEvent(room *model.Room) model.InboxEvent {
214185
// Use the source _updatedAt millis (already zero-guarded in handleRoom) as the
215186
// nameUpdatedAt high-water mark so UpdateSubscriptionNamesForRoom uses the same guard
216187
// timestamp as the companion room_sync.
217-
payload, err := json.Marshal(model.RoomRenamedInboxPayload{
188+
return h.inboxEvent(model.InboxRoomRenamed, room.SiteID, mustMarshal(model.RoomRenamedInboxPayload{
218189
RoomID: room.ID,
219190
NewName: room.Name,
220191
Timestamp: room.UpdatedAt.UnixMilli(),
221-
})
222-
if err != nil {
223-
return model.InboxEvent{}, fmt.Errorf("marshal room_renamed payload: %w", err)
224-
}
225-
return h.inboxEvent(model.InboxRoomRenamed, room.SiteID, payload), nil
192+
}))
226193
}
227194

228-
func (h *handler) roomRestrictedEvent(room *model.Room) (model.InboxEvent, error) {
195+
func (h *handler) roomRestrictedEvent(room *model.Room) model.InboxEvent {
229196
// Use the source _updatedAt millis (already zero-guarded in handleRoom) as the
230197
// visibilityUpdatedAt high-water mark so ApplySubscriptionVisibility uses the same
231198
// guard timestamp as the companion room_sync.
232-
payload, err := json.Marshal(model.RoomRestrictedInboxPayload{
199+
return h.inboxEvent(model.InboxRoomRestricted, room.SiteID, mustMarshal(model.RoomRestrictedInboxPayload{
233200
RoomID: room.ID,
234201
Restricted: room.Restricted,
235202
ExternalAccess: room.ExternalAccess,
236203
OwnerAccount: "",
237204
Timestamp: room.UpdatedAt.UnixMilli(),
238-
})
239-
if err != nil {
240-
return model.InboxEvent{}, fmt.Errorf("marshal room_restricted payload: %w", err)
241-
}
242-
return h.inboxEvent(model.InboxRoomRestricted, room.SiteID, payload), nil
205+
}))
243206
}
244207

245208
// inboxEvent wraps an inner payload in the local-INBOX InboxEvent envelope. SiteID is the

data-migration/oplog-collections-transformer/targetstore.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ func (s *mongoTargetStore) UpsertUserIfAbsent(ctx context.Context, u model.User)
6363
// when no thread room exists yet for that parent message.
6464
func (s *mongoTargetStore) FindThreadRoom(ctx context.Context, parentMessageID string) (string, string, string, bool, error) {
6565
var tr model.ThreadRoom
66-
err := s.threadRooms.FindOne(ctx, bson.M{"parentMessageId": parentMessageID}).Decode(&tr)
66+
err := s.threadRooms.FindOne(ctx, bson.M{"parentMessageId": parentMessageID},
67+
options.FindOne().SetProjection(bson.M{"roomId": 1, "siteId": 1}),
68+
).Decode(&tr)
6769
if errors.Is(err, mongo.ErrNoDocuments) {
6870
return "", "", "", false, nil
6971
}
@@ -77,7 +79,9 @@ func (s *mongoTargetStore) FindThreadRoom(ctx context.Context, parentMessageID s
7779
// when the account has not been seeded yet.
7880
func (s *mongoTargetStore) FindUserID(ctx context.Context, account string) (string, bool, error) {
7981
var u model.User
80-
err := s.users.FindOne(ctx, bson.M{"account": account}).Decode(&u)
82+
err := s.users.FindOne(ctx, bson.M{"account": account},
83+
options.FindOne().SetProjection(bson.M{"_id": 1}),
84+
).Decode(&u)
8185
if errors.Is(err, mongo.ErrNoDocuments) {
8286
return "", false, nil
8387
}

data-migration/oplog-collections-transformer/threadsubs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (h *handler) handleThreadSub(ctx context.Context, ev oplogEvent) error {
4242

4343
if ev.Op == "delete" {
4444
// Un-actionable: the event carries only the source _id; destination keys by
45-
// (threadRoomId, userId) which live in the deleted doc. Also no inbox removal
45+
// (threadRoomId, userAccount) which live in the deleted doc. Also no inbox removal
4646
// handler for thread-sub unfollows (spec §4.4 / D2).
4747
slog.Debug("skip thread_sub delete (un-actionable, no inbox removal handler)",
4848
"eventId", ev.EventID, "request_id", reqID)

data-migration/oplog-collections-transformer/users.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,11 @@ func (h *handler) handleUser(ctx context.Context, ev oplogEvent) error {
111111
// not the RocketChat source. A publish failure Naks the whole event; the re-fan is
112112
// idempotent (status is last-write-wins).
113113
func (h *handler) publishUserStatus(ctx context.Context, account, statusText string) error {
114+
now := h.nowMillis()
114115
payload := mustMarshal(model.UserStatusUpdated{
115116
Account: account,
116117
StatusText: statusText,
117-
Timestamp: h.nowMillis(),
118+
Timestamp: now,
118119
})
119120
for _, dest := range h.allSiteIDs {
120121
if dest == "" {
@@ -125,7 +126,7 @@ func (h *handler) publishUserStatus(ctx context.Context, account, statusText str
125126
SiteID: h.siteID,
126127
DestSiteID: dest,
127128
Payload: payload,
128-
Timestamp: h.nowMillis(),
129+
Timestamp: now,
129130
}
130131
if err := h.pub.Publish(ctx, evt); err != nil {
131132
return fmt.Errorf("publish user_status_updated to %q: %w", dest, err)

0 commit comments

Comments
 (0)