Skip to content

Commit 3d84ea4

Browse files
committed
Implement read waiter for UDP
1 parent c098d42 commit 3d84ea4

File tree

8 files changed

+160
-110
lines changed

8 files changed

+160
-110
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.20
55
require (
66
github.com/gofrs/uuid/v5 v5.0.0
77
github.com/sagernet/quic-go v0.40.0
8-
github.com/sagernet/sing v0.2.18
8+
github.com/sagernet/sing v0.2.19-0.20231207032540-dbccc28f8194
99
golang.org/x/crypto v0.16.0
1010
golang.org/x/exp v0.0.0-20231127185646-65229373498e
1111
)

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5
2323
github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
2424
github.com/sagernet/quic-go v0.40.0 h1:DvQNPb72lzvNQDe9tcUyHTw8eRv6PLtM2mNYmdlzUMo=
2525
github.com/sagernet/quic-go v0.40.0/go.mod h1:VqtdhlbkeeG5Okhb3eDMb/9o0EoglReHunNT9ukrJAI=
26-
github.com/sagernet/sing v0.2.18 h1:2Ce4dl0pkWft+4914NGXPb8OiQpgA8UHQ9xFOmgvKuY=
27-
github.com/sagernet/sing v0.2.18/go.mod h1:OL6k2F0vHmEzXz2KW19qQzu172FDgSbUSODylighuVo=
26+
github.com/sagernet/sing v0.2.19-0.20231207032540-dbccc28f8194 h1:lphv+waf4VhMIPkOiTewsHaCrBC7Jyrkt/uOKgjLnso=
27+
github.com/sagernet/sing v0.2.19-0.20231207032540-dbccc28f8194/go.mod h1:Ce5LNojQOgOiWhiD8pPD6E9H7e2KgtOe3Zxx4Ou5u80=
2828
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2929
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
3030
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=

hysteria/packet.go

+13-35
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/sagernet/sing/common/cache"
2020
E "github.com/sagernet/sing/common/exceptions"
2121
M "github.com/sagernet/sing/common/metadata"
22+
N "github.com/sagernet/sing/common/network"
2223
"github.com/sagernet/sing/common/rw"
2324
)
2425

@@ -118,17 +119,18 @@ func fragUDPMessage(message *udpMessage, maxPacketSize int) []*udpMessage {
118119
}
119120

120121
type udpPacketConn struct {
121-
ctx context.Context
122-
cancel common.ContextCancelCauseFunc
123-
sessionID uint32
124-
quicConn quic.Connection
125-
data chan *udpMessage
126-
udpMTU int
127-
udpMTUTime time.Time
128-
packetId atomic.Uint32
129-
closeOnce sync.Once
130-
defragger *udpDefragger
131-
onDestroy func()
122+
ctx context.Context
123+
cancel common.ContextCancelCauseFunc
124+
sessionID uint32
125+
quicConn quic.Connection
126+
data chan *udpMessage
127+
udpMTU int
128+
udpMTUTime time.Time
129+
packetId atomic.Uint32
130+
closeOnce sync.Once
131+
defragger *udpDefragger
132+
onDestroy func()
133+
readWaitOptions N.ReadWaitOptions
132134
}
133135

134136
func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy func()) *udpPacketConn {
@@ -143,18 +145,6 @@ func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy f
143145
}
144146
}
145147

146-
func (c *udpPacketConn) ReadPacketThreadSafe() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
147-
select {
148-
case p := <-c.data:
149-
buffer = p.data
150-
destination = M.ParseSocksaddrHostPort(p.host, p.port)
151-
p.release()
152-
return
153-
case <-c.ctx.Done():
154-
return nil, M.Socksaddr{}, io.ErrClosedPipe
155-
}
156-
}
157-
158148
func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
159149
select {
160150
case p := <-c.data:
@@ -167,18 +157,6 @@ func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr,
167157
}
168158
}
169159

