Skip to content

Commit e924e07

Browse files
committed
Add simplified client API
1 parent 27f733c commit e924e07

File tree

3 files changed

+987
-0
lines changed

3 files changed

+987
-0
lines changed

allocation.go

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
// Package turn contains the public API for pion/turn, a toolkit for building TURN clients and servers.
5+
package turn
6+
7+
import (
8+
"context"
9+
"errors"
10+
"fmt"
11+
"net"
12+
"sync"
13+
"time"
14+
15+
"github.com/pion/transport/v4"
16+
"github.com/pion/transport/v4/deadline"
17+
"github.com/pion/transport/v4/packetio"
18+
"github.com/pion/turn/v4/internal/client"
19+
)
20+
21+
const defaultAcceptBacklog = 128
22+
23+
var (
24+
_ transport.Dialer = &turnAllocation{}
25+
_ net.Listener = &turnAllocation{}
26+
_ Allocation = &turnAllocation{}
27+
)
28+
29+
// Typed errors.
30+
var (
31+
ErrAllocationClosed = errors.New("allocation: closed")
32+
errFailedToAlloc = errors.New("allocation: failed to create allocation")
33+
errFailedToDial = errors.New("allocation: failed to dial")
34+
errInvalidNetwork = errors.New("allocation: invalid network")
35+
)
36+
37+
// Allocation represents a TURN allocation that provides bidirectional communication with peers
38+
// through a TURN relay server. It combines dialer and listener semantics: use Dial to establish
39+
// outgoing connections to peers, and Accept to receive incoming connections from peers. The
40+
// allocation is bound to a specific transport protocol (UDP or TCP) specified when created via
41+
// NewAllocation. All connections through this allocation use that protocol.
42+
type Allocation interface {
43+
// Dial establishes a connection to the specified peer address through the TURN relay.
44+
// The network parameter must match the allocation's protocol ("udp" or "tcp").
45+
// Dial automatically creates a TURN permission for the peer, allowing the peer to
46+
// send data back to this allocation's relay address.
47+
Dial(network, address string) (net.Conn, error)
48+
49+
// Accept waits for the next incoming connection from a peer. The peer must have a valid
50+
// permission (created via Dial or CreatePermission) to send data to this allocation's
51+
// relay address.
52+
Accept() (net.Conn, error)
53+
54+
// Addr returns the relay transport address of the allocation.
55+
Addr() net.Addr
56+
57+
// Close releases the TURN allocation and closes all associated connections.
58+
// Any blocked Accept or Dial calls will return ErrAllocationClosed.
59+
Close() error
60+
61+
// CreatePermission installs a TURN permission for the specified peer address,
62+
// allowing that peer to send data to this allocation's relay address.
63+
CreatePermission(addr net.Addr) error
64+
}
65+
66+
type turnAllocation struct {
67+
*Client // TURN client
68+
network string
69+
tcpAlloc *client.TCPAllocation // for TCP allocations
70+
dispatcher *packetDispatcher // for UDP allocations
71+
closeCh chan struct{}
72+
closeOnce sync.Once
73+
mu sync.RWMutex
74+
closed bool
75+
}
76+
77+
// NewAllocation creates a TURN allocation that can dial peers and accept incoming connections.
78+
func NewAllocation(network string, conf *ClientConfig) (Allocation, error) {
79+
turnClient, err := NewClient(conf)
80+
if err != nil {
81+
return nil, fmt.Errorf("%w: %w", errFailedToAlloc, err)
82+
}
83+
84+
if err = turnClient.Listen(); err != nil {
85+
turnClient.Close()
86+
87+
return nil, fmt.Errorf("%w: %w", errFailedToAlloc, err)
88+
}
89+
90+
turnAlloc := &turnAllocation{
91+
Client: turnClient,
92+
network: network,
93+
closeCh: make(chan struct{}),
94+
}
95+
96+
switch network {
97+
case "udp", "udp4", "udp6": //nolint:goconst
98+
pconn, err := turnClient.Allocate()
99+
if err != nil {
100+
return nil, fmt.Errorf("%w: %w", errFailedToAlloc, err)
101+
}
102+
turnAlloc.dispatcher = newPacketDispatcher(pconn, turnAlloc.closeCh)
103+
case "tcp", "tcp4", "tcp6": //nolint:goconst
104+
tcpAlloc, err := turnClient.AllocateTCP()
105+
if err != nil {
106+
return nil, fmt.Errorf("%w: %w", errFailedToAlloc, err)
107+
}
108+
turnAlloc.tcpAlloc = tcpAlloc
109+
default:
110+
return nil, errInvalidNetwork
111+
}
112+
113+
return turnAlloc, nil
114+
}
115+
116+
// Dial establishes a connection to the address through TURN. Network must be "udp", "udp4",
117+
// "udp6" or "tcp", "tcp4", "tcp6".
118+
func (a *turnAllocation) Dial(network, address string) (net.Conn, error) {
119+
a.mu.RLock()
120+
if a.closed {
121+
a.mu.RUnlock()
122+
123+
return nil, ErrAllocationClosed
124+
}
125+
a.mu.RUnlock()
126+
127+
switch a.network {
128+
case "udp", "udp4", "udp6": //nolint:goconst
129+
return a.dialUDP(network, address)
130+
case "tcp", "tcp4", "tcp6": //nolint:goconst
131+
return a.dialTCP(network, address)
132+
default:
133+
return nil, errInvalidNetwork
134+
}
135+
}
136+
137+
func (a *turnAllocation) dialUDP(network, address string) (net.Conn, error) {
138+
if network != a.network {
139+
return nil, errInvalidNetwork
140+
}
141+
142+
peerAddr, err := net.ResolveUDPAddr(network, address)
143+
if err != nil {
144+
return nil, fmt.Errorf("%w: %w", errFailedToDial, err)
145+
}
146+
147+
if err = a.CreatePermission(peerAddr); err != nil {
148+
return nil, fmt.Errorf("%w: %w", errFailedToDial, err)
149+
}
150+
151+
return a.dispatcher.register(peerAddr), nil
152+
}
153+
154+
func (a *turnAllocation) dialTCP(network, address string) (net.Conn, error) {
155+
if network != a.network {
156+
return nil, errInvalidNetwork
157+
}
158+
159+
peerAddr, err := net.ResolveTCPAddr(network, address)
160+
if err != nil {
161+
return nil, fmt.Errorf("%w: %w", errFailedToDial, err)
162+
}
163+
164+
if err = a.CreatePermission(peerAddr); err != nil {
165+
return nil, fmt.Errorf("%w: %w", errFailedToDial, err)
166+
}
167+
168+
return a.tcpAlloc.DialTCP(network, nil, peerAddr)
169+
}
170+
171+
// Accept returns the next incoming connection from a permitted peer.
172+
func (a *turnAllocation) Accept() (net.Conn, error) {
173+
switch a.network {
174+
case "udp", "udp4", "udp6": //nolint:goconst
175+
select {
176+
case <-a.closeCh:
177+
return nil, ErrAllocationClosed
178+
case conn := <-a.dispatcher.acceptCh:
179+
return conn, nil
180+
}
181+
case "tcp", "tcp4", "tcp6": //nolint:goconst
182+
return a.tcpAlloc.Accept()
183+
default:
184+
return nil, errInvalidNetwork
185+
}
186+
}
187+
188+
// Addr returns the transport relay address of the allocation.
189+
func (a *turnAllocation) Addr() net.Addr {
190+
switch a.network {
191+
case "udp", "udp4", "udp6": //nolint:goconst
192+
return a.dispatcher.pconn.LocalAddr()
193+
case "tcp", "tcp4", "tcp6": //nolint:goconst
194+
return a.tcpAlloc.RelayAddr()
195+
default:
196+
return nil
197+
}
198+
}
199+
200+
// Close closes the allocation and all associated connections.
201+
func (a *turnAllocation) Close() error {
202+
var err error
203+
a.closeOnce.Do(func() {
204+
switch a.network {
205+
case "udp", "udp4", "udp6": //nolint:goconst
206+
err = a.dispatcher.pconn.Close()
207+
case "tcp", "tcp4", "tcp6": //nolint:goconst
208+
err = a.tcpAlloc.Close()
209+
default:
210+
err = errInvalidNetwork
211+
}
212+
a.Client.Close()
213+
a.mu.Lock()
214+
a.closed = true
215+
a.mu.Unlock()
216+
close(a.closeCh)
217+
})
218+
219+
return err
220+
}
221+
222+
// CreatePermission admits a peer on the allocation. Note that Dial automatically creates a
223+
// permission for the peer.
224+
func (a *turnAllocation) CreatePermission(addr net.Addr) error {
225+
return a.Client.CreatePermission(addr)
226+
}
227+
228+
const receiveMTU = 8192
229+
230+
// packetDispatcher demultiplexes packets from a single PacketConn to multiple Conns.
231+
type packetDispatcher struct {
232+
pconn net.PacketConn
233+
mu sync.Mutex
234+
conns map[string]*dispatchedConn
235+
acceptCh chan *dispatchedConn
236+
closeCh chan struct{}
237+
}
238+
239+
// newPacketDispatcher creates a dispatcher and starts its read loop.
240+
func newPacketDispatcher(pconn net.PacketConn, closeCh chan struct{}) *packetDispatcher {
241+
d := &packetDispatcher{
242+
pconn: pconn,
243+
conns: make(map[string]*dispatchedConn),
244+
acceptCh: make(chan *dispatchedConn, defaultAcceptBacklog),
245+
closeCh: closeCh,
246+
}
247+
248+
go d.readLoop()
249+
250+
return d
251+
}
252+
253+
func (d *packetDispatcher) readLoop() {
254+
buf := make([]byte, receiveMTU)
255+
for {
256+
n, raddr, err := d.pconn.ReadFrom(buf)
257+
if err != nil {
258+
// Close all conn buffers on read error.
259+
d.mu.Lock()
260+
for _, c := range d.conns {
261+
_ = c.buffer.Close()
262+
}
263+
d.mu.Unlock()
264+
265+
return
266+
}
267+
d.dispatch(raddr, buf[:n])
268+
}
269+
}
270+
271+
func (d *packetDispatcher) dispatch(raddr net.Addr, buf []byte) {
272+
d.mu.Lock()
273+
conn, ok := d.conns[raddr.String()]
274+
if !ok {
275+
// New peer - create conn and send to accept channel.
276+
conn = d.newConn(raddr)
277+
d.conns[raddr.String()] = conn
278+
select {
279+
case d.acceptCh <- conn:
280+
// Sent to accept queue.
281+
default:
282+
// Accept queue full: unreg and drop connection
283+
delete(d.conns, raddr.String())
284+
}
285+
}
286+
d.mu.Unlock()
287+
_, _ = conn.buffer.Write(buf)
288+
}
289+
290+
// register adds a conn for the given peer address (used by Dial).
291+
func (d *packetDispatcher) register(raddr net.Addr) *dispatchedConn {
292+
d.mu.Lock()
293+
defer d.mu.Unlock()
294+
295+
if conn, ok := d.conns[raddr.String()]; ok {
296+
return conn
297+
}
298+
299+
conn := d.newConn(raddr)
300+
d.conns[raddr.String()] = conn
301+
302+
return conn
303+
}
304+
305+
// newConn creates a new dispatchedConn (must be called with lock held).
306+
func (d *packetDispatcher) newConn(raddr net.Addr) *dispatchedConn {
307+
return &dispatchedConn{
308+
dispatcher: d,
309+
rAddr: raddr,
310+
buffer: packetio.NewBuffer(),
311+
writeDeadline: deadline.New(),
312+
}
313+
}
314+
315+
// unregister removes a conn from the dispatcher.
316+
func (d *packetDispatcher) unregister(raddr net.Addr) {
317+
d.mu.Lock()
318+
delete(d.conns, raddr.String())
319+
d.mu.Unlock()
320+
}
321+
322+
// dispatchedConn is a net.Conn bound to a specific peer, receiving via the dispatcher.
323+
type dispatchedConn struct {
324+
dispatcher *packetDispatcher
325+
rAddr net.Addr
326+
buffer *packetio.Buffer
327+
writeDeadline *deadline.Deadline
328+
closeOnce sync.Once
329+
}
330+
331+
// Read reads data from the buffer (dispatched by the read loop).
332+
func (c *dispatchedConn) Read(b []byte) (int, error) {
333+
return c.buffer.Read(b)
334+
}
335+
336+
// Write sends data to the bound peer.
337+
func (c *dispatchedConn) Write(b []byte) (int, error) {
338+
select {
339+
case <-c.writeDeadline.Done():
340+
return 0, context.DeadlineExceeded
341+
default:
342+
}
343+
344+
return c.dispatcher.pconn.WriteTo(b, c.rAddr)
345+
}
346+
347+
// Close unregisters from dispatcher and closes the buffer.
348+
func (c *dispatchedConn) Close() error {
349+
var err error
350+
c.closeOnce.Do(func() {
351+
c.dispatcher.unregister(c.rAddr)
352+
err = c.buffer.Close()
353+
})
354+
355+
return err
356+
}
357+
358+
// LocalAddr returns the local address (relay address).
359+
func (c *dispatchedConn) LocalAddr() net.Addr {
360+
return c.dispatcher.pconn.LocalAddr()
361+
}
362+
363+
// RemoteAddr returns the bound peer address.
364+
func (c *dispatchedConn) RemoteAddr() net.Addr {
365+
return c.rAddr
366+
}
367+
368+
// SetDeadline sets both read and write deadlines.
369+
func (c *dispatchedConn) SetDeadline(t time.Time) error {
370+
c.writeDeadline.Set(t)
371+
372+
return c.buffer.SetReadDeadline(t)
373+
}
374+
375+
// SetReadDeadline sets the read deadline.
376+
func (c *dispatchedConn) SetReadDeadline(t time.Time) error {
377+
return c.buffer.SetReadDeadline(t)
378+
}
379+
380+
// SetWriteDeadline sets the write deadline.
381+
func (c *dispatchedConn) SetWriteDeadline(t time.Time) error {
382+
c.writeDeadline.Set(t)
383+
384+
return nil
385+
}

0 commit comments

Comments
 (0)