Skip to content

Commit f95de1f

Browse files
authored
Merge pull request #35 from Ehco1996/deadline
perf: opt tcp/udp relay
2 parents cb858a9 + 8f823af commit f95de1f

File tree

12 files changed

+509
-33
lines changed

12 files changed

+509
-33
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,4 @@ iperf3 -c 0.0.0.0 -p 1234 -u -b 1G --length 1024
131131
| iperf | raw | relay(raw) | relay(ws) |relay(wss) | relay(mwss)|
132132
| ---- | ---- | ---- | ---- | ---- | ---- |
133133
| tcp | 62.6 Gbits/sec | 23.9 Gbits/sec | 14.65 Gbits/sec | 4.22 Gbits/sec | 2.43 Gbits/sec |
134-
| udp | 2.2 Gbits/sec | 2.2 Gbits/sec | 直接转发 | 直接转发 | 直接转发 |
134+
| udp | 14.5 Gbits/sec | 3.3 Gbits/sec | 直接转发 | 直接转发 | 直接转发 |

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/gobwas/pool v0.2.1 // indirect
88
github.com/gobwas/ws v1.0.4
99
github.com/gorilla/mux v1.8.0
10+
github.com/prometheus/client_golang v1.9.0
1011
github.com/urfave/cli/v2 v2.3.0
1112
github.com/xtaci/smux v2.0.1+incompatible
1213
go.uber.org/zap v1.16.0

go.sum

+366-4
Large diffs are not rendered by default.

