Skip to content

Commit dd9eae2

Browse files
committed
perf: pool []queryValues slices to reduce per-request allocations
Use bucketed sync.Pool (caps 8/16/32/64/128) to reuse []queryValues slices in executeQuery() and executeBatch(). Slices are returned to the pool immediately after c.exec() serializes the frame, with clear() to release string/[]byte references. Benchmark (8 values, sequential): ~3.9x faster than make(). Benchmark (8 values, parallel): ~7.7x faster than make().
1 parent 0952897 commit dd9eae2

File tree

3 files changed

+244
-2
lines changed

3 files changed

+244
-2
lines changed

conn.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,12 +1537,13 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
15371537
return &Iter{err: fmt.Errorf("gocql: expected %d values send got %d", info.request.actualColCount, len(values))}
15381538
}
15391539

1540-
params.values = make([]queryValues, len(values))
1540+
params.values = getQueryValues(len(values))
15411541
for i := 0; i < len(values); i++ {
15421542
v := &params.values[i]
15431543
value := values[i]
15441544
typ := info.request.columns[i].TypeInfo
15451545
if err := marshalQueryValue(typ, value, v); err != nil {
1546+
putQueryValues(params.values)
15461547
return &Iter{err: err}
15471548
}
15481549
}
@@ -1571,6 +1572,8 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
15711572
}
15721573

15731574
framer, err := c.exec(ctx, frame, qry.trace, qry.GetRequestTimeout())
1575+
// Return pooled queryValues now that the frame has been serialized.
1576+
putQueryValues(params.values)
15741577
if err != nil {
15751578
return &Iter{err: err}
15761579
}
@@ -1737,6 +1740,7 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
17371740
if len(entry.Args) > 0 || entry.binding != nil {
17381741
info, err := c.prepareStatement(batch.Context(), entry.Stmt, batch.trace, batch.GetRequestTimeout())
17391742
if err != nil {
1743+
putBatchQueryValues(req.statements)
17401744
return &Iter{err: err}
17411745
}
17421746

@@ -1751,24 +1755,27 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
17511755
PKeyColumns: info.request.pkeyColumns,
17521756
})
17531757
if err != nil {
1758+
putBatchQueryValues(req.statements)
17541759
return &Iter{err: err}
17551760
}
17561761
}
17571762

17581763
if len(values) != info.request.actualColCount {
1764+
putBatchQueryValues(req.statements)
17591765
return &Iter{err: fmt.Errorf("gocql: batch statement %d expected %d values send got %d", i, info.request.actualColCount, len(values))}
17601766
}
17611767

17621768
b.preparedID = info.id
17631769
stmts[string(info.id)] = entry.Stmt
17641770

1765-
b.values = make([]queryValues, info.request.actualColCount)
1771+
b.values = getQueryValues(info.request.actualColCount)
17661772

17671773
for j := 0; j < info.request.actualColCount; j++ {
17681774
v := &b.values[j]
17691775
value := values[j]
17701776
typ := info.request.columns[j].TypeInfo
17711777
if err := marshalQueryValue(typ, value, v); err != nil {
1778+
putBatchQueryValues(req.statements)
17721779
return &Iter{err: err}
17731780
}
17741781
}
@@ -1789,6 +1796,8 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
17891796

17901797
// TODO: should batch support tracing?
17911798
framer, err := c.exec(batch.Context(), req, batch.trace, batch.GetRequestTimeout())
1799+
// Return pooled queryValues now that the frame has been serialized.
1800+
putBatchQueryValues(req.statements)
17921801
if err != nil {
17931802
return &Iter{err: err}
17941803
}

frame.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"runtime"
3535
"strconv"
3636
"strings"
37+
"sync"
3738
"time"
3839

3940
frm "github.com/gocql/gocql/internal/frame"
@@ -1163,6 +1164,68 @@ type queryValues struct {
11631164
isUnset bool
11641165
}
11651166

