Skip to content

Commit a19eb89

Browse files
committed
api: Add all in-flight requests /reverse_proxy/upstreams (Fixes #7277)
This refactors the initial approach in PR #7281, replacing the UsagePool with a dedicated package-level sync.Map and atomic.Int64 to track in-flight requests without global lock contention. It also introduces a lookup map in the admin API to fix a potential O(n^2) iteration over upstreams, ensuring that draining upstreams are correctly exposed across config reloads without leaking memory. Co-authored-by: Y.Horie <u5.horie@gmail.com> reverseproxy: optimize in-flight tracking and admin API - Replaced sync.RWMutex with sync.Map and atomic.Int64 to avoid lock contention under high RPS. - Introduced a lookup map in the admin API to fix a potential O(n^2) iteration over upstreams.
1 parent 9873752 commit a19eb89

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed

modules/caddyhttp/reverseproxy/admin.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
7373

7474
// Collect the results to respond with
7575
results := []upstreamStatus{}
76+
knownHosts := make(map[string]struct{})
7677

7778
// Iterate over the upstream pool (needs to be fast)
7879
var rangeErr error
@@ -95,6 +96,8 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
9596
return false
9697
}
9798

99+
knownHosts[address] = struct{}{}
100+
98101
results = append(results, upstreamStatus{
99102
Address: address,
100103
NumRequests: upstream.NumRequests(),
@@ -103,7 +106,17 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
103106
return true
104107
})
105108

106-
// If an error happened during the range, return it
109+
currentInFlight := getInFlightRequests()
110+
for address, count := range currentInFlight {
111+
if _, exists := knownHosts[address]; !exists && count > 0 {
112+
results = append(results, upstreamStatus{
113+
Address: address,
114+
NumRequests: int(count),
115+
Fails: 0,
116+
})
117+
}
118+
}
119+
107120
if rangeErr != nil {
108121
return rangeErr
109122
}

modules/caddyhttp/reverseproxy/reverseproxy.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"strconv"
3333
"strings"
3434
"sync"
35+
"sync/atomic"
3536
"time"
3637

3738
"go.uber.org/zap"
@@ -46,6 +47,31 @@ import (
4647
"github.com/caddyserver/caddy/v2/modules/caddyhttp/rewrite"
4748
)
4849

50+
// inFlightRequests uses sync.Map with atomic.Int64 for lock-free updates on the hot path
51+
var inFlightRequests sync.Map
52+
53+
func incInFlightRequest(address string) {
54+
v, _ := inFlightRequests.LoadOrStore(address, new(atomic.Int64))
55+
v.(*atomic.Int64).Add(1)
56+
}
57+
58+
func decInFlightRequest(address string) {
59+
if v, ok := inFlightRequests.Load(address); ok {
60+
if v.(*atomic.Int64).Add(-1) <= 0 {
61+
inFlightRequests.Delete(address)
62+
}
63+
}
64+
}
65+
66+
func getInFlightRequests() map[string]int64 {
67+
copyMap := make(map[string]int64)
68+
inFlightRequests.Range(func(key, value any) bool {
69+
copyMap[key.(string)] = value.(*atomic.Int64).Load()
70+
return true
71+
})
72+
return copyMap
73+
}
74+
4975
func init() {
5076
caddy.RegisterModule(Handler{})
5177
}
@@ -904,8 +930,16 @@ func (h Handler) addForwardedHeaders(req *http.Request) error {
904930
// Go standard library which was used as the foundation.)
905931
func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origReq *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error {
906932
_ = di.Upstream.Host.countRequest(1)
933+
934+
// Increment the in-flight request count
935+
incInFlightRequest(di.Address)
936+
907937
//nolint:errcheck
908-
defer di.Upstream.Host.countRequest(-1)
938+
defer func() {
939+
di.Upstream.Host.countRequest(-1)
940+
// Decrement the in-flight request count
941+
decInFlightRequest(di.Address)
942+
}()
909943

910944
// point the request to this upstream
911945
h.directRequest(req, di)

0 commit comments

Comments
 (0)