170-
func (c *udpPacketConn) WaitReadPacket(newBuffer func() *buf.Buffer) (destination M.Socksaddr, err error) {
171-
select {
172-
case p := <-c.data:
173-
_, err = newBuffer().ReadOnceFrom(p.data)
174-
destination = M.ParseSocksaddrHostPort(p.host, p.port)
175-
p.releaseMessage()
176-
return
177-
case <-c.ctx.Done():
178-
return M.Socksaddr{}, io.ErrClosedPipe
179-
}
180-
}
181-
182160
func (c *udpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
183161
select {
184162
case pkt := <-c.data:

hysteria/packet_wait.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package hysteria
2+
3+
import (
4+
"io"
5+
6+
"github.com/sagernet/sing/common/buf"
7+
M "github.com/sagernet/sing/common/metadata"
8+
N "github.com/sagernet/sing/common/network"
9+
)
10+
11+
func (c *udpPacketConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
12+
c.readWaitOptions = options
13+
return options.NeedHeadroom()
14+
}
15+
16+
func (c *udpPacketConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
17+
select {
18+
case p := <-c.data:
19+
destination = M.ParseSocksaddrHostPort(p.host, p.port)
20+
if c.readWaitOptions.NeedHeadroom() {
21+
buffer = c.readWaitOptions.NewPacketBuffer()
22+
_, err = buffer.Write(p.data.Bytes())
23+
if err != nil {
24+
buffer.Release()
25+
return nil, M.Socksaddr{}, err
26+
}
27+
p.releaseMessage()
28+
c.readWaitOptions.PostReturn(buffer)
29+
} else {
30+
buffer = p.data
31+
p.release()
32+
}
33+
return
34+
case <-c.ctx.Done():
35+
return nil, M.Socksaddr{}, io.ErrClosedPipe
36+
}
37+
}

hysteria2/packet.go

+13-35
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/sagernet/sing/common/buf"
2121
"github.com/sagernet/sing/common/cache"
2222
M "github.com/sagernet/sing/common/metadata"
23+
N "github.com/sagernet/sing/common/network"
2324
)
2425

2526
var udpMessagePool = sync.Pool{
@@ -114,17 +115,18 @@ func fragUDPMessage(message *udpMessage, maxPacketSize int) []*udpMessage {
114115
}
115116

116117
type udpPacketConn struct {
117-
ctx context.Context
118-
cancel common.ContextCancelCauseFunc
119-
sessionID uint32
120-
quicConn quic.Connection
121-
data chan *udpMessage
122-
udpMTU int
123-
udpMTUTime time.Time
124-
packetId atomic.Uint32
125-
closeOnce sync.Once
126-
defragger *udpDefragger
127-
onDestroy func()
118+
ctx context.Context
119+
cancel common.ContextCancelCauseFunc
120+
sessionID uint32
121+
quicConn quic.Connection
122+
data chan *udpMessage
123+
udpMTU int
124+
udpMTUTime time.Time
125+
packetId atomic.Uint32
126+
closeOnce sync.Once
127+
defragger *udpDefragger
128+
onDestroy func()
129+
readWaitOptions N.ReadWaitOptions
128130
}
129131

130132
func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy func()) *udpPacketConn {
@@ -139,18 +141,6 @@ func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, onDestroy f
139141
}
140142
}
141143

142-
func (c *udpPacketConn) ReadPacketThreadSafe() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
143-
select {
144-
case p := <-c.data:
145-
buffer = p.data
146-
destination = M.ParseSocksaddr(p.destination)
147-
p.release()
148-
return
149-
case <-c.ctx.Done():
150-
return nil, M.Socksaddr{}, io.ErrClosedPipe
151-
}
152-
}
153-
154144
func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
155145
select {
156146
case p := <-c.data:
@@ -163,18 +153,6 @@ func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr,
163153
}
164154
}
165155

