Skip to content

Commit 7bfbdd0

Browse files
committed
feat: implement server data point broadcasting and subscription management
1 parent 7fbe986 commit 7bfbdd0

2 files changed

Lines changed: 76 additions & 56 deletions

File tree

taks/pingJob.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ func (j *PingJob) run() {
152152
continue
153153
}
154154

155+
websocket.GlobalHub.SendToServer(server.IP, map[string]interface{}{
156+
"type": "data_point_rt",
157+
"data": data.ServerDataPoint{
158+
Timestamp: time.Now().Unix(),
159+
PlayerCount: resp.PlayerCount.Online,
160+
Ip: server.IP,
161+
Name: server.Name,
162+
},
163+
})
164+
155165
pingTime := time.Since(start)
156166

157167
// EWMA

websocket/websocket.go

Lines changed: 66 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ package websocket
22

33
import (
44
"MineTracker/util"
5-
"errors"
5+
"encoding/json"
66
"fmt"
7-
"net"
87
"net/http"
98
"os"
109
"sync"
@@ -22,14 +21,16 @@ var upgrader = websocket.Upgrader{
2221
}
2322

2423
type Hub struct {
25-
clients map[*websocket.Conn]bool
26-
writeMu map[*websocket.Conn]*sync.Mutex
27-
mu sync.RWMutex
24+
clients map[*websocket.Conn]bool
25+
writeMu map[*websocket.Conn]*sync.Mutex
26+
subscriptions map[string]map[*websocket.Conn]bool
27+
mu sync.RWMutex
2828
}
2929

3030
var GlobalHub = &Hub{
31-
clients: make(map[*websocket.Conn]bool),
32-
writeMu: make(map[*websocket.Conn]*sync.Mutex),
31+
clients: make(map[*websocket.Conn]bool),
32+
writeMu: make(map[*websocket.Conn]*sync.Mutex),
33+
subscriptions: make(map[string]map[*websocket.Conn]bool),
3334
}
3435

3536
func (h *Hub) Register(conn *websocket.Conn) {
@@ -42,36 +43,58 @@ func (h *Hub) Register(conn *websocket.Conn) {
4243
func (h *Hub) Unregister(conn *websocket.Conn) {
4344
h.mu.Lock()
4445
defer h.mu.Unlock()
46+
4547
delete(h.clients, conn)
4648
delete(h.writeMu, conn)
49+
50+
for _, subs := range h.subscriptions {
51+
delete(subs, conn)
52+
}
4753
}
4854

49-
func (h *Hub) WriteJSON(conn *websocket.Conn, v interface{}) error {
50-
h.mu.RLock()
51-
m := h.writeMu[conn]
52-
h.mu.RUnlock()
55+
func (h *Hub) Subscribe(conn *websocket.Conn, ip string) {
56+
h.mu.Lock()
57+
defer h.mu.Unlock()
58+
59+
if h.subscriptions[ip] == nil {
60+
h.subscriptions[ip] = make(map[*websocket.Conn]bool)
61+
}
62+
h.subscriptions[ip][conn] = true
63+
}
64+
65+
func (h *Hub) Unsubscribe(conn *websocket.Conn, ip string) {
66+
h.mu.Lock()
67+
defer h.mu.Unlock()
68+
69+
if subs, ok := h.subscriptions[ip]; ok {
70+
delete(subs, conn)
71+
if len(subs) == 0 {
72+
delete(h.subscriptions, ip)
73+
}
74+
}
75+
}
5376

77+
func (h *Hub) writeJSONLocked(conn *websocket.Conn, v interface{}) error {
78+
m := h.writeMu[conn]
5479
if m == nil {
5580
return fmt.Errorf("connection not registered")
5681
}
57-
5882
m.Lock()
5983
defer m.Unlock()
6084
return conn.WriteJSON(v)
6185
}
6286

63-
func (h *Hub) WriteMessage(conn *websocket.Conn, messageType int, data []byte) error {
87+
func (h *Hub) SendToServer(ip string, message interface{}) {
6488
h.mu.RLock()
65-
m := h.writeMu[conn]
89+
conns := h.subscriptions[ip]
6690
h.mu.RUnlock()
6791

68-
if m == nil {
69-
return fmt.Errorf("connection not registered")
92+
for conn := range conns {
93+
if err := h.writeJSONLocked(conn, message); err != nil {
94+
h.Unregister(conn)
95+
_ = conn.Close()
96+
}
7097
}
71-
72-
m.Lock()
73-
defer m.Unlock()
74-
return conn.WriteMessage(messageType, data)
7598
}
7699

77100
func (h *Hub) Broadcast(message interface{}) {
@@ -83,61 +106,48 @@ func (h *Hub) Broadcast(message interface{}) {
83106
h.mu.RUnlock()
84107

85108
for _, conn := range conns {
86-
if err := h.WriteJSON(conn, message); err != nil {
87-
isExpectedClose := websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) ||
88-
errors.Is(err, websocket.ErrCloseSent) || errors.Is(err, websocket.ErrCloseSent) || errors.Is(err, net.ErrClosed)
89-
90-
if isExpectedClose {
91-
h.Unregister(conn)
92-
if cerr := conn.Close(); cerr != nil {
93-
if !(websocket.IsCloseError(cerr, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) ||
94-
errors.Is(cerr, websocket.ErrCloseSent) || errors.Is(cerr, websocket.ErrCloseSent) || errors.Is(cerr, net.ErrClosed)) {
95-
util.Logger.Error().Err(cerr).Msg("Error closing client connection")
96-
}
97-
}
98-
continue
99-
}
109+
if err := h.writeJSONLocked(conn, message); err != nil {
100110
h.Unregister(conn)
101-
if cerr := conn.Close(); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
102-
util.Logger.Error().Err(cerr).Msg("Error closing client connection")
103-
}
111+
_ = conn.Close()
104112
}
105113
}
106114
}
107115

116+
type WSMessage struct {
117+
Type string `json:"type"`
118+
IP string `json:"ip,omitempty"`
119+
}
120+
108121
func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
109122
conn, err := upgrader.Upgrade(w, r, nil)
110123
if err != nil {
111-
util.Logger.Error().Err(err).Msg("Error while upgrading to websocket")
124+
util.Logger.Error().Err(err).Msg("WS upgrade failed")
112125
return
113126
}
114127

115128
GlobalHub.Register(conn)
116-
117-
defer func(conn *websocket.Conn) {
129+
defer func() {
118130
GlobalHub.Unregister(conn)
119-
if err := conn.Close(); err != nil {
120-
if !(websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) || errors.Is(err, net.ErrClosed)) {
121-
util.Logger.Error().Err(err).Msg("Error while closing connection")
122-
}
123-
}
124-
}(conn)
131+
_ = conn.Close()
132+
}()
125133

126134
for {
127-
messageType, message, err := conn.ReadMessage()
135+
_, raw, err := conn.ReadMessage()
128136
if err != nil {
129-
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) || errors.Is(err, net.ErrClosed) {
130-
// Expected close
131-
} else {
132-
util.Logger.Error().Err(err).Msg("Error while reading")
133-
}
134137
break
135138
}
136-
util.Logger.Info().Str("type", string(rune(messageType))).Msg("Got message")
137139

138-
if err = GlobalHub.WriteMessage(conn, messageType, message); err != nil {
139-
util.Logger.Error().Err(err).Msg("Error while writing")
140-
break
140+
var msg WSMessage
141+
if err := json.Unmarshal(raw, &msg); err != nil {
142+
continue
143+
}
144+
145+
switch msg.Type {
146+
case "subscribe_server":
147+
GlobalHub.Subscribe(conn, msg.IP)
148+
149+
case "unsubscribe_server":
150+
GlobalHub.Unsubscribe(conn, msg.IP)
141151
}
142152
}
143153
}

0 commit comments

Comments
 (0)