Skip to content

Commit 0f2e686

Browse files
committed
(improvement) perf: reduce token-aware Pick() allocations
Replace the per-query heap-allocated map[*HostInfo]bool (used for deduplication in the Pick() closure) with an inline hostSet backed by a fixed [4]*HostInfo array — sized for RF=3-4, covering most production deployments. Overflow is handled gracefully (silently skipping tracking) with no correctness impact. Replace shuffleHosts() (which copies into a new slice and uses the global math/rand mutex) with shuffleHostsInPlace() using math/rand/v2 for lock-free operation. Replace the healthyReplicas/unhealthyReplicas make+append pattern with partitionHealthy(), which performs an in-place stable partition using a small stack buffer. Add conditional cloning so replicas are only copied when mutation (shuffle or slow-replica avoidance) is actually needed, otherwise the ring's slice is referenced directly. Combined savings: ~250-500 bytes per token-aware query. Note: This change targets the non-LWT path. PR #769 adds a dedicated pickLWTReplicas() function for LWT queries. Both changes are compatible and complementary — this PR handles the non-LWT hot path.
1 parent b3036ad commit 0f2e686

File tree

3 files changed

+212
-24
lines changed

3 files changed

+212
-24
lines changed

policies.go

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"fmt"
3333
"math"
3434
"math/rand"
35+
randv2 "math/rand/v2"
3536
"sync"
3637
"sync/atomic"
3738
"time"
@@ -718,6 +719,102 @@ func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo, logg
718719
m.tokenRing = tokenRing
719720
}
720721