internal/constant/constant.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const (
77

88
MaxMWSSStreamCnt = 10
99
DialTimeOut = 3 * time.Second
10-
MaxConKeepAlive = 10 * time.Minute
10+
MaxConKeepAlive = 3 * time.Second
1111

1212
Listen_RAW = "raw"
1313
Listen_WS = "ws"

internal/relay/relay.go

+4-10
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,7 @@ func (r *Relay) RunLocalTCPServer() error {
9999
for {
100100
c, err := lis.AcceptTCP()
101101
if err != nil {
102-
logger.Logger.Infof("accept tcp con error: %s", err)
103-
return err
104-
}
105-
if err := c.SetDeadline(time.Now().Add(constant.MaxConKeepAlive)); err != nil {
106-
logger.Logger.Infof("set max deadline err: %s", err)
107-
return err
102+
logger.Logger.Fatal("accept tcp conn error: %s", err)
108103
}
109104
go func(c *net.TCPConn) {
110105
if err := r.TP.HandleTCPConn(c); err != nil {
@@ -124,13 +119,12 @@ func (r *Relay) RunLocalUDPServer() error {
124119
}
125120
defer lis.Close()
126121

127-
buffers := transporter.NewBufferPool(constant.BUFFER_SIZE)
128-
buf := buffers.Get().([]byte)
129-
defer buffers.Put(buf)
122+
buf := transporter.InboundBufferPool.Get().([]byte)
123+
defer transporter.InboundBufferPool.Put(buf)
130124
for {
131125
n, addr, err := lis.ReadFromUDP(buf)
132126
if err != nil {
133-
return err
127+
logger.Logger.Fatal("listen udp conn error: %s", err)
134128
}
135129
bc := r.TP.GetOrCreateBufferCh(addr)
136130
bc.Ch <- buf[0:n]

internal/transporter/buffer.go

+49-8
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ import (
88
"syscall"
99

1010
"github.com/Ehco1996/ehco/internal/constant"
11+
"github.com/Ehco1996/ehco/internal/web"
1112
)
1213

1314
// 全局pool
14-
var inboundBufferPool, outboundBufferPool *sync.Pool
15+
var InboundBufferPool, OutboundBufferPool *sync.Pool
1516

1617
func init() {
17-
inboundBufferPool = NewBufferPool(constant.BUFFER_SIZE)
18-
outboundBufferPool = NewBufferPool(constant.BUFFER_SIZE)
18+
InboundBufferPool = NewBufferPool(constant.BUFFER_SIZE)
19+
OutboundBufferPool = NewBufferPool(constant.BUFFER_SIZE)
1920
}
2021

2122
func NewBufferPool(size int) *sync.Pool {
@@ -24,22 +25,62 @@ func NewBufferPool(size int) *sync.Pool {
2425
}}
2526
}
2627

27-
func copyBuffer(dst io.Writer, src io.Reader, bufferPool *sync.Pool) error {
28+
// NOTE adapeted from io.CopyBuffer
29+
func copyBuffer(dst io.Writer, src io.Reader, bufferPool *sync.Pool) (written int64, err error) {
2830
buf := bufferPool.Get().([]byte)
2931
defer bufferPool.Put(buf)
30-
_, err := io.CopyBuffer(dst, src, buf)
31-
return err
32+
33+
// If the reader has a WriteTo method, use it to do the copy.
34+
// Avoids an allocation and a copy.
35+
if wt, ok := src.(io.WriterTo); ok {
36+
written, err = wt.WriteTo(dst)
37+
web.NetWorkTransmitBytes.Add(float64(written))
38+
return
39+
}
40+
// Similarly, if the writer has a ReadFrom method, use it to do the copy.
41+
if rt, ok := dst.(io.ReaderFrom); ok {
42+
written, err = rt.ReadFrom(src)
43+
web.NetWorkTransmitBytes.Add(float64(written))
44+
return
45+
}
46+
for {
47+
nr, er := src.Read(buf)
48+
if nr > 0 {
49+
nw, ew := dst.Write(buf[0:nr])
50+
if nw > 0 {
51+
written += int64(nw)
52+
}
53+
if ew != nil {
54+
err = ew
55+
break
56+
}
57+
if nr != nw {
58+
err = io.ErrShortWrite
59+
break
60+
}
61+
}
62+
if er != nil {
63+
if er != io.EOF {
64+
err = er
65+
}
66+
break
67+
}
68+
}
69+
web.NetWorkTransmitBytes.Add(float64(written * 2))
70+
return
3271
}
3372

3473
// NOTE must call setdeadline before use this func or may goroutine leak
3574
func transport(rw1, rw2 io.ReadWriter) error {
3675
errc := make(chan error, 1)
3776
go func() {
38-
errc <- copyBuffer(rw1, rw2, inboundBufferPool)
77+
_, err := copyBuffer(rw1, rw2, InboundBufferPool)
78+
errc <- err
3979
}()
4080

4181
go func() {
42-
errc <- copyBuffer(rw2, rw1, outboundBufferPool)
82+
_, err := copyBuffer(rw2, rw1, OutboundBufferPool)
83+
errc <- err
4384
}()
4485
err := <-errc
4586
// NOTE 我们不关心operror 比如 eof/reset/broken pipe

internal/transporter/mwss.go

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"net"
55

66
"github.com/Ehco1996/ehco/internal/logger"
7+
"github.com/Ehco1996/ehco/internal/web"
78
)
89

910
type Mwss struct {
@@ -21,6 +22,8 @@ func (s *Mwss) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
2122

2223
func (s *Mwss) HandleTCPConn(c *net.TCPConn) error {
2324
defer c.Close()
25+
web.CurTCPNum.Inc()
26+
defer web.CurTCPNum.Dec()
2427

2528
node := s.raw.TCPNodes.PickMin()
2629
defer s.raw.TCPNodes.DeferPick(node)

internal/transporter/raw.go

+26-9
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ import (
44
"net"
55
"net/http"
66
"sync"
7+
"time"
78

9+
"github.com/Ehco1996/ehco/internal/constant"
810
"github.com/Ehco1996/ehco/internal/lb"
911
"github.com/Ehco1996/ehco/internal/logger"
12+
"github.com/Ehco1996/ehco/internal/web"
1013
"github.com/gobwas/ws"
1114
)
1215

@@ -28,6 +31,8 @@ func (raw *Raw) GetOrCreateBufferCh(uaddr *net.UDPAddr) *BufferCh {
2831
}
2932

3033
func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
34+
web.CurUDPNum.Inc()
35+
defer web.CurUDPNum.Dec()
3136

3237
bc := raw.GetOrCreateBufferCh(uaddr)
3338
node := raw.UDPNodes.PickMin()
@@ -39,10 +44,8 @@ func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
3944
raw.UDPNodes.OnError(node)
4045
return
4146
}
42-
4347
defer func() {
4448
rc.Close()
45-
close(bc.Ch)
4649
delete(raw.UDPBufferChMap, uaddr.String())
4750
}()
4851

@@ -52,34 +55,45 @@ func (raw *Raw) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
5255
wg.Add(1)
5356

5457
go func() {
55-
buf := outboundBufferPool.Get().([]byte)
58+
buf := OutboundBufferPool.Get().([]byte)
59+
wt := 0
5660
for {
5761
i, err := rc.Read(buf)
5862
if err != nil {
5963
logger.Logger.Info(err)
6064
break
6165
}
66+
rc.SetReadDeadline(time.Now().Add(constant.MaxConKeepAlive))
6267
if _, err := local.WriteToUDP(buf[0:i], uaddr); err != nil {
6368
logger.Logger.Info(err)
6469
break
6570
}
71+
wt += i
6672
}
67-
outboundBufferPool.Put(buf)
73+
web.NetWorkTransmitBytes.Add(float64(wt * 2))
74+
OutboundBufferPool.Put(buf)
6875
wg.Done()
76+
close(bc.Ch)
6977
}()
7078

79+
wt := 0
7180
for b := range bc.Ch {
81+
wt += len(b)
82+
rc.SetReadDeadline(time.Now().Add(constant.MaxConKeepAlive))
7283
if _, err := rc.Write(b); err != nil {
7384
logger.Logger.Info(err)
85+
close(bc.Ch)
7486
break
7587
}
7688
}
89+
web.NetWorkTransmitBytes.Add(float64(wt * 2))
7790
wg.Wait()
7891
}
7992

8093
func (raw *Raw) HandleTCPConn(c *net.TCPConn) error {
8194
defer c.Close()
82-
95+
web.CurTCPNum.Inc()
96+
defer web.CurTCPNum.Dec()
8397
node := raw.TCPNodes.PickMin()
8498
defer raw.TCPNodes.DeferPick(node)
8599

@@ -89,18 +103,19 @@ func (raw *Raw) HandleTCPConn(c *net.TCPConn) error {
89103
return err
90104
}
91105
logger.Logger.Infof("[raw] HandleTCPConn from %s to %s", c.LocalAddr().String(), node.Remote)
92-
93106
defer rc.Close()
107+
94108
return transport(c, rc)
95109
}
96110

97111
func (raw *Raw) HandleWsRequset(w http.ResponseWriter, req *http.Request) {
112+
web.CurTCPNum.Inc()
113+
defer web.CurTCPNum.Dec()
98114
wsc, _, _, err := ws.UpgradeHTTP(req, w)
99115
if err != nil {
100116
return
101117
}
102118
defer wsc.Close()
103-
104119
node := raw.TCPNodes.PickMin()
105120
defer raw.TCPNodes.DeferPick(node)
106121

@@ -119,12 +134,13 @@ func (raw *Raw) HandleWsRequset(w http.ResponseWriter, req *http.Request) {
119134
}
120135

121136
func (raw *Raw) HandleWssRequset(w http.ResponseWriter, req *http.Request) {
137+
web.CurTCPNum.Inc()
138+
defer web.CurTCPNum.Dec()
122139
wsc, _, _, err := ws.UpgradeHTTP(req, w)
123140
if err != nil {
124141
return
125142
}
126143
defer wsc.Close()
127-
128144
node := raw.TCPNodes.PickMin()
129145
defer raw.TCPNodes.DeferPick(node)
130146

@@ -143,8 +159,9 @@ func (raw *Raw) HandleWssRequset(w http.ResponseWriter, req *http.Request) {
143159
}
144160

145161
func (raw *Raw) HandleMWssRequset(c net.Conn) {
162+
web.CurTCPNum.Inc()
163+
defer web.CurTCPNum.Dec()
146164
defer c.Close()
147-
148165
node := raw.TCPNodes.PickMin()
149166
defer raw.TCPNodes.DeferPick(node)
150167

internal/transporter/ws.go

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net"
66

77
"github.com/Ehco1996/ehco/internal/logger"
8+
"github.com/Ehco1996/ehco/internal/web"
89
"github.com/gobwas/ws"
910
)
1011

@@ -22,6 +23,8 @@ func (s *Ws) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
2223

2324
func (s *Ws) HandleTCPConn(c *net.TCPConn) error {
2425
defer c.Close()
26+
web.CurTCPNum.Inc()
27+
defer web.CurTCPNum.Dec()
2528

2629
node := s.raw.TCPNodes.PickMin()
2730
defer s.raw.TCPNodes.DeferPick(node)

internal/transporter/wss.go

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/Ehco1996/ehco/internal/logger"
88
"github.com/Ehco1996/ehco/internal/tls"
9+
"github.com/Ehco1996/ehco/internal/web"
910
"github.com/gobwas/ws"
1011
)
1112

@@ -23,6 +24,8 @@ func (s *Wss) HandleUDPConn(uaddr *net.UDPAddr, local *net.UDPConn) {
2324

2425
func (s *Wss) HandleTCPConn(c *net.TCPConn) error {
2526
defer c.Close()
27+
web.CurTCPNum.Inc()
28+
defer web.CurTCPNum.Dec()
2629

2730
node := s.raw.TCPNodes.PickMin()
2831
defer s.raw.TCPNodes.DeferPick(node)

internal/web/metrics.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package web
2+
3+
import (
4+
"os"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
var (
10+
Hostname, _ = os.Hostname()
11+
12+
CurTCPNum = prometheus.NewGauge(prometheus.GaugeOpts{
13+
Help: "当前tcp链接数",
14+
Name: "current_tcp_num",
15+
ConstLabels: map[string]string{
16+
"hostname": Hostname,
17+
},
18+
})
19+
20+
CurUDPNum = prometheus.NewGauge(prometheus.GaugeOpts{
21+
Help: "当前udp链接数",
22+
Name: "current_udp_num",
23+
ConstLabels: map[string]string{
24+
"hostname": Hostname,
25+
},
26+
})
27+
28+
NetWorkTransmitBytes = prometheus.NewCounter(prometheus.CounterOpts{
29+
Name: "network_transmit_bytes",
30+
Help: "传输流量总量bytes",
31+
ConstLabels: map[string]string{
32+
"hostname": Hostname,
33+
},
34+
})
35+
)
36+
37+
func init() {
38+
39+
prometheus.MustRegister(CurTCPNum)
40+
prometheus.MustRegister(CurUDPNum)
41+
prometheus.MustRegister(NetWorkTransmitBytes)
42+
43+
}

internal/web/server.go

+9
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/Ehco1996/ehco/internal/constant"
1010
"github.com/Ehco1996/ehco/internal/logger"
1111
"github.com/gorilla/mux"
12+
"github.com/prometheus/client_golang/prometheus/promhttp"
1213
)
1314

1415
func Index(w http.ResponseWriter, r *http.Request) {
@@ -21,16 +22,24 @@ func AttachProfiler(router *mux.Router) {
2122
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
2223
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
2324
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
25+
26+
// Manually add support for paths linked to by index page at /debug/pprof/
27+
router.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
28+
router.Handle("/debug/pprof/heap", pprof.Handler("heap"))
29+
router.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
30+
router.Handle("/debug/pprof/block", pprof.Handler("block"))
2431
}
2532

2633
func StartWebServer(port string) {
2734
addr := "0.0.0.0:" + port
2835
logger.Logger.Infof("Start Web Server at http://%s/", addr)
2936
r := mux.NewRouter()
3037
AttachProfiler(r)
38+
r.Handle("/metrics/", promhttp.Handler())
3139
r.HandleFunc("/",
3240
func(w http.ResponseWriter, r *http.Request) {
3341
fmt.Fprintf(w, constant.IndexHTMLTMPL)
3442
})
43+
3544
logger.Logger.Fatal(http.ListenAndServe(addr, r))
3645
}

0 commit comments

Comments
 (0)