Skip to content

Commit bb68b00

Browse files
committed
message-worker: stop re-stamping parent thread_room_id on every subsequent reply
The parent message's thread_room_id is immutable once set and is stamped on the first reply (handleFirstThreadReply). The subsequent-reply path re-issued the same UpdateParentMessageThreadRoomID Cassandra write on every reply purely as a crash-window self-heal. Drop it: one fewer Cassandra write per subsequent reply. Trade-off (consistent with the parent-subscription removal): if the first reply created the thread room but crashed before its stamp landed, the parent stays unstamped — a narrow window not worth a write on every reply. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01VzvswJ23JQB1nYyskjBpi9
1 parent 9ce5c8c commit bb68b00

2 files changed

Lines changed: 8 additions & 77 deletions

File tree

message-worker/handler.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -415,30 +415,11 @@ func (h *Handler) handleSubsequentThreadReply(ctx context.Context, msg *model.Me
415415
return "", false, fmt.Errorf("update thread room last message: %w", err)
416416
}
417417

418-
// Re-stamp handles redelivery: first attempt may have created the thread room
419-
// but crashed before the stamp landed. IF EXISTS in the store prevents phantom rows.
420-
switch {
421-
case parentFound && msg.ThreadParentMessageCreatedAt != nil:
422-
if err := h.store.UpdateParentMessageThreadRoomID(ctx, msg.ThreadParentMessageID, msg.RoomID, *msg.ThreadParentMessageCreatedAt, existingRoom.ID); err != nil {
423-
return "", false, fmt.Errorf("stamp thread_room_id on parent message: %w", err)
424-
}
425-
case !parentFound:
426-
slog.ErrorContext(ctx, "subsequent thread reply: parent not found in messages_by_id, thread_room_id stamp skipped",
427-
"request_id", natsutil.RequestIDFromContext(ctx),
428-
"replyID", msg.ID,
429-
"parentMessageID", msg.ThreadParentMessageID,
430-
"threadRoomID", existingRoom.ID,
431-
"room_id", msg.RoomID,
432-
)
433-
default: // msg.ThreadParentMessageCreatedAt == nil
434-
slog.ErrorContext(ctx, "subsequent thread reply: ThreadParentMessageCreatedAt is nil, parent thread_room_id stamp skipped",
435-
"request_id", natsutil.RequestIDFromContext(ctx),
436-
"replyID", msg.ID,
437-
"parentMessageID", msg.ThreadParentMessageID,
438-
"threadRoomID", existingRoom.ID,
439-
"room_id", msg.RoomID,
440-
)
441-
}
418+
// The parent message's thread_room_id is stamped once, on the first reply
419+
// (handleFirstThreadReply). It never changes for a thread, so re-stamping it on every
420+
// subsequent reply is a redundant Cassandra write — skipped here. Trade-off: if the
421+
// first reply created the room but crashed before its stamp landed, the parent stays
422+
// unstamped (no per-reply self-heal) — a narrow window not worth a write on every reply.
442423

443424
return existingRoom.ID, replierLastSeenAdvanced, nil
444425
}

message-worker/handler_test.go