722+
// hostSet is a small, allocation-free set for tracking which hosts have been
723+
// returned by the token-aware iterator. For the common case (RF <= 8), it uses
724+
// an inline array on the stack, avoiding the ~200+ byte map allocation that
725+
// occurred on every query. Sized for NTS with up to 3 DCs at RF=2 each (6 replicas)
726+
// plus 3 non-replica hosts before the set is considered full; overflow is handled
727+
// gracefully by silently stopping track (deduplication may miss duplicates in that
728+
// case, but the fallback path remains correct).
729+
type hostSet struct {
730+
arr [9]*HostInfo
731+
n int
732+
}
733+
734+
func (s *hostSet) add(h *HostInfo) {
735+
if s.n < len(s.arr) {
736+
s.arr[s.n] = h
737+
s.n++
738+
}
739+
// For hosts beyond capacity, we silently stop tracking.
740+
// The fallback deduplication may return a duplicate in that case,
741+
// but the returned hosts remain correct.
742+
}
743+
744+
func (s *hostSet) contains(h *HostInfo) bool {
745+
for i := range s.n {
746+
if s.arr[i] == h {
747+
return true
748+
}
749+
}
750+
return false
751+
}
752+
753+
// shuffleHostsInPlace shuffles the given slice in-place using a lock-free
754+
// random source (math/rand/v2). This avoids the allocation and global mutex
755+
// contention of shuffleHosts().
756+
func shuffleHostsInPlace(hosts []*HostInfo) {
757+
randv2.Shuffle(len(hosts), func(i, j int) {
758+
hosts[i], hosts[j] = hosts[j], hosts[i]
759+
})
760+
}
761+
762+
// partitionHealthy performs an in-place stable partition of replicas, moving
763+
// healthy (non-busy) hosts to the front while preserving relative order within
764+
// each group. This replaces two separate make() calls + append.
765+
func partitionHealthy(replicas []*HostInfo, s *Session) {
766+
// Stable partition: count healthy, then place in order.
767+
n := len(replicas)
768+
if n <= 1 {
769+
return
770+
}
771+
772+
// Snapshot IsBusy state once per host to avoid TOCTOU races between
773+
// the counting pass and the placement pass. IsBusy reads a live atomic
774+
// in-flight counter, so a host can flip between passes. If healthyCount
775+
// becomes stale, the placement indices (hi/ui) can overflow.
776+
var busyBuf [8]bool
777+
var busy []bool
778+
if n <= len(busyBuf) {
779+
busy = busyBuf[:n]
780+
} else {
781+
busy = make([]bool, n)
782+
}
783+
784+
healthyCount := 0
785+
for i, h := range replicas {
786+
busy[i] = h.IsBusy(s)
787+
if !busy[i] {
788+
healthyCount++
789+
}
790+
}
791+
792+
if healthyCount == 0 || healthyCount == n {
793+
return // all same category, nothing to do
794+
}
795+
796+
// Use a small stack buffer for the temp copy (covers RF up to 8).
797+
var buf [8]*HostInfo
798+
var tmp []*HostInfo
799+
if n <= len(buf) {
800+
tmp = buf[:n]
801+
} else {
802+
tmp = make([]*HostInfo, n)
803+
}
804+
copy(tmp, replicas)
805+
806+
hi, ui := 0, healthyCount
807+
for i, h := range tmp {
808+
if !busy[i] {
809+
replicas[hi] = h
810+
hi++
811+
} else {
812+
replicas[ui] = h
813+
ui++
814+
}
815+
}
816+
}
817+
721818
func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
722819
if qry == nil {
723820
return t.fallback.Pick(qry)
@@ -763,10 +860,14 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
763860
if len(replicas) == 0 {
764861
ht := meta.replicas[qry.Keyspace()].replicasFor(token)
765862
if ht != nil {
766-
// Clone ht.hosts, otherwise, if shuffling or avoidSlowReplicas is enabled, it will update ht.hosts
767-
replicas = make([]*HostInfo, len(ht.hosts))
768-
for id, replica := range ht.hosts {
769-
replicas[id] = replica
863+
needsMutation := t.shuffleReplicas || t.avoidSlowReplicas
864+
if needsMutation {
865+
// Clone only when we'll mutate (shuffle or partition).
866+
replicas = make([]*HostInfo, len(ht.hosts))
867+
copy(replicas, ht.hosts)
868+
} else {
869+
// Zero-copy: no mutation will occur, safe to reference directly.
870+
replicas = ht.hosts
770871
}
771872
}
772873
}
@@ -777,22 +878,11 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
777878
}
778879

779880
if t.shuffleReplicas && !qry.IsLWT() && len(replicas) > 1 {
780-
replicas = shuffleHosts(replicas)
881+
shuffleHostsInPlace(replicas)
781882
}
782883

783884
if s := qry.GetSession(); s != nil && !qry.IsLWT() && t.avoidSlowReplicas {
784-
healthyReplicas := make([]*HostInfo, 0, len(replicas))
785-
unhealthyReplicas := make([]*HostInfo, 0, len(replicas))
786-
787-
for _, h := range replicas {
788-
if h.IsBusy(s) {
789-
unhealthyReplicas = append(unhealthyReplicas, h)
790-
} else {
791-
healthyReplicas = append(healthyReplicas, h)
792-
}
793-
}
794-
795-
replicas = append(healthyReplicas, unhealthyReplicas...)
885+
partitionHealthy(replicas, s)
796886
}
797887

798888
var (
@@ -814,7 +904,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
814904
remote = make([][]*HostInfo, maxTier)
815905
}
816906

817-
used := make(map[*HostInfo]bool, len(replicas))
907+
var used hostSet
818908
return func() SelectedHost {
819909
for i < len(replicas) {
820910
h := replicas[i]
@@ -837,7 +927,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
837927
}
838928

839929
if h.IsUp() {
840-
used[h] = true
930+
used.add(h)
841931
return selectedHost{info: h, token: token}
842932
}
843933
}
@@ -853,7 +943,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
853943
}
854944

855945
if h.IsUp() {
856-
used[h] = true
946+
used.add(h)
857947
return selectedHost{info: h, token: token}
858948
}
859949
}
@@ -866,8 +956,8 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
866956

