Skip to content

Commit 9410b42

Browse files
committed
perf: wire putVectorBuf into connection write path
Return pooled vector buffers to vectorBufPool after the framer copies marshalled bytes in executeQuery and executeBatch. This completes the zero-alloc steady-state cycle for vector marshal operations. In executeQuery, a defer after the marshal loop returns buffers for columns identified as pooled vector types (float32, float64, int32, int64). In executeBatch, vector buffers are collected across all batch statements and returned via a single defer. The vectorBufPoolSubtype helper centralizes the type check to keep the two call sites consistent with the marshal fast paths. Includes unit tests covering vectorBufPoolSubtype classification, single-query and batch pool return simulation, and non-pooled type safety.
1 parent c5fb0b2 commit 9410b42

3 files changed

Lines changed: 237 additions & 27 deletions

File tree

conn.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,6 +1574,29 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
15741574
}
15751575
}
15761576

1577+
// Return pooled vector buffers after the framer has copied the
1578+
// marshalled bytes (which happens inside c.exec → buildFrame).
1579+
// Only install the defer when at least one column uses a poolable
1580+
// vector buffer to avoid ~50ns defer overhead on non-vector queries.
1581+
cols := info.request.columns
1582+
vals := params.values
1583+
hasPooledVec := false
1584+
for _, col := range cols {
1585+
if vt, ok := col.TypeInfo.(VectorType); ok && vectorBufPoolSubtype(vt) {
1586+
hasPooledVec = true
1587+
break
1588+
}
1589+
}
1590+
if hasPooledVec {
1591+
defer func() {
1592+
for i, col := range cols {
1593+
if vt, ok := col.TypeInfo.(VectorType); ok && vectorBufPoolSubtype(vt) {
1594+
putVectorBuf(vals[i].value)
1595+
}
1596+
}
1597+
}()
1598+
}
1599+
15771600
// if the metadata was not present in the response then we should not skip it
15781601
params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) && len(info.response.columns) != 0
15791602

@@ -1761,6 +1784,10 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
17611784

17621785
hasLwtEntries := false
17631786

1787+
// vectorBufs collects marshalled byte slices from vector fast paths
1788+
// so they can be returned to vectorBufPool after the framer copies them.
1789+
var vectorBufs [][]byte
1790+
17641791
for i := 0; i < n; i++ {
17651792
entry := &batch.Entries[i]
17661793
b := &req.statements[i]
@@ -1802,6 +1829,9 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
18021829
if err := marshalQueryValue(typ, value, v); err != nil {
18031830
return &Iter{err: err}
18041831
}
1832+
if vt, ok := typ.(VectorType); ok && vectorBufPoolSubtype(vt) {
1833+
vectorBufs = append(vectorBufs, v.value)
1834+
}
18051835
}
18061836

18071837
if !hasLwtEntries && info.request.lwt {
@@ -1812,6 +1842,16 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
18121842
}
18131843
}
18141844

1845+
// Return pooled vector buffers after the framer has copied the
1846+
// marshalled bytes (which happens inside c.exec → buildFrame).
1847+
if len(vectorBufs) > 0 {
1848+
defer func() {
1849+
for _, buf := range vectorBufs {
1850+
putVectorBuf(buf)
1851+
}
1852+
}()
1853+
}
1854+
18151855
// The batch is considered to be conditional if even one of the
18161856
// statements is conditional.
18171857
batch.routingInfo.mu.Lock()