Lines changed: 3 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,55 +1000,7 @@ func TestHandler_HandleThreadRoomAndSubscriptions(t *testing.T) {
10001000
wantErr: true,
10011001
},
10021002
{
1003-
name: "subsequent reply — stamps thread_room_id on parent when parentCreatedAt known",
1004-
msg: &model.Message{
1005-
ID: "msg-reply",
1006-
RoomID: "r1",
1007-
UserID: "u-replier",
1008-
UserAccount: "replier",
1009-
CreatedAt: now,
1010-
ThreadParentMessageID: "msg-parent",
1011-
ThreadParentMessageCreatedAt: ptrTime(now.Add(-5 * time.Minute)),
1012-
},
1013-
siteID: "site-a",
1014-
setupMocks: func(store *MockStore, ts *MockThreadStore) {
1015-
ts.EXPECT().EnsureThreadRoom(gomock.Any(), gomock.Any()).Return(&model.ThreadRoom{ID: "tr-existing"}, false, nil)
1016-
store.EXPECT().GetMessageSender(gomock.Any(), "msg-parent").Return(parentSender, nil)
1017-
// Only the replier's subscription is written (combined upsert+lastSeen); the
1018-
// parent author's sub is not re-upserted and no parent owner-site lookup runs.
1019-
ts.EXPECT().UpsertThreadSubscriptionAdvancingLastSeen(gomock.Any(), gomock.Any(), now).Return(nil)
1020-
ts.EXPECT().UpdateThreadRoomLastMessage(gomock.Any(), "tr-existing", "msg-reply", gomock.Any(), now).Return(nil)
1021-
store.EXPECT().UpdateParentMessageThreadRoomID(
1022-
gomock.Any(), "msg-parent", "r1",
1023-
now.Add(-5*time.Minute),
1024-
"tr-existing",
1025-
).Return(nil)
1026-
},
1027-
},
1028-
{
1029-
name: "subsequent reply — UpdateParentMessageThreadRoomID fails — returns error",
1030-
msg: &model.Message{
1031-
ID: "msg-reply",
1032-
RoomID: "r1",
1033-
UserID: "u-replier",
1034-
UserAccount: "replier",
1035-
CreatedAt: now,
1036-
ThreadParentMessageID: "msg-parent",
1037-
ThreadParentMessageCreatedAt: ptrTime(now.Add(-5 * time.Minute)),
1038-
},
1039-
siteID: "site-a",
1040-
setupMocks: func(store *MockStore, ts *MockThreadStore) {
1041-
ts.EXPECT().EnsureThreadRoom(gomock.Any(), gomock.Any()).Return(&model.ThreadRoom{ID: "tr-existing"}, false, nil)
1042-
store.EXPECT().GetMessageSender(gomock.Any(), "msg-parent").Return(parentSender, nil)
1043-
ts.EXPECT().UpsertThreadSubscriptionAdvancingLastSeen(gomock.Any(), gomock.Any(), now).Return(nil)
1044-
ts.EXPECT().UpdateThreadRoomLastMessage(gomock.Any(), "tr-existing", "msg-reply", gomock.Any(), now).Return(nil)
1045-
store.EXPECT().UpdateParentMessageThreadRoomID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
1046-
Return(errors.New("cassandra: write timeout"))
1047-
},
1048-
wantErr: true,
1049-
},
1050-
{
1051-
name: "subsequent reply — parent not found but parentCreatedAt known — skips UpdateParentMessageThreadRoomID",
1003+
name: "subsequent reply — parent not found but parentCreatedAt known — replier sub still written",
10521004
msg: &model.Message{
10531005
ID: "msg-reply",
10541006
RoomID: "r1",
@@ -2058,10 +2010,8 @@ func TestHandler_ProcessMessage_ThreadReplyPublish(t *testing.T) {
20582010
// re-upsert, no parent owner-site lookup, no standalone AdvanceThreadSubscriptionLastSeen.
20592011
ts.EXPECT().UpsertThreadSubscriptionAdvancingLastSeen(gomock.Any(), gomock.Any(), now).Return(nil)
20602012
ts.EXPECT().UpdateThreadRoomLastMessage(gomock.Any(), "tr-1", "msg-reply", gomock.Any(), now).Return(nil)
2061-
// parentFound && ThreadParentMessageCreatedAt != nil → stamps thread_room_id on parent.
2062-
store.EXPECT().UpdateParentMessageThreadRoomID(
2063-
gomock.Any(), "msg-parent", "r1", parentCreatedAt, "tr-1",
2064-
).Return(nil)
2013+
// Subsequent reply: the parent's thread_room_id was stamped on the first reply, so it is
2014+
// NOT re-stamped here (no UpdateParentMessageThreadRoomID call).
20652015
}
20662016

20672017
t.Run("publishes MessageEvent to canonical thread reply subject", func(t *testing.T) {

0 commit comments

Comments
 (0)