Skip to content

Commit 87af275

Browse files
committed
core+event+domain: drop broken parent_id-as-user-id routing in replies
parent_id is a TWEET id, not a user id (it selects which subtree of replies inside a thread to return). The existing StreamGetRepliesHandler treated it as a user id: isOwnTweetReplies := parentId == streamer.NodeInfo().OwnerId parentUser, err := userRepo.Get(parentId) Both were always false / always ErrUserNotFound, so the handler silently fell through to the local cache for every request. Removed the dead remote-routing path; the handler now serves replies straight from the local repo. Empty parent_id is still normalised to root_id (top-level replies of the thread). Proper remote-fetch routing would require a RootUserId field in GetAllRepliesEvent so we know which node to forward to — out of scope here; clients fall back to whatever replies they've already cached from gossip / earlier fetches. Also annotated GetAllRepliesEvent, NewReplyEvent, and domain.Tweet with comments calling out that ParentId is a tweet id, so this mistake doesn't get re-introduced. Tests: dropped the now-impossible "parent user not found / stream offline / remote success" cases (the handler no longer touches a streamer or user repo) and added a "propagates repo error" case.
1 parent fe0380d commit 87af275

7 files changed

Lines changed: 62 additions & 136 deletions

File tree

cmd/node/member/node/member-node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ func (m *MemberNode) replyHandlers(
543543
},
544544
{
545545
event.PUBLIC_GET_REPLIES,
546-
handler.StreamGetRepliesHandler(r.replyRepo, userRepo, m),
546+
handler.StreamGetRepliesHandler(r.replyRepo),
547547
},
548548
}
549549
}

core/handler/reply.go

Lines changed: 27 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ package handler
2929

3030
import (
3131
"errors"
32-
"fmt"
3332
"strings"
3433

3534
"github.com/Warp-net/warpnet/core/stream"
@@ -304,11 +303,26 @@ func StreamDeleteReplyHandler(
304303
}
305304
}
306305