marshal.go

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,9 +1013,10 @@ func unmarshalVector(info VectorType, data []byte, value interface{}) error {
10131013
// vectorBufPool pools []byte buffers used by vector marshal fast paths.
10141014
// Buffers are returned to the pool by putVectorBuf after the framer copies them.
10151015
//
1016-
// NOTE: putVectorBuf is currently exercised only by benchmarks/tests.
1017-
// Wiring it into the connection write path (so production callers return
1018-
// buffers after the framer copies them) is planned for a follow-up change.
1016+
// vectorBufPool is used by the marshal fast paths (marshalVectorFloat32 etc.)
1017+
// to allocate temporary byte buffers. After the framer copies the marshalled
1018+
// bytes via writeBytes (append into framer.buf), the caller returns the buffer
1019+
// to the pool via putVectorBuf.
10191020
var vectorBufPool = sync.Pool{}
10201021

10211022
func getVectorBuf(size int) []byte {
@@ -1044,6 +1045,17 @@ func putVectorBuf(buf []byte) {
10441045
vectorBufPool.Put(buf) //nolint:staticcheck // SA6002: []byte is a value type; boxing cost is acceptable for pool reuse
10451046
}
10461047

1048+
// vectorBufPoolSubtype returns true if the given VectorType's SubType uses a
1049+
// marshal fast path that allocates from vectorBufPool via getVectorBuf.
1050+
func vectorBufPoolSubtype(vt VectorType) bool {
1051+
switch vt.SubType.Type() {
1052+
case TypeFloat, TypeDouble, TypeInt, TypeBigInt:
1053+
return true
1054+
default:
1055+
return false
1056+
}
1057+
}
1058+
10471059
// vectorByteSize computes dim * elemBytes and returns an error if the result
10481060
// would overflow int. This guards against corrupt or adversarial schema metadata
10491061
// on 32-bit platforms where int is 32 bits.
@@ -1072,10 +1084,12 @@ func marshalVectorFloat32(dim int, vec []float32) ([]byte, error) {
10721084
return nil, marshalErrorf("%v", err)
10731085
}
10741086
buf := getVectorBuf(size)
1075-
off := 0
1076-
for _, v := range vec {
1077-
binary.BigEndian.PutUint32(buf[off:off+4], math.Float32bits(v))
1078-
off += 4
1087+
if dim == 0 {
1088+
return buf, nil
1089+
}
1090+
_ = buf[dim*4-1] // BCE hint
1091+
for i, v := range vec {
1092+
binary.BigEndian.PutUint32(buf[i*4:i*4+4], math.Float32bits(v))
10791093
}
10801094
return buf, nil
10811095
}
@@ -1097,10 +1111,12 @@ func marshalVectorFloat64(dim int, vec []float64) ([]byte, error) {
10971111
return nil, marshalErrorf("%v", err)
10981112
}
10991113
buf := getVectorBuf(size)
1100-
off := 0
1101-
for _, v := range vec {
1102-
binary.BigEndian.PutUint64(buf[off:off+8], math.Float64bits(v))
1103-
off += 8
1114+
if dim == 0 {
1115+
return buf, nil
1116+
}
1117+
_ = buf[dim*8-1] // BCE hint
1118+
for i, v := range vec {
1119+
binary.BigEndian.PutUint64(buf[i*8:i*8+8], math.Float64bits(v))
11041120
}
11051121
return buf, nil
11061122
}
@@ -1136,9 +1152,9 @@ func unmarshalVectorFloat32(dim int, data []byte, dst *[]float32) error {
11361152
} else {
11371153
vec = make([]float32, dim)
11381154
}
1155+
_ = data[dim*4-1] // BCE hint: compiler can prove data[i*4:i*4+4] is in-bounds
11391156
for i := range vec {
1140-
vec[i] = math.Float32frombits(binary.BigEndian.Uint32(data[:4]))
1141-
data = data[4:]
1157+
vec[i] = math.Float32frombits(binary.BigEndian.Uint32(data[i*4 : i*4+4]))
11421158
}
11431159
*dst = vec
11441160
return nil
@@ -1175,9 +1191,9 @@ func unmarshalVectorFloat64(dim int, data []byte, dst *[]float64) error {
11751191
} else {
11761192
vec = make([]float64, dim)
11771193
}
1194+
_ = data[dim*8-1] // BCE hint: compiler can prove data[i*8:i*8+8] is in-bounds
11781195
for i := range vec {
1179-
vec[i] = math.Float64frombits(binary.BigEndian.Uint64(data[:8]))
1180-
data = data[8:]
1196+
vec[i] = math.Float64frombits(binary.BigEndian.Uint64(data[i*8 : i*8+8]))
11811197
}
11821198
*dst = vec
11831199
return nil
@@ -1200,10 +1216,12 @@ func marshalVectorInt32(dim int, vec []int32) ([]byte, error) {
12001216
return nil, marshalErrorf("%v", err)
12011217
}
12021218
buf := getVectorBuf(size)
1203-
off := 0
1204-
for _, v := range vec {
1205-
binary.BigEndian.PutUint32(buf[off:off+4], uint32(v))
1206-
off += 4
1219+
if dim == 0 {
1220+
return buf, nil
1221+
}
1222+
_ = buf[dim*4-1] // BCE hint
1223+
for i, v := range vec {
1224+
binary.BigEndian.PutUint32(buf[i*4:i*4+4], uint32(v))
12071225
}
12081226
return buf, nil
12091227
}
@@ -1225,10 +1243,12 @@ func marshalVectorInt64(dim int, vec []int64) ([]byte, error) {
12251243
return nil, marshalErrorf("%v", err)
12261244
}
12271245
buf := getVectorBuf(size)
1228-
off := 0
1229-
for _, v := range vec {
1230-
binary.BigEndian.PutUint64(buf[off:off+8], uint64(v))
1231-
off += 8
1246+
if dim == 0 {
1247+
return buf, nil
1248+
}
1249+
_ = buf[dim*8-1] // BCE hint
1250+
for i, v := range vec {
1251+
binary.BigEndian.PutUint64(buf[i*8:i*8+8], uint64(v))
12321252
}
12331253
return buf, nil
12341254
}
@@ -1264,9 +1284,9 @@ func unmarshalVectorInt32(dim int, data []byte, dst *[]int32) error {
12641284
} else {
12651285
vec = make([]int32, dim)
12661286
}
1287+
_ = data[dim*4-1] // BCE hint: compiler can prove data[i*4:i*4+4] is in-bounds
12671288
for i := range vec {
1268-
vec[i] = int32(binary.BigEndian.Uint32(data[:4]))
1269-
data = data[4:]
1289+
vec[i] = int32(binary.BigEndian.Uint32(data[i*4 : i*4+4]))
12701290
}
12711291
*dst = vec
12721292
return nil
@@ -1303,9 +1323,9 @@ func unmarshalVectorInt64(dim int, data []byte, dst *[]int64) error {
13031323
} else {
13041324
vec = make([]int64, dim)
13051325
}
1326+
_ = data[dim*8-1] // BCE hint: compiler can prove data[i*8:i*8+8] is in-bounds
13061327
for i := range vec {
1307-
vec[i] = int64(binary.BigEndian.Uint64(data[:8]))
1308-
data = data[8:]
1328+
vec[i] = int64(binary.BigEndian.Uint64(data[i*8 : i*8+8]))
13091329
}
13101330
*dst = vec
13111331
return nil

marshal_vector_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,3 +1491,153 @@ func TestGetVectorBuf_NonPositiveSize(t *testing.T) {
14911491
}
14921492
}
14931493
}
1494+
1495+
// TestVectorBufPoolSubtype verifies that vectorBufPoolSubtype correctly
1496+
// identifies which vector subtypes use the pooled fast path.
1497+
func TestVectorBufPoolSubtype(t *testing.T) {
1498+
tests := []struct {
1499+
name string
1500+
vt VectorType
1501+
expect bool
1502+
}{
1503+
{"float32", makeFloat32VectorType(3), true},
1504+
{"float64", makeFloat64VectorType(3), true},
1505+
{"int32", makeInt32VectorType(3), true},
1506+
{"int64", makeInt64VectorType(3), true},
1507+
{"uuid", makeUUIDVectorType(3), false},
1508+
{"varchar", VectorType{
1509+
SubType: NativeType{proto: protoVersion4, typ: TypeVarchar},
1510+
Dimensions: 3,
1511+
}, false},
1512+
{"boolean", VectorType{
1513+
SubType: NativeType{proto: protoVersion4, typ: TypeBoolean},
1514+
Dimensions: 3,
1515+
}, false},
1516+
{"timestamp", VectorType{
1517+
SubType: NativeType{proto: protoVersion4, typ: TypeTimestamp},
1518+
Dimensions: 3,
1519+
}, false},
1520+
}
1521+
for _, tt := range tests {
1522+
t.Run(tt.name, func(t *testing.T) {
1523+
got := vectorBufPoolSubtype(tt.vt)
1524+
if got != tt.expect {
1525+
t.Errorf("vectorBufPoolSubtype(%s) = %v, want %v", tt.name, got, tt.expect)
1526+
}
1527+
})
1528+
}
1529+
}
1530+
1531+
// TestVectorBufPoolReturnSimulation simulates the buffer lifecycle used in
1532+
// executeQuery and executeBatch: marshal a vector value via marshalQueryValue,
1533+
// then return the buffer to the pool via putVectorBuf. Verifies that the
1534+
// returned buffer is reused by a subsequent getVectorBuf call.
1535+
func TestVectorBufPoolReturnSimulation(t *testing.T) {
1536+
const dim = 128
1537+
vt := makeFloat32VectorType(dim)
1538+
1539+
// Create test data.
1540+
vec := make([]float32, dim)
1541+
for i := range vec {
1542+
vec[i] = float32(i)
1543+
}
1544+
1545+
// Marshal like marshalQueryValue does.
1546+
data, err := Marshal(vt, vec)
1547+
if err != nil {
1548+
t.Fatalf("Marshal: %v", err)
1549+
}
1550+
expectedSize := dim * 4
1551+
if len(data) != expectedSize {
1552+
t.Fatalf("marshalled size = %d, want %d", len(data), expectedSize)
1553+
}
1554+
1555+
// Remember the backing array pointer before returning to pool.
1556+
origPtr := &data[:1][0]
1557+
1558+
// Simulate what the defer in executeQuery/executeBatch does.
1559+
if vectorBufPoolSubtype(vt) {
1560+
putVectorBuf(data)
1561+
}
1562+
1563+
// Get a buffer of the same size — should reuse the pooled one.
1564+
reused := getVectorBuf(expectedSize)
1565+
if len(reused) != expectedSize {
1566+
t.Fatalf("getVectorBuf(%d) returned len %d", expectedSize, len(reused))
1567+
}
1568+
reusedPtr := &reused[:1][0]
1569+
if origPtr != reusedPtr {
1570+
t.Error("expected pooled buffer to be reused, but got a different allocation")
1571+
}
1572+
1573+
// Clean up.
1574+
putVectorBuf(reused)
1575+
}
1576+
1577+
// TestVectorBufPoolReturnNonPooledType verifies that putVectorBuf is a no-op
1578+
// for vector types that don't use the fast path (e.g. UUID vectors), and
1579+
// that vectorBufPoolSubtype correctly excludes them.
1580+
func TestVectorBufPoolReturnNonPooledType(t *testing.T) {
1581+
const dim = 4
1582+
vt := makeUUIDVectorType(dim)
1583+
1584+
if vectorBufPoolSubtype(vt) {
1585+
t.Fatal("vectorBufPoolSubtype should be false for UUID vectors")
1586+
}
1587+
1588+
// The UUID marshal path does not use getVectorBuf, so putting its
1589+
// result back should be skipped by the type check. Verify no panic.
1590+
data, err := Marshal(vt, []UUID{
1591+
{0x01}, {0x02}, {0x03}, {0x04},
1592+
})
1593+
if err != nil {
1594+
t.Fatalf("Marshal UUID vector: %v", err)
1595+
}
1596+
// Even if we mistakenly call putVectorBuf, it should not panic.
1597+
putVectorBuf(data)
1598+
}
1599+
1600+
// TestVectorBufPoolBatchSimulation simulates the batch buffer lifecycle:
1601+
// multiple statements with vector columns get their buffers collected and
1602+
// returned after the framer copies them.
1603+
func TestVectorBufPoolBatchSimulation(t *testing.T) {
1604+
const dim = 64
1605+
1606+
types := []struct {
1607+
vt VectorType
1608+
val interface{}
1609+
}{
1610+
{makeFloat32VectorType(dim), make([]float32, dim)},
1611+
{makeFloat64VectorType(dim), make([]float64, dim)},
1612+
{makeInt32VectorType(dim), make([]int32, dim)},
1613+
{makeInt64VectorType(dim), make([]int64, dim)},
1614+
}
1615+
1616+
// Simulate batch: marshal all, collect buffers.
1617+
var vectorBufs [][]byte
1618+
for _, tt := range types {
1619+
data, err := Marshal(tt.vt, tt.val)
1620+
if err != nil {
1621+
t.Fatalf("Marshal %v: %v", tt.vt.SubType.Type(), err)
1622+
}
1623+
if vectorBufPoolSubtype(tt.vt) {
1624+
vectorBufs = append(vectorBufs, data)
1625+
}
1626+
}
1627+
1628+
if len(vectorBufs) != 4 {
1629+
t.Fatalf("expected 4 pooled buffers, got %d", len(vectorBufs))
1630+
}
1631+
1632+
// Return all to pool (like the defer in executeBatch).
1633+
for _, buf := range vectorBufs {
1634+
putVectorBuf(buf)
1635+
}
1636+
1637+
// Verify at least one can be reused.
1638+
reused := getVectorBuf(dim * 4) // float32 size
1639+
if reused == nil {
1640+
t.Fatal("expected to get a buffer from pool after returning batch buffers")
1641+
}
1642+
putVectorBuf(reused)
1643+
}

0 commit comments

Comments
 (0)