Skip to content

Commit 593f0da

Browse files
authored
Merge pull request #129 from proost/feat-support-value-type-receiver-tuple-sketch
feat: support value-receiver type summary
2 parents 103e883 + e59ed43 commit 593f0da

9 files changed

Lines changed: 708 additions & 144 deletions

tuple/hashtable.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,22 @@ type entry[S Summary] struct {
4949
Summary S
5050
}
5151

52-
func (e *entry[S]) reset() {
52+
func (e *entry[S]) reset(newSummary func() S) {
5353
if e.Hash != 0 {
54-
e.Summary.Reset()
54+
if newSummary != nil {
55+
e.Summary = newSummary()
56+
} else {
57+
e.Summary.Reset()
58+
}
59+
60+
e.Hash = 0
5561
}
56-
e.Hash = 0
5762
}
5863

5964
type hashtable[S Summary] struct {
6065
entries []entry[S]
6166
entryLessFunc func(a, b entry[S]) int
67+
newSummary func() S
6268
theta uint64
6369
seed uint64
6470
numEntries uint32
@@ -270,7 +276,7 @@ func (t *hashtable[S]) Reset() {
270276
} else {
271277
// just clear existing entries
272278
for i := range t.entries {
273-
t.entries[i].reset()
279+
t.entries[i].reset(t.newSummary)
274280
}
275281
}
276282

tuple/intersection.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func WithIntersectionSeed(seed uint64) IntersectionOptionFunc {
4242
type Intersection[S Summary] struct {
4343
hashtable *hashtable[S]
4444
policy Policy[S]
45+
applyFunc func(S, S) S
4546
entryLessFunc func(a, b entry[S]) int
4647
isValid bool
4748
}
@@ -72,6 +73,35 @@ func NewIntersection[S Summary](policy Policy[S], opts ...IntersectionOptionFunc
7273
}
7374
}
7475

76+
// NewIntersectionWithSummaryMergeFunc creates a new intersection that uses a function to merge summaries.
77+
// This is useful for value-type summaries where Policy.Apply cannot mutate the internal summary.
78+
func NewIntersectionWithSummaryMergeFunc[S Summary](
79+
applyFunc func(S, S) S, opts ...IntersectionOptionFunc,
80+
) *Intersection[S] {
81+
options := &intersectionOptions{
82+
seed: theta.DefaultSeed,
83+
}
84+
for _, opt := range opts {
85+
opt(options)
86+
}
87+
88+
return &Intersection[S]{
89+
hashtable: newHashtable[S](
90+
0, 0, theta.ResizeX1, 1.0, theta.MaxTheta, options.seed, false,
91+
),
92+
entryLessFunc: func(a, b entry[S]) int {
93+
if a.Hash < b.Hash {
94+
return -1
95+
} else if a.Hash > b.Hash {
96+
return 1
97+
}
98+
return 0
99+
},
100+
applyFunc: applyFunc,
101+
isValid: false,
102+
}
103+
}
104+
75105
// Update updates the intersection with a given sketch.
76106
func (i *Intersection[S]) Update(sketch Sketch[S]) error {
77107
if i.hashtable.isEmpty {
@@ -149,7 +179,11 @@ func (i *Intersection[S]) Update(sketch Sketch[S]) error {
149179
return errors.New("max matches exceeded, possibly corrupted input sketch")
150180
}
151181

152-
i.policy.Apply(i.hashtable.entries[key].Summary, summary)
182+
if i.applyFunc != nil {
183+
i.hashtable.entries[key].Summary = i.applyFunc(i.hashtable.entries[key].Summary, summary)
184+
} else {
185+
i.policy.Apply(i.hashtable.entries[key].Summary, summary)
186+
}
153187

154188
matchesEntries = append(matchesEntries, i.hashtable.entries[key])
155189
matchCount++

tuple/intersection_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,4 +899,101 @@ func TestIntersectionWithDifferentPolicies(t *testing.T) {
899899
}
900900
assert.Equal(t, int32(303), totalSum) // (100+1) + (200+2)
901901
})
902+
903+
t.Run("Sum With SummaryMergeFunc", func(t *testing.T) {
904+
sketch1, err := NewUpdateSketchWithSummaryUpdateFunc[int32ValueSummary, int32](
905+
newInt32ValueSummary,
906+
func(s int32ValueSummary, v int32) int32ValueSummary {
907+
s.value += v
908+
return s
909+
},
910+
)
911+
assert.NoError(t, err)
912+
sketch1.UpdateInt64(1, 100)
913+
sketch1.UpdateInt64(2, 200)
914+
sketch1.UpdateInt64(3, 300)
915+
916+
sketch2, err := NewUpdateSketchWithSummaryUpdateFunc[int32ValueSummary, int32](
917+
newInt32ValueSummary,
918+
func(s int32ValueSummary, v int32) int32ValueSummary {
919+
s.value += v
920+
return s
921+
},
922+
)
923+
assert.NoError(t, err)
924+
sketch2.UpdateInt64(2, 5)
925+
sketch2.UpdateInt64(3, 15)
926+
sketch2.UpdateInt64(4, 25)
927+
928+
intersection := NewIntersectionWithSummaryMergeFunc[int32ValueSummary](
929+
func(internal, incoming int32ValueSummary) int32ValueSummary {
930+
internal.value += incoming.value
931+
return internal
932+
},
933+
)
934+
err = intersection.Update(sketch1)
935+
assert.NoError(t, err)
936+
err = intersection.Update(sketch2)
937+
assert.NoError(t, err)
938+
939+
result, err := intersection.Result(true)
940+
assert.NoError(t, err)
941+
942+
assert.Equal(t, uint32(2), result.NumRetained())
943+
944+
totalSum := int32(0)
945+
for _, summary := range result.All() {
946+
totalSum += summary.value
947+
}
948+
// Key 2: 200 + 5 = 205, Key 3: 300 + 15 = 315, Total = 520
949+
assert.Equal(t, int32(520), totalSum)
950+
})
951+
952+
t.Run("Exact Mode Half Overlap With SummaryMergeFunc", func(t *testing.T) {
953+
sketch1, err := NewUpdateSketchWithSummaryUpdateFunc[int32ValueSummary, int32](
954+
newInt32ValueSummary,
955+
func(s int32ValueSummary, v int32) int32ValueSummary {
956+
s.value += v
957+
return s
958+
},
959+
)
960+
assert.NoError(t, err)
961+
value := 0
962+
for i := 0; i < 1000; i++ {
963+
sketch1.UpdateInt64(int64(value), 1)
964+
value++
965+
}
966+
967+
sketch2, err := NewUpdateSketchWithSummaryUpdateFunc[int32ValueSummary, int32](
968+
newInt32ValueSummary,
969+
func(s int32ValueSummary, v int32) int32ValueSummary {
970+
s.value += v
971+
return s
972+
},
973+
)
974+
assert.NoError(t, err)
975+
value = 500
976+
for i := 0; i < 1000; i++ {
977+
sketch2.UpdateInt64(int64(value), 1)
978+
value++
979+
}
980+
981+
intersection := NewIntersectionWithSummaryMergeFunc[int32ValueSummary](
982+
func(internal, incoming int32ValueSummary) int32ValueSummary {
983+
internal.value += incoming.value
984+
return internal
985+
},
986+
)
987+
err = intersection.Update(sketch1)
988+
assert.NoError(t, err)
989+
err = intersection.Update(sketch2)
990+
assert.NoError(t, err)
991+
992+
result, err := intersection.Result(true)
993+
assert.NoError(t, err)
994+
995+
assert.False(t, result.IsEmpty())
996+
assert.False(t, result.IsEstimationMode())
997+
assert.Equal(t, 500.0, result.Estimate())
998+
})
902999
}

tuple/sketch_benchmark_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package tuple
19+
20+
import "testing"
21+
22+
func BenchmarkUpdateSketch_PointerSummary(b *testing.B) {
23+
b.ReportAllocs()
24+
for b.Loop() {
25+
sketch, _ := NewUpdateSketch[*int32Summary, int32](newInt32Summary)
26+
for i := 0; i < 10000; i++ {
27+
sketch.UpdateInt64(int64(i), 1)
28+
}
29+
}
30+
}
31+
32+
func BenchmarkUpdateSketch_ValueSummary(b *testing.B) {
33+
b.ReportAllocs()
34+
for b.Loop() {
35+
sketch, _ := NewUpdateSketchWithSummaryUpdateFunc[int32ValueSummary, int32](
36+
newInt32ValueSummary,
37+
func(s int32ValueSummary, v int32) int32ValueSummary {
38+
s.value += v
39+
return s
40+
},
41+
)
42+
for i := 0; i < 10000; i++ {
43+
sketch.UpdateInt64(int64(i), 1)
44+
}
45+
}
46+
}

tuple/testing.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,15 @@ func (s *float64Summary) Update(value float64) {
5858
func newFloat64Summary() *float64Summary {
5959
return &float64Summary{}
6060
}
61+
62+
type int32ValueSummary struct {
63+
value int32
64+
}
65+
66+
func (s int32ValueSummary) Reset() {}
67+
func (s int32ValueSummary) Clone() Summary { return s }
68+
func (s int32ValueSummary) Update(value int32) {}
69+
70+
func newInt32ValueSummary() int32ValueSummary {
71+
return int32ValueSummary{}
72+
}

tuple/union.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
// Union computes the union of Tuple sketches.
3030
type Union[S Summary] struct {
3131
policy Policy[S]
32+
applyFunc func(S, S) S
3233
hashtable *hashtable[S]
3334
entryLessFunc func(a, b entry[S]) int
3435
theta uint64
@@ -43,6 +44,20 @@ type unionOptions struct {
4344
rf theta.ResizeFactor
4445
}
4546

47+
func (o *unionOptions) Validate() error {
48+
if o.lgK < theta.MinLgK {
49+
return fmt.Errorf("lgK must not be less than %d: %d", theta.MinLgK, o.lgK)
50+
}
51+
if o.lgK > theta.MaxLgK {
52+
return fmt.Errorf("lgK must not be greater than %d: %d", theta.MaxLgK, o.lgK)
53+
}
54+
if o.p <= 0 || o.p > 1 {
55+
return errors.New("sampling probability must be between 0 and 1")
56+
}
57+
58+
return nil
59+
}
60+
4661
type UnionOptionFunc func(*unionOptions)
4762

4863
// WithUnionLgK sets log2(k), where k is a nominal number of entries in the union
@@ -89,14 +104,49 @@ func NewUnion[S Summary](policy Policy[S], opts ...UnionOptionFunc) (*Union[S],
89104
opt(options)
90105
}
91106

92-
if options.lgK < theta.MinLgK {
93-
return nil, fmt.Errorf("lgK must not be less than %d: %d", theta.MinLgK, options.lgK)
107+
if err := options.Validate(); err != nil {
108+
return nil, err
94109
}
95-
if options.lgK > theta.MaxLgK {
96-
return nil, fmt.Errorf("lgK must not be greater than %d: %d", theta.MaxLgK, options.lgK)
110+
111+
options.lgCurSize = startingSubMultiple(options.lgK+1, theta.MinLgK, uint8(options.rf))
112+
options.theta = startingThetaFromP(options.p)
113+
114+
table := newHashtable[S](
115+
options.lgCurSize, options.lgK, options.rf, options.p, options.theta, options.seed, true,
116+
)
117+
118+
return &Union[S]{
119+
hashtable: table,
120+
policy: policy,
121+
entryLessFunc: func(a, b entry[S]) int {
122+
if a.Hash < b.Hash {
123+
return -1
124+
} else if a.Hash > b.Hash {
125+
return 1
126+
}
127+
return 0
128+
},
129+
theta: table.theta,
130+
}, nil
131+
}
132+
133+
// NewUnionWithSummaryMergeFunc creates a new union that uses a function to merge summaries.
134+
// This is useful for value-type summaries where Policy.Apply cannot mutate the internal summary.
135+
func NewUnionWithSummaryMergeFunc[S Summary](
136+
applyFunc func(S, S) S, opts ...UnionOptionFunc,
137+
) (*Union[S], error) {
138+
options := &unionOptions{
139+
lgK: theta.DefaultLgK,
140+
rf: theta.DefaultResizeFactor,
141+
p: 1.0,
142+
seed: theta.DefaultSeed,
143+
}
144+
for _, opt := range opts {
145+
opt(options)
97146
}
98-
if options.p <= 0 || options.p > 1 {
99-
return nil, errors.New("sampling probability must be between 0 and 1")
147+
148+
if err := options.Validate(); err != nil {
149+
return nil, err
100150
}
101151

102152
options.lgCurSize = startingSubMultiple(options.lgK+1, theta.MinLgK, uint8(options.rf))
@@ -108,7 +158,7 @@ func NewUnion[S Summary](policy Policy[S], opts ...UnionOptionFunc) (*Union[S],
108158

109159
return &Union[S]{
110160
hashtable: table,
111-
policy: policy,
161+
applyFunc: applyFunc,
112162
entryLessFunc: func(a, b entry[S]) int {
113163
if a.Hash < b.Hash {
114164
return -1
@@ -156,7 +206,11 @@ func (u *Union[S]) Update(sketch Sketch[S]) error {
156206
return err
157207
}
158208

159-
u.policy.Apply(u.hashtable.entries[index].Summary, summary)
209+
if u.applyFunc != nil {
210+
u.hashtable.entries[index].Summary = u.applyFunc(u.hashtable.entries[index].Summary, summary)
211+
} else {
212+
u.policy.Apply(u.hashtable.entries[index].Summary, summary)
213+
}
160214
} else {
161215
// For ordered sketches, we can break early
162216
if sketch.IsOrdered() {

0 commit comments

Comments
 (0)