Skip to content

Commit a0340da

Browse files
authored
Merge pull request #313 from mrpalide/feat/add-smux-smlib
Add `smux` lib
2 parents 5ea8f30 + 58e5aea commit a0340da

File tree

21 files changed

+2111
-67
lines changed

21 files changed

+2111
-67
lines changed

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ require (
2222
github.com/sirupsen/logrus v1.9.3
2323
github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6
2424
github.com/skycoin/skycoin v0.28.1-0.20250823221707-c533551dfabd //DO NOT MODIFY OR UPDATE v0.28.1-0.20241105130348-39b49a2d0a7f
25-
github.com/skycoin/skywire v1.3.31-0.20250724153549-ec7ca3554d42
25+
github.com/skycoin/skywire v1.3.31-0.20250810155428-30d83a379b39
2626
github.com/spf13/cobra v1.9.1
2727
github.com/stretchr/testify v1.10.0
2828
golang.org/x/net v0.43.0
2929
golang.org/x/sys v0.35.0
3030
golang.org/x/term v0.34.0
3131
)
3232

33+
require github.com/xtaci/smux v1.5.34
34+
3335
require (
3436
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
3537
github.com/bytedance/sonic v1.14.0 // indirect

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0
118118
github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o=
119119
github.com/skycoin/skycoin v0.28.1-0.20250823221707-c533551dfabd h1:yKo1t3+P78TcCZvWqEJDV7DAB162C3qVHDKLjB8b2hA=
120120
github.com/skycoin/skycoin v0.28.1-0.20250823221707-c533551dfabd/go.mod h1:9w5J+CJ7fWwkmpttrQ2SFksiSPc0t0DtwsCdXLdl4Qg=
121-
github.com/skycoin/skywire v1.3.31-0.20250724153549-ec7ca3554d42 h1:9Hr/ht404g8fDo80Bw9YIPwu0IuDKrG3mRkZeH6y/Vc=
122-
github.com/skycoin/skywire v1.3.31-0.20250724153549-ec7ca3554d42/go.mod h1:JnR5EJHpryaFFILpPpFJybtUT+0+2/aQxSxgjKvsPZs=
121+
github.com/skycoin/skywire v1.3.31-0.20250810155428-30d83a379b39 h1:6+YIdW2rrU9ZDCvigTP9j/oUGcWgEzPPNFM0yo7Z2F0=
122+
github.com/skycoin/skywire v1.3.31-0.20250810155428-30d83a379b39/go.mod h1:8fUvhqqo54SR0lMlUpGX/qnXSjKZEQw6TFYsXzwBAqg=
123123
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
124124
github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo=
125125
github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0=
@@ -145,6 +145,8 @@ github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G
145145
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
146146
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
147147
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
148+
github.com/xtaci/smux v1.5.34 h1:OUA9JaDFHJDT8ZT3ebwLWPAgEfE6sWo2LaTy3anXqwg=
149+
github.com/xtaci/smux v1.5.34/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY=
148150
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
149151
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
150152
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=

pkg/disc/entry.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ type Entry struct {
123123

124124
// Signature for proving authenticity of an Entry.
125125
Signature string `json:"signature,omitempty"`
126+
127+
// Protocol is the lib that use for multiplexing.
128+
Protocol string `json:"protocol,omitempty"`
126129
}
127130

128131
func (e *Entry) String() string {

pkg/dmsg/client.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/hashicorp/yamux"
1314
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher"
1415
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging"
1516
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/netutil"
17+
"github.com/xtaci/smux"
1618
"golang.org/x/net/proxy"
1719

1820
"github.com/skycoin/dmsg/pkg/disc"
@@ -47,6 +49,7 @@ type Config struct {
4749
Callbacks *ClientCallbacks
4850
ClientType string
4951
ConnectedServersType string
52+
Protocol string
5053
}
5154

5255
// Ensure ensures all config values are set.
@@ -156,6 +159,8 @@ func (ce *Client) Serve(ctx context.Context) {
156159

157160
updateEntryLoopOnce := new(sync.Once)
158161

162+
needInitialPost := true
163+
159164
for {
160165
if isClosed(ce.done) {
161166
return
@@ -208,6 +213,18 @@ func (ce *Client) Serve(ctx context.Context) {
208213
rand.Shuffle(len(entries), func(i, j int) {
209214
entries[i], entries[j] = entries[j], entries[i]
210215
})
216+
217+
if needInitialPost {
218+
// use this for put protocol type of client to disc, for dicision part of dmsg-server
219+
err = ce.initilizeClientEntry(cancellabelCtx, ce.conf.ClientType, ce.conf.Protocol)
220+
if err != nil {
221+
ce.log.WithError(err).Warn("Initial post entry failed")
222+
} else {
223+
ce.log.WithError(err).Info("Initial post entry successed")
224+
}
225+
needInitialPost = false
226+
}
227+
211228
for n, entry := range entries {
212229
if isClosed(ce.done) {
213230
return
@@ -490,7 +507,7 @@ func (ce *Client) EnsureSession(ctx context.Context, entry *disc.Entry) error {
490507
ce.log.WithField("remote_pk", entry.Static).Debug("Session already exists...")
491508
return nil
492509
}
493-
510+
entry.Protocol = ce.conf.Protocol
494511
// Dial session.
495512
_, err := ce.dialSession(ctx, entry)
496513
return err
@@ -537,6 +554,19 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client
537554
if err != nil {
538555
return ClientSession{}, err
539556
}
557+
if entry.Protocol == "smux" {
558+
dSes.sm.smux, err = smux.Client(conn, smux.DefaultConfig())
559+
if err != nil {
560+
return ClientSession{}, err
561+
}
562+
ce.log.Infof("smux stream session initial for %s", dSes.RemotePK().String())
563+
} else {
564+
dSes.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig())
565+
if err != nil {
566+
return ClientSession{}, err
567+
}
568+
ce.log.Infof("yamux stream session initial for %s", dSes.RemotePK().String())
569+
}
540570

541571
if !ce.setSession(ctx, dSes.SessionCommon) {
542572
_ = dSes.Close() //nolint:errcheck

pkg/dmsg/entity_common.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,29 @@ func (c *EntityCommon) updateServerEntryLoop(ctx context.Context, addr string, m
225225
}
226226
}
227227

228+
func (c *EntityCommon) initilizeClientEntry(ctx context.Context, clientType string, protocol string) (err error) {
229+
// Record last update on success.
230+
defer func() {
231+
if err == nil {
232+
c.recordUpdate()
233+
}
234+
}()
235+
236+
srvPKs := make([]cipher.PubKey, 0, len(c.sessions))
237+
238+
_, err = c.dc.Entry(ctx, c.pk)
239+
if err != nil {
240+
entry := disc.NewClientEntry(c.pk, 0, srvPKs)
241+
entry.ClientType = clientType
242+
entry.Protocol = protocol
243+
if err := entry.Sign(c.sk); err != nil {
244+
return err
245+
}
246+
return c.dc.PostEntry(ctx, entry)
247+
}
248+
return nil
249+
}
250+
228251
func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{}, clientType string) (err error) {
229252
if isClosed(done) {
230253
return nil
@@ -295,6 +318,17 @@ func (c *EntityCommon) updateClientEntryLoop(ctx context.Context, done chan stru
295318
}
296319
}
297320

321+
func (c *EntityCommon) entryProtocol(ctx context.Context, pk cipher.PubKey) string {
322+
entry, err := c.dc.Entry(ctx, pk)
323+
if err != nil {
324+
c.log.WithField("entry", entry).WithError(err).Warn("Entry not found, so return empty as protocol.\n")
325+
return ""
326+
}
327+
328+
c.log.WithField("entry", entry).Debug("Entry's protocol fetch.\n")
329+
return entry.Protocol
330+
}
331+
298332
func (c *EntityCommon) delEntry(ctx context.Context) (err error) {
299333

300334
entry, err := c.dc.Entry(ctx, c.pk)

pkg/dmsg/server.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"sync"
88
"time"
99

10+
"github.com/hashicorp/yamux"
1011
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher"
1112
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging"
1213
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/netutil"
14+
"github.com/xtaci/smux"
1315

1416
"github.com/skycoin/dmsg/internal/servermetrics"
1517
"github.com/skycoin/dmsg/pkg/disc"
@@ -214,7 +216,6 @@ func (s *Server) handleSession(conn net.Conn) {
214216
}
215217
return
216218
}
217-
218219
log = log.WithField("remote_pk", dSes.RemotePK())
219220
log.Info("Started session.")
220221

@@ -223,6 +224,27 @@ func (s *Server) handleSession(conn net.Conn) {
223224
awaitDone(ctx, s.done)
224225
log.WithError(dSes.Close()).Info("Stopped session.")
225226
}()
227+
// detect visor protocol for dmsg
228+
protocol := s.entryProtocol(ctx, dSes.RemotePK())
229+
230+
// based on protocol, create smux or yamux stream session
231+
if protocol == "smux" {
232+
dSes.sm.smux, err = smux.Server(conn, smux.DefaultConfig())
233+
if err != nil {
234+
cancel()
235+
return
236+
}
237+
dSes.sm.addr = dSes.sm.smux.RemoteAddr()
238+
log.Infof("smux stream session initial for %s", dSes.RemotePK().String())
239+
} else {
240+
dSes.sm.yamux, err = yamux.Server(conn, yamux.DefaultConfig())
241+
if err != nil {
242+
cancel()
243+
return
244+
}
245+
dSes.sm.addr = dSes.sm.yamux.RemoteAddr()
246+
log.Infof("yamux stream session initial for %s", dSes.RemotePK().String())
247+
}
226248

227249
if s.setSession(ctx, dSes.SessionCommon) {
228250
dSes.Serve()

pkg/dmsg/server_session.go

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/hashicorp/yamux"
1010
"github.com/sirupsen/logrus"
1111
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/netutil"
12+
"github.com/xtaci/smux"
1213

1314
"github.com/skycoin/dmsg/internal/servermetrics"
1415
"github.com/skycoin/dmsg/pkg/noise"
@@ -44,30 +45,54 @@ func (ss *ServerSession) Close() error {
4445
func (ss *ServerSession) Serve() {
4546
ss.m.RecordSession(servermetrics.DeltaConnect) // record successful connection
4647
defer ss.m.RecordSession(servermetrics.DeltaDisconnect) // record disconnection
47-
48-
for {
49-
yStr, err := ss.ys.AcceptStream()
50-
if err != nil {
51-
switch err {
52-
case yamux.ErrSessionShutdown, io.EOF:
53-
ss.log.WithError(err).Info("Stopping session...")
54-
default:
55-
ss.log.WithError(err).Warn("Failed to accept stream, stopping session...")
48+
if ss.sm.smux != nil {
49+
for {
50+
sStr, err := ss.sm.smux.AcceptStream()
51+
if err != nil {
52+
switch err {
53+
case io.EOF:
54+
ss.log.WithError(err).Info("Stopping session...")
55+
default:
56+
ss.log.WithError(err).Warn("Failed to accept stream, stopping session...")
57+
}
58+
return
5659
}
57-
return
60+
61+
log := ss.log.WithField("smux_id", sStr.ID())
62+
log.Info("Initiating stream.")
63+
64+
go func(sStr *smux.Stream) {
65+
err := ss.serveStream(log, sStr, ss.sm.addr)
66+
log.WithError(err).Info("Stopped stream.")
67+
}(sStr)
5868
}
69+
} else {
70+
for {
71+
yStr, err := ss.sm.yamux.AcceptStream()
72+
if err != nil {
73+
switch err {
74+
case yamux.ErrSessionShutdown, io.EOF:
75+
ss.log.WithError(err).Info("Stopping session...")
76+
default:
77+
ss.log.WithError(err).Warn("Failed to accept stream, stopping session...")
78+
}
79+
return
80+
}
5981

60-
log := ss.log.WithField("yamux_id", yStr.StreamID())
61-
log.Info("Initiating stream.")
82+
log := ss.log.WithField("yamux_id", yStr.StreamID())
83+
log.Info("Initiating stream.")
6284

63-
go func(yStr *yamux.Stream) {
64-
err := ss.serveStream(log, yStr)
65-
log.WithError(err).Info("Stopped stream.")
66-
}(yStr)
85+
go func(yStr *yamux.Stream) {
86+
err := ss.serveStream(log, yStr, ss.sm.addr)
87+
log.WithError(err).Info("Stopped stream.")
88+
}(yStr)
89+
}
6790
}
6891
}
6992

70-
func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr *yamux.Stream) error {
93+
// struct
94+
95+
func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCloser, addr net.Addr) error {
7196
readRequest := func() (StreamRequest, error) {
7297
obj, err := ss.readObject(yStr)
7398
if err != nil {
@@ -102,7 +127,7 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr *yamux.Stream)
102127
if req.IPinfo && req.DstAddr.PK == ss.entity.LocalPK() {
103128
log.Debug("Received IP stream request.")
104129

105-
ip, err := addrToIP(yStr.RemoteAddr())
130+
ip, err := addrToIP(addr)
106131
if err != nil {
107132
ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream
108133
return err
@@ -164,22 +189,27 @@ func addrToIP(addr net.Addr) (net.IP, error) {
164189
}
165190
}
166191

167-
func (ss *ServerSession) forwardRequest(req StreamRequest) (yStr *yamux.Stream, respObj SignedObject, err error) {
192+
func (ss *ServerSession) forwardRequest(req StreamRequest) (mStr io.ReadWriteCloser, respObj SignedObject, err error) {
168193
defer func() {
169-
if err != nil && yStr != nil {
194+
if err != nil && mStr != nil {
170195
ss.log.
171-
WithError(yStr.Close()).
196+
WithError(mStr.Close()).
172197
Debugf("After forwardRequest failed, the yamux stream is closed.")
173198
}
174199
}()
175-
176-
if yStr, err = ss.ys.OpenStream(); err != nil {
177-
return nil, nil, err
200+
if ss.sm.smux != nil {
201+
if mStr, err = ss.sm.smux.OpenStream(); err != nil {
202+
return nil, nil, err
203+
}
204+
} else {
205+
if mStr, err = ss.sm.yamux.OpenStream(); err != nil {
206+
return nil, nil, err
207+
}
178208
}
179-
if err = ss.writeObject(yStr, req.raw); err != nil {
209+
if err = ss.writeObject(mStr, req.raw); err != nil {
180210
return nil, nil, err
181211
}
182-
if respObj, err = ss.readObject(yStr); err != nil {
212+
if respObj, err = ss.readObject(mStr); err != nil {
183213
return nil, nil, err
184214
}
185215
var resp StreamResponse
@@ -189,5 +219,5 @@ func (ss *ServerSession) forwardRequest(req StreamRequest) (yStr *yamux.Stream,
189219
if err = resp.Verify(req); err != nil {
190220
return nil, nil, err
191221
}
192-
return yStr, respObj, nil
222+
return mStr, respObj, nil
193223
}

0 commit comments

Comments
 (0)