1167+
// queryValuesPool is a set of size-bucketed sync.Pools for []queryValues slices.
1168+
// Buckets: 0→cap 8, 1→cap 16, 2→cap 32, 3→cap 64, 4→cap 128.
1169+
// Slices larger than 128 are not pooled.
1170+
var queryValuesPools [5]sync.Pool
1171+
1172+
// queryValuesBucket returns the pool bucket index for a given count.
1173+
// Returns -1 if the count exceeds the maximum pooled size.
1174+
func queryValuesBucket(n int) int {
1175+
switch {
1176+
case n <= 8:
1177+
return 0
1178+
case n <= 16:
1179+
return 1
1180+
case n <= 32:
1181+
return 2
1182+
case n <= 64:
1183+
return 3
1184+
case n <= 128:
1185+
return 4
1186+
default:
1187+
return -1
1188+
}
1189+
}
1190+
1191+
// getQueryValues returns a []queryValues of length n from the pool.
1192+
// The returned slice elements are zeroed.
1193+
func getQueryValues(n int) []queryValues {
1194+
bucket := queryValuesBucket(n)
1195+
if bucket < 0 {
1196+
return make([]queryValues, n)
1197+
}
1198+
if v := queryValuesPools[bucket].Get(); v != nil {
1199+
s := v.([]queryValues)
1200+
return s[:n]
1201+
}
1202+
// Allocate with the bucket's capacity so future returns fit.
1203+
return make([]queryValues, n, 8<<bucket)
1204+
}
1205+
1206+
// putQueryValues returns a []queryValues slice to the pool.
1207+
// It clears all elements to release references to marshaled byte slices.
1208+
func putQueryValues(s []queryValues) {
1209+
if s == nil {
1210+
return
1211+
}
1212+
bucket := queryValuesBucket(cap(s))
1213+
if bucket < 0 {
1214+
return
1215+
}
1216+
// Clear to release references (name strings, value []byte).
1217+
clear(s[:cap(s)])
1218+
queryValuesPools[bucket].Put(s[:cap(s)])
1219+
}
1220+
1221+
// putBatchQueryValues returns all pooled []queryValues slices from batch statements.
1222+
func putBatchQueryValues(stmts []batchStatment) {
1223+
for i := range stmts {
1224+
putQueryValues(stmts[i].values)
1225+
stmts[i].values = nil
1226+
}
1227+
}
1228+
11661229
type queryParams struct {
11671230
keyspace string
11681231
values []queryValues

frame_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ package gocql
3030
import (
3131
"bytes"
3232
"os"
33+
"runtime"
3334
"testing"
3435

3536
frm "github.com/gocql/gocql/internal/frame"
@@ -162,3 +163,172 @@ func TestParseEventFrame_ClientRoutesChanged(t *testing.T) {
162163
t.Fatalf("HostIDs = %v, want empty", evt.HostIDs)
163164
}
164165
}
166+
167+
// --- queryValues pool tests ---
168+
169+
func TestQueryValuesBucket(t *testing.T) {
170+
tests := []struct {
171+
n int
172+
bucket int
173+
}{
174+
{0, 0}, {1, 0}, {8, 0},
175+
{9, 1}, {16, 1},
176+
{17, 2}, {32, 2},
177+
{33, 3}, {64, 3},
178+
{65, 4}, {128, 4},
179+
{129, -1}, {1000, -1},
180+
}
181+
for _, tt := range tests {
182+
got := queryValuesBucket(tt.n)
183+
if got != tt.bucket {
184+
t.Errorf("queryValuesBucket(%d) = %d, want %d", tt.n, got, tt.bucket)
185+
}
186+
}
187+
}
188+
189+
func TestGetQueryValuesLength(t *testing.T) {
190+
for _, n := range []int{0, 1, 5, 8, 10, 16, 30, 64, 128, 200} {
191+
s := getQueryValues(n)
192+
if len(s) != n {
193+
t.Errorf("getQueryValues(%d): len = %d, want %d", n, len(s), n)
194+
}
195+
// For pooled sizes, capacity should be the bucket size.
196+
bucket := queryValuesBucket(n)
197+
if bucket >= 0 {
198+
wantCap := 8 << bucket
199+
if cap(s) != wantCap {
200+
t.Errorf("getQueryValues(%d): cap = %d, want %d", n, cap(s), wantCap)
201+
}
202+
}
203+
}
204+
}
205+
206+
func TestPutGetQueryValuesClearsReferences(t *testing.T) {
207+
s := getQueryValues(4)
208+
s[0].name = "col1"
209+
s[0].value = []byte("data")
210+
s[1].name = "col2"
211+
s[1].value = []byte("more")
212+
s[2].isUnset = true
213+
214+
putQueryValues(s)
215+
216+
// Get a new slice from the same bucket. It should have zeroed elements.
217+
s2 := getQueryValues(4)
218+
for i := 0; i < 4; i++ {
219+
if s2[i].name != "" {
220+
t.Errorf("element %d: name = %q, want empty", i, s2[i].name)
221+
}
222+
if s2[i].value != nil {
223+
t.Errorf("element %d: value = %v, want nil", i, s2[i].value)
224+
}
225+
if s2[i].isUnset {
226+
t.Errorf("element %d: isUnset = true, want false", i)
227+
}
228+
}
229+
}
230+
231+
func TestPutQueryValuesNilSafe(t *testing.T) {
232+
// Must not panic.
233+
putQueryValues(nil)
234+
}
235+
236+
func TestPutBatchQueryValues(t *testing.T) {
237+
stmts := make([]batchStatment, 3)
238+
stmts[0].values = getQueryValues(5)
239+
stmts[0].values[0].name = "a"
240+
// stmts[1].values is nil (simulates entry without args)
241+
stmts[2].values = getQueryValues(10)
242+
stmts[2].values[0].value = []byte("x")
243+
244+
putBatchQueryValues(stmts)
245+
246+
for i, s := range stmts {
247+
if s.values != nil {
248+
t.Errorf("stmts[%d].values should be nil after putBatchQueryValues", i)
249+
}
250+
}
251+
}
252+
253+
func TestGetQueryValuesOversize(t *testing.T) {
254+
// Slices larger than 128 should not be pooled.
255+
s := getQueryValues(200)
256+
if len(s) != 200 {
257+
t.Errorf("getQueryValues(200): len = %d, want 200", len(s))
258+
}
259+
// Should not panic when returning oversize.
260+
putQueryValues(s)
261+
}
262+
263+
// --- benchmarks ---
264+
265+
// queryValuesSink forces escape analysis to heap-allocate make() in baseline
266+
// benchmarks. Assigned once after the loop to avoid per-iteration cache-line
267+
// effects that would unfairly penalize the baseline.
268+
var queryValuesSink []queryValues
269+
270+
func BenchmarkGetPutQueryValues_8_Seq(b *testing.B) {
271+
for b.Loop() {
272+
s := getQueryValues(8)
273+
s[0].name = "col"
274+
s[0].value = []byte("val")
275+
putQueryValues(s)
276+
}
277+
}
278+
279+
func BenchmarkGetPutQueryValues_8_Parallel(b *testing.B) {
280+
b.RunParallel(func(pb *testing.PB) {
281+
for pb.Next() {
282+
s := getQueryValues(8)
283+
s[0].name = "col"
284+
s[0].value = []byte("val")
285+
putQueryValues(s)
286+
}
287+
})
288+
}
289+
290+
func BenchmarkMakeQueryValues_8_Seq(b *testing.B) {
291+
var s []queryValues
292+
for b.Loop() {
293+
s = make([]queryValues, 8)
294+
s[0].name = "col"
295+
s[0].value = []byte("val")
296+
}
297+
queryValuesSink = s
298+
}
299+
300+
func BenchmarkMakeQueryValues_8_Parallel(b *testing.B) {
301+
b.RunParallel(func(pb *testing.PB) {
302+
var s []queryValues
303+
for pb.Next() {
304+
s = make([]queryValues, 8)
305+
s[0].name = "col"
306+
s[0].value = []byte("val")
307+
}
308+
runtime.KeepAlive(s)
309+
})
310+
}
311+
312+
func BenchmarkPutBatchQueryValues_10x8(b *testing.B) {
313+
for b.Loop() {
314+
stmts := make([]batchStatment, 10)
315+
for i := range stmts {
316+
stmts[i].values = getQueryValues(8)
317+
stmts[i].values[0].name = "col"
318+
}
319+
putBatchQueryValues(stmts)
320+
}
321+
}
322+
323+
func BenchmarkPutBatchQueryValues_10x8_Parallel(b *testing.B) {
324+
b.RunParallel(func(pb *testing.PB) {
325+
for pb.Next() {
326+
stmts := make([]batchStatment, 10)
327+
for i := range stmts {
328+
stmts[i].values = getQueryValues(8)
329+
stmts[i].values[0].name = "col"
330+
}
331+
putBatchQueryValues(stmts)
332+
}
333+
})
334+
}

0 commit comments

Comments
 (0)