Skip to content

Commit 432f624

Browse files
committed
perf: wire putVectorBuf into connection write path
Return pooled vector buffers to vectorBufPool after the framer copies marshalled bytes in executeQuery and executeBatch. This completes the zero-alloc steady-state cycle for vector marshal operations. In executeQuery, a defer after the marshal loop returns buffers for columns identified as pooled vector types (float32, float64, int32, int64). In executeBatch, vector buffers are collected across all batch statements and returned via a single defer. The vectorBufPoolSubtype helper centralizes the type check to keep the two call sites consistent with the marshal fast paths. Includes unit tests covering vectorBufPoolSubtype classification, single-query and batch pool return simulation, and non-pooled type safety.
1 parent c5fb0b2 commit 432f624

3 files changed

Lines changed: 195 additions & 3 deletions

File tree

conn.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,6 +1574,19 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
15741574
}
15751575
}
15761576

1577+
// Return pooled vector buffers after the framer has copied the
1578+
// marshalled bytes (which happens inside c.exec → buildFrame).
1579+
// The defer runs on every exit path of executeQuery.
1580+
cols := info.request.columns
1581+
vals := params.values
1582+
defer func() {
1583+
for i, col := range cols {
1584+
if vt, ok := col.TypeInfo.(VectorType); ok && vectorBufPoolSubtype(vt) {
1585+
putVectorBuf(vals[i].value)
1586+
}
1587+
}
1588+
}()
1589+
15771590
// if the metadata was not present in the response then we should not skip it
15781591
params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) && len(info.response.columns) != 0
15791592

@@ -1761,6 +1774,10 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
17611774

17621775
hasLwtEntries := false
17631776

1777+
// vectorBufs collects marshalled byte slices from vector fast paths
1778+
// so they can be returned to vectorBufPool after the framer copies them.
1779+
var vectorBufs [][]byte
1780+
17641781
for i := 0; i < n; i++ {
17651782
entry := &batch.Entries[i]
17661783
b := &req.statements[i]
@@ -1802,6 +1819,9 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
18021819
if err := marshalQueryValue(typ, value, v); err != nil {
18031820
return &Iter{err: err}
18041821
}
1822+
if vt, ok := typ.(VectorType); ok && vectorBufPoolSubtype(vt) {
1823+
vectorBufs = append(vectorBufs, v.value)
1824+
}
18051825
}
18061826

18071827
if !hasLwtEntries && info.request.lwt {
@@ -1812,6 +1832,16 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
18121832
}
18131833
}
18141834

1835+
// Return pooled vector buffers after the framer has copied the
1836+
// marshalled bytes (which happens inside c.exec → buildFrame).
1837+
if len(vectorBufs) > 0 {
1838+
defer func() {
1839+
for _, buf := range vectorBufs {
1840+
putVectorBuf(buf)
1841+
}
1842+
}()
1843+
}
1844+
18151845
// The batch is considered to be conditional if even one of the
18161846
// statements is conditional.
18171847
batch.routingInfo.mu.Lock()