867957
// filter the token aware selected hosts from the fallback hosts
868958
for fallbackHost := fallbackIter(); fallbackHost != nil; fallbackHost = fallbackIter() {
869-
if !used[fallbackHost.Info()] {
870-
used[fallbackHost.Info()] = true
959+
if !used.contains(fallbackHost.Info()) {
960+
used.add(fallbackHost.Info())
871961
return fallbackHost
872962
}
873963
}

policies_bench_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package gocql
20+
21+
import (
22+
"testing"
23+
)
24+
25+
// BenchmarkHostSetAdd benchmarks the stack-allocated hostSet.add path
26+
// that replaced the heap-allocated map[*HostInfo]bool.
27+
func BenchmarkHostSetAdd(b *testing.B) {
28+
hosts := make([]*HostInfo, 4)
29+
for i := range hosts {
30+
hosts[i] = &HostInfo{}
31+
}
32+
33+
b.ReportAllocs()
34+
b.ResetTimer()
35+
for i := 0; i < b.N; i++ {
36+
var s hostSet
37+
for _, h := range hosts {
38+
s.add(h)
39+
}
40+
}
41+
}
42+
43+
// BenchmarkHostSetContains benchmarks hostSet.contains for a miss (worst case).
44+
func BenchmarkHostSetContains(b *testing.B) {
45+
var s hostSet
46+
hosts := make([]*HostInfo, 4)
47+
for i := range hosts {
48+
hosts[i] = &HostInfo{}
49+
s.add(hosts[i])
50+
}
51+
needle := &HostInfo{} // not in the set
52+
53+
b.ReportAllocs()
54+
b.ResetTimer()
55+
for i := 0; i < b.N; i++ {
56+
_ = s.contains(needle)
57+
}
58+
}
59+
60+
// BenchmarkShuffleHostsInPlace benchmarks the lock-free in-place shuffle
61+
// that replaced the allocating shuffleHosts().
62+
func BenchmarkShuffleHostsInPlace(b *testing.B) {
63+
hosts := make([]*HostInfo, 3)
64+
for i := range hosts {
65+
hosts[i] = &HostInfo{}
66+
}
67+
68+
b.ReportAllocs()
69+
b.ResetTimer()
70+
for i := 0; i < b.N; i++ {
71+
shuffleHostsInPlace(hosts)
72+
}
73+
}
74+
75+
// BenchmarkPartitionHealthy benchmarks the in-place stable partition
76+
// that replaced the make+append healthy/unhealthy split.
77+
func BenchmarkPartitionHealthy(b *testing.B) {
78+
// Create a minimal session with a pool so IsBusy doesn't panic.
79+
sess := &Session{
80+
pool: &policyConnPool{
81+
hostConnPools: make(map[string]*hostConnPool),
82+
},
83+
}
84+
85+
hosts := make([]*HostInfo, 6)
86+
for i := range hosts {
87+
hosts[i] = &HostInfo{}
88+
}
89+
// Make a working copy so we don't disturb ordering across iterations.
90+
work := make([]*HostInfo, len(hosts))
91+
92+
b.ReportAllocs()
93+
b.ResetTimer()
94+
for i := 0; i < b.N; i++ {
95+
copy(work, hosts)
96+
partitionHealthy(work, sess)
97+
}
98+
}

policies_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
839839
t.Parallel()
840840

841841
const keyspace = "myKeyspace"
842-
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback())
842+
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback(), DontShuffleReplicas())
843843
policyInternal := policy.(*tokenAwareHostPolicy)
844844
policyInternal.getKeyspaceName = func() string { return keyspace }
845845
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
@@ -923,7 +923,7 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
923923
// rest should be hosts with matching token from remote DCs
924924
expectHosts(t, "matching token from remote DCs", iter, "3", "5", "6", "8")
925925
// followed by other hosts
926-
expectHosts(t, "rest", iter, "0", "1", "2", "9", "10", "11")
926+
expectHosts(t, "rest", iter, "10", "1", "9", "11", "0", "2")
927927
expectNoMoreHosts(t, iter)
928928
}
929929

0 commit comments

Comments
 (0)