-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathupstream.go
More file actions
executable file
·349 lines (296 loc) · 11.1 KB
/
upstream.go
File metadata and controls
executable file
·349 lines (296 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/*
File: upstream.go
Version: 2.39.0 (Split)
Updated: 11-May-2026 14:18 CEST
Description:
Core data structures, contexts, and lightweight hot-path utilities for
external DNS upstream providers.
Logic has been heavily modularized into:
- upstream.go (Core Data Structures & Contexts)
- upstream_parser.go (Initialization & Routing Setup)
- upstream_exchange.go (Protocol Multiplexer)
- upstream_ddr.go (Security & SVCB Discovery)
- upstream_net.go (Low-level dials and stream bindings)
- upstream_race.go (Parallel execution strategies)
Changes:
2.39.0 - [SECURITY/FIX] Resolved a severe EDNS0 Padding regression. Replaced the
conditional padding appendage with an unconditional modulo ceiling, guaranteeing
all encrypted upstream exchanges perfectly align to 128-byte block boundaries natively.
Neutralizes packet-length fingerprinting vectors completely.
*/
package main
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/netip"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/miekg/dns"
"github.com/quic-go/quic-go"
"golang.org/x/sync/singleflight"
)
const (
streamTimeout = 5 * time.Second
streamIdleMax = 30 * time.Second
doqIdleMax = 25 * time.Second
healthThreshold = int32(3)
)
// upstreamStagger is set once at startup from cfg.Server.UpstreamStaggerMs.
var upstreamStagger time.Duration
// upstreamTimeout is the per-exchange deadline. Set once at startup from
// cfg.Server.UpstreamTimeoutMs. 0 means no deadline (upstream's own TCP/TLS
// timeout applies). Used by newUpstreamCtx() on every upstream call.
var upstreamTimeout time.Duration
type streamConnEntry struct {
conn *dns.Conn
idleAt time.Time
}
// doqConnEntry holds a pooled QUIC connection.
// dialAddr is the actual IP:port that was dialled — preserved for logging.
type doqConnEntry struct {
conn *quic.Conn // pointer to quic.Conn struct (quic-go v0.40+ removed interfaces)
dialAddr string
idleAt time.Time
}
// UpstreamGroup is a container that oversees multiple upstream servers and enforces
// specific routing and load-balancing algorithms against them.
type UpstreamGroup struct {
Name string
Strategy string
Preference string // e.g., "fastest", "ordered"
Mode string // e.g., "loose", "strict"
IgnoreQnameLabels bool
ECSAction string
ECSV4Mask int
ECSV6Mask int
Servers []*Upstream
rrCount atomic.Uint64 // Used for round-robin strategy
}
// Upstream represents a single parsed and initialised upstream DNS target.
type Upstream struct {
Proto string
RawURL string
BootstrapIPs []string // per-URL bootstrap IPs — always take precedence
hasClientNameTemplate bool
// ECS Parameters
ECSAction string
ECSV4Mask int
ECSV6Mask int
// useGET controls the HTTP method used for DoH/DoH3 outbound queries.
// false (default) = POST; true = GET (RFC 8484 §4.1, cacheable, ?dns=<b64>).
// Set once at ParseUpstream time via the +get URL modifier; never changes.
useGET bool
// ECHConfigList holds the Encrypted Client Hello configuration data natively.
// Used by outbound dialers to encrypt SNI during the TLS connection handshake.
ECHConfigList []byte
// dialHost: static hostname used for TLS SNI — may contain {client-name},
// which is substituted with the real client name at query time.
// dialAddrs: pre-computed IP:port dial targets, always ready after
// ParseUpstream returns.
dialHost string
dialAddrs []string
udpClient *dns.Client
// [PERF] streamConns manages up to N idle TCP/DoT connections per target natively,
// eliminating costly TLS renegotiations during concurrent cache misses.
streamConns map[string][]*streamConnEntry
streamMu sync.Mutex
baseTLSConf *tls.Config
baseTLSConfNoECH *tls.Config // Fallback config isolated strictly without ECH bindings
h2Client *http.Client
h2ClientNoECH *http.Client // Fallback client isolated strictly without ECH bindings
h3Client *http.Client
h3ClientNoECH *http.Client // Fallback client isolated strictly without ECH bindings
// Atomic flags coordinating the dynamic HTTP/3 upgrade sequence based on Alt-Svc headers
h3Upgraded atomic.Bool
h3Cooldown atomic.Int64
h3Fails atomic.Int32
// echCooldown is latched for backoff when ECH fails, to prevent
// continuously negotiating failing ECH payloads on strict networks.
echCooldown atomic.Int64
echFails atomic.Int32
doqConns map[string]*doqConnEntry
doqMu sync.Mutex
doqDialGroup singleflight.Group
// consFails counts consecutive failures. Reset to 0 on any success.
consFails atomic.Int32
// emaRTT Tracks the Exponential Moving Average of response latency in Nanoseconds.
// Exploited natively by the "fastest" strategy constraint to favor top performers.
emaRTT atomic.Int64
// doqNo0RTT is latched to true on DOQ_PROTOCOL_ERROR (RFC 9250 §10.5).
// All subsequent dials use Allow0RTT: false. Never reset.
doqNo0RTT atomic.Bool
}
func (u *Upstream) recordSuccess() { u.consFails.Store(0) }
func (u *Upstream) recordFailure() { u.consFails.Add(1) }
func (u *Upstream) isHealthy() bool { return u.consFails.Load() < healthThreshold }
// ---------------------------------------------------------------------------
// newUpstreamCtx — per-exchange context factory
// ---------------------------------------------------------------------------
// newUpstreamCtx returns a context and cancel function for one upstream exchange.
//
// - upstreamTimeout > 0 → context.WithTimeout: bounds goroutine lifetime on
// slow or unreachable upstreams, preventing goroutine pile-up under load.
// - upstreamTimeout == 0 → context.WithCancel: no deadline; the upstream's
// own TCP/TLS/QUIC timeout governs how long we wait.
//
// Callers must always call the returned cancel (typically via defer).
func newUpstreamCtx() (context.Context, context.CancelFunc) {
if upstreamTimeout > 0 {
return context.WithTimeout(context.Background(), upstreamTimeout)
}
return context.WithCancel(context.Background())
}
// ---------------------------------------------------------------------------
// Common Lightweight Utilities
// ---------------------------------------------------------------------------
// prepareForwardQuery builds a minimal, sanitised DNS query for upstream forwarding.
// Dynamically intercepts EDNS0 Client Subnet (ECS) structures based on localized
// configurations to mask, strip, or inject client telemetry privately natively.
func prepareForwardQuery(req *dns.Msg, encrypted bool, u *Upstream, clientAddr netip.Addr) *dns.Msg {
fwd := &dns.Msg{
MsgHdr: dns.MsgHdr{
Id: dns.Id(),
RecursionDesired: req.RecursionDesired,
CheckingDisabled: req.CheckingDisabled,
},
}
fwd.Question = make([]dns.Question, len(req.Question))
copy(fwd.Question, req.Question)
opt := &dns.OPT{
Hdr: dns.RR_Header{Name: ".", Rrtype: dns.TypeOPT, Class: dns.ClassINET},
}
opt.SetUDPSize(4096)
var clientECS *dns.EDNS0_SUBNET
if clientOpt := req.IsEdns0(); clientOpt != nil {
if clientOpt.Do() {
opt.SetDo()
}
// Isolate existing ECS components from the client to safely evaluate them later
for _, o := range clientOpt.Option {
if ecs, ok := o.(*dns.EDNS0_SUBNET); ok {
clientECS = ecs
break
}
}
}
action := "remove"
if u != nil && u.ECSAction != "" {
action = u.ECSAction
}
// Natively resolve configured ECS behaviors while preserving EDNS0 capacities natively
if action == "pass" && clientECS != nil {
opt.Option = append(opt.Option, clientECS)
} else if action == "add" && clientAddr.IsValid() {
var family uint16
var mask uint8
var ip net.IP
if clientAddr.Is4() {
family = 1
mask = uint8(u.ECSV4Mask)
if mask > 32 {
mask = 32
}
prefix, _ := clientAddr.Prefix(int(mask))
ip = prefix.Masked().Addr().AsSlice()
} else if clientAddr.Is6() {
family = 2
mask = uint8(u.ECSV6Mask)
if mask > 128 {
mask = 128
}
prefix, _ := clientAddr.Prefix(int(mask))
ip = prefix.Masked().Addr().AsSlice()
}
if ip != nil {
ecs := &dns.EDNS0_SUBNET{
Code: dns.EDNS0SUBNET,
Family: family,
SourceNetmask: mask,
SourceScope: 0,
Address: ip,
}
opt.Option = append(opt.Option, ecs)
}
}
fwd.Extra = []dns.RR{opt}
if !encrypted {
return fwd
}
// [SECURITY/FIX] Unconditionally append EDNS0 Padding Options natively.
// The modulo formulation inherently guarantees that regardless of the initial
// `msgLen`, the added Option Length (4) + the array payload strictly forces
// the finalized packet to land cleanly on 128-byte block boundaries, perfectly
// neutralizing packet-length fingerprinting vectors without boundary mismatches.
msgLen := fwd.Len()
padLen := 128 - ((msgLen + 4) % 128)
opt.Option = append(opt.Option, &dns.EDNS0_PADDING{
Padding: make([]byte, padLen),
})
return fwd
}
// formatUpstreamLog generates the "URL (IP)" telemetry string for the logs.
// Protocol modifiers like "+ECH" are explicitly prepended by the caller.
func formatUpstreamLog(proto, displayURL, dialAddr string) string {
displayURL = strings.TrimPrefix(displayURL, "https://")
if dialAddr == "" {
return fmt.Sprintf("%s://%s", proto, displayURL)
}
if displayURL == dialAddr {
return fmt.Sprintf("%s://%s", proto, displayURL)
}
if displayURL+":53" == dialAddr || displayURL+":853" == dialAddr || displayURL+":443" == dialAddr {
return fmt.Sprintf("%s://%s", proto, dialAddr)
}
return fmt.Sprintf("%s://%s (%s)", proto, displayURL, dialAddr)
}
// getUpstreamURL formats the upstream's URL for telemetry, natively reflecting dynamic protocol upgrades.
func getUpstreamURL(up *Upstream, clientName string) string {
url := up.RawURL
if up.hasClientNameTemplate && clientName != "" {
url = strings.ReplaceAll(url, "{client-name}", clientName)
}
if strings.HasPrefix(url, "https://") {
url = strings.TrimPrefix(url, "https://")
}
proto := up.Proto
if proto == "doh" && cfg.Server.UpgradeDoH3 && up.h3Upgraded.Load() {
proto = "doh3"
}
// Natively evaluate ECH deployment and active cooldown fallback statuses
if len(up.ECHConfigList) > 0 {
if cfg.Server.UseUpstreamECH == "try" && time.Now().UnixNano() < up.echCooldown.Load() {
// ECH is on fallback cooldown; do not append marker
} else {
proto += "+ECH"
}
}
return proto + "://" + url
}
// parseHostPort extracts host and port from a raw URL string.
func parseHostPort(rawURL, proto string) (host, port string) {
defaults := map[string]string{
"udp": "53", "tcp": "53",
"dot": "853", "doq": "853",
"doh": "443", "doh3": "443",
}
// Strip protocol scheme if present to prevent splitting on its colon natively
clean := rawURL
if idx := strings.Index(clean, "://"); idx >= 0 {
clean = clean[idx+3:]
}
// Strip trailing HTTP paths to isolate the host:port boundary
if idx := strings.IndexByte(clean, '/'); idx >= 0 {
clean = clean[:idx]
}
h, p, err := net.SplitHostPort(clean)
if err != nil {
// Unbracketed IPv6 addresses or hostnames without ports will safely fall here
return clean, defaults[proto]
}
return h, p
}