@@ -5,6 +5,7 @@ package router
55
66import (
77 "context"
8+ "fmt"
89 "slices"
910 "strings"
1011 "sync"
@@ -41,6 +42,8 @@ type ScoreBasedRouter struct {
4142 backends map [string ]* backendWrapper
4243 // TODO: sort the groups to leverage binary search.
4344 groups []* Group
45+ // portConflictDetector dispatches listener ports to cluster-scoped backend groups.
46+ portConflictDetector * portConflictDetector
4447 // The routing rule for categorizing backends to groups.
4548 matchType MatchType
4649 observeError error
@@ -74,6 +77,8 @@ func (r *ScoreBasedRouter) Init(ctx context.Context, ob observer.BackendObserver
7477 r .matchType = MatchClientCIDR
7578 case config .MatchProxyCIDRStr :
7679 r .matchType = MatchProxyCIDR
80+ case config .MatchPortStr :
81+ r .matchType = MatchPort
7782 case "" :
7883 default :
7984 r .logger .Error ("unsupported routing rule, use the default rule" , zap .String ("rule" , cfg .Balance .RoutingRule ))
@@ -110,7 +115,10 @@ func (router *ScoreBasedRouter) GetBackendSelector(clientInfo ClientInfo) Backen
110115 return
111116 }
112117 // The group may change from round to round because the backends are updated.
113- group = router .routeToGroup (clientInfo )
118+ group , err = router .routeToGroup (clientInfo )
119+ if err != nil {
120+ return
121+ }
114122 if group == nil {
115123 err = ErrNoBackend
116124 return
@@ -146,14 +154,20 @@ func (router *ScoreBasedRouter) HealthyBackendCount() int {
146154}
147155
148156// called in the lock
149- func (router * ScoreBasedRouter ) routeToGroup (clientInfo ClientInfo ) * Group {
157+ func (router * ScoreBasedRouter ) routeToGroup (clientInfo ClientInfo ) (* Group , error ) {
158+ if router .matchType == MatchPort {
159+ if router .portConflictDetector == nil {
160+ return nil , nil
161+ }
162+ return router .portConflictDetector .groupFor (clientInfo .ListenerPort )
163+ }
150164 // TODO: binary search
151165 for _ , group := range router .groups {
152166 if group .Match (clientInfo ) {
153- return group
167+ return group , nil
154168 }
155169 }
156- return nil
170+ return nil , nil
157171}
158172
159173// RefreshBackend implements Router.GetBackendSelector interface.
@@ -233,7 +247,45 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt
233247 }
234248}
235249
236- // Update the groups after the backend list is updated.
250+ func matchPortValue (clusterName , port string ) string {
251+ if clusterName == "" {
252+ return port
253+ }
254+ return fmt .Sprintf ("%s:%s" , clusterName , port )
255+ }
256+
257+ func (router * ScoreBasedRouter ) backendGroupValues (backend * backendWrapper ) []string {
258+ switch router .matchType {
259+ case MatchClientCIDR , MatchProxyCIDR :
260+ return backend .Cidr ()
261+ case MatchPort :
262+ port := backend .TiProxyPort ()
263+ if port != "" {
264+ return []string {matchPortValue (backend .ClusterName (), port )}
265+ }
266+ }
267+ return nil
268+ }
269+
270+ func (router * ScoreBasedRouter ) rebuildPortConflictDetector () {
271+ if router .matchType != MatchPort {
272+ router .portConflictDetector = nil
273+ return
274+ }
275+ detector := newPortConflictDetector ()
276+ for _ , group := range router .groups {
277+ for _ , value := range group .values {
278+ clusterName , port , ok := strings .Cut (value , ":" )
279+ if ! ok {
280+ port = value
281+ clusterName = ""
282+ }
283+ detector .bind (port , clusterName , group )
284+ }
285+ }
286+ router .portConflictDetector = detector
287+ }
288+
237289// called in the lock.
238290func (router * ScoreBasedRouter ) updateGroups () {
239291 for _ , backend := range router .backends {
@@ -254,6 +306,17 @@ func (router *ScoreBasedRouter) updateGroups() {
254306 }
255307 // If the labels were correctly set, we won't update its group even if the labels change.
256308 if backend .group != nil {
309+ switch router .matchType {
310+ case MatchClientCIDR , MatchProxyCIDR , MatchPort :
311+ values := router .backendGroupValues (backend )
312+ if ! backend .group .EqualValues (values ) {
313+ router .logger .Warn ("backend routing values changed, keep the existing group until it is removed" ,
314+ zap .String ("backend_id" , backend .id ),
315+ zap .String ("addr" , backend .Addr ()),
316+ zap .Strings ("current_values" , values ),
317+ zap .Strings ("group_values" , backend .group .values ))
318+ }
319+ }
257320 continue
258321 }
259322
@@ -267,33 +330,35 @@ func (router *ScoreBasedRouter) updateGroups() {
267330 router .groups = append (router .groups , group )
268331 }
269332 group = router .groups [0 ]
270- case MatchClientCIDR , MatchProxyCIDR :
271- cidrs := backend . Cidr ( )
272- if len (cidrs ) == 0 {
333+ case MatchClientCIDR , MatchProxyCIDR , MatchPort :
334+ values := router . backendGroupValues ( backend )
335+ if len (values ) == 0 {
273336 break
274337 }
275338 for _ , g := range router .groups {
276- if g .Intersect (cidrs ) {
339+ if g .Intersect (values ) {
277340 group = g
278341 break
279342 }
280343 }
281344 if group == nil {
282- g , err := NewGroup (cidrs , router .bpCreator , router .matchType , router .logger )
345+ g , err := NewGroup (values , router .bpCreator , router .matchType , router .logger )
283346 if err == nil {
284347 group = g
285348 router .groups = append (router .groups , group )
286349 }
287350 // maybe too many logs, ignore the error now
288351 }
289352 }
290- if group ! = nil {
291- group . AddBackend ( backend . id , backend )
353+ if group = = nil {
354+ continue
292355 }
356+ group .AddBackend (backend .id , backend )
293357 }
294358 for _ , group := range router .groups {
295359 group .RefreshCidr ()
296360 }
361+ router .rebuildPortConflictDetector ()
297362}
298363
299364func (router * ScoreBasedRouter ) rebalanceLoop (ctx context.Context ) {
0 commit comments