Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type ItemSketchHasher[C comparable] interface {
Hash(item C) uint64
}

type ItemSketchSerde[C comparable] interface {
SizeOf(item C) int
type ItemSketchSerde[T any] interface {
SizeOf(item T) int
SizeOfMany(mem []byte, offsetBytes int, numItems int) (int, error)
SerializeManyToSlice(items []C) []byte
SerializeOneToSlice(item C) []byte
DeserializeManyFromSlice(mem []byte, offsetBytes int, numItems int) ([]C, error)
SerializeManyToSlice(items []T) []byte
SerializeOneToSlice(item T) []byte
DeserializeManyFromSlice(mem []byte, offsetBytes int, numItems int) ([]T, error)
}
14 changes: 7 additions & 7 deletions sampling/reservoir_items_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ func (s *ReservoirItemsSketch[T]) forceIncrementItemsSeen(delta int64) error {

// Serialization constants
const (
preambleIntsEmpty = 1
serVer = 2
flagEmpty = 0x04
resizeFactorMask = 0xC0
preambleIntsEmpty = 1
reservoirItemsSketchSerialVersion = 2
flagEmpty = 0x04
resizeFactorMask = 0xC0
)

func resizeFactorBitsFor(rf ResizeFactor) (byte, error) {
Expand Down Expand Up @@ -358,7 +358,7 @@ func (s *ReservoirItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
if s.isEmpty() {
buf := make([]byte, 8)
buf[0] = rfBits | preambleIntsEmpty
buf[1] = serVer
buf[1] = reservoirItemsSketchSerialVersion
buf[2] = byte(internal.FamilyEnum.ReservoirItems.Id)
buf[3] = flagEmpty
binary.LittleEndian.PutUint32(buf[4:], uint32(s.k))
Expand All @@ -375,7 +375,7 @@ func (s *ReservoirItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
buf := make([]byte, preBytes+len(itemsBytes))

buf[0] = rfBits | byte(preLongs)
buf[1] = serVer
buf[1] = reservoirItemsSketchSerialVersion
buf[2] = byte(internal.FamilyEnum.ReservoirItems.Id)
buf[3] = 0
binary.LittleEndian.PutUint32(buf[4:], uint32(s.k))
Expand Down Expand Up @@ -445,7 +445,7 @@ func NewReservoirItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) (

k := int(binary.LittleEndian.Uint32(data[4:]))

if ver != serVer {
if ver != reservoirItemsSketchSerialVersion {
if ver == 1 {
encK := binary.LittleEndian.Uint16(data[4:])
decodedK, err := decodeReservoirSize(encK)
Expand Down
4 changes: 2 additions & 2 deletions sampling/reservoir_items_sketch_serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestReservoirItemsSketchDeserializationErrors(t *testing.T) {
t.Run("BadFamily", func(t *testing.T) {
data := make([]byte, 8)
data[0] = 0xC0 | preambleIntsEmpty
data[1] = serVer
data[1] = reservoirItemsSketchSerialVersion
data[2] = 99 // invalid family ID
data[3] = flagEmpty
binary.LittleEndian.PutUint32(data[4:], 100)
Expand All @@ -388,7 +388,7 @@ func TestReservoirItemsSketchDeserializationErrors(t *testing.T) {
t.Run("BadPreLongs", func(t *testing.T) {
data := make([]byte, 8)
data[0] = 0xC0 | 5 // invalid preamble longs
data[1] = serVer
data[1] = reservoirItemsSketchSerialVersion
data[2] = byte(internal.FamilyEnum.ReservoirItems.Id)
data[3] = flagEmpty
binary.LittleEndian.PutUint32(data[4:], 100)
Expand Down
67 changes: 67 additions & 0 deletions sampling/varopt_items_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"slices"
"strings"

"github.com/apache/datasketches-go/common"
"github.com/apache/datasketches-go/internal"
)

Expand Down Expand Up @@ -81,12 +82,15 @@ type varOptConfig struct {
resizeFactor ResizeFactor
}

// WithResizeFactor sets the resize factor in the VarOpt configuration.
func WithResizeFactor(rf ResizeFactor) VarOptOption {
return func(c *varOptConfig) {
c.resizeFactor = rf
}
}

// NewVarOptItemsSketch creates a new VarOptItemsSketch with a specified maximum capacity `k` and optional configurations.
// It returns an error if `k` is less than 1 or exceeds the maximum allowed value (2^31 - 2).
func NewVarOptItemsSketch[T any](k uint, opts ...VarOptOption) (*VarOptItemsSketch[T], error) {
if k < 1 || k > varOptMaxK {
return nil, errors.New("k must be at least 1 and less than 2^31 - 1")
Expand Down Expand Up @@ -120,6 +124,42 @@ func NewVarOptItemsSketch[T any](k uint, opts ...VarOptOption) (*VarOptItemsSket
}, nil
}

func newVarOptItemsSketchFromState[T any](
k int, rf ResizeFactor, isGadget bool,
) (*VarOptItemsSketch[T], error) {
if k == 0 || k > varOptMaxK {
return nil, errors.New("k must be at least 1 and less than 2^31 - 1")
}

ceilingLgK := math.Log2(float64(common.CeilingPowerOf2(k)))
initialLgSize := startingSubMultiple(int(ceilingLgK), int(rf), minLgArrItems)
currItemsAlloc := adjustedSamplingAllocationSize(k, 1<<initialLgSize)
if currItemsAlloc == k {
currItemsAlloc++
}

data := make([]T, 0, currItemsAlloc)
weights := make([]float64, 0, currItemsAlloc)
var marks []bool
if isGadget {
marks = make([]bool, 0, currItemsAlloc)
}

return &VarOptItemsSketch[T]{
k: k,
h: 0,
m: 0,
r: 0,
n: 0,
totalWeightR: 0.0,
rf: rf,
marks: marks,
data: data,
weights: weights,
numMarksInH: 0,
}, nil
}

// K returns the configured maximum sample size.
func (s *VarOptItemsSketch[T]) K() int { return s.k }

Expand Down Expand Up @@ -740,3 +780,30 @@ func (s *VarOptItemsSketch[T]) EstimateSubsetSum(predicate func(T) bool) (Sample
TotalSketchWeight: weightSumInH + s.totalWeightR,
}, nil
}

// SerializedSizeBytes computes size needed to serialize the current state of the sketch.
func (s *VarOptItemsSketch[T]) SerializedSizeBytes(serde common.ItemSketchSerde[T]) int {
if s.IsEmpty() {
return preambleIntsEmpty << 3
}

numBytes := int(preambleLongsFull << 3)
if s.r == 0 {
numBytes = int(preambleLongsWarmup << 3)
}

numBytes += s.h * 8 // weights.

if s.marks != nil {
numBytes += s.h / 8
if s.h%8 > 0 {
numBytes++
}
}

for sample := range s.All() {
numBytes += serde.SizeOf(sample.Item)
}

return numBytes
}
Loading
Loading