@@ -38,11 +38,14 @@ var (
3838 pageResultPool = sync.Pool {New : func () any { return & pageResult {} }}
3939)
4040
41- // returnIfNotNil returns item to pool if it's not nil (helper for deferred cleanup)
42- func returnIfNotNil (item * pageWorkItem ) {
43- if item ! = nil {
44- pageWorkItemPool . Put ( item )
41+ func getPageWorkItem () * pageWorkItem { return pageWorkItemPool . Get ().( * pageWorkItem ) }
42+ func putPageWorkItem (item * pageWorkItem ) {
43+ if item = = nil {
44+ return
4545 }
46+ item .seq = 0
47+ item .uncompressedData = item .uncompressedData [:0 ]
48+ pageWorkItemPool .Put (item )
4649}
4750
4851func getPageResult () * pageResult { return pageResultPool .Get ().(* pageResult ) }
@@ -330,7 +333,7 @@ func (c *PagedWriter) initWorkers() {
330333
331334func (c * PagedWriter ) compressionWorker (ctx context.Context ) error {
332335 processItem := func (item * pageWorkItem ) {
333- defer pageWorkItemPool . Put (item )
336+ defer putPageWorkItem (item )
334337
335338 result := getPageResult ()
336339 result .seq = item .seq
@@ -433,11 +436,11 @@ func (c *PagedWriter) writePage() error {
433436 }
434437
435438 // Async path with parallel workers
436- item := pageWorkItemPool . Get ().( * pageWorkItem )
439+ item := getPageWorkItem ( )
437440 sent := false
438441 defer func () {
439442 if ! sent {
440- pageWorkItemPool . Put (item )
443+ putPageWorkItem (item )
441444 }
442445 }()
443446
@@ -514,6 +517,9 @@ func (c *PagedWriter) Flush() error {
514517 }
515518 defer func () {
516519 c .eg .Wait () //nolint:errcheck // ensure all worker goroutines have exited, even on error
520+ for _ , r := range c .pendingResults {
521+ putPageResult (r )
522+ }
517523 c .resetPage ()
518524 }()
519525
0 commit comments