From e19c4491ac7311302fdf307aa00062d33d1a674b Mon Sep 17 00:00:00 2001 From: lani_karrot Date: Wed, 22 Apr 2026 15:13:31 +0900 Subject: [PATCH 1/3] refactor: move consts to sketch --- req/compactor.go | 20 +++++--------------- req/compactor_test.go | 10 ++++++++++ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/req/compactor.go b/req/compactor.go index 04bed74..df789dd 100644 --- a/req/compactor.go +++ b/req/compactor.go @@ -23,19 +23,13 @@ import ( "fmt" "math" "math/bits" - "math/rand" + "math/rand/v2" "slices" "strings" "github.com/apache/datasketches-go/internal" ) -const ( - initNumberOfSections = 3 - minK = 4 - nomCapMul = 2 -) - type compactResult struct { deltaRetItems int deltaNomSize int @@ -121,7 +115,7 @@ func numberOfTrailingOnes(v int64) int { return bits.TrailingZeros64(uint64(^v)) } -func (c *compactor) Compact(next *compactor, rng *rand.Rand) (compactResult, error) { +func (c *compactor) Compact(next *compactor) (compactResult, error) { startRetItems := c.Count() startNomCap := c.NomCapacity() @@ -141,7 +135,7 @@ func (c *compactor) Compact(next *compactor, rng *rand.Rand) (compactResult, err if (c.state & 1) == 1 { c.coin = !c.coin // if numCompactions odd, flip coin } else { - c.coin = rng.Intn(2) == 1 // random coin flip + c.coin = rand.IntN(2) == 1 // random coin flip } promoteCount, err := c.promoteEvensOrOddsInto(next, compactionStart, compactionEnd, c.coin) @@ -163,10 +157,6 @@ func (c *compactor) Coin() bool { return c.coin } -func (c *compactor) LgWeight() byte { - return c.lgWeight -} - func (c *compactor) NomCapacity() int { return nomCapMul * int(c.numSections) * c.sectionSize } @@ -398,7 +388,7 @@ func (c *compactor) marshalItems() []byte { } // itemsToString returns a formatted string of the items. -func (c *compactor) itemsToString(format string, width int) string { +func (c *compactor) itemsToString(width int) string { var sb strings.Builder spaces := " " var start, end int @@ -413,7 +403,7 @@ func (c *compactor) itemsToString(format string, width int) string { cnt := 0 sb.WriteString(spaces) for i := start; i < end; i++ { - str := fmt.Sprintf(format, c.items[i]) + str := fmt.Sprintf("%f", c.items[i]) if i > start { cnt++ if cnt%width == 0 { diff --git a/req/compactor_test.go b/req/compactor_test.go index 8c7df20..87e379f 100644 --- a/req/compactor_test.go +++ b/req/compactor_test.go @@ -409,3 +409,13 @@ func TestNumberOfTrailingOnes(t *testing.T) { }) } } + +func TestCompactorGetters(t *testing.T) { + c := newCompactor(0, true, 12) + assert.False(t, c.Coin()) + assert.Greater(t, c.NumSections(), 0) + assert.Greater(t, c.SectionSize(), 0) + assert.Greater(t, c.SectionSizeFlt(), float32(0)) + assert.True(t, c.IsHighRankAccuracyMode()) + assert.Equal(t, int64(0), c.State()) +} From c282ec4c0fd0b6d8eddf1658dfec5d08a571cdcb Mon Sep 17 00:00:00 2001 From: lani_karrot Date: Wed, 22 Apr 2026 15:13:39 +0900 Subject: [PATCH 2/3] feat: REQ sketch --- req/sketch.go | 790 +++++++++++++++++++++++++++++++++++++ req/sketch_test.go | 955 +++++++++++++++++++++++++++++++++++++++++++++ req/utils.go | 29 ++ 3 files changed, 1774 insertions(+) create mode 100644 req/sketch.go create mode 100644 req/sketch_test.go create mode 100644 req/utils.go diff --git a/req/sketch.go b/req/sketch.go new file mode 100644 index 0000000..3ec4183 --- /dev/null +++ b/req/sketch.go @@ -0,0 +1,790 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package req + +import ( + "errors" + "fmt" + "math" + "strings" + + quantilecommon "github.com/apache/datasketches-go/common/quantiles" +) + +const ( + minK = 4 + defaultK = 12 // 1% @ 95% Confidence + nomCapMul = 2 + initNumberOfSections = 3 + fixRSEFactor = 0.084 +) + +var ( + relRSEFactor = math.Sqrt(0.0512 / initNumberOfSections) //0.1306394529 +) + +var ( + ErrEmpty = errors.New("operation is undefined for an empty sketch") +) + +type SketchOptionFunc func(*Sketch) + +// WithHighRankAccuracyMode sets the high rank accuracy mode for the Sketch. +func WithHighRankAccuracyMode(isHRAMode bool) SketchOptionFunc { + return func(sk *Sketch) { + sk.isHighRankAccuracyMode = isHRAMode + } +} + +// WithK sets the k parameter for the Sketch. +func WithK(k int) SketchOptionFunc { + return func(sk *Sketch) { + sk.k = k + } +} + +type Sketch struct { + k int + isHighRankAccuracyMode bool + n int64 + minItem float32 + maxItem float32 + numRetained int + maxNomSize int // sum of nominal capacities of all compactors. + sortedView *quantilecommon.NumericSortedView[float32] + compactors []*compactor +} + +func NewSketch(options ...SketchOptionFunc) (*Sketch, error) { + sk := &Sketch{ + k: defaultK, + isHighRankAccuracyMode: true, + } + for _, option := range options { + option(sk) + } + if err := sk.validateK(); err != nil { + return nil, err + } + + sk.grow() + + return sk, nil +} + +func (s *Sketch) validateK() error { + if s.k&1 > 0 || s.k < minK || s.k > 1024 { + return fmt.Errorf("must be even and in the range [4, 1024]: %d", s.k) + } + + return nil +} + +func (s *Sketch) grow() { + lgWeight := len(s.compactors) + s.compactors = append(s.compactors, newCompactor(byte(lgWeight), s.isHighRankAccuracyMode, s.k)) + s.maxNomSize = s.computeMaxNomSize() +} + +// computeMaxNomSize Computes a new bound for determining when to compress the sketch +func (s *Sketch) computeMaxNomSize() int { + capacity := 0 + for _, comp := range s.compactors { + capacity += comp.NomCapacity() + } + return capacity +} + +// K returns the k parameter which controls the accuracy of the sketch +// and its memory space usage. +func (s *Sketch) K() int { + return s.k +} + +// CDF is equivalent of NumericSortedView CDF function. +func (s *Sketch) CDF(splitPoints []float32, isInclusive bool) ([]float64, error) { + if s.IsEmpty() { + return nil, ErrEmpty + } + + if err := s.refreshSortedView(); err != nil { + return nil, err + } + + buckets, err := s.sortedView.CDF(splitPoints, isInclusive) + if err != nil { + return nil, err + } + return buckets, nil +} + +// IsEmpty checks if the sketch contains no data and returns true if it is empty, otherwise false. +func (s *Sketch) IsEmpty() bool { + return s.n == 0 +} + +func (s *Sketch) refreshSortedView() error { + if s.sortedView == nil { + if s.IsEmpty() { + return ErrEmpty + } + + quantiles := make([]float32, s.numRetained) + cumWeights := make([]int64, s.numRetained) + count := 0 + for _, comp := range s.compactors { + weight := 1 << comp.lgWeight + s.mergeSortIn( + comp, quantiles, cumWeights, int64(weight), count, s.isHighRankAccuracyMode, + ) + + count += comp.Count() + } + + if err := s.accumulateNativeRanks(len(quantiles), cumWeights); err != nil { + return err + } + + s.sortedView = quantilecommon.NewNumericSortedView[float32]( + quantiles, cumWeights, s.n, s.maxItem, s.minItem, + ) + } + + return nil +} + +// specially modified version of mergeSortIn. +// weight is associated weight of input. +// count is number of items inserted. +func (s *Sketch) mergeSortIn( + comp *compactor, + quantiles []float32, + cumWeights []int64, + weight int64, + count int, + isHighRankAccuracyMode bool, +) { + if !comp.sorted { + comp.Sort() + } + + totalLength := count + comp.Count() + i := count - 1 + j := comp.Count() - 1 + h := comp.Count() - 1 + if isHighRankAccuracyMode { + h = comp.Capacity() - 1 + } + for k := totalLength - 1; k >= 0; k-- { + switch { + case i >= 0 && j >= 0: // both valid + if quantiles[i] >= comp.items[h] { + quantiles[k] = quantiles[i] + cumWeights[k] = cumWeights[i] + i-- + } else { + quantiles[k] = comp.items[h] + h-- + j-- + cumWeights[k] = weight + } + case i >= 0: // i is valid + quantiles[k] = quantiles[i] + cumWeights[k] = cumWeights[i] + i-- + case j >= 0: // j is valid + quantiles[k] = comp.items[h] + h-- + j-- + cumWeights[k] = weight + default: + break + } + } +} + +func (s *Sketch) accumulateNativeRanks(quantilesLength int, cumWeights []int64) error { + for i := 1; i < quantilesLength; i++ { + cumWeights[i] += cumWeights[i-1] + } + if s.n > 0 && cumWeights[quantilesLength-1] != s.n { + return errors.New("sum of weight should equal to total count") + } + + return nil +} + +// IsHighRankAccuracyMode returns whether the sketch is in high rank accuracy mode. +// If true, the high ranks are prioritized for better accuracy. +// If not, low ranks are prioritized for better accuracy. +func (s *Sketch) IsHighRankAccuracyMode() bool { + return s.isHighRankAccuracyMode +} + +// MaxItem retrieves the maximum item in the sketch. Returns an error if the sketch is empty. +func (s *Sketch) MaxItem() (float32, error) { + if s.IsEmpty() { + return 0, ErrEmpty + } + return s.maxItem, nil +} + +// MinItem retrieves the minimum item in the sketch. Returns an error if the sketch is empty. +func (s *Sketch) MinItem() (float32, error) { + if s.IsEmpty() { + return 0, ErrEmpty + } + return s.minItem, nil +} + +// N returns the total number of items in the sketch. +func (s *Sketch) N() int64 { + return s.n +} + +type searchCriteria struct { + isInclusive bool +} + +// SearchCriteriaOptionFunc defines a function type used to configure or modify a searchCriteria instance. +type SearchCriteriaOptionFunc func(*searchCriteria) + +// WithExclusiveSearch creates a SearchCriteriaOptionFunc that sets the search criteria to be exclusive. +func WithExclusiveSearch() SearchCriteriaOptionFunc { + return func(c *searchCriteria) { + c.isInclusive = false + } +} + +// PMF is the equivalent of the NumericSortedView PMF function. +// the default option is inclusive. +func (s *Sketch) PMF(splitPoints []float32, opts ...SearchCriteriaOptionFunc) ([]float64, error) { + if s.IsEmpty() { + return nil, ErrEmpty + } + + if err := s.refreshSortedView(); err != nil { + return nil, err + } + + options := &searchCriteria{ + isInclusive: true, + } + for _, opt := range opts { + opt(options) + } + + return s.sortedView.PMF(splitPoints, options.isInclusive) +} + +// Quantile gets the approximate quantile of the given normalized rank. +// normRank is the normalized rank in the range [0.0, 1.0]. +// If isInclusive is true, rank includes all the quantiles less than or equal to +// the quantile directly corresponding to the given rank. +// If not, rank includes all the quantiles less than +// the quantile directly corresponding to the given rank. +// The default option is inclusive. +func (s *Sketch) Quantile(normRank float64, opts ...SearchCriteriaOptionFunc) (float32, error) { + if s.IsEmpty() { + return 0, ErrEmpty + } + + if err := s.refreshSortedView(); err != nil { + return 0, err + } + + options := &searchCriteria{ + isInclusive: true, + } + for _, opt := range opts { + opt(options) + } + + return s.sortedView.Quantile(normRank, options.isInclusive) +} + +// Quantiles gets quantiles from the given array of normalized ranks. +// ranks is the normalized ranks, each of which must be in the valid interval [0.0, 1.0]. +// The default option is inclusive search. +func (s *Sketch) Quantiles(ranks []float64, opts ...SearchCriteriaOptionFunc) ([]float32, error) { + if s.IsEmpty() { + return nil, ErrEmpty + } + + if err := s.refreshSortedView(); err != nil { + return nil, err + } + + options := &searchCriteria{ + isInclusive: true, + } + for _, opt := range opts { + opt(options) + } + + length := len(ranks) + quantiles := make([]float32, 0, length) + for i := 0; i < length; i++ { + quantile, err := s.sortedView.Quantile(ranks[i], options.isInclusive) + if err != nil { + return nil, err + } + + quantiles = append(quantiles, quantile) + } + return quantiles, nil +} + +type confidenceOptions struct { + numStdDev int +} + +// ConfidenceOptionFunc is a function type that configures confidence interval options. +type ConfidenceOptionFunc func(*confidenceOptions) + +// WithNumStdDev sets the number of standard deviations to use for confidence intervals. +func WithNumStdDev(n int) ConfidenceOptionFunc { + return func(opt *confidenceOptions) { + opt.numStdDev = n + } +} + +// QuantileLowerBound returns the lower bound of the quantile confidence interval +// in which the quantile of the given rank exists. +// numStdDev is the number of standard deviations. Must be 1, 2, or 3. default numStdDev is 2. +// When numStdDev is 2, the approximate probability that the true quantile is within +// the confidence interval specified by the upper and lower quantile bounds +// for this sketch is 0.95. +func (s *Sketch) QuantileLowerBound( + rank float64, opts ...ConfidenceOptionFunc, +) (float32, error) { + lb, err := s.RankLowerBound(rank, opts...) + if err != nil { + return 0, err + } + return s.Quantile(lb) +} + +// RankLowerBound returns the approximate lower bound of a rank confidence interval +// which the true rank of the given rank exists. +// rank should be in the 0 to 1.0. +// numStdDev is the number of standard deviations. Must be 1, 2, or 3. default numStdDev is 2. +// When numStdDev is 2, the approximate probability that the true quantile is within +// the confidence interval specified by the upper and lower quantile bounds +// for this sketch is 0.95. +func (s *Sketch) RankLowerBound( + rank float64, opts ...ConfidenceOptionFunc, +) (float64, error) { + options := &confidenceOptions{ + numStdDev: 2, + } + for _, opt := range opts { + opt(options) + } + + return computeRankLowerBound( + s.k, s.numLevels(), rank, options.numStdDev, s.isHighRankAccuracyMode, s.n, + ), nil +} + +// numLevels returns number of levels of compactors in the sketch. +func (s *Sketch) numLevels() int { + return len(s.compactors) +} + +// QuantileUpperBound returns the upper bound of the quantile confidence interval +// in which the quantile of the given rank exists. +// numStdDev is the number of standard deviations. Must be 1, 2, or 3. default numStdDev is 2. +// When numStdDev is 2, the approximate probability that the true quantile is within +// the confidence interval specified by the upper and lower quantile bounds +// for this sketch is 0.95. +func (s *Sketch) QuantileUpperBound( + rank float64, opts ...ConfidenceOptionFunc, +) (float32, error) { + ub, err := s.RankUpperBound(rank, opts...) + if err != nil { + return 0, err + } + return s.Quantile(ub) +} + +// RankUpperBound returns the approximate upper bound of the rank confidence interval +// in which the true rank of the given rank exists. +// rank should be in the 0 to 1.0. +// numStdDev is the number of standard deviations. Must be 1, 2, or 3. default numStdDev is 2. +// When numStdDev is 2, the approximate probability that the true quantile is within +// the confidence interval specified by the upper and lower quantile bounds +// for this sketch is 0.95. +func (s *Sketch) RankUpperBound( + rank float64, opts ...ConfidenceOptionFunc, +) (float64, error) { + options := &confidenceOptions{ + numStdDev: 2, + } + for _, opt := range opts { + opt(options) + } + + return computeRankUpperBound( + s.k, s.numLevels(), rank, options.numStdDev, s.isHighRankAccuracyMode, s.n, + ), nil +} + +// Rank returns normalized rank corresponding to the given quantile and search criterion. +// The default option is inclusive search. +func (s *Sketch) Rank(quantile float32, opts ...SearchCriteriaOptionFunc) (float64, error) { + if s.IsEmpty() { + return 0, ErrEmpty + } + if math.IsNaN(float64(quantile)) { + return 0, errors.New("quantile must not be NaN") + } + if math.IsInf(float64(quantile), 0) || math.IsInf(float64(quantile), -1) { + return 0, errors.New("quantile must be finite") + } + + if err := s.refreshSortedView(); err != nil { + return 0, err + } + + options := &searchCriteria{ + isInclusive: true, + } + for _, opt := range opts { + opt(options) + } + + return s.sortedView.Rank(quantile, options.isInclusive) +} + +// Ranks returns normalized ranks corresponding to the given quantiles and search criterion. +// The default option is inclusive. +func (s *Sketch) Ranks(quantiles []float32, opts ...SearchCriteriaOptionFunc) ([]float64, error) { + if s.IsEmpty() { + return nil, ErrEmpty + } + + length := len(quantiles) + ranks := make([]float64, 0, length) + for i := 0; i < length; i++ { + rank, err := s.Rank(quantiles[i], opts...) + if err != nil { + return nil, err + } + + ranks = append(ranks, rank) + } + return ranks, nil +} + +// NumRetained returns the number of quantiles retained by the sketch. +func (s *Sketch) NumRetained() int { + return s.numRetained +} + +// IsEstimationMode returns true if the sketch is in estimation mode. +func (s *Sketch) IsEstimationMode() bool { + return s.numLevels() > 1 +} + +// All returns all retained items of the sketch. +func (s *Sketch) All() []Item { + if s.numRetained == 0 { + return nil + } + + var ( + itemIndex = 0 + compactorIndex = 0 + items []Item + currentCompactor = s.compactors[0] + ) + for compactorIndex < len(s.compactors) { + quantile := currentCompactor.Item(itemIndex) + weight := int64(1) << compactorIndex + + items = append(items, Item{ + Quantile: quantile, + Weight: weight, + }) + + if itemIndex == currentCompactor.Count()-1 { + compactorIndex++ + if compactorIndex >= len(s.compactors) { + break + } + + currentCompactor = s.compactors[compactorIndex] + itemIndex = 0 + continue + } + + itemIndex++ + } + + return items +} + +// Merge merges another sketch into this one. The other sketch is not modified. +func (s *Sketch) Merge(other *Sketch) error { + if other == nil || other.IsEmpty() { + return nil + } + + if s.isHighRankAccuracyMode != other.isHighRankAccuracyMode { + return errors.New("both sketches must have the same HighRankAccuracy setting") + } + + s.n += other.n + + if math.IsNaN(float64(s.minItem)) || other.minItem < s.minItem { + s.minItem = other.minItem + } + if math.IsNaN(float64(s.maxItem)) || other.maxItem > s.maxItem { + s.maxItem = other.maxItem + } + + // grow until self has at least as many compactors as others. + for i := s.numLevels(); i < other.numLevels(); i++ { + s.grow() + } + + // merge items in all height compactors. + for i := 0; i < other.numLevels(); i++ { + if err := s.compactors[i].Merge(other.compactors[i]); err != nil { + return err + } + } + s.maxNomSize = s.computeMaxNomSize() + s.numRetained = s.computeRetainedItems() + if s.numRetained >= s.maxNomSize { + if err := s.compress(); err != nil { + return err + } + } + + if s.numRetained >= s.maxNomSize { + return fmt.Errorf( + "sketch is in invalid state. retained items should be less than max nominal size. retained: %d, max nominal size: %d", + s.numRetained, s.maxNomSize, + ) + } + + s.sortedView = nil + + return nil +} + +// Reset the sketch to the empty state. +func (s *Sketch) Reset() { + s.n = 0 + s.numRetained = 0 + s.maxNomSize = 0 + s.minItem = float32(math.NaN()) + s.maxItem = float32(math.NaN()) + s.sortedView = nil + s.compactors = s.compactors[:0] + s.grow() +} + +// String returns a string representation of the sketch. +func (s *Sketch) String() string { + var result strings.Builder + result.WriteString("**********Relative Error Quantiles Sketch Summary**********") + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" K : %d", s.k)) + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" N : %d", s.n)) + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" Retained Items : %d", s.numRetained)) + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" Min Item : %f", s.minItem)) + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" Max Item : %f", s.maxItem)) + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" Estimation Mode : %v", s.IsEstimationMode())) + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" High Rank Acc : %v", s.isHighRankAccuracyMode)) + result.WriteString("\n") + result.WriteString(fmt.Sprintf(" Levels: : %d", s.numLevels())) + result.WriteString("\n") + result.WriteString("************************End Summary************************") + result.WriteString("\n") + + return result.String() +} + +// Update updates this sketch with the given item. +// NaN are ignored. +func (s *Sketch) Update(item float32) error { + if math.IsNaN(float64(item)) { + return nil + } + + if s.IsEmpty() { + s.minItem = item + s.maxItem = item + } else { + s.minItem = min(item, s.minItem) + s.maxItem = max(item, s.maxItem) + } + + comp := s.compactors[0] + comp.Append(item) + s.numRetained++ + s.n++ + if s.numRetained >= s.maxNomSize { + comp.Sort() + if err := s.compress(); err != nil { + return err + } + } + + s.sortedView = nil + + return nil +} + +// CompactorDetailString returns a string representation of the compactors in the sketch. +// Each compactor string is prepended by the compactor lgWeight, +// the current number of retained quantiles of the compactor and +// the current nominal capacity of the compactor. +func (s *Sketch) CompactorDetailString(showAllData bool) string { + var result strings.Builder + result.WriteString("*********Relative Error Quantiles Compactor Detail*********") + result.WriteString("\n") + result.WriteString(fmt.Sprintf("Compactor Detail: Ret Items: %d N: %d", s.numRetained, s.n)) + result.WriteString("\n") + for _, comp := range s.compactors { + result.WriteString(comp.String()) + result.WriteString("\n") + if showAllData { + result.WriteString(comp.itemsToString(20)) + result.WriteString("\n") + } + } + result.WriteString("************************End Detail*************************") + return result.String() +} + +// SortedView returns a sorted view of the data retained by the sketch, or an error if refreshing the view fails. +func (s *Sketch) SortedView() (*quantilecommon.NumericSortedView[float32], error) { + if err := s.refreshSortedView(); err != nil { + return nil, err + } + + return s.sortedView, nil +} + +func (s *Sketch) computeRetainedItems() int { + count := 0 + for _, comp := range s.compactors { + count += comp.Count() + } + return count +} + +func (s *Sketch) compress() error { + for i := 0; i < len(s.compactors); i++ { + comp := s.compactors[i] + retainedItemsInCompactor := comp.Count() + nomCapInCompactor := comp.NomCapacity() + + if retainedItemsInCompactor >= nomCapInCompactor { + if i+1 >= s.numLevels() { // at the top. + s.grow() // add a new level, increase maxNomSize. + } + + result, err := comp.Compact(s.compactors[i+1]) + if err != nil { + return err + } + + s.numRetained += result.deltaRetItems + s.maxNomSize += result.deltaNomSize + } + } + + s.sortedView = nil + + return nil +} + +func computeRankLowerBound( + k int, + levels int, + rank float64, + numStdDev int, + isHighRankAccuracyMode bool, + n int64, +) float64 { + if isExactRank(k, levels, rank, isHighRankAccuracyMode, n) { + return rank + } + + relative := (relRSEFactor / float64(k)) * rank + if isHighRankAccuracyMode { + relative = (relRSEFactor / float64(k)) * (1.0 - rank) + } + fixed := fixRSEFactor / float64(k) + lbRel := rank - (float64(numStdDev) * relative) + lbFix := rank - (float64(numStdDev) * fixed) + return max(lbRel, lbFix) +} + +func isExactRank( + k int, levels int, rank float64, isHighRankAccuracyMode bool, n int64, +) bool { + baseCap := k * initNumberOfSections + if levels == 1 || n <= int64(baseCap) { + return true + } + exactRankThreshold := float64(baseCap) / float64(n) + if isHighRankAccuracyMode { + return rank >= (1.0 - exactRankThreshold) + } + return rank <= exactRankThreshold +} + +func computeRankUpperBound( + k int, + levels int, + rank float64, + numStdDev int, + isHighRankAccuracyMode bool, + n int64, +) float64 { + if isExactRank(k, levels, rank, isHighRankAccuracyMode, n) { + return rank + } + + relative := (relRSEFactor / float64(k)) * rank + if isHighRankAccuracyMode { + relative = (relRSEFactor / float64(k)) * (1.0 - rank) + } + fixed := fixRSEFactor / float64(k) + lbRel := rank + (float64(numStdDev) * relative) + lbFix := rank + (float64(numStdDev) * fixed) + return min(lbRel, lbFix) +} + +// Item represents a quantile value and its associated weight retained by the sketch. +type Item struct { + Quantile float32 + Weight int64 +} diff --git a/req/sketch_test.go b/req/sketch_test.go new file mode 100644 index 0000000..5bc111e --- /dev/null +++ b/req/sketch_test.go @@ -0,0 +1,955 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package req + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/datasketches-go/internal/quantiles" +) + +func loadSketch(t *testing.T, k, min, max int, up, hra bool) *Sketch { + t.Helper() + sk, err := NewSketch(WithK(k), WithHighRankAccuracyMode(hra)) + assert.NoError(t, err) + if up { + for i := min; i <= max; i++ { + assert.NoError(t, sk.Update(float32(i))) + } + } else { + for i := max; i >= min; i-- { + assert.NoError(t, sk.Update(float32(i))) + } + } + return sk +} + +func evenlySpacedFloats(min, max float32, n int) []float32 { + if n < 2 { + return []float32{min} + } + result := make([]float32, n) + step := (max - min) / float32(n-1) + for i := 0; i < n; i++ { + result[i] = min + float32(i)*step + } + result[n-1] = max // avoid floating point drift + return result +} + +func evenlySpacedDoubles(min, max float64, n int) []float64 { + if n < 2 { + return []float64{min} + } + result := make([]float64, n) + step := (max - min) / float64(n-1) + for i := 0; i < n; i++ { + result[i] = min + float64(i)*step + } + result[n-1] = max + return result +} + +func TestNewSketch(t *testing.T) { + t.Run("default", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + assert.Equal(t, 12, sk.K()) + }) + + t.Run("with options", func(t *testing.T) { + sk, err := NewSketch(WithK(50), WithHighRankAccuracyMode(true)) + assert.NoError(t, err) + assert.Equal(t, 50, sk.K()) + assert.Equal(t, true, sk.IsHighRankAccuracyMode()) + }) + + t.Run("invalid k", func(t *testing.T) { + _, err := NewSketch(WithK(1)) + assert.ErrorContains(t, err, "must be even and in the range [4, 1024]") + }) +} + +func TestSketchCDF(t *testing.T) { + t.Run("NaN", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + assert.NoError(t, sk.Update(1)) + _, err = sk.CDF([]float32{float32(math.NaN())}, true) + assert.ErrorIs(t, err, quantiles.ErrNanInSplitPoints) + }) + + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + spArr := []float32{20, 40, 60, 80} + cdf, err := sk.CDF(spArr, true) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(cdf)) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + spArr := []float32{20, 40, 60, 80} + cdf, err := sk.CDF(spArr, true) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(cdf)) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + spArr := []float32{20, 40, 60, 80} + cdf, err := sk.CDF(spArr, true) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(cdf)) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + spArr := []float32{20, 40, 60, 80} + cdf, err := sk.CDF(spArr, true) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(cdf)) + }) +} + +func TestSketchQuantile(t *testing.T) { + t.Run("Exceed limit", func(t *testing.T) { + sk := loadSketch(t, 6, 1, 200, true, true) + _, err := sk.Quantile(2.0) + assert.ErrorContains(t, err, "rank must be between 0 and 1 inclusive") + _, err = sk.Quantile(-2.0) + assert.ErrorContains(t, err, "rank must be between 0 and 1 inclusive") + }) + + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + rArr := []float64{0, .1, .2, .3, .4, .5, .6, .7, .8, .9, 1.0} + qOut, err := sk.Quantiles(rArr) + assert.NoError(t, err) + assert.Equal(t, len(rArr), len(qOut)) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + rArr := []float64{0, .1, .2, .3, .4, .5, .6, .7, .8, .9, 1.0} + qOut, err := sk.Quantiles(rArr) + assert.NoError(t, err) + assert.Equal(t, len(rArr), len(qOut)) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + rArr := []float64{0, .1, .2, .3, .4, .5, .6, .7, .8, .9, 1.0} + qOut, err := sk.Quantiles(rArr) + assert.NoError(t, err) + assert.Equal(t, len(rArr), len(qOut)) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + rArr := []float64{0, .1, .2, .3, .4, .5, .6, .7, .8, .9, 1.0} + qOut, err := sk.Quantiles(rArr) + assert.NoError(t, err) + assert.Equal(t, len(rArr), len(qOut)) + }) + + t.Run("exclusive", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + for i := 1; i <= 10; i++ { + assert.NoError(t, sk.Update(float32(i))) + } + + expectedExcl := []struct { + rank float64 + q float32 + }{ + {0, 1}, {0.1, 2}, {0.2, 3}, {0.3, 4}, {0.4, 5}, + {0.5, 6}, {0.6, 7}, {0.7, 8}, {0.8, 9}, {0.9, 10}, {1, 10}, + } + for _, tc := range expectedExcl { + q, err := sk.Quantile(tc.rank, WithExclusiveSearch()) + assert.NoError(t, err) + assert.Equal(t, tc.q, q, "exclusive quantile mismatch at rank=%f", tc.rank) + } + }) + + t.Run("inclusive", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + for i := 1; i <= 10; i++ { + assert.NoError(t, sk.Update(float32(i))) + } + + expectedIncl := []struct { + rank float64 + q float32 + }{ + {0, 1}, {0.1, 1}, {0.2, 2}, {0.3, 3}, {0.4, 4}, + {0.5, 5}, {0.6, 6}, {0.7, 7}, {0.8, 8}, {0.9, 9}, {1, 10}, + } + for _, tc := range expectedIncl { + q, err := sk.Quantile(tc.rank) + assert.NoError(t, err) + assert.Equal(t, tc.q, q, "inclusive quantile mismatch at rank=%f", tc.rank) + } + }) + + t.Run("Quantile and Quantiles equivalence", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + for i := 1; i <= 10; i++ { + assert.NoError(t, sk.Update(float32(i))) + } + + ranks := []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1} + + // exclusive + quantilesExcl, err := sk.Quantiles(ranks, WithExclusiveSearch()) + assert.NoError(t, err) + for i, r := range ranks { + q, err := sk.Quantile(r, WithExclusiveSearch()) + assert.NoError(t, err) + assert.Equal(t, q, quantilesExcl[i]) + } + + // inclusive + quantilesIncl, err := sk.Quantiles(ranks) + assert.NoError(t, err) + for i, r := range ranks { + q, err := sk.Quantile(r) + assert.NoError(t, err) + assert.Equal(t, q, quantilesIncl[i]) + } + }) +} + +func TestSketchIsEstimationMode(t *testing.T) { + sk := loadSketch(t, 20, 1, 119, true, true) + assert.False(t, sk.IsEstimationMode()) + + lb, err := sk.RankLowerBound(1.0, WithNumStdDev(1)) + assert.NoError(t, err) + assert.Equal(t, 1.0, lb) + + ub, err := sk.RankUpperBound(1.0, WithNumStdDev(1)) + assert.NoError(t, err) + assert.Equal(t, 1.0, ub) + + assert.Equal(t, 120, sk.maxNomSize) + + assert.NoError(t, sk.Update(120)) + assert.True(t, sk.IsEstimationMode()) + + assert.Equal(t, 240, sk.maxNomSize) + + v, err := sk.Quantile(1.0) + assert.NoError(t, err) + assert.Equal(t, float32(120.0), v) + + sv, err := sk.SortedView() + assert.NoError(t, err) + assert.NotNil(t, sv) + + assert.True(t, ComputeRSE(sk.K(), 0.5, false, 120) >= 0) +} + +func TestSketchUpdate(t *testing.T) { + t.Run("NaN", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + assert.NoError(t, sk.Update(float32(math.NaN()))) + assert.True(t, sk.IsEmpty()) + }) +} + +func TestSketchRank(t *testing.T) { + t.Run("NaN", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + assert.NoError(t, sk.Update(1)) + _, err = sk.Rank(float32(math.NaN())) + assert.ErrorContains(t, err, "quantile must not be NaN") + }) + + t.Run("Infinity", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + + err = sk.Update(1) + assert.NoError(t, err) + + _, err = sk.Rank(float32(math.Inf(0))) + assert.ErrorContains(t, err, "quantile must be finite") + + _, err = sk.Rank(float32(math.Inf(-1))) + assert.ErrorContains(t, err, "quantile must be finite") + }) + + t.Run("Duplicated values", func(t *testing.T) { + sk, err := NewSketch(WithK(50), WithHighRankAccuracyMode(false)) + assert.NoError(t, err) + + vArr := []float32{5, 5, 5, 6, 6, 6, 7, 8, 8, 8} + for _, v := range vArr { + assert.NoError(t, sk.Update(v)) + } + + // exclusive ranks + rArrExcl := []float64{0.0, 0.0, 0.0, 0.3, 0.3, 0.3, 0.6, 0.7, 0.7, 0.7} + for i, v := range vArr { + rank, err := sk.Rank(v, WithExclusiveSearch()) + assert.NoError(t, err) + assert.Equal(t, rArrExcl[i], rank, "exclusive rank mismatch at index %d", i) + } + + // inclusive ranks + rArrIncl := []float64{0.3, 0.3, 0.3, 0.6, 0.6, 0.6, 0.7, 1.0, 1.0, 1.0} + for i, v := range vArr { + rank, err := sk.Rank(v) + assert.NoError(t, err) + assert.Equal(t, rArrIncl[i], rank, "inclusive rank mismatch at index %d", i) + } + }) + + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + + spArr := evenlySpacedFloats(0, float32(100), 11) + trueRanks := evenlySpacedDoubles(0, 1.0, 11) + for i := 0; i < len(spArr); i++ { + rank, err := sk.Rank(spArr[i]) + assert.NoError(t, err) + assert.InDelta(t, trueRanks[i], rank, 0.01, "rank mismatch at splitPoint %f", spArr[i]) + } + + ranks, err := sk.Ranks(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr), len(ranks)) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + + spArr := evenlySpacedFloats(0, float32(100), 11) + trueRanks := evenlySpacedDoubles(0, 1.0, 11) + for i := 0; i < len(spArr); i++ { + rank, err := sk.Rank(spArr[i]) + assert.NoError(t, err) + assert.InDelta(t, trueRanks[i], rank, 0.01, "rank mismatch at splitPoint %f", spArr[i]) + } + + ranks, err := sk.Ranks(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr), len(ranks)) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + + spArr := evenlySpacedFloats(0, float32(100), 11) + trueRanks := evenlySpacedDoubles(0, 1.0, 11) + for i := 0; i < len(spArr); i++ { + rank, err := sk.Rank(spArr[i]) + assert.NoError(t, err) + assert.InDelta(t, trueRanks[i], rank, 0.01, "rank mismatch at splitPoint %f", spArr[i]) + } + + ranks, err := sk.Ranks(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr), len(ranks)) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + + spArr := evenlySpacedFloats(0, float32(100), 11) + trueRanks := evenlySpacedDoubles(0, 1.0, 11) + for i := 0; i < len(spArr); i++ { + rank, err := sk.Rank(spArr[i]) + assert.NoError(t, err) + assert.InDelta(t, trueRanks[i], rank, 0.01, "rank mismatch at splitPoint %f", spArr[i]) + } + + ranks, err := sk.Ranks(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr), len(ranks)) + }) + + t.Run("inclusive and exclusive", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + for i := 1; i <= 10; i++ { + assert.NoError(t, sk.Update(float32(i))) + } + assert.False(t, sk.IsEmpty()) + assert.Equal(t, int64(10), sk.N()) + assert.Equal(t, 10, sk.NumRetained()) + + for i := 1; i <= 10; i++ { + rank, err := sk.Rank(float32(i)) + assert.NoError(t, err) + assert.Equal(t, float64(i)/10.0, rank, "inclusive rank mismatch at i=%d", i) + + rankExcl, err := sk.Rank(float32(i), WithExclusiveSearch()) + assert.NoError(t, err) + assert.Equal(t, float64(i-1)/10.0, rankExcl, "exclusive rank mismatch at i=%d", i) + } + }) +} + +func TestSketchMerge(t *testing.T) { + t.Run("HRA mismatch", func(t *testing.T) { + sk1, err := NewSketch(WithHighRankAccuracyMode(true)) + assert.NoError(t, err) + assert.NoError(t, sk1.Update(1)) + + sk2, err := NewSketch(WithHighRankAccuracyMode(false)) + assert.NoError(t, err) + assert.NoError(t, sk2.Update(2)) + + err = sk1.Merge(sk2) + assert.ErrorContains(t, err, "both sketches must have the same HighRankAccuracy setting") + }) + + t.Run("Merge nil", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + assert.NoError(t, sk.Update(1)) + assert.NoError(t, sk.Merge(nil)) + assert.Equal(t, int64(1), sk.N()) + }) + + t.Run("Merge Multiple", func(t *testing.T) { + s1, err := NewSketch(WithK(12)) + assert.NoError(t, err) + for i := 0; i < 40; i++ { + assert.NoError(t, s1.Update(float32(i))) + } + + s2, err := NewSketch(WithK(12)) + assert.NoError(t, err) + for i := 0; i < 40; i++ { + assert.NoError(t, s2.Update(float32(i))) + } + + s3, err := NewSketch(WithK(12)) + assert.NoError(t, err) + for i := 0; i < 40; i++ { + assert.NoError(t, s3.Update(float32(i))) + } + + s, err := NewSketch(WithK(12)) + assert.NoError(t, err) + assert.NoError(t, s.Merge(s1)) + assert.NoError(t, s.Merge(s2)) + assert.NoError(t, s.Merge(s3)) + }) + + t.Run("Merge Empty", func(t *testing.T) { + sk1, err := NewSketch() + assert.NoError(t, err) + sk2, err := NewSketch() + assert.NoError(t, err) + + for i := 5; i < 10; i++ { + assert.NoError(t, sk1.Update(float32(i))) + } + assert.NoError(t, sk1.Merge(sk2)) + }) + + t.Run("Merge Overlapped", func(t *testing.T) { + sk1, err := NewSketch() + assert.NoError(t, err) + sk2, err := NewSketch() + assert.NoError(t, err) + + for i := 5; i < 10; i++ { + assert.NoError(t, sk1.Update(float32(i))) + } + + for i := 1; i <= 15; i++ { + assert.NoError(t, sk2.Update(float32(i))) + } + assert.NoError(t, sk1.Merge(sk2)) + assert.Equal(t, int64(20), sk1.N()) + }) + + t.Run("Merge Non-Overlapped", func(t *testing.T) { + sk1, err := NewSketch() + assert.NoError(t, err) + sk2, err := NewSketch() + assert.NoError(t, err) + + for i := 5; i < 10; i++ { + assert.NoError(t, sk1.Update(float32(i))) + } + + for i := 16; i <= 300; i++ { + assert.NoError(t, sk2.Update(float32(i))) + } + assert.NoError(t, sk1.Merge(sk2)) + }) + + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + sk2 := loadSketch(t, 20, 1, 100, true, true) + assert.NoError(t, sk.Merge(sk2)) + assert.Equal(t, int64(200), sk.N()) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + sk2 := loadSketch(t, 20, 1, 100, false, false) + assert.NoError(t, sk.Merge(sk2)) + assert.Equal(t, int64(200), sk.N()) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + sk2 := loadSketch(t, 20, 1, 100, false, true) + assert.NoError(t, sk.Merge(sk2)) + assert.Equal(t, int64(200), sk.N()) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + sk2 := loadSketch(t, 20, 1, 100, true, false) + assert.NoError(t, sk.Merge(sk2)) + assert.Equal(t, int64(200), sk.N()) + }) +} + +func TestSketchRankUBLB(t *testing.T) { + tests := []struct { + name string + hra bool + }{ + {"HRA mode", true}, + {"LRA mode", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sk := loadSketch(t, 12, 1, 1000, true, tt.hra) + + rLB, err := sk.RankLowerBound(0.5, WithNumStdDev(1)) + assert.NoError(t, err) + assert.Greater(t, rLB, 0.0) + + if tt.hra { + rLB, err = sk.RankLowerBound(995.0/1000, WithNumStdDev(1)) + } else { + rLB, err = sk.RankLowerBound(5.0/1000, WithNumStdDev(1)) + } + assert.NoError(t, err) + assert.Greater(t, rLB, 0.0) + + rUB, err := sk.RankUpperBound(0.5, WithNumStdDev(1)) + assert.NoError(t, err) + assert.Greater(t, rUB, 0.0) + + if tt.hra { + rUB, err = sk.RankUpperBound(995.0/1000, WithNumStdDev(1)) + } else { + rUB, err = sk.RankUpperBound(5.0/1000, WithNumStdDev(1)) + } + assert.NoError(t, err) + assert.Greater(t, rUB, 0.0) + + _, err = sk.Ranks([]float32{5, 100}) + assert.NoError(t, err) + }) + } +} + +func TestEmptySketch(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + + _, err = sk.Rank(1) + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.Ranks([]float32{1}) + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.Quantile(0.5) + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.Quantiles([]float64{0.5}) + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.PMF([]float32{1}) + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.CDF([]float32{1}, true) + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.MinItem() + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.MaxItem() + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.QuantileLowerBound(0.5) + assert.ErrorIs(t, err, ErrEmpty) + + _, err = sk.QuantileUpperBound(0.5) + assert.ErrorIs(t, err, ErrEmpty) + + items := sk.All() + assert.Nil(t, items) + + rUB, err := sk.RankUpperBound(0.5, WithNumStdDev(1)) + assert.NoError(t, err) + assert.Greater(t, rUB, 0.0) +} + +func TestSketchSortedView(t *testing.T) { + t.Run("sketch has two values", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + assert.NoError(t, sk.Update(1)) + assert.NoError(t, sk.Update(2)) + + sv, err := sk.SortedView() + assert.NoError(t, err) + itr := sv.Iterator() + + // first item + assert.True(t, itr.Next()) + + q, err := itr.Quantile() + assert.NoError(t, err) + assert.Equal(t, float32(1), q) + + wt, err := itr.Weight() + assert.NoError(t, err) + assert.Equal(t, int64(1), wt) + + natRankExcl, err := itr.NaturalRankWithCriterion(false) + assert.NoError(t, err) + assert.Equal(t, int64(0), natRankExcl) + + natRankIncl, err := itr.NaturalRankWithCriterion(true) + assert.NoError(t, err) + assert.Equal(t, int64(1), natRankIncl) + + normRankExcl, err := itr.NormalizedRankWithCriterion(false) + assert.NoError(t, err) + assert.Equal(t, 0.0, normRankExcl) + + normRankIncl, err := itr.NormalizedRankWithCriterion(true) + assert.NoError(t, err) + assert.Equal(t, 0.5, normRankIncl) + + // second item + assert.True(t, itr.Next()) + + q, err = itr.Quantile() + assert.NoError(t, err) + assert.Equal(t, float32(2), q) + + wt, err = itr.Weight() + assert.NoError(t, err) + assert.Equal(t, int64(1), wt) + + natRankExcl, err = itr.NaturalRankWithCriterion(false) + assert.NoError(t, err) + assert.Equal(t, int64(1), natRankExcl) + + natRankIncl, err = itr.NaturalRankWithCriterion(true) + assert.NoError(t, err) + assert.Equal(t, int64(2), natRankIncl) + + normRankExcl, err = itr.NormalizedRankWithCriterion(false) + assert.NoError(t, err) + assert.Equal(t, 0.5, normRankExcl) + + normRankIncl, err = itr.NormalizedRankWithCriterion(true) + assert.NoError(t, err) + assert.Equal(t, 1.0, normRankIncl) + }) + + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + sv, err := sk.SortedView() + assert.NoError(t, err) + + itr := sv.Iterator() + retainedCount := sk.NumRetained() + totalN := sk.N() + count := 0 + var cumWt int64 + for itr.Next() { + w, err := itr.NaturalRankWithCriterion(true) + assert.NoError(t, err) + cumWt = w + count++ + } + assert.Equal(t, totalN, cumWt) + assert.Equal(t, retainedCount, count) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + sv, err := sk.SortedView() + assert.NoError(t, err) + + itr := sv.Iterator() + retainedCount := sk.NumRetained() + totalN := sk.N() + count := 0 + var cumWt int64 + for itr.Next() { + w, err := itr.NaturalRankWithCriterion(true) + assert.NoError(t, err) + cumWt = w + count++ + } + assert.Equal(t, totalN, cumWt) + assert.Equal(t, retainedCount, count) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + sv, err := sk.SortedView() + assert.NoError(t, err) + + itr := sv.Iterator() + retainedCount := sk.NumRetained() + totalN := sk.N() + count := 0 + var cumWt int64 + for itr.Next() { + w, err := itr.NaturalRankWithCriterion(true) + assert.NoError(t, err) + cumWt = w + count++ + } + assert.Equal(t, totalN, cumWt) + assert.Equal(t, retainedCount, count) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + sv, err := sk.SortedView() + assert.NoError(t, err) + + itr := sv.Iterator() + retainedCount := sk.NumRetained() + totalN := sk.N() + count := 0 + var cumWt int64 + for itr.Next() { + w, err := itr.NaturalRankWithCriterion(true) + assert.NoError(t, err) + cumWt = w + count++ + } + assert.Equal(t, totalN, cumWt) + assert.Equal(t, retainedCount, count) + }) +} + +func TestSketchString(t *testing.T) { + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + assert.NotEmpty(t, sk.String()) + assert.NotEmpty(t, sk.CompactorDetailString(false)) + assert.NotEmpty(t, sk.CompactorDetailString(true)) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + assert.NotEmpty(t, sk.String()) + assert.NotEmpty(t, sk.CompactorDetailString(false)) + assert.NotEmpty(t, sk.CompactorDetailString(true)) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + assert.NotEmpty(t, sk.String()) + assert.NotEmpty(t, sk.CompactorDetailString(false)) + assert.NotEmpty(t, sk.CompactorDetailString(true)) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + assert.NotEmpty(t, sk.String()) + assert.NotEmpty(t, sk.CompactorDetailString(false)) + assert.NotEmpty(t, sk.CompactorDetailString(true)) + }) +} + +func TestSketchPMF(t *testing.T) { + t.Run("NaN", func(t *testing.T) { + sk, err := NewSketch() + assert.NoError(t, err) + assert.NoError(t, sk.Update(1)) + _, err = sk.PMF([]float32{float32(math.NaN())}) + assert.Error(t, err) + }) + + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + spArr := []float32{20, 40, 60, 80} + pmf, err := sk.PMF(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(pmf)) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + spArr := []float32{20, 40, 60, 80} + pmf, err := sk.PMF(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(pmf)) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + spArr := []float32{20, 40, 60, 80} + pmf, err := sk.PMF(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(pmf)) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + spArr := []float32{20, 40, 60, 80} + pmf, err := sk.PMF(spArr) + assert.NoError(t, err) + assert.Equal(t, len(spArr)+1, len(pmf)) + }) +} + +func TestSketchIterator(t *testing.T) { + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + items := sk.All() + assert.NotEmpty(t, items) + for _, item := range items { + assert.Greater(t, item.Weight, int64(0)) + } + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + items := sk.All() + assert.NotEmpty(t, items) + for _, item := range items { + assert.Greater(t, item.Weight, int64(0)) + } + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + items := sk.All() + assert.NotEmpty(t, items) + for _, item := range items { + assert.Greater(t, item.Weight, int64(0)) + } + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + items := sk.All() + assert.NotEmpty(t, items) + for _, item := range items { + assert.Greater(t, item.Weight, int64(0)) + } + }) +} + +func TestSketchReset(t *testing.T) { + t.Run("k=20 up HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, true) + sk.Reset() + assert.True(t, sk.IsEmpty()) + }) + + t.Run("k=20 down LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, false) + sk.Reset() + assert.True(t, sk.IsEmpty()) + }) + + t.Run("k=20 down HRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, false, true) + sk.Reset() + assert.True(t, sk.IsEmpty()) + }) + + t.Run("k=20 up LRA", func(t *testing.T) { + sk := loadSketch(t, 20, 1, 100, true, false) + sk.Reset() + assert.True(t, sk.IsEmpty()) + }) +} + +func TestSketchMinMaxItem(t *testing.T) { + t.Run("sequential values", func(t *testing.T) { + sk := loadSketch(t, 12, 1, 100, true, true) + minV, err := sk.MinItem() + assert.NoError(t, err) + assert.Equal(t, float32(1), minV) + + maxV, err := sk.MaxItem() + assert.NoError(t, err) + assert.Equal(t, float32(100), maxV) + }) + + t.Run("reverse order", func(t *testing.T) { + sk := loadSketch(t, 12, 1, 100, false, true) + minV, err := sk.MinItem() + assert.NoError(t, err) + assert.Equal(t, float32(1), minV) + + maxV, err := sk.MaxItem() + assert.NoError(t, err) + assert.Equal(t, float32(100), maxV) + }) +} + +func TestSketchQuantileBounds(t *testing.T) { + t.Run("QuantileLowerBound", func(t *testing.T) { + sk := loadSketch(t, 12, 1, 1000, true, true) + qlb, err := sk.QuantileLowerBound(0.5, WithNumStdDev(1)) + assert.NoError(t, err) + assert.Greater(t, qlb, float32(0)) + }) + + t.Run("QuantileUpperBound", func(t *testing.T) { + sk := loadSketch(t, 12, 1, 1000, true, true) + qub, err := sk.QuantileUpperBound(0.5, WithNumStdDev(1)) + assert.NoError(t, err) + assert.Greater(t, qub, float32(0)) + }) + + t.Run("bounds ordering", func(t *testing.T) { + sk := loadSketch(t, 12, 1, 1000, true, true) + qlb, err := sk.QuantileLowerBound(0.5, WithNumStdDev(2)) + assert.NoError(t, err) + qub, err := sk.QuantileUpperBound(0.5, WithNumStdDev(2)) + assert.NoError(t, err) + assert.LessOrEqual(t, qlb, qub) + }) +} diff --git a/req/utils.go b/req/utils.go new file mode 100644 index 0000000..7fa9a70 --- /dev/null +++ b/req/utils.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package req + +// ComputeRSE returns an a priori estimate of relative standard error (ComputeRSE, expressed as a number in [0,1]). +// Derived from Lemma 12 in https://arxiv.org/abs/2004.01668v2, but the constant factors were +// adjusted based on empirical measurements. +// K is the sketch's k. +// Rank is the normalized rank, in [0,1]. +// isHighRankAccuracyMode is true if the sketch is configured for high rank accuracy. +// N is the total number of items in the stream. +func ComputeRSE(k int, rank float64, isHighRankAccuracyMode bool, n int64) float64 { + return computeRankUpperBound(k, 2, rank, 1, isHighRankAccuracyMode, n) - rank +} From 9f2f201bcb3f9ccc4949b433c9cce2006429d5a6 Mon Sep 17 00:00:00 2001 From: lani_karrot Date: Thu, 23 Apr 2026 11:33:26 +0900 Subject: [PATCH 3/3] perf: align sketch memory layout --- req/sketch.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/req/sketch.go b/req/sketch.go index 3ec4183..8d5c866 100644 --- a/req/sketch.go +++ b/req/sketch.go @@ -59,15 +59,15 @@ func WithK(k int) SketchOptionFunc { } type Sketch struct { - k int - isHighRankAccuracyMode bool n int64 + compactors []*compactor + numRetained int + maxNomSize int minItem float32 maxItem float32 - numRetained int - maxNomSize int // sum of nominal capacities of all compactors. sortedView *quantilecommon.NumericSortedView[float32] - compactors []*compactor + k int + isHighRankAccuracyMode bool } func NewSketch(options ...SketchOptionFunc) (*Sketch, error) {