Skip to content

Commit 838a702

Browse files
authored
Merge pull request #305 from Warp-net/develop
Develop
2 parents 1b37829 + b452214 commit 838a702

114 files changed

Lines changed: 604 additions & 408 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/node/business/handlers/bridge.go

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,6 @@ const pathIsFirstRun = "is-first-run"
4545
// burst of dashboard calls) can spawn, so one connection can't exhaust memory.
4646
const maxInflightDispatches = 32
4747

48-
// logoutGracePeriod is how long the node waits after the last dashboard
49-
// connection drops before logging the owner out. A page reload reconnects well
50-
// within this window (cancelling the logout); closing the browser tab never
51-
// reconnects, so the logout fires and the database is sealed.
52-
const logoutGracePeriod = 5 * time.Second
53-
5448
var upgrader = websocket.Upgrader{
5549
ReadBufferSize: 4096,
5650
WriteBufferSize: 4096,
@@ -60,7 +54,7 @@ var upgrader = websocket.Upgrader{
6054
// sameOrigin permits only the dashboard served from this node: a request whose
6155
// Origin host matches the Host it connects to. A missing Origin (non-browser
6256
// clients, e.g. health probes) is allowed; any other origin is rejected so a
63-
// malicious page can't open a /ws connection and pin the auto-logout open.
57+
// malicious page can't open a /ws connection to this node.
6458
func sameOrigin(r *http.Request) bool {
6559
origin := r.Header.Get("Origin")
6660
if origin == "" {
@@ -88,7 +82,6 @@ type Authenticator interface {
8882
AuthLogin(message event.LoginEvent, psk security.PSK) (event.LoginResponse, error)
8983
AuthLogout()
9084
Reset()
91-
IsAuthenticated() bool
9285
PrivateKey() ed25519.PrivateKey
9386
}
9487

@@ -100,10 +93,6 @@ type BridgeHandler struct {
10093

10194
mx sync.RWMutex
10295
node Node
103-
104-
connMx sync.Mutex
105-
activeConns int
106-
logoutTimer *time.Timer
10796
}
10897

10998
func NewBridgeHandler(
@@ -128,13 +117,6 @@ func (b *BridgeHandler) Handle() http.HandlerFunc {
128117
return
129118
}
130119

131-
// Track the dashboard connection so closing the last browser tab logs
132-
// the owner out (a reload reconnects within the grace period). The
133-
// conn.Close defer is registered last so it runs first on return:
134-
// the socket is fully torn down before the disconnect accounting arms
135-
// the auto-logout timer.
136-
b.connConnected()
137-
defer b.connDisconnected()
138120
defer func() { _ = conn.Close() }()
139121

140122
// Dispatch each message in its own goroutine so a slow libp2p self-stream
@@ -201,53 +183,6 @@ func (b *BridgeHandler) AttachNode(n Node) {
201183
b.mx.Unlock()
202184
}
203185

204-
// connConnected records a new dashboard connection and cancels any pending
205-
// auto-logout (e.g. a page reload reconnecting within the grace period).
206-
func (b *BridgeHandler) connConnected() {
207-
b.connMx.Lock()
208-
defer b.connMx.Unlock()
209-
b.activeConns++
210-
if b.logoutTimer != nil {
211-
b.logoutTimer.Stop()
212-
b.logoutTimer = nil
213-
}
214-
}
215-
216-
// connDisconnected records a dropped dashboard connection. When the last one
217-
// goes away it arms a grace timer that logs the owner out unless a new
218-
// connection arrives first, so closing the browser tab seals the node while a
219-
// reload keeps the session.
220-
func (b *BridgeHandler) connDisconnected() {
221-
b.connMx.Lock()
222-
defer b.connMx.Unlock()
223-
if b.activeConns > 0 {
224-
b.activeConns--
225-
}
226-
if b.activeConns > 0 {
227-
return
228-
}
229-
if b.logoutTimer != nil {
230-
b.logoutTimer.Stop()
231-
}
232-
b.logoutTimer = time.AfterFunc(logoutGracePeriod, b.autoLogout)
233-
}
234-
235-
// autoLogout fires when the dashboard has been gone for the whole grace period.
236-
// It holds connMx across the decision and the logout so a reconnect can't slip
237-
// in between (which would leave an active connection on a logged-out node); the
238-
// reconnect either precedes it (activeConns > 0, bail) or follows it cleanly.
239-
func (b *BridgeHandler) autoLogout() {
240-
b.connMx.Lock()
241-
defer b.connMx.Unlock()
242-
b.logoutTimer = nil
243-
if b.activeConns > 0 || !b.auth.IsAuthenticated() {
244-
return
245-
}
246-
log.Infoln("business: dashboard closed, logging out")
247-
b.auth.AuthLogout() // closes the database; the node keeps running
248-
b.auth.Reset() // clear the auth guard so the next login can re-authenticate
249-
}
250-
251186
func (b *BridgeHandler) dispatch(req event.Message) event.Message {
252187
defer func() {
253188
if r := recover(); r != nil {

core/dht/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import "github.com/Warp-net/warpnet/core/warpnet"
2929
type dhtConfig struct {
3030
store RoutingStorer
3131
addCallbacks, removeCallbacks []func(info warpnet.WarpPeerID)
32-
bootstrapNodes []warpnet.WarpAddrInfo
32+
bootstrapNodes []warpnet.WarpAddrInfo
3333
network string
3434
}
3535
type Option func(*dhtConfig)

core/handler/filter_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ import (
1010
)
1111

1212
type stubFilterRepo struct {
13-
createFn func(userId string, f domain.Filter) (domain.Filter, error)
14-
getFn func(userId, filterId string) (domain.Filter, error)
15-
updateFn func(userId string, f domain.Filter) (domain.Filter, error)
16-
deleteFn func(userId, filterId string) error
17-
listFn func(userId string, limit *uint64, cursor *string) ([]domain.Filter, string, error)
18-
addKwFn func(userId, filterId string, kw domain.FilterKeyword) (domain.FilterKeyword, error)
19-
updateKwFn func(userId string, kw domain.FilterKeyword) (domain.FilterKeyword, error)
20-
deleteKwFn func(userId, keywordId string) error
13+
createFn func(userId string, f domain.Filter) (domain.Filter, error)
14+
getFn func(userId, filterId string) (domain.Filter, error)
15+
updateFn func(userId string, f domain.Filter) (domain.Filter, error)
16+
deleteFn func(userId, filterId string) error
17+
listFn func(userId string, limit *uint64, cursor *string) ([]domain.Filter, string, error)
18+
addKwFn func(userId, filterId string, kw domain.FilterKeyword) (domain.FilterKeyword, error)
19+
updateKwFn func(userId string, kw domain.FilterKeyword) (domain.FilterKeyword, error)
20+
deleteKwFn func(userId, keywordId string) error
2121
}
2222

2323
func (s stubFilterRepo) Create(u string, f domain.Filter) (domain.Filter, error) {

core/handler/like.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func StreamUnlikeHandler(repo LikesStorer, userRepo LikedUserFetcher, streamer L
193193
if err != nil {
194194
return nil, err
195195
}
196-
196+
197197
if unlikedUser.NodeId == ownNodeInfo.ID.String() {
198198
return event.LikesCountResponse{Count: num}, nil
199199
}

core/handler/subscription_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
type stubSubsRepo struct {
12-
subscribeFn func(ownerId, targetId string) error
12+
subscribeFn func(ownerId, targetId string) error
1313
unsubscribeFn func(ownerId, targetId string) error
1414
}
1515

core/handler/user.go

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"strings"
3434
"time"
3535

36-
"github.com/Warp-net/warpnet/core/mastodon"
3736
"github.com/Warp-net/warpnet/core/stream"
3837
"github.com/Warp-net/warpnet/core/warpnet"
3938
"github.com/Warp-net/warpnet/database"
@@ -296,89 +295,6 @@ func refreshUsers(
296295
}
297296
}
298297

299-
func StreamGetWhoToFollowHandler(
300-
authRepo UserAuthStorer,
301-
userRepo UserFetcher,
302-
followRepo UserFollowsCounter,
303-
) warpnet.WarpHandlerFunc {
304-
return func(buf []byte, s warpnet.WarpStream) (any, error) {
305-
var ev event.GetAllUsersEvent
306-
err := json.Unmarshal(buf, &ev)
307-
if err != nil {
308-
return nil, err
309-
}
310-
311-
if ev.UserId == "" {
312-
return nil, errEmptyUserId
313-
}
314-
315-
users, cursor, err := userRepo.WhoToFollow(ev.Limit, ev.Cursor)
316-
if err != nil {
317-
return nil, err
318-
}
319-
320-
followingsLimit := uint64(80) //nolint:mnd // TODO limit?
321-
followings, _, err := followRepo.GetFollowings(authRepo.GetOwner().UserId, &followingsLimit, nil)
322-
if err != nil {
323-
log.Errorf("get who to follow handler: get followers %v", err)
324-
}
325-
326-
followedUsers := map[string]struct{}{}
327-
for _, followingId := range followings {
328-
followedUsers[followingId] = struct{}{}
329-
}
330-
331-
owner := authRepo.GetOwner()
332-
333-
profile, err := userRepo.Get(ev.UserId)
334-
if err != nil {
335-
log.Errorf("get who to follow handler: get user %v", err)
336-
profile = domain.User{
337-
Id: owner.UserId,
338-
Username: owner.Username,
339-
Network: warpnet.WarpnetName,
340-
NodeId: owner.NodeId,
341-
}
342-
}
343-
344-
whotofollow := make([]domain.User, 0, len(users))
345-
latestByNode := make(map[string]int, len(users))
346-
for _, user := range users {
347-
if user.IsOffline {
348-
continue
349-
}
350-
if user.Id == owner.UserId || user.NodeId == owner.NodeId {
351-
continue
352-
}
353-
// if profile from Warpnet - don't show other network recommendations
354-
if profile.Id != owner.UserId && profile.Network != user.Network {
355-
continue
356-
}
357-
if _, ok := followedUsers[user.Id]; ok {
358-
continue
359-
}
360-
361-
if user.NodeId != "" && user.Network != mastodon.Network {
362-
if idx, ok := latestByNode[user.NodeId]; ok {
363-
if user.CreatedAt.After(whotofollow[idx].CreatedAt) {
364-
whotofollow[idx] = user
365-
}
366-
continue
367-
}
368-
}
369-
370-
latestByNode[user.NodeId] = len(whotofollow)
371-
372-
whotofollow = append(whotofollow, user)
373-
}
374-
375-
return event.UsersResponse{
376-
Cursor: cursor,
377-
Users: whotofollow,
378-
}, nil
379-
}
380-
}
381-
382298
func StreamUpdateProfileHandler(authRepo UserAuthStorer, userRepo UserFetcher) warpnet.WarpHandlerFunc {
383299
return func(buf []byte, s warpnet.WarpStream) (any, error) {
384300
var ev event.NewUserEvent

core/handler/user_test.go

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -310,78 +310,6 @@ func TestStreamGetUsersHandler(t *testing.T) {
310310
})
311311
}
312312

313-
func TestStreamGetWhoToFollowHandler(t *testing.T) {
314-
owner := "owner-1"
315-
316-
t.Run("invalid payload", func(t *testing.T) {
317-
h := StreamGetWhoToFollowHandler(stubAuth{owner: domain.Owner{UserId: owner}}, stubUserFetcher{}, stubUserFollowsCounter{})
318-
_, err := h([]byte("{"), nil)
319-
if err == nil {
320-
t.Fatal("expected error")
321-
}
322-
})
323-
324-
t.Run("empty user id", func(t *testing.T) {
325-
h := StreamGetWhoToFollowHandler(stubAuth{owner: domain.Owner{UserId: owner}}, stubUserFetcher{}, stubUserFollowsCounter{})
326-
_, err := h(marshal(t, event.GetAllUsersEvent{}), nil)
327-
if err == nil || err.Error() != "empty user id" {
328-
t.Fatalf("unexpected err: %v", err)
329-
}
330-
})
331-
332-
t.Run("filters out self and already followed", func(t *testing.T) {
333-
h := StreamGetWhoToFollowHandler(
334-
stubAuth{owner: domain.Owner{UserId: owner, NodeId: "node-1", Username: "test"}},
335-
stubUserFetcher{
336-
whoToFollowFn: func(limit *uint64, cursor *string) ([]domain.User, string, error) {
337-
return []domain.User{
338-
{Id: owner, Network: warpnet.WarpnetName},
339-
{Id: "already-followed", Network: warpnet.WarpnetName},
340-
{Id: "new-user", Network: warpnet.WarpnetName},
341-
}, "end", nil
342-
},
343-
getFn: func(userId string) (domain.User, error) {
344-
return domain.User{Id: owner, Network: warpnet.WarpnetName}, nil
345-
},
346-
},
347-
stubUserFollowsCounter{getFollowingsFn: func(userId string, limit *uint64, cursor *string) ([]string, string, error) {
348-
return []string{"already-followed"}, "", nil
349-
}},
350-
)
351-
resp, err := h(marshal(t, event.GetAllUsersEvent{UserId: owner}), nil)
352-
if err != nil {
353-
t.Fatalf("unexpected err: %v", err)
354-
}
355-
r := resp.(event.UsersResponse)
356-
if len(r.Users) != 1 || r.Users[0].Id != "new-user" {
357-
t.Fatalf("expected only new-user, got: %v", r.Users)
358-
}
359-
})
360-
361-
t.Run("repo error", func(t *testing.T) {
362-
repoErr := errors.New("db error")
363-
h := StreamGetWhoToFollowHandler(stubAuth{owner: domain.Owner{UserId: owner}}, stubUserFetcher{whoToFollowFn: func(limit *uint64, cursor *string) ([]domain.User, string, error) {
364-
return nil, "", repoErr
365-
}}, stubUserFollowsCounter{})
366-
_, err := h(marshal(t, event.GetAllUsersEvent{UserId: owner}), nil)
367-
if !errors.Is(err, repoErr) {
368-
t.Fatalf("expected repo error: %v", err)
369-
}
370-
})
371-
372-
t.Run("empty results", func(t *testing.T) {
373-
h := StreamGetWhoToFollowHandler(stubAuth{owner: domain.Owner{UserId: owner}}, stubUserFetcher{}, stubUserFollowsCounter{})
374-
resp, err := h(marshal(t, event.GetAllUsersEvent{UserId: owner}), nil)
375-
if err != nil {
376-
t.Fatalf("unexpected err: %v", err)
377-
}
378-
r := resp.(event.UsersResponse)
379-
if len(r.Users) != 0 {
380-
t.Fatalf("expected empty users: %v", r.Users)
381-
}
382-
})
383-
}
384-
385313
func TestStreamUpdateProfileHandler(t *testing.T) {
386314
owner := "owner-1"
387315

0 commit comments

Comments
 (0)