Skip to content

Commit fb8ca00

Browse files
committed
save
1 parent 3664a59 commit fb8ca00

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

db/seg/seg_paged_rw.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -322,30 +322,32 @@ func (c *PagedWriter) initWorkers() {
322322

323323
func (c *PagedWriter) compressionWorker() {
324324
defer c.wg.Done()
325+
326+
processItem := func(item *pageWorkItem) {
327+
defer pageWorkItemPool.Put(item)
328+
329+
result := getPageResult()
330+
result.seq = item.seq
331+
result.err = nil
332+
333+
// Compress directly into result.data (no extra copy, each result owns its buffer)
334+
result.data, _ = compress.EncodeZstdIfNeed(result.data[:0], item.uncompressedData, c.compressionEnabled)
335+
336+
// Send result, respecting context cancellation
337+
select {
338+
case c.resultCh <- result:
339+
case <-c.ctx.Done():
340+
putPageResult(result) // return result to pool if context cancelled
341+
}
342+
}
343+
325344
for {
326345
select {
327346
case item, ok := <-c.workCh:
328347
if !ok {
329348
return // channel closed
330349
}
331-
// Ensure work item is returned to pool even if compression panics
332-
func() {
333-
defer pageWorkItemPool.Put(item)
334-
335-
result := getPageResult()
336-
result.seq = item.seq
337-
result.err = nil
338-
339-
// Compress directly into result.data (no extra copy, each result owns its buffer)
340-
result.data, _ = compress.EncodeZstdIfNeed(result.data[:0], item.uncompressedData, c.compressionEnabled)
341-
342-
// Send result, respecting context cancellation
343-
select {
344-
case c.resultCh <- result:
345-
case <-c.ctx.Done():
346-
putPageResult(result) // return result to pool if context cancelled
347-
}
348-
}()
350+
processItem(item)
349351

350352
case <-c.ctx.Done():
351353
return // context cancelled

0 commit comments

Comments
 (0)