Skip to content

Commit

Permalink
Use sync.Pool to cache zstd ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie committed Feb 1, 2024
1 parent 0bea7cd commit 051ae5e
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions pulsar/internal/compression/zstd_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@
package compression

import (
"sync"

"github.com/DataDog/zstd"
log "github.com/sirupsen/logrus"
)

type zstdCGoProvider struct {
ctx zstd.Ctx
ctxPool sync.Pool
level Level
zstdLevel int
}

func newCGoZStdProvider(level Level) Provider {
z := &zstdCGoProvider{
ctx: zstd.NewCtx(),
ctxPool: sync.Pool{New: func() any {
return zstd.NewCtx()
}},
}

switch level {
Expand All @@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int {
}

func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel)
ctx := z.ctxPool.Get().(zstd.Ctx)
defer z.ctxPool.Put(ctx)
out, err := ctx.CompressLevel(dst, src, z.zstdLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}
Expand All @@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
}

func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
return z.ctx.Decompress(dst, src)
ctx := z.ctxPool.Get().(zstd.Ctx)
defer z.ctxPool.Put(ctx)
return ctx.Decompress(dst, src)
}

func (z *zstdCGoProvider) Close() error {
Expand Down

0 comments on commit 051ae5e

Please sign in to comment.