marshal.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,9 +1013,10 @@ func unmarshalVector(info VectorType, data []byte, value interface{}) error {
10131013
// vectorBufPool pools []byte buffers used by vector marshal fast paths.
10141014
// Buffers are returned to the pool by putVectorBuf after the framer copies them.
10151015
//
1016-
// NOTE: putVectorBuf is currently exercised only by benchmarks/tests.
1017-
// Wiring it into the connection write path (so production callers return
1018-
// buffers after the framer copies them) is planned for a follow-up change.
1016+
// vectorBufPool is used by the marshal fast paths (marshalVectorFloat32 etc.)
1017+
// to allocate temporary byte buffers. After the framer copies the marshalled
1018+
// bytes via writeBytes (append into framer.buf), the caller returns the buffer
1019+
// to the pool via putVectorBuf.
10191020
var vectorBufPool = sync.Pool{}
10201021

10211022
func getVectorBuf(size int) []byte {
@@ -1044,6 +1045,17 @@ func putVectorBuf(buf []byte) {
10441045
vectorBufPool.Put(buf) //nolint:staticcheck // SA6002: []byte is a value type; boxing cost is acceptable for pool reuse
10451046
}
10461047

1048+
// vectorBufPoolSubtype returns true if the given VectorType's SubType uses a
1049+
// marshal fast path that allocates from vectorBufPool via getVectorBuf.
1050+
func vectorBufPoolSubtype(vt VectorType) bool {
1051+
switch vt.SubType.Type() {
1052+
case TypeFloat, TypeDouble, TypeInt, TypeBigInt:
1053+
return true
1054+
default:
1055+
return false
1056+
}
1057+
}
1058+
10471059
// vectorByteSize computes dim * elemBytes and returns an error if the result
10481060
// would overflow int. This guards against corrupt or adversarial schema metadata
10491061
// on 32-bit platforms where int is 32 bits.

marshal_vector_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,3 +1491,153 @@ func TestGetVectorBuf_NonPositiveSize(t *testing.T) {
14911491
}
14921492
}
14931493
}
1494+
1495+
// TestVectorBufPoolSubtype verifies that vectorBufPoolSubtype correctly
1496+
// identifies which vector subtypes use the pooled fast path.
1497+
func TestVectorBufPoolSubtype(t *testing.T) {
1498+
tests := []struct {
1499+
name string
1500+
vt VectorType
1501+
expect bool
1502+
}{
1503+
{"float32", makeFloat32VectorType(3), true},
1504+
{"float64", makeFloat64VectorType(3), true},
1505+
{"int32", makeInt32VectorType(3), true},
1506+
{"int64", makeInt64VectorType(3), true},
1507+
{"uuid", makeUUIDVectorType(3), false},
1508+
{"varchar", VectorType{
1509+
SubType: NativeType{proto: protoVersion4, typ: TypeVarchar},
1510+
Dimensions: 3,
1511+
}, false},
1512+
{"boolean", VectorType{
1513+
SubType: NativeType{proto: protoVersion4, typ: TypeBoolean},
1514+
Dimensions: 3,
1515+
}, false},
1516+
{"timestamp", VectorType{
1517+
SubType: NativeType{proto: protoVersion4, typ: TypeTimestamp},
1518+
Dimensions: 3,
1519+
}, false},
1520+
}
1521+
for _, tt := range tests {
1522+
t.Run(tt.name, func(t *testing.T) {
1523+
got := vectorBufPoolSubtype(tt.vt)
1524+
if got != tt.expect {
1525+
t.Errorf("vectorBufPoolSubtype(%s) = %v, want %v", tt.name, got, tt.expect)
1526+
}
1527+
})
1528+
}
1529+
}
1530+
1531+
// TestVectorBufPoolReturnSimulation simulates the buffer lifecycle used in
1532+
// executeQuery and executeBatch: marshal a vector value via marshalQueryValue,
1533+
// then return the buffer to the pool via putVectorBuf. Verifies that the
1534+
// returned buffer is reused by a subsequent getVectorBuf call.
1535+
func TestVectorBufPoolReturnSimulation(t *testing.T) {
1536+
const dim = 128
1537+
vt := makeFloat32VectorType(dim)
1538+
1539+
// Create test data.
1540+
vec := make([]float32, dim)
1541+
for i := range vec {
1542+
vec[i] = float32(i)
1543+
}
1544+
1545+
// Marshal like marshalQueryValue does.
1546+
data, err := Marshal(vt, vec)
1547+
if err != nil {
1548+
t.Fatalf("Marshal: %v", err)
1549+
}
1550+
expectedSize := dim * 4
1551+
if len(data) != expectedSize {
1552+
t.Fatalf("marshalled size = %d, want %d", len(data), expectedSize)
1553+
}
1554+
1555+
// Remember the backing array pointer before returning to pool.
1556+
origPtr := &data[:1][0]
1557+
1558+
// Simulate what the defer in executeQuery/executeBatch does.
1559+
if vectorBufPoolSubtype(vt) {
1560+
putVectorBuf(data)
1561+
}
1562+
1563+
// Get a buffer of the same size — should reuse the pooled one.
1564+
reused := getVectorBuf(expectedSize)
1565+
if len(reused) != expectedSize {
1566+
t.Fatalf("getVectorBuf(%d) returned len %d", expectedSize, len(reused))
1567+
}
1568+
reusedPtr := &reused[:1][0]
1569+
if origPtr != reusedPtr {
1570+
t.Error("expected pooled buffer to be reused, but got a different allocation")
1571+
}
1572+
1573+
// Clean up.
1574+
putVectorBuf(reused)
1575+
}
1576+
1577+
// TestVectorBufPoolReturnNonPooledType verifies that putVectorBuf is a no-op
1578+
// for vector types that don't use the fast path (e.g. UUID vectors), and
1579+
// that vectorBufPoolSubtype correctly excludes them.
1580+
func TestVectorBufPoolReturnNonPooledType(t *testing.T) {
1581+
const dim = 4
1582+
vt := makeUUIDVectorType(dim)
1583+
1584+
if vectorBufPoolSubtype(vt) {
1585+
t.Fatal("vectorBufPoolSubtype should be false for UUID vectors")
1586+
}
1587+
1588+
// The UUID marshal path does not use getVectorBuf, so putting its
1589+
// result back should be skipped by the type check. Verify no panic.
1590+
data, err := Marshal(vt, []UUID{
1591+
{0x01}, {0x02}, {0x03}, {0x04},
1592+
})
1593+
if err != nil {
1594+
t.Fatalf("Marshal UUID vector: %v", err)
1595+
}
1596+
// Even if we mistakenly call putVectorBuf, it should not panic.
1597+
putVectorBuf(data)
1598+
}
1599+
1600+
// TestVectorBufPoolBatchSimulation simulates the batch buffer lifecycle:
1601+
// multiple statements with vector columns get their buffers collected and
1602+
// returned after the framer copies them.
1603+
func TestVectorBufPoolBatchSimulation(t *testing.T) {
1604+
const dim = 64
1605+
1606+
types := []struct {
1607+
vt VectorType
1608+
val interface{}
1609+
}{
1610+
{makeFloat32VectorType(dim), make([]float32, dim)},
1611+
{makeFloat64VectorType(dim), make([]float64, dim)},
1612+
{makeInt32VectorType(dim), make([]int32, dim)},
1613+
{makeInt64VectorType(dim), make([]int64, dim)},
1614+
}
1615+
1616+
// Simulate batch: marshal all, collect buffers.
1617+
var vectorBufs [][]byte
1618+
for _, tt := range types {
1619+
data, err := Marshal(tt.vt, tt.val)
1620+
if err != nil {
1621+
t.Fatalf("Marshal %v: %v", tt.vt.SubType.Type(), err)
1622+
}
1623+
if vectorBufPoolSubtype(tt.vt) {
1624+
vectorBufs = append(vectorBufs, data)
1625+
}
1626+
}
1627+
1628+
if len(vectorBufs) != 4 {
1629+
t.Fatalf("expected 4 pooled buffers, got %d", len(vectorBufs))
1630+
}
1631+
1632+
// Return all to pool (like the defer in executeBatch).
1633+
for _, buf := range vectorBufs {
1634+
putVectorBuf(buf)
1635+
}
1636+
1637+
// Verify at least one can be reused.
1638+
reused := getVectorBuf(dim * 4) // float32 size
1639+
if reused == nil {
1640+
t.Fatal("expected to get a buffer from pool after returning batch buffers")
1641+
}
1642+
putVectorBuf(reused)
1643+
}

0 commit comments

Comments
 (0)