Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,29 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
}
}

// Return pooled vector buffers after the framer has copied the
// marshalled bytes (which happens inside c.exec → buildFrame).
// Only install the defer when at least one column uses a poolable
// vector buffer to avoid ~50ns defer overhead on non-vector queries.
cols := info.request.columns
vals := params.values
hasPooledVec := false
for _, col := range cols {
if vt, ok := col.TypeInfo.(VectorType); ok && vectorBufPoolSubtype(vt) {
hasPooledVec = true
break
}
}
if hasPooledVec {
defer func() {
for i, col := range cols {
if vt, ok := col.TypeInfo.(VectorType); ok && vectorBufPoolSubtype(vt) {
putVectorBuf(vals[i].value)
}
}
}()
}

// if the metadata was not present in the response then we should not skip it
params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) && len(info.response.columns) != 0

Expand Down Expand Up @@ -1866,6 +1889,10 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {

hasLwtEntries := false

// vectorBufs collects marshalled byte slices from vector fast paths
// so they can be returned to vectorBufPool after the framer copies them.
var vectorBufs [][]byte

for i := 0; i < n; i++ {
entry := &batch.Entries[i]
b := &req.statements[i]
Expand Down Expand Up @@ -1907,6 +1934,9 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
if err := marshalQueryValue(typ, value, v); err != nil {
return &Iter{err: err}
}
if vt, ok := typ.(VectorType); ok && vectorBufPoolSubtype(vt) {
vectorBufs = append(vectorBufs, v.value)
}
}

if !hasLwtEntries && info.request.lwt {
Expand All @@ -1917,6 +1947,16 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
}
}

// Return pooled vector buffers after the framer has copied the
// marshalled bytes (which happens inside c.exec → buildFrame).
if len(vectorBufs) > 0 {
defer func() {
for _, buf := range vectorBufs {
putVectorBuf(buf)
}
}()
}

// The batch is considered to be conditional if even one of the
// statements is conditional.
batch.routingInfo.mu.Lock()
Expand Down
37 changes: 37 additions & 0 deletions helpers_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,40 @@ func BenchmarkRowDataAllocation(b *testing.B) {
})
}
}

// BenchmarkRowDataWithVector measures RowData() performance when the schema
// includes a vector column (common in AI/ML embedding tables). This exercises
// the VectorType.NewWithError() fast path added to avoid the expensive
// goType() → asVectorType() re-parse on every call.
func BenchmarkRowDataWithVector(b *testing.B) {
columns := []ColumnInfo{
{Name: "id", TypeInfo: NativeType{typ: TypeInt, proto: protoVersion4}},
{Name: "embedding", TypeInfo: VectorType{
NativeType: NativeType{proto: protoVersion4, typ: TypeCustom,
custom: apacheCassandraTypePrefix + "VectorType(" + apacheCassandraTypePrefix + "FloatType, 1536)"},
SubType: NativeType{proto: protoVersion4, typ: TypeFloat},
Dimensions: 1536,
}},
{Name: "label", TypeInfo: NativeType{typ: TypeVarchar, proto: protoVersion4}},
}

iter := &Iter{
meta: resultMetadata{
columns: columns,
colCount: len(columns),
actualColCount: len(columns),
},
numRows: 1,
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
rd, err := iter.RowData()
if err != nil {
b.Fatal(err)
}
_ = rd
}
}
Loading
Loading