Skip to content

Commit 1aee94e

Browse files
committed
Address PR nats-io#7871 review feedback: fix batch compression bugs
Apply feedback from Claude and Codex reviewers: 1. Fix critical bug: compressed length comparison `len(ebuf) < len(buf)` did not account for batch prefix offset. For batched messages where opIndex > 0, the comparison must be against the payload window size `len(buf)-opIndex-1` to avoid data truncation or corruption. 2. Fix heap escape in putCompressBuf: taking `&buf` of a slice parameter causes the slice header to escape to heap on every call. Instead, return the pool pointer from getCompressBuf and pass it back to putCompressBuf directly. 3. Remove dead code: `nbuf[0] = byte(compressedStreamMsgOp)` was never used since the op byte is written into buf, not nbuf. https://claude.ai/code/session_01LU1gqVBKHyaQdkJ6K9Ur35
1 parent fb8f1ae commit 1aee94e

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

server/jetstream_cluster.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9397,19 +9397,20 @@ var compressBufPool = sync.Pool{
93979397
},
93989398
}
93999399

9400-
func getCompressBuf(sz int) []byte {
9400+
func getCompressBuf(sz int) (*[]byte, []byte) {
94019401
bp := compressBufPool.Get().(*[]byte)
94029402
buf := *bp
94039403
if cap(buf) >= sz {
9404-
return buf[:sz]
9404+
return bp, buf[:sz]
94059405
}
94069406
// Return undersized buffer to pool, allocate a new one.
94079407
compressBufPool.Put(bp)
9408-
return make([]byte, sz)
9408+
buf = make([]byte, sz)
9409+
return &buf, buf
94099410
}
94109411

9411-
func putCompressBuf(buf []byte) {
9412-
compressBufPool.Put(&buf)
9412+
func putCompressBuf(bp *[]byte) {
9413+
compressBufPool.Put(bp)
94139414
}
94149415

94159416
// If allowed and contents over the threshold we will compress.
@@ -9468,17 +9469,18 @@ func encodeStreamMsgAllowCompressAndBatch(subject, reply string, hdr, msg []byte
94689469

94699470
// Check if we should compress.
94709471
if shouldCompress {
9471-
nbuf := getCompressBuf(s2.MaxEncodedLen(elen))
9472-
nbuf[0] = byte(compressedStreamMsgOp)
9473-
ebuf := s2.Encode(nbuf[1:], buf[opIndex+1:])
9472+
bp, nbuf := getCompressBuf(s2.MaxEncodedLen(elen))
9473+
ebuf := s2.Encode(nbuf, buf[opIndex+1:])
94749474
// Only pay the cost of decode on the other side if we compressed.
94759475
// S2 will allow us to try without major penalty for non-compressable data.
9476-
if len(ebuf) < len(buf) {
9476+
// Compare against the payload window (excluding batch prefix) to avoid
9477+
// truncation or corruption when opIndex > 0 for batched messages.
9478+
if len(ebuf) < len(buf)-opIndex-1 {
94779479
buf[opIndex] = byte(compressedStreamMsgOp)
94789480
copy(buf[opIndex+1:], ebuf)
94799481
buf = buf[:len(ebuf)+opIndex+1]
94809482
}
9481-
putCompressBuf(nbuf)
9483+
putCompressBuf(bp)
94829484
}
94839485

94849486
return buf

0 commit comments

Comments
 (0)