166-
func (c *udpPacketConn) WaitReadPacket(newBuffer func() *buf.Buffer) (destination M.Socksaddr, err error) {
167-
select {
168-
case p := <-c.data:
169-
_, err = newBuffer().ReadOnceFrom(p.data)
170-
destination = M.ParseSocksaddr(p.destination)
171-
p.releaseMessage()
172-
return
173-
case <-c.ctx.Done():
174-
return M.Socksaddr{}, io.ErrClosedPipe
175-
}
176-
}
177-
178156
func (c *udpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
179157
select {
180158
case pkt := <-c.data:

hysteria2/packet_wait.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package hysteria2
2+
3+
import (
4+
"io"
5+
6+
"github.com/sagernet/sing/common/buf"
7+
M "github.com/sagernet/sing/common/metadata"
8+
N "github.com/sagernet/sing/common/network"
9+
)
10+
11+
func (c *udpPacketConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
12+
c.readWaitOptions = options
13+
return options.NeedHeadroom()
14+
}
15+
16+
func (c *udpPacketConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
17+
select {
18+
case p := <-c.data:
19+
destination = M.ParseSocksaddr(p.destination)
20+
if c.readWaitOptions.NeedHeadroom() {
21+
buffer = c.readWaitOptions.NewPacketBuffer()
22+
_, err = buffer.Write(p.data.Bytes())
23+
if err != nil {
24+
buffer.Release()
25+
return nil, M.Socksaddr{}, err
26+
}
27+
p.releaseMessage()
28+
c.readWaitOptions.PostReturn(buffer)
29+
} else {
30+
buffer = p.data
31+
p.release()
32+
}
33+
return
34+
case <-c.ctx.Done():
35+
return nil, M.Socksaddr{}, io.ErrClosedPipe
36+
}
37+
}

tuic/packet.go

+20-37
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/sagernet/sing/common/cache"
2020
E "github.com/sagernet/sing/common/exceptions"
2121
M "github.com/sagernet/sing/common/metadata"
22+
N "github.com/sagernet/sing/common/network"
2223
)
2324

2425
var udpMessagePool = sync.Pool{
@@ -114,20 +115,26 @@ func fragUDPMessage(message *udpMessage, maxPacketSize int) []*udpMessage {
114115
return fragments
115116
}
116117

118+
var (
119+
_ N.NetPacketConn = (*udpPacketConn)(nil)
120+
_ N.PacketReadWaiter = (*udpPacketConn)(nil)
121+
)
122+
117123
type udpPacketConn struct {
118-
ctx context.Context
119-
cancel common.ContextCancelCauseFunc
120-
sessionID uint16
121-
quicConn quic.Connection
122-
data chan *udpMessage
123-
udpStream bool
124-
udpMTU int
125-
udpMTUTime time.Time
126-
packetId atomic.Uint32
127-
closeOnce sync.Once
128-
isServer bool
129-
defragger *udpDefragger
130-
onDestroy func()
124+
ctx context.Context
125+
cancel common.ContextCancelCauseFunc
126+
sessionID uint16
127+
quicConn quic.Connection
128+
data chan *udpMessage
129+
udpStream bool
130+
udpMTU int
131+
udpMTUTime time.Time
132+
packetId atomic.Uint32
133+
closeOnce sync.Once
134+
isServer bool
135+
defragger *udpDefragger
136+
onDestroy func()
137+
readWaitOptions N.ReadWaitOptions
131138
}
132139

133140
func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, udpStream bool, isServer bool, onDestroy func()) *udpPacketConn {
@@ -144,18 +151,6 @@ func newUDPPacketConn(ctx context.Context, quicConn quic.Connection, udpStream b
144151
}
145152
}
146153

147-
func (c *udpPacketConn) ReadPacketThreadSafe() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
148-
select {
149-
case p := <-c.data:
150-
buffer = p.data
151-
destination = p.destination
152-
p.release()
153-
return
154-
case <-c.ctx.Done():
155-
return nil, M.Socksaddr{}, io.ErrClosedPipe
156-
}
157-
}
158-
159154
func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr, err error) {
160155
select {
161156
case p := <-c.data:
@@ -168,18 +163,6 @@ func (c *udpPacketConn) ReadPacket(buffer *buf.Buffer) (destination M.Socksaddr,
168163
}
169164
}
170165

171-
func (c *udpPacketConn) WaitReadPacket(newBuffer func() *buf.Buffer) (destination M.Socksaddr, err error) {
172-
select {
173-
case p := <-c.data:
174-
_, err = newBuffer().ReadOnceFrom(p.data)
175-
destination = p.destination
176-
p.releaseMessage()
177-
return
178-
case <-c.ctx.Done():
179-
return M.Socksaddr{}, io.ErrClosedPipe
180-
}
181-
}
182-
183166
func (c *udpPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
184167
select {
185168
case pkt := <-c.data:

tuic/packet_wait.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package tuic
2+
3+
import (
4+
"io"
5+
6+
"github.com/sagernet/sing/common/buf"
7+
M "github.com/sagernet/sing/common/metadata"
8+
N "github.com/sagernet/sing/common/network"
9+
)
10+
11+
func (c *udpPacketConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
12+
c.readWaitOptions = options
13+
return options.NeedHeadroom()
14+
}
15+
16+
func (c *udpPacketConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
17+
select {
18+
case p := <-c.data:
19+
destination = p.destination
20+
if c.readWaitOptions.NeedHeadroom() {
21+
buffer = c.readWaitOptions.NewPacketBuffer()
22+
_, err = buffer.Write(p.data.Bytes())
23+
if err != nil {
24+
buffer.Release()
25+
return nil, M.Socksaddr{}, err
26+
}
27+
p.releaseMessage()
28+
c.readWaitOptions.PostReturn(buffer)
29+
} else {
30+
buffer = p.data
31+
p.release()
32+
}
33+
return
34+
case <-c.ctx.Done():
35+
return nil, M.Socksaddr{}, io.ErrClosedPipe
36+
}
37+
}

0 commit comments

Comments
 (0)