Skip to content

Commit b61d4b8

Browse files
authored
Merge pull request apache#149 from proost/feat-varoptitems-encodingdecoding
feat: var opt items encoding & decoding
2 parents 97efcf1 + fdd21ce commit b61d4b8

39 files changed

Lines changed: 1170 additions & 14 deletions

common/types.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ type ItemSketchHasher[C comparable] interface {
2323
Hash(item C) uint64
2424
}
2525

26-
type ItemSketchSerde[C comparable] interface {
27-
SizeOf(item C) int
26+
type ItemSketchSerde[T any] interface {
27+
SizeOf(item T) int
2828
SizeOfMany(mem []byte, offsetBytes int, numItems int) (int, error)
29-
SerializeManyToSlice(items []C) []byte
30-
SerializeOneToSlice(item C) []byte
31-
DeserializeManyFromSlice(mem []byte, offsetBytes int, numItems int) ([]C, error)
29+
SerializeManyToSlice(items []T) []byte
30+
SerializeOneToSlice(item T) []byte
31+
DeserializeManyFromSlice(mem []byte, offsetBytes int, numItems int) ([]T, error)
3232
}

sampling/reservoir_items_sketch.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,10 @@ func (s *ReservoirItemsSketch[T]) forceIncrementItemsSeen(delta int64) error {
308308

309309
// Serialization constants
310310
const (
311-
preambleIntsEmpty = 1
312-
serVer = 2
313-
flagEmpty = 0x04
314-
resizeFactorMask = 0xC0
311+
preambleIntsEmpty = 1
312+
reservoirItemsSketchSerialVersion = 2
313+
flagEmpty = 0x04
314+
resizeFactorMask = 0xC0
315315
)
316316

317317
func resizeFactorBitsFor(rf ResizeFactor) (byte, error) {
@@ -358,7 +358,7 @@ func (s *ReservoirItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
358358
if s.isEmpty() {
359359
buf := make([]byte, 8)
360360
buf[0] = rfBits | preambleIntsEmpty
361-
buf[1] = serVer
361+
buf[1] = reservoirItemsSketchSerialVersion
362362
buf[2] = byte(internal.FamilyEnum.ReservoirItems.Id)
363363
buf[3] = flagEmpty
364364
binary.LittleEndian.PutUint32(buf[4:], uint32(s.k))
@@ -375,7 +375,7 @@ func (s *ReservoirItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
375375
buf := make([]byte, preBytes+len(itemsBytes))
376376

377377
buf[0] = rfBits | byte(preLongs)
378-
buf[1] = serVer
378+
buf[1] = reservoirItemsSketchSerialVersion
379379
buf[2] = byte(internal.FamilyEnum.ReservoirItems.Id)
380380
buf[3] = 0
381381
binary.LittleEndian.PutUint32(buf[4:], uint32(s.k))
@@ -445,7 +445,7 @@ func NewReservoirItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) (
445445

446446
k := int(binary.LittleEndian.Uint32(data[4:]))
447447

448-
if ver != serVer {
448+
if ver != reservoirItemsSketchSerialVersion {
449449
if ver == 1 {
450450
encK := binary.LittleEndian.Uint16(data[4:])
451451
decodedK, err := decodeReservoirSize(encK)

sampling/reservoir_items_sketch_serialization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ func TestReservoirItemsSketchDeserializationErrors(t *testing.T) {
375375
t.Run("BadFamily", func(t *testing.T) {
376376
data := make([]byte, 8)
377377
data[0] = 0xC0 | preambleIntsEmpty
378-
data[1] = serVer
378+
data[1] = reservoirItemsSketchSerialVersion
379379
data[2] = 99 // invalid family ID
380380
data[3] = flagEmpty
381381
binary.LittleEndian.PutUint32(data[4:], 100)
@@ -388,7 +388,7 @@ func TestReservoirItemsSketchDeserializationErrors(t *testing.T) {
388388
t.Run("BadPreLongs", func(t *testing.T) {
389389
data := make([]byte, 8)
390390
data[0] = 0xC0 | 5 // invalid preamble longs
391-
data[1] = serVer
391+
data[1] = reservoirItemsSketchSerialVersion
392392
data[2] = byte(internal.FamilyEnum.ReservoirItems.Id)
393393
data[3] = flagEmpty
394394
binary.LittleEndian.PutUint32(data[4:], 100)

sampling/varopt_items_sketch.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"slices"
2727
"strings"
2828

29+
"github.com/apache/datasketches-go/common"
2930
"github.com/apache/datasketches-go/internal"
3031
)
3132

@@ -81,12 +82,15 @@ type varOptConfig struct {
8182
resizeFactor ResizeFactor
8283
}
8384

85+
// WithResizeFactor sets the resize factor in the VarOpt configuration.
8486
func WithResizeFactor(rf ResizeFactor) VarOptOption {
8587
return func(c *varOptConfig) {
8688
c.resizeFactor = rf
8789
}
8890
}
8991

92+
// NewVarOptItemsSketch creates a new VarOptItemsSketch with a specified maximum capacity `k` and optional configurations.
93+
// It returns an error if `k` is less than 1 or exceeds the maximum allowed value (2^31 - 2).
9094
func NewVarOptItemsSketch[T any](k uint, opts ...VarOptOption) (*VarOptItemsSketch[T], error) {
9195
if k < 1 || k > varOptMaxK {
9296
return nil, errors.New("k must be at least 1 and less than 2^31 - 1")
@@ -120,6 +124,42 @@ func NewVarOptItemsSketch[T any](k uint, opts ...VarOptOption) (*VarOptItemsSket
120124
}, nil
121125
}
122126

127+
func newVarOptItemsSketchFromState[T any](
128+
k int, rf ResizeFactor, isGadget bool,
129+
) (*VarOptItemsSketch[T], error) {
130+
if k == 0 || k > varOptMaxK {
131+
return nil, errors.New("k must be at least 1 and less than 2^31 - 1")
132+
}
133+
134+
ceilingLgK := math.Log2(float64(common.CeilingPowerOf2(k)))
135+
initialLgSize := startingSubMultiple(int(ceilingLgK), int(rf), minLgArrItems)
136+
currItemsAlloc := adjustedSamplingAllocationSize(k, 1<<initialLgSize)
137+
if currItemsAlloc == k {
138+
currItemsAlloc++
139+
}
140+
141+
data := make([]T, 0, currItemsAlloc)
142+
weights := make([]float64, 0, currItemsAlloc)
143+
var marks []bool
144+
if isGadget {
145+
marks = make([]bool, 0, currItemsAlloc)
146+
}
147+
148+
return &VarOptItemsSketch[T]{
149+
k: k,
150+
h: 0,
151+
m: 0,
152+
r: 0,
153+
n: 0,
154+
totalWeightR: 0.0,
155+
rf: rf,
156+
marks: marks,
157+
data: data,
158+
weights: weights,
159+
numMarksInH: 0,
160+
}, nil
161+
}
162+
123163
// K returns the configured maximum sample size.
124164
func (s *VarOptItemsSketch[T]) K() int { return s.k }
125165

@@ -740,3 +780,30 @@ func (s *VarOptItemsSketch[T]) EstimateSubsetSum(predicate func(T) bool) (Sample
740780
TotalSketchWeight: weightSumInH + s.totalWeightR,
741781
}, nil
742782
}
783+
784+
// SerializedSizeBytes computes size needed to serialize the current state of the sketch.
785+
func (s *VarOptItemsSketch[T]) SerializedSizeBytes(serde common.ItemSketchSerde[T]) int {
786+
if s.IsEmpty() {
787+
return preambleIntsEmpty << 3
788+
}
789+
790+
numBytes := int(preambleLongsFull << 3)
791+
if s.r == 0 {
792+
numBytes = int(preambleLongsWarmup << 3)
793+
}
794+
795+
numBytes += s.h * 8 // weights.
796+
797+
if s.marks != nil {
798+
numBytes += s.h / 8
799+
if s.h%8 > 0 {
800+
numBytes++
801+
}
802+
}
803+
804+
for sample := range s.All() {
805+
numBytes += serde.SizeOf(sample.Item)
806+
}
807+
808+
return numBytes
809+
}

0 commit comments

Comments
 (0)