Skip to content

Commit 16e1320

Browse files
committed
simple socket encoding, auto reconnect, integration with pubsub
1 parent a3bdd41 commit 16e1320

File tree

12 files changed

+232
-301
lines changed

12 files changed

+232
-301
lines changed

examples/pubsub/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ package main
22

33
import (
44
"fmt"
5+
"time"
6+
57
"github.com/nidorx/chain"
68
"github.com/nidorx/chain/pubsub"
7-
"time"
89
)
910

1011
type MyDispatcher struct {
1112
}
1213

13-
func (d *MyDispatcher) Dispatch(topic string, message any, from string) {
14+
func (d *MyDispatcher) Dispatch(topic string, message []byte, from string) {
1415
println(fmt.Sprintf("New Message. Topic: %s, Content: %s", topic, message))
1516
}
1617

pubsub/compression.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func compressPayload(payload []byte) ([]byte, error) {
2323

2424
// Create a compressed message
2525
buf := bytes.NewBuffer(nil)
26-
buf.WriteByte(byte(messageTypeCompress))
26+
buf.WriteByte(byte(MessageTypeCompress))
2727
buf.Write(buffer.Bytes())
2828
return buf.Bytes(), nil
2929
}

pubsub/crypto.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package pubsub
22

33
import (
44
"bytes"
5+
56
"github.com/nidorx/chain"
67
"github.com/nidorx/chain/crypto"
78
)
89

910
var globalKeyring = chain.NewKeyring("chain.pubsub.keyring.salt", 1000, 32, "sha256")
1011

11-
var aad = append([]byte{byte(messageTypeEncrypt)}, []byte("chain.pubsub.aad")...)
12+
var aad = append([]byte{byte(MessageTypeEncrypt)}, []byte("chain.pubsub.aad")...)
1213

1314
// encryptPayload is used to encrypt a message before sending
1415
func encryptPayload(keyring *crypto.Keyring, payload []byte) ([]byte, error) {
@@ -19,7 +20,7 @@ func encryptPayload(keyring *crypto.Keyring, payload []byte) ([]byte, error) {
1920

2021
// return encrypted cipher text
2122
buf := bytes.NewBuffer(nil)
22-
buf.WriteByte(byte(messageTypeEncrypt))
23+
buf.WriteByte(byte(MessageTypeEncrypt))
2324
buf.Write(encrypted)
2425
return buf.Bytes(), nil
2526
}

pubsub/message.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package pubsub
22

3-
// messageType is an integer ID of a type of message that can be received on network channels from other members.
4-
type messageType uint8
3+
// MessageType is an integer ID of a type of message that can be received on network channels from other members.
4+
type MessageType uint8
55

66
// The list of available message types.
77
const (
8-
messageTypeCompress messageType = iota
9-
messageTypeEncrypt
10-
messageTypeBroadcast
11-
messageTypeDirectBroadcast
12-
indirectPingMsg
13-
ackRespMsg
14-
suspectMsg
15-
aliveMsg
16-
deadMsg
17-
pushPullMsg
18-
compoundMsg
19-
userMsg
20-
nackRespMsg
21-
errMsg
8+
MessageTypeCompress MessageType = iota
9+
MessageTypeEncrypt
10+
MessageTypeBroadcast
11+
MessageTypeDirectBroadcast
12+
IndirectPingMsg
13+
AckRespMsg
14+
SuspectMsg
15+
AliveMsg
16+
DeadMsg
17+
PushPullMsg
18+
CompoundMsg
19+
UserMsg
20+
NackRespMsg
21+
ErrMsg
2222
)

pubsub/pubsub.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,21 @@ var (
2222
)
2323

2424
type Dispatcher interface {
25-
Dispatch(topic string, message any, from string)
25+
Dispatch(topic string, message []byte, from string)
2626
}
2727

2828
type DispatcherFuncImpl struct {
29-
Dispatcher func(topic string, message any, from string)
29+
Dispatcher func(topic string, message []byte, from string)
3030
}
3131

32-
func (d *DispatcherFuncImpl) Dispatch(topic string, message any, from string) {
32+
func (d *DispatcherFuncImpl) Dispatch(topic string, message []byte, from string) {
3333
if d.Dispatcher == nil {
3434
return
3535
}
3636
d.Dispatcher(topic, message, from)
3737
}
3838

39-
func DispatcherFunc(d func(topic string, message any, from string)) Dispatcher {
39+
func DispatcherFunc(d func(topic string, message []byte, from string)) Dispatcher {
4040
return &DispatcherFuncImpl{Dispatcher: d}
4141
}
4242

@@ -122,7 +122,7 @@ func Broadcast(topic string, message []byte, options ...*Option) (err error) {
122122
msgToSend := message
123123

124124
// [messageType: byte] [from: 20 bytes] [msgToSend: ...]
125-
msgToSend = append(append([]byte{byte(messageTypeBroadcast)}, selfIdBytes...), msgToSend...)
125+
msgToSend = append(append([]byte{byte(MessageTypeBroadcast)}, selfIdBytes...), msgToSend...)
126126

127127
// Check if we have compression enabled
128128
if config.DisableCompression == false {
@@ -178,11 +178,11 @@ func DirectBroadcast(nodeId string, topic string, message []byte, options ...*Op
178178
buf.WriteString(topic)
179179
buf.Write(message)
180180

181-
return broadcastMessage(messageTypeDirectBroadcast, "direct:"+nodeId, buf.Bytes(), options...)
181+
return broadcastMessage(MessageTypeDirectBroadcast, "direct:"+nodeId, buf.Bytes(), options...)
182182
}
183183

184184
// Broadcast broadcasts message on given topic across the whole cluster.
185-
func broadcastMessage(msgType messageType, topic string, message []byte, options ...*Option) (err error) {
185+
func broadcastMessage(msgType MessageType, topic string, message []byte, options ...*Option) (err error) {
186186
var config *AdapterConfig
187187
if config = GetAdapter(topic); config == nil {
188188
return ErrNoAdapter
@@ -210,7 +210,7 @@ func broadcastMessage(msgType messageType, topic string, message []byte, options
210210
msgToSend := buf.Bytes()
211211

212212
// Check if we have compression enabled
213-
if config.DisableCompression == false {
213+
if !config.DisableCompression {
214214
var compressed []byte
215215
if compressed, err = compressPayload(msgToSend); err != nil {
216216
slog.Warn(
@@ -224,7 +224,7 @@ func broadcastMessage(msgType messageType, topic string, message []byte, options
224224
}
225225

226226
// Check if we have encryption enabled
227-
if config.DisableEncryption == false {
227+
if !config.DisableEncryption {
228228
keyring := config.Keyring
229229
if keyring == nil {
230230
keyring = globalKeyring
@@ -245,10 +245,10 @@ func broadcastMessage(msgType messageType, topic string, message []byte, options
245245
func Dispatch(topic string, message []byte) {
246246
if config := GetAdapter(topic); config != nil {
247247
// Read the message type
248-
msgType := messageType(message[0])
248+
msgType := MessageType(message[0])
249249

250250
// Check if the message is encrypted
251-
if msgType == messageTypeEncrypt {
251+
if msgType == MessageTypeEncrypt {
252252
if config.DisableEncryption {
253253
slog.Error(
254254
"[chain.pubsub] remote message is encrypted and encryption is not configured",
@@ -274,9 +274,9 @@ func Dispatch(topic string, message []byte) {
274274
}
275275

276276
// Reset message type and buf
277-
msgType = messageType(plain[0])
277+
msgType = MessageType(plain[0])
278278
message = plain
279-
} else if config.DisableEncryption == false {
279+
} else if !config.DisableEncryption {
280280
slog.Error(
281281
"[chain.pubsub] encryption is configured but remote message is not encrypted",
282282
slog.String("Topic", topic),
@@ -286,7 +286,7 @@ func Dispatch(topic string, message []byte) {
286286
}
287287

288288
// Check if we have a compressed message
289-
if msgType == messageTypeCompress {
289+
if msgType == MessageTypeCompress {
290290
decompressed, err := decompressPayload(message)
291291
if err != nil {
292292
slog.Error(
@@ -299,7 +299,7 @@ func Dispatch(topic string, message []byte) {
299299
}
300300

301301
// Reset message type and buf
302-
msgType = messageType(decompressed[0])
302+
msgType = MessageType(decompressed[0])
303303
message = decompressed
304304
}
305305

@@ -334,7 +334,7 @@ func Dispatch(topic string, message []byte) {
334334
message = message[20:]
335335

336336
// Check if is a direct broadcast
337-
if msgType == messageTypeDirectBroadcast {
337+
if msgType == MessageTypeDirectBroadcast {
338338
if topic != directTopic {
339339
slog.Error(
340340
"[chain.pubsub] invalid topic for remote direct broadcast message",
@@ -379,7 +379,7 @@ func Dispatch(topic string, message []byte) {
379379
}
380380
topic = string(message[:topicNameLen])
381381
message = message[topicNameLen:]
382-
} else if msgType != messageTypeBroadcast {
382+
} else if msgType != MessageTypeBroadcast {
383383
slog.Error(
384384
"[chain.pubsub] invalid remote message type",
385385
slog.String("Topic", topic),
@@ -396,7 +396,7 @@ func Dispatch(topic string, message []byte) {
396396
//
397397
// `topic` - The topic to broadcast to, ie: `"users:123"`
398398
// `message` - The payload of the broadcast
399-
func LocalBroadcast(topic string, message any) {
399+
func LocalBroadcast(topic string, message []byte) {
400400
dispatchMessage(topic, message, selfIdString)
401401
}
402402

@@ -480,7 +480,7 @@ func scheduleUnsubscribe(topic string) {
480480
}
481481

482482
// dispatchMessage deliver the message locally
483-
func dispatchMessage(topic string, message any, from string) {
483+
func dispatchMessage(topic string, message []byte, from string) {
484484
go func() {
485485
if from == "" {
486486
from = selfIdString

pubsub/pubsub_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package pubsub
22

33
import (
4-
"github.com/nidorx/chain"
5-
"github.com/segmentio/ksuid"
64
"reflect"
75
"sync"
86
"testing"
97
"time"
8+
9+
"github.com/nidorx/chain"
10+
"github.com/segmentio/ksuid"
1011
)
1112

1213
var (
@@ -197,7 +198,7 @@ type testDispatcherStruct struct {
197198
mutex sync.Mutex
198199
}
199200

200-
func (d *testDispatcherStruct) Dispatch(topic string, message any, from string) {
201+
func (d *testDispatcherStruct) Dispatch(topic string, message []byte, from string) {
201202
d.mutex.Lock()
202203
defer d.mutex.Unlock()
203204
d.messages = append(d.messages, &testDispatcherMessage{topic, message, from})

socket/channel.go

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package socket
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"log/slog"
67
"sync"
@@ -144,11 +145,11 @@ func (c *Channel) Leave(topic string, handler LeaveHandler) {
144145

145146
// Broadcast on the pubsub server with the given topic, event and payload.
146147
func (c *Channel) Broadcast(topic string, event string, payload any) (err error) {
147-
broadcast := newMessage(MessageTypeBroadcast, topic, event, payload)
148-
defer deleteMessage(broadcast)
148+
message := newMessage(MessageTypeBroadcast, topic, event, payload)
149+
defer deleteMessage(message)
149150

150151
var bytes []byte
151-
if bytes, err = c.serializer.Encode(broadcast); err != nil {
152+
if bytes, err = c.serializer.Encode(message); err != nil {
152153
return
153154
}
154155
err = pubsub.Broadcast(topic, bytes)
@@ -157,36 +158,59 @@ func (c *Channel) Broadcast(topic string, event string, payload any) (err error)
157158

158159
// LocalBroadcast on the pubsub server with the given topic, event and payload.
159160
func (c *Channel) LocalBroadcast(topic string, event string, payload any) (err error) {
160-
broadcast := newMessage(MessageTypeBroadcast, topic, event, payload)
161-
pubsub.LocalBroadcast(topic, broadcast)
161+
message := newMessage(MessageTypeBroadcast, topic, event, payload)
162+
var bytes []byte
163+
if bytes, err = c.serializer.Encode(message); err != nil {
164+
return
165+
}
166+
pubsub.LocalBroadcast(topic, bytes)
162167
return
163168
}
164169

165-
// Dispatch Hook invoked by pubsub dispatch.
166-
func (c *Channel) Dispatch(topic string, msg any, from string) {
167-
var message *Message
168-
var valid bool
169-
var payload []byte
170-
isByteArray := false
171-
172-
if payload, valid = msg.([]byte); valid {
173-
isByteArray = true
174-
message = newMessageAny()
175-
if _, err := c.serializer.Decode(payload, message); err != nil {
176-
slog.Debug(
177-
"[chain.socket] could not decode serialized data",
178-
slog.Any("Error", err),
179-
slog.Any("Payload", payload),
180-
slog.String("Topic", topic),
170+
// Subscribe ma
171+
func (c *Channel) Subscribe(pubsubTopic, channelTopic, channelEvent string) {
172+
pubsub.Subscribe(pubsubTopic, pubsub.DispatcherFunc(func(topic string, pubsubPayload []byte, from string) {
173+
payload := map[string]any{}
174+
if err := json.Unmarshal(pubsubPayload, &payload); err != nil {
175+
slog.Warn(
176+
"[chain.socket] failed to decode pubsub message",
177+
slog.Any("error", err),
178+
slog.String("from", from),
179+
slog.String("event", channelEvent),
180+
slog.String("pubsubTopic", pubsubTopic),
181+
slog.String("topic", topic),
182+
slog.String("message", string(pubsubPayload)),
181183
)
182-
183-
deleteMessage(message)
184184
return
185185
}
186-
} else if message, valid = msg.(*Message); !valid {
186+
187+
message := newMessage(MessageTypeBroadcast, channelTopic, channelEvent, payload)
188+
c.dispatch(channelTopic, message, from)
189+
}))
190+
}
191+
192+
// Dispatch Hook invoked by pubsub dispatch.
193+
func (c *Channel) Dispatch(topic string, channelMessageEncoded []byte, from string) {
194+
var message = newMessageAny()
195+
196+
if _, err := c.serializer.Decode(channelMessageEncoded, message); err != nil {
197+
slog.Debug(
198+
"[chain.socket] could not decode serialized data",
199+
slog.Any("Error", err),
200+
slog.String("topic", topic),
201+
slog.String("from", from),
202+
)
203+
deleteMessage(message)
187204
return
188205
}
189206

207+
c.dispatch(topic, message, from)
208+
}
209+
210+
func (c *Channel) dispatch(topic string, message *Message, from string) {
211+
212+
defer deleteMessage(message)
213+
190214
// get sockets
191215
c.socketsMutex.RLock()
192216
var sockets []*Socket
@@ -216,15 +240,13 @@ func (c *Channel) Dispatch(topic string, msg any, from string) {
216240

217241
// fastlane (not intercepted, single encode for all sockets)
218242

219-
if !isByteArray {
220-
var err error
221-
if payload, err = c.serializer.Encode(message); err != nil {
222-
return
223-
}
243+
encoded, err := c.serializer.Encode(message)
244+
if err != nil {
245+
return
224246
}
225247

226248
for _, socket := range sockets {
227-
socket.Send(payload)
249+
socket.Send(encoded)
228250
}
229251
}
230252

0 commit comments

Comments
 (0)