Skip to content

Commit e1cce38

Browse files
committed
fix(inbox-worker): retry mute/favorite/read on a missing sub like roles
UpdateSubscriptionMute/Favorite/Read silently no-op'd when the subscription didn't exist yet, while UpdateSubscriptionRoles Nak-retries. Because member_added runs on the FIFO membership lane and the field events on the worker pool, a field event frequently wins the race and finds no sub — silently dropping the migrated mute/favorite/unread state. Add the same subscriptionExists disambiguation (extracted as naksIfSubscriptionMissing) to all four: a genuinely missing sub now errors → redelivered until member_added lands, while a stale event the high-water guard rejected stays a silent no-op. Follows the review's "don't silently drop migrated state" principle (the field-state instance ngangwar962 flagged for status but not subscriptions). Tests flipped to assert the new Nak behavior. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
1 parent e1c8151 commit e1cce38

4 files changed

Lines changed: 66 additions & 23 deletions

File tree

inbox-worker/handler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,19 @@ type InboxStore interface {
3737
// UpdateSubscriptionRead sets lastSeenAt and alert on the subscription
3838
// keyed by (roomID, account). Idempotent and order-safe: the write
3939
// only applies when the stored lastSeenAt is missing or strictly
40-
// earlier than the supplied value. Older or duplicate events are
41-
// silent no-ops. Missing-subscription is also a silent no-op.
40+
// earlier than the supplied value. Older or duplicate events are silent no-ops.
41+
// A genuinely missing sub returns an error (Nak) so the event redelivers until member_added lands.
4242
UpdateSubscriptionRead(ctx context.Context, roomID, account string, lastSeenAt time.Time, alert bool) error
4343
UpsertThreadSubscription(ctx context.Context, sub *model.ThreadSubscription) error
4444
// ApplyThreadRead writes ThreadSubscription under a $lt lastSeenAt guard, then the Subscription only if the guard accepted.
4545
ApplyThreadRead(ctx context.Context, roomID, threadRoomID, account string, newThreadUnread []string, alert bool, lastSeenAt time.Time) error
4646
// UpdateSubscriptionMute sets muted by (roomID, account), guarded by
4747
// muteUpdatedAt (the source event's publish time): older/duplicate events
48-
// are silent no-ops. Missing-sub is also a silent no-op for federation races.
48+
// are silent no-ops. A genuinely missing sub returns an error (Nak) so the event redelivers until member_added lands.
4949
UpdateSubscriptionMute(ctx context.Context, roomID, account string, muted bool, muteUpdatedAt time.Time) error
5050
// UpdateSubscriptionFavorite sets favorite by (roomID, account), guarded by
5151
// favoriteUpdatedAt (the source event's publish time): older/duplicate events
52-
// are silent no-ops. Missing-sub is also a silent no-op for federation races.
52+
// are silent no-ops. A genuinely missing sub returns an error (Nak) so the event redelivers until member_added lands.
5353
UpdateSubscriptionFavorite(ctx context.Context, roomID, account string, favorite bool, favoriteUpdatedAt time.Time) error
5454
// UpdateSubscriptionNamesForRoom sets name on every subscription in the room,
5555
// each guarded by its own nameUpdatedAt so an out-of-order rename cannot regress

inbox-worker/handler_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,12 +1686,12 @@ func TestHandler_SubscriptionMuteToggled(t *testing.T) {
16861686
assert.Equal(t, int64(12345), updates[0].updatedAt.UnixMilli())
16871687
}
16881688

1689-
func TestHandler_SubscriptionMuteToggled_MissingSubscriptionNoOp(t *testing.T) {
1689+
func TestHandler_SubscriptionMuteToggled_Forwarded(t *testing.T) {
16901690
store := &stubInboxStore{}
16911691
h := NewHandler(store)
16921692

16931693
payload, err := json.Marshal(model.SubscriptionMuteToggledEvent{
1694-
Account: "ghost", RoomID: "r1", Muted: true, Timestamp: 12345,
1694+
Account: "alice", RoomID: "r1", Muted: true, Timestamp: 12345,
16951695
})
16961696
require.NoError(t, err)
16971697
evt, err := json.Marshal(model.InboxEvent{
@@ -1701,6 +1701,10 @@ func TestHandler_SubscriptionMuteToggled_MissingSubscriptionNoOp(t *testing.T) {
17011701
require.NoError(t, err)
17021702

17031703
require.NoError(t, h.HandleEvent(context.Background(), evt))
1704+
updates := store.getMuteUpdates()
1705+
require.Len(t, updates, 1)
1706+
assert.Equal(t, "alice", updates[0].account)
1707+
assert.True(t, updates[0].muted)
17041708
}
17051709

17061710
func TestHandler_SubscriptionMuteToggled_MalformedPayload(t *testing.T) {
@@ -1751,12 +1755,12 @@ func TestHandler_SubscriptionFavoriteToggled(t *testing.T) {
17511755
assert.Equal(t, int64(12345), updates[0].updatedAt.UnixMilli())
17521756
}
17531757

1754-
func TestHandler_SubscriptionFavoriteToggled_MissingSubscriptionNoOp(t *testing.T) {
1758+
func TestHandler_SubscriptionFavoriteToggled_Forwarded(t *testing.T) {
17551759
store := &stubInboxStore{}
17561760
h := NewHandler(store)
17571761

17581762
payload, err := json.Marshal(model.SubscriptionFavoriteToggledEvent{
1759-
Account: "ghost", RoomID: "r1", Favorite: true, Timestamp: 12345,
1763+
Account: "alice", RoomID: "r1", Favorite: true, Timestamp: 12345,
17601764
})
17611765
require.NoError(t, err)
17621766
evt, err := json.Marshal(model.InboxEvent{
@@ -1766,6 +1770,10 @@ func TestHandler_SubscriptionFavoriteToggled_MissingSubscriptionNoOp(t *testing.
17661770
require.NoError(t, err)
17671771

17681772
require.NoError(t, h.HandleEvent(context.Background(), evt))
1773+
updates := store.getFavoriteUpdates()
1774+
require.Len(t, updates, 1)
1775+
assert.Equal(t, "alice", updates[0].account)
1776+
assert.True(t, updates[0].favorite)
17691777
}
17701778

17711779
func TestHandler_SubscriptionFavoriteToggled_MalformedPayload(t *testing.T) {

inbox-worker/integration_test.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -337,18 +337,33 @@ func TestInbox_UpdateSubscriptionRead_EqualTimestampSkipped(t *testing.T) {
337337
assert.True(t, got.Alert) // unchanged
338338
}
339339

340-
func TestInbox_UpdateSubscriptionRead_MissingSubscriptionNoOp(t *testing.T) {
340+
func TestInbox_UpdateSubscriptionRead_MissingSubscriptionErrors(t *testing.T) {
341341
ctx := context.Background()
342-
db := setupMongo(t)
343-
store := &mongoInboxStore{
344-
subCol: db.Collection("subscriptions"),
345-
roomCol: db.Collection("rooms"),
346-
userCol: db.Collection("users"),
347-
threadSubCol: db.Collection("thread_subscriptions"),
348-
}
342+
store := newGuardStore(setupMongo(t))
343+
344+
// No subscription seeded — a genuinely missing sub must error so the event redelivers until
345+
// member_added lands (field events can race ahead of member_added on the worker pool).
346+
err := store.UpdateSubscriptionRead(ctx, "missing-room", "ghost", time.Now().UTC(), false)
347+
require.Error(t, err)
348+
assert.Contains(t, err.Error(), "subscription not found")
349+
}
349350

350-
now := time.Now().UTC()
351-
require.NoError(t, store.UpdateSubscriptionRead(ctx, "missing-room", "ghost", now, false))
351+
func TestInbox_UpdateSubscriptionMute_MissingSubscriptionErrors(t *testing.T) {
352+
ctx := context.Background()
353+
store := newGuardStore(setupMongo(t))
354+
355+
err := store.UpdateSubscriptionMute(ctx, "missing-room", "ghost", true, time.UnixMilli(100).UTC())
356+
require.Error(t, err)
357+
assert.Contains(t, err.Error(), "subscription not found")
358+
}
359+
360+
func TestInbox_UpdateSubscriptionFavorite_MissingSubscriptionErrors(t *testing.T) {
361+
ctx := context.Background()
362+
store := newGuardStore(setupMongo(t))
363+
364+
err := store.UpdateSubscriptionFavorite(ctx, "missing-room", "ghost", true, time.UnixMilli(100).UTC())
365+
require.Error(t, err)
366+
assert.Contains(t, err.Error(), "subscription not found")
352367
}
353368

354369
func TestInboxWorker_ThreadSubscriptionUpserted_Insert_Integration(t *testing.T) {

inbox-worker/main.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,17 @@ func (s *mongoInboxStore) UpdateSubscriptionRoles(ctx context.Context, account,
105105
if err != nil {
106106
return fmt.Errorf("update subscription roles for %q in room %q: %w", account, roomID, err)
107107
}
108-
if res.MatchedCount > 0 {
109-
return nil
108+
if res.MatchedCount == 0 {
109+
return s.naksIfSubscriptionMissing(ctx, account, roomID)
110110
}
111+
return nil
112+
}
113+
114+
// naksIfSubscriptionMissing disambiguates a MatchedCount==0 guarded subscription write. A genuinely
115+
// missing subscription returns an error (Nak → redelivered until member_added lands, the
116+
// federation/migration race where field events can race ahead of member_added); a stale event the
117+
// high-water guard rejected is a silent no-op (the sub exists with a newer-or-equal value).
118+
func (s *mongoInboxStore) naksIfSubscriptionMissing(ctx context.Context, account, roomID string) error {
111119
exists, err := s.subscriptionExists(ctx, account, roomID)
112120
if err != nil {
113121
return fmt.Errorf("check subscription exists for %q in room %q: %w", account, roomID, err)
@@ -236,9 +244,13 @@ func (s *mongoInboxStore) UpdateSubscriptionMute(ctx context.Context, roomID, ac
236244
},
237245
}
238246
update := bson.M{"$set": bson.M{"muted": muted, "muteUpdatedAt": muteUpdatedAt}}
239-
if _, err := s.subCol.UpdateOne(ctx, filter, update); err != nil {
247+
res, err := s.subCol.UpdateOne(ctx, filter, update)
248+
if err != nil {
240249
return fmt.Errorf("update subscription mute for %q in room %q: %w", account, roomID, err)
241250
}
251+
if res.MatchedCount == 0 {
252+
return s.naksIfSubscriptionMissing(ctx, account, roomID)
253+
}
242254
return nil
243255
}
244256

@@ -256,9 +268,13 @@ func (s *mongoInboxStore) UpdateSubscriptionFavorite(ctx context.Context, roomID
256268
},
257269
}
258270
update := bson.M{"$set": bson.M{"favorite": favorite, "favoriteUpdatedAt": favoriteUpdatedAt}}
259-
if _, err := s.subCol.UpdateOne(ctx, filter, update); err != nil {
271+
res, err := s.subCol.UpdateOne(ctx, filter, update)
272+
if err != nil {
260273
return fmt.Errorf("update subscription favorite for %q in room %q: %w", account, roomID, err)
261274
}
275+
if res.MatchedCount == 0 {
276+
return s.naksIfSubscriptionMissing(ctx, account, roomID)
277+
}
262278
return nil
263279
}
264280

@@ -272,9 +288,13 @@ func (s *mongoInboxStore) UpdateSubscriptionRead(ctx context.Context, roomID, ac
272288
},
273289
}
274290
update := bson.M{"$set": bson.M{"lastSeenAt": lastSeenAt, "alert": alert}}
275-
if _, err := s.subCol.UpdateOne(ctx, filter, update); err != nil {
291+
res, err := s.subCol.UpdateOne(ctx, filter, update)
292+
if err != nil {
276293
return fmt.Errorf("update subscription read for %q in room %q: %w", account, roomID, err)
277294
}
295+
if res.MatchedCount == 0 {
296+
return s.naksIfSubscriptionMissing(ctx, account, roomID)
297+
}
278298
return nil
279299
}
280300

0 commit comments

Comments
 (0)