307-
func StreamGetRepliesHandler(
308-
repo ReplyStorer,
309-
userRepo ReplyUserFetcher,
310-
streamer ReplyStreamer,
311-
) warpnet.WarpHandlerFunc {
306+
// StreamGetRepliesHandler answers /public/get/replies requests.
307+
//
308+
// ev.RootId is the root tweet of the thread; ev.ParentId is the parent
309+
// TWEET id selecting which subtree of replies to return (NOT a user id —
310+
// it gets compared against tweet/reply ids in the repo). Clients send
311+
// an empty ParentId for "give me the top-level replies of the thread",
312+
// which we normalise to RootId so the repo lookup matches the first
313+
// tier of replies. Replies are served straight from the local store:
314+
// any reply we know about (because the author's node pushed it to us
315+
// via gossip, or because we cached an earlier fetch) is returned;
316+
// otherwise the response is empty.
317+
//
318+
// Note on routing: this used to try to forward the request to the
319+
// "parent user" by treating ParentId as a user id and looking it up in
320+
// userRepo. That can't work — ParentId is a tweet id — so the lookup
321+
// always returned ErrUserNotFound and we silently fell back to local
322+
// storage anyway. The dead code is removed; proper remote-fetch
323+
// routing would need a RootUserId in GetAllRepliesEvent to identify the
324+
// author of the root tweet, which clients don't currently send.
325+
func StreamGetRepliesHandler(repo ReplyStorer) warpnet.WarpHandlerFunc {
312326
return func(buf []byte, s warpnet.WarpStream) (any, error) {
313327
var ev event.GetAllRepliesEvent
314328
err := json.Unmarshal(buf, &ev)
@@ -320,71 +334,22 @@ func StreamGetRepliesHandler(
320334
}
321335
// Top-level replies on a thread have no parent — clients send an
322336
// empty parent_id in that case. Treat it as the root itself so
323-
// the lookup returns the first-tier replies.
337+
// the repo returns the first-tier replies hanging off RootId.
324338
if ev.ParentId == "" {
325339
ev.ParentId = ev.RootId
326340
}
327341

328342
rootId := strings.TrimPrefix(ev.RootId, domain.RetweetPrefix)
329343
parentId := strings.TrimPrefix(ev.ParentId, domain.RetweetPrefix)
330-
isOwnTweetReplies := parentId == streamer.NodeInfo().OwnerId
331-
332-
if isOwnTweetReplies {
333-
replies, cursor, err := repo.GetRepliesTree(rootId, parentId, ev.Limit, ev.Cursor)
334-
if err != nil {
335-
return nil, err
336-
}
337-
return event.RepliesResponse{
338-
Cursor: cursor,
339-
Replies: replies,
340-
UserId: &parentId,
341-
}, nil
342-
}
343-
344-
parentUser, err := userRepo.Get(parentId)
345-
if errors.Is(err, database.ErrUserNotFound) {
346-
replies, cursor, _ := repo.GetRepliesTree(rootId, parentId, ev.Limit, ev.Cursor)
347-
return event.RepliesResponse{
348-
Cursor: cursor,
349-
Replies: replies,
350-
UserId: &parentId,
351-
}, nil
352-
}
353-
if err != nil {
354-
return nil, err
355-
}
356344

357-
replyDataResp, err := streamer.GenericStream(
358-
parentUser.NodeId,
359-
event.PUBLIC_GET_REPLIES,
360-
ev,
361-
)
362-
if errors.Is(err, warpnet.ErrNodeIsOffline) {
363-
replies, cursor, _ := repo.GetRepliesTree(rootId, parentId, ev.Limit, ev.Cursor)
364-
return event.RepliesResponse{
365-
Cursor: cursor,
366-
Replies: replies,
367-
UserId: &parentId,
368-
}, nil
369-
}
345+
replies, cursor, err := repo.GetRepliesTree(rootId, parentId, ev.Limit, ev.Cursor)
370346
if err != nil {
371347
return nil, err
372348
}
373-
374-
var possibleError event.ResponseError
375-
if _ = json.Unmarshal(replyDataResp, &possibleError); possibleError.Message != "" {
376-
return nil, fmt.Errorf("unmarshal other delete reply error response: %w", possibleError)
377-
}
378-
379-
var repliesResp event.RepliesResponse
380-
if err := json.Unmarshal(replyDataResp, &repliesResp); err != nil {
381-
return nil, err
382-
}
383-
for _, reply := range repliesResp.Replies {
384-
if _, err := repo.AddReply(reply.Reply); err != nil {
385-
log.Errorf("failed to add reply to replies repo: %v", err)
386-
}
387-
}
388-
return repliesResp, nil
349+
return event.RepliesResponse{
350+
Cursor: cursor,
351+
Replies: replies,
352+
UserId: &parentId,
353+
}, nil
389354
}
390355
}

core/handler/reply_test.go

Lines changed: 16 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,11 @@ func TestStreamNewReplyHandler(t *testing.T) {
267267
}
268268

269269
func TestStreamGetRepliesHandler(t *testing.T) {
270-
owner := "owner-1"
271270
rootId := "root-1"
272271
parentId := "parent-1"
273272

274273
t.Run("invalid payload", func(t *testing.T) {
275-
h := StreamGetRepliesHandler(stubReplyRepo{}, stubReplyUserRepo{}, stubStreamer{})
274+
h := StreamGetRepliesHandler(stubReplyRepo{})
276275
_, err := h([]byte("{"), nil)
277276
if err == nil {
278277
t.Fatal("expected error")
@@ -281,17 +280,16 @@ func TestStreamGetRepliesHandler(t *testing.T) {
281280

282281
t.Run("empty parent id defaults to root", func(t *testing.T) {
283282
// Top-level replies on a thread carry no parent_id from the
284-
// client — the handler must fall back to root_id so the
285-
// underlying repo lookup runs against the right key.
283+
// client — the handler must fall back to root_id so the repo
284+
// lookup runs against the first tier of replies hanging off
285+
// the root tweet.
286286
var seenRoot, seenParent string
287287
h := StreamGetRepliesHandler(
288288
stubReplyRepo{getRepliesTreeFn: func(rootID, parentIdArg string, _ *uint64, _ *string) ([]domain.ReplyNode, string, error) {
289289
seenRoot = rootID
290290
seenParent = parentIdArg
291291
return nil, "", nil
292292
}},
293-
stubReplyUserRepo{},
294-
stubStreamer{nodeInfo: warpnet.NodeInfo{OwnerId: rootId}},
295293
)
296294
_, err := h(marshal(t, event.GetAllRepliesEvent{RootId: rootId}), nil)
297295
if err != nil {
@@ -303,18 +301,18 @@ func TestStreamGetRepliesHandler(t *testing.T) {
303301
})
304302

305303
t.Run("empty root id", func(t *testing.T) {
306-
h := StreamGetRepliesHandler(stubReplyRepo{}, stubReplyUserRepo{}, stubStreamer{})
304+
h := StreamGetRepliesHandler(stubReplyRepo{})
307305
_, err := h(marshal(t, event.GetAllRepliesEvent{ParentId: parentId}), nil)
308306
if err == nil || err.Error() != "empty root id" {
309307
t.Fatalf("unexpected err: %v", err)
310308
}
311309
})
312310

313-
t.Run("own tweet replies", func(t *testing.T) {
311+
t.Run("serves replies from local repo", func(t *testing.T) {
314312
replies := []domain.ReplyNode{{Reply: domain.Tweet{Id: "r1", Text: "reply"}}}
315313
h := StreamGetRepliesHandler(stubReplyRepo{getRepliesTreeFn: func(rootID, parentIdArg string, limit *uint64, cursor *string) ([]domain.ReplyNode, string, error) {
316314
return replies, "end", nil
317-
}}, stubReplyUserRepo{}, stubStreamer{nodeInfo: warpnet.NodeInfo{OwnerId: parentId}})
315+
}})
318316
resp, err := h(marshal(t, event.GetAllRepliesEvent{RootId: rootId, ParentId: parentId}), nil)
319317
if err != nil {
320318
t.Fatalf("unexpected err: %v", err)
@@ -323,65 +321,19 @@ func TestStreamGetRepliesHandler(t *testing.T) {
323321
if len(r.Replies) != 1 {
324322
t.Fatalf("expected 1 reply, got %d", len(r.Replies))
325323
}
326-
})
327-
328-
t.Run("parent user not found fallback", func(t *testing.T) {
329-
h := StreamGetRepliesHandler(stubReplyRepo{}, stubReplyUserRepo{getFn: func(userId string) (domain.User, error) {
330-
return domain.User{}, database.ErrUserNotFound
331-
}}, stubStreamer{nodeInfo: warpnet.NodeInfo{OwnerId: owner}})
332-
resp, err := h(marshal(t, event.GetAllRepliesEvent{RootId: rootId, ParentId: parentId}), nil)
333-
if err != nil {
334-
t.Fatalf("unexpected err: %v", err)
324+
if r.Cursor != "end" {
325+
t.Fatalf("expected cursor 'end', got %q", r.Cursor)
335326
}
336-
_ = resp.(event.RepliesResponse)
337327
})
338328

339-
t.Run("stream node offline fallback", func(t *testing.T) {
340-
h := StreamGetRepliesHandler(stubReplyRepo{}, stubReplyUserRepo{}, stubStreamer{
341-
nodeInfo: warpnet.NodeInfo{OwnerId: owner},
342-
genericStreamFn: func(nodeId string, path stream.WarpRoute, data any) ([]byte, error) {
343-
return nil, warpnet.ErrNodeIsOffline
344-
},
345-
})
346-
resp, err := h(marshal(t, event.GetAllRepliesEvent{RootId: rootId, ParentId: parentId}), nil)
347-
if err != nil {
348-
t.Fatalf("unexpected err: %v", err)
349-
}
350-
_ = resp.(event.RepliesResponse)
351-
})
352-
353-
t.Run("stream error", func(t *testing.T) {
354-
streamErr := errors.New("broken")
355-
h := StreamGetRepliesHandler(stubReplyRepo{}, stubReplyUserRepo{}, stubStreamer{
356-
nodeInfo: warpnet.NodeInfo{OwnerId: owner},
357-
genericStreamFn: func(nodeId string, path stream.WarpRoute, data any) ([]byte, error) {
358-
return nil, streamErr
359-
},
360-
})
329+
t.Run("propagates repo error", func(t *testing.T) {
330+
boom := errors.New("db down")
331+
h := StreamGetRepliesHandler(stubReplyRepo{getRepliesTreeFn: func(string, string, *uint64, *string) ([]domain.ReplyNode, string, error) {
332+
return nil, "", boom
333+
}})
361334
_, err := h(marshal(t, event.GetAllRepliesEvent{RootId: rootId, ParentId: parentId}), nil)
362-
if !errors.Is(err, streamErr) {
363-
t.Fatalf("expected stream error: %v", err)
364-
}
365-
})
366-
367-
t.Run("remote successful response", func(t *testing.T) {
368-
remoteResp, _ := json.Marshal(event.RepliesResponse{
369-
Cursor: "end",
370-
Replies: []domain.ReplyNode{{Reply: domain.Tweet{Id: "r1", Text: "remote reply", RootId: rootId, ParentId: &parentId}}},
371-
})
372-
h := StreamGetRepliesHandler(stubReplyRepo{}, stubReplyUserRepo{}, stubStreamer{
373-
nodeInfo: warpnet.NodeInfo{OwnerId: owner},
374-
genericStreamFn: func(nodeId string, path stream.WarpRoute, data any) ([]byte, error) {
375-
return remoteResp, nil
376-
},
377-
})
378-
resp, err := h(marshal(t, event.GetAllRepliesEvent{RootId: rootId, ParentId: parentId}), nil)
379-
if err != nil {
380-
t.Fatalf("unexpected err: %v", err)
381-
}
382-
r := resp.(event.RepliesResponse)
383-
if len(r.Replies) != 1 {
384-
t.Fatalf("expected 1 reply: %v", r)
335+
if !errors.Is(err, boom) {
336+
t.Fatalf("expected db error, got %v", err)
385337
}
386338
})
387339
}

core/handler/self_request_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,6 @@ func TestOwnerSelfRequest_NoOutboundStream(t *testing.T) {
6060
ownerTweetUserRepo := stubTweetUserRepo{getFn: func(userId string) (domain.User, error) {
6161
return domain.User{Id: userId, NodeId: ownerNodeID}, nil
6262
}}
63-
ownerReplyUserRepo := stubReplyUserRepo{getFn: func(userId string) (domain.User, error) {
64-
return domain.User{Id: userId, NodeId: ownerNodeID}, nil
65-
}}
6663
ownerLikeUserRepo := stubLikeUserRepo{getFn: func(userId string) (domain.User, error) {
6764
return domain.User{Id: userId, NodeId: ownerNodeID}, nil
6865
}}
@@ -255,11 +252,9 @@ func TestOwnerSelfRequest_NoOutboundStream(t *testing.T) {
255252
})
256253

257254
t.Run("StreamGetRepliesHandler - replies under own tweet", func(t *testing.T) {
258-
streamer := stubStreamer{
259-
nodeInfo: ownerInfo,
260-
genericStreamFn: failOnStream(t),
261-
}
262-
h := StreamGetRepliesHandler(stubReplyRepo{}, ownerReplyUserRepo, streamer)
255+
// Replies handler is fully local now; no streamer/user repo
256+
// involved (see reply.go for why).
257+
h := StreamGetRepliesHandler(stubReplyRepo{})
263258
if _, err := h(marshal(t, event.GetAllRepliesEvent{RootId: rootID, ParentId: owner}), nil); err != nil {
264259
t.Fatalf("unexpected err: %v", err)
265260
}

domain/warpnet.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ type ReplyNode struct {
144144
const RetweetPrefix = "RT:"
145145

146146
// Tweet defines model for Tweet.
147+
//
148+
// ParentId is the parent TWEET id (not a user id) for replies; nil for
149+
// top-level tweets and for replies that hang directly off RootId.
147150
type Tweet struct {
148151
CreatedAt time.Time `json:"created_at"`
149152
UpdatedAt *time.Time `json:"updated_at,omitempty"`

event/event.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ type GetAllMessagesEvent struct {
134134
}
135135

136136
// GetAllRepliesEvent defines model for GetAllRepliesEvent.
137+
//
138+
// ParentId is the parent TWEET id (not a user id) — it selects which
139+
// subtree of replies inside RootId to return. Empty means "top-level
140+
// replies of the thread"; the handler treats that as ParentId = RootId.
141+
// RootId is the root tweet of the thread.
137142
type GetAllRepliesEvent struct {
138143
Cursor *string `json:"cursor,omitempty"`
139144
Limit *uint64 `json:"limit,omitempty"`
@@ -284,6 +289,12 @@ type NewMessageEvent = domain.ChatMessage
284289
type NewMessageResponse = domain.ChatMessage
285290

286291
// NewReplyEvent defines model for NewReplyEvent.
292+
//
293+
// ParentId is the parent TWEET id this reply is attached to (nil/empty
294+
// means the reply hangs directly off RootId). ParentUserId is the user
295+
// id of the parent tweet's author — that's the routing key the server
296+
// uses to forward the request to the right node when the parent tweet
297+
// lives on a remote peer.
287298
type NewReplyEvent struct {
288299
CreatedAt time.Time `json:"created_at"`
289300
Id domain.ID `json:"id"`

version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.7.17
1+
0.7.18

0 commit comments

Comments
 (0)