Skip to content

Commit da514e9

Browse files
committed
socket cluster and local load balancer
1 parent 062a48c commit da514e9

File tree

20 files changed

+1471
-483
lines changed

20 files changed

+1471
-483
lines changed

examples/socket-chat/public/app.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import * as chain from '/chain.js'
22

3-
const socket = new chain.Socket('/socket')
3+
const socket = new chain.Socket({
4+
getNodes: async () => { return ['/socket'] }
5+
})
46
socket.connect()
57

68
const channel = socket.channel("chat:lobby", { param1: 'foo' })

examples/socket-cluster/main.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"embed"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"log"
10+
"log/slog"
11+
"net/http"
12+
"strings"
13+
"time"
14+
15+
"math/rand"
16+
17+
"github.com/nidorx/chain"
18+
"github.com/nidorx/chain/pubsub"
19+
"github.com/nidorx/chain/socket"
20+
)
21+
22+
var (
23+
//go:embed public
24+
staticFs embed.FS
25+
staticDir = "public"
26+
lastPortUsed = 8080
27+
cluster = map[int]*http.Server{}
28+
)
29+
30+
func main() {
31+
32+
// Used by session
33+
if err := chain.SetSecretKeyBase("ZcbD0D29eYsGq89QjirJbPkw7Qxwxboy"); err != nil {
34+
panic(err)
35+
}
36+
37+
initPublisher()
38+
39+
router := chain.New()
40+
router.GET("/*", createStaticFileHandler())
41+
router.GET("/node", listNodeHandler)
42+
router.POST("/node", addNodeHandler)
43+
router.DELETE("/node", deleteNodeHandler)
44+
socket.ClientJsHandler(router, "/") // "/chain.js"
45+
46+
port := fmt.Sprintf("%d", lastPortUsed)
47+
log.Printf("Listening on :%s...\n", port)
48+
49+
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), router); err != nil {
50+
log.Fatalf("ListenAndServe: %v", err)
51+
}
52+
}
53+
54+
func initPublisher() {
55+
// send message to all nodes, using pubsub (local)
56+
go func() {
57+
addNodeHandler(nil) // first socket server
58+
addNodeHandler(nil) // second socket server
59+
60+
ticker := time.NewTicker(500 * time.Millisecond)
61+
i := 1
62+
for range ticker.C {
63+
i++
64+
if bytes, err := json.Marshal(map[string]any{"name": "Server", "body": fmt.Sprintf("msg %d", i)}); err != nil {
65+
return
66+
} else {
67+
pubsub.Broadcast("chat:lobby", bytes)
68+
}
69+
}
70+
}()
71+
}
72+
73+
func deleteNodeHandler(ctx *chain.Context) {
74+
if len(cluster) > 0 {
75+
for {
76+
toRemove := 0
77+
for port := range cluster {
78+
if rand.Intn(100) > 50 {
79+
toRemove = port
80+
break
81+
}
82+
}
83+
if toRemove > 0 {
84+
sctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
85+
if err := cluster[toRemove].Shutdown(sctx); err != nil {
86+
cluster[toRemove].Close()
87+
}
88+
delete(cluster, toRemove)
89+
break
90+
}
91+
}
92+
}
93+
ctx.OK()
94+
}
95+
96+
func listNodeHandler(ctx *chain.Context) {
97+
var nodes []int
98+
for port, _ := range cluster {
99+
nodes = append(nodes, port)
100+
}
101+
ctx.Json(nodes)
102+
}
103+
104+
func addNodeHandler(ctx *chain.Context) {
105+
lastPortUsed++
106+
port := fmt.Sprintf("%d", lastPortUsed)
107+
108+
channel := socket.NewChannel("chat:*", func(channel *socket.Channel) {
109+
110+
channel.Join("chat:lobby", func(payload any, skt *socket.Socket) (reply any, err error) {
111+
slog.Info(
112+
skt.Topic()+" join",
113+
slog.String("server", port),
114+
slog.Any("socket", skt.Id()),
115+
slog.Any("payload", payload),
116+
)
117+
return
118+
})
119+
120+
channel.Leave("chat:lobby", func(skt *socket.Socket, reason socket.LeaveReason) {
121+
slog.Info(
122+
skt.Topic()+" leave",
123+
slog.String("server", port),
124+
slog.Any("socket", skt.Id()),
125+
slog.Any("reason", reason),
126+
)
127+
})
128+
129+
// automatically send messages to connected clients (from pubsub)
130+
channel.Subscribe("chat:*", "message")
131+
})
132+
133+
handler := &socket.Handler{
134+
Channels: []*socket.Channel{channel},
135+
OnConfig: func(handler *socket.Handler, router *chain.Router, endpoint string) error {
136+
return nil
137+
},
138+
OnConnect: func(info *socket.Session) error {
139+
return nil
140+
},
141+
Transports: []socket.Transport{&socket.TransportSSE{
142+
Cors: &socket.CorsConfig{
143+
MaxAge: 12 * time.Hour,
144+
AllowAllOrigins: false,
145+
AllowCredentials: true,
146+
AllowPrivateNetwork: false,
147+
AllowOrigins: []string{"*"},
148+
AllowMethods: []string{"GET", "POST", "OPTIONS"},
149+
AllowHeaders: []string{"Origin", "Content-Length", "Content-Type"},
150+
ExposeHeaders: []string{},
151+
},
152+
}},
153+
}
154+
155+
// configure socket server
156+
router := chain.New()
157+
router.Configure("/socket", handler)
158+
159+
server := &http.Server{
160+
Addr: fmt.Sprintf(":%s", port),
161+
Handler: router,
162+
}
163+
164+
go func() {
165+
log.Printf("Socket listening on :%s...\n", port)
166+
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
167+
log.Fatalf("Socket HTTP server error: %v", err)
168+
}
169+
}()
170+
171+
cluster[lastPortUsed] = server
172+
173+
if ctx != nil {
174+
ctx.OK()
175+
}
176+
}
177+
178+
func createStaticFileHandler() http.Handler {
179+
h := http.FileServer(http.FS(staticFs))
180+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
181+
if r.URL.Path == "/" {
182+
r.URL.Path = fmt.Sprintf("/%s/", staticDir)
183+
} else {
184+
b := strings.Split(r.URL.Path, "/")[0]
185+
if b != staticDir {
186+
r.URL.Path = fmt.Sprintf("/%s%s", staticDir, r.URL.Path)
187+
}
188+
}
189+
h.ServeHTTP(w, r)
190+
})
191+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
body {
2+
font-family: 'Open Sans', sans-serif;
3+
background: #D5B3DE;
4+
font-size: 1.0em;
5+
line-height: 1.4em;
6+
overflow-x: hidden;
7+
}
8+
9+
h2 {
10+
margin: 5px 0;
11+
}
12+
13+
.container {
14+
width: 75%;
15+
height: 100%;
16+
padding: 20px;
17+
background-color: #fff;
18+
top: 100px;
19+
bottom: 0;
20+
left: 0;
21+
right: 0;
22+
margin: auto;
23+
}
24+
25+
#chat-box {
26+
border: 1px #000 solid;
27+
padding: 0 0 0 5px;
28+
min-height: 225px;
29+
max-height: 225px;
30+
overflow: scroll;
31+
font-size: 12px;
32+
font-family: monospace;
33+
line-height: 14px;
34+
}
35+
36+
button {
37+
margin-right: 2px;
38+
}
39+
40+
#server-list button.preferred {
41+
color: green;
42+
font-weight: bold;
43+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import * as chain from '/chain.js';
2+
3+
let priority = 8081;
4+
let servers = [8081];
5+
6+
const socket = new chain.Socket({
7+
transport: [
8+
{
9+
name: "SSE",
10+
cors: true,
11+
}
12+
],
13+
getNodesInterval: 5,
14+
getNodes: async () => {
15+
return fetch('/node')
16+
.then(res => res.json())
17+
.then((res) => {
18+
let host = window.location.host.split(':')[0];
19+
20+
servers = res || [];
21+
22+
if (!servers.includes(priority)) {
23+
priority = servers[0];
24+
}
25+
26+
servers.sort((a, b) => {
27+
if (a == priority) {
28+
return -1;
29+
}
30+
if (b == priority) {
31+
return 1;
32+
}
33+
return a - b;
34+
});
35+
36+
setTimeout(updateServerListButton, 50);
37+
38+
return servers.map((port) => {
39+
return `http://${host}:${port}/socket`
40+
});
41+
});
42+
}
43+
})
44+
socket.connect()
45+
46+
socket.channel("chat:lobby", { param1: 'foo' })
47+
.on('ok', () => chain.log('Join', "success"))
48+
.on('error', err => chain.log('Join', "errored", err))
49+
.on('timeout', () => chain.log('Join', "timed out "))
50+
.on('message', onMessage)
51+
.join();
52+
53+
document.getElementById('node-add').onclick = () => {
54+
fetch('/node', { method: 'post' }).then(res => { });
55+
};
56+
57+
document.getElementById('node-del').onclick = () => {
58+
fetch('/node', { method: 'delete' }).then(res => { });
59+
};
60+
61+
const chatBox = document.getElementById('chat-box');
62+
63+
function onMessage(message, ref, joinRef) {
64+
let div = document.createElement('div')
65+
div.classList.add('c1');
66+
div.innerHTML = JSON.stringify(message);
67+
chatBox.insertBefore(div, chatBox.firstChild);
68+
69+
if (chatBox.childNodes.length > 50) {
70+
chatBox.removeChild(chatBox.lastChild);
71+
}
72+
}
73+
74+
function updateServerListButton() {
75+
let container = document.getElementById('server-list');
76+
container.innerHTML = '';
77+
78+
servers.forEach((port) => {
79+
let button = document.createElement('button')
80+
button.innerHTML = `:${port}`;
81+
button.onclick = (e) => {
82+
e.preventDefault();
83+
e.stopPropagation();
84+
priority = port;
85+
updateServerListButton();
86+
};
87+
88+
if (priority == port) {
89+
button.classList.add('preferred');
90+
}
91+
92+
container.appendChild(button);
93+
})
94+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="utf-8">
5+
<meta http-equiv="X-UA-Compatible" content="IE=edge">
6+
<meta name="viewport" content="width=device-width, initial-scale=1">
7+
8+
<title>Chain Chat</title>
9+
<link rel="stylesheet" href="/app.css">
10+
</head>
11+
<body>
12+
<div class="container">
13+
<main role="main">
14+
<h2>Chain Chat</h2>
15+
<div id="chat-box"></div>
16+
17+
<h2>Cluster</h2>
18+
<div>
19+
<button id="node-add">add node</button>
20+
<button id="node-del">remove node</button>
21+
<div id="server-list"></div>
22+
</div>
23+
</main>
24+
</div>
25+
26+
<script src="/app.js" type="module"></script>
27+
</body>
28+
</html>

socket/channel.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ const (
2323
)
2424

2525
var (
26-
ErrJoinCrashed = errors.New("join crashed")
27-
ErrUnmatchedTopic = errors.New("unmatched topic")
26+
ErrJoinCrashed = errors.New("join crashed")
27+
ErrUnmatchedTopic = errors.New("unmatched topic")
28+
ErrJoinWildcardTopic = errors.New("joining topics with wildcard is not allowed")
2829
)
2930

3031
// JoinHandler invoked when the client joins a channel (event:_join, `js: channel.join()`).

0 commit comments

Comments
 (0)