Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions structure/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package structure // import "github.com/lightstep/go-expohisto/structure"

import (
"encoding/binary"
"fmt"
"math"

"github.com/lightstep/go-expohisto/mapping"
"github.com/lightstep/go-expohisto/mapping/exponent"
Expand Down Expand Up @@ -138,6 +140,12 @@ type (
countAt(pos uint32) uint64
// reset resets all buckets to zero count.
reset()
// appendBinary appends binary representation to a
// given slice.
appendBinary([]byte) []byte
// unmarshalBinary decodes the binary representation into
// the backing bucket.
unmarshalBinary([]byte) int
}

// highLow is used to establish the maximum range of bucket
Expand Down Expand Up @@ -165,6 +173,70 @@ func (h *Histogram[N]) Init(cfg Config) {
h.mapping = m
}

// AppendBinary implements the binary appender interface for aggregation.Histogram.
func (h *Histogram[N]) AppendBinary(buf []byte) []byte {
buf = binary.BigEndian.AppendUint32(buf, uint32(h.maxSize))
buf = binary.BigEndian.AppendUint64(buf, h.count)
buf = binary.BigEndian.AppendUint64(buf, h.zeroCount)
switch any(h.sum).(type) {
case int64:
buf = binary.BigEndian.AppendUint64(buf, uint64(h.sum))
buf = binary.BigEndian.AppendUint64(buf, uint64(h.min))
buf = binary.BigEndian.AppendUint64(buf, uint64(h.max))
case float64:
buf = binary.BigEndian.AppendUint64(buf, math.Float64bits(float64(h.sum)))
buf = binary.BigEndian.AppendUint64(buf, math.Float64bits(float64(h.min)))
buf = binary.BigEndian.AppendUint64(buf, math.Float64bits(float64(h.max)))
}
buf = binary.BigEndian.AppendUint32(buf, uint32(h.mapping.Scale()))
buf = h.positive.AppendBinary(buf)
buf = h.negative.AppendBinary(buf)
return buf
}

func (h *Histogram[N]) UnmarshalBinary(buf []byte) (err error) {
h.maxSize = int32(binary.BigEndian.Uint32(buf[:4]))
buf = buf[4:]

h.count = binary.BigEndian.Uint64(buf[:8])
buf = buf[8:]

h.zeroCount = binary.BigEndian.Uint64(buf[:8])
buf = buf[8:]

switch any(h.sum).(type) {
case int64:
h.sum = N(binary.BigEndian.Uint64(buf[:8]))
buf = buf[8:]

h.min = N(binary.BigEndian.Uint64(buf[:8]))
buf = buf[8:]

h.max = N(binary.BigEndian.Uint64(buf[:8]))
buf = buf[8:]
case float64:
h.sum = N(math.Float64frombits(binary.BigEndian.Uint64(buf[:8])))
buf = buf[8:]

h.min = N(math.Float64frombits(binary.BigEndian.Uint64(buf[:8])))
buf = buf[8:]

h.max = N(math.Float64frombits(binary.BigEndian.Uint64(buf[:8])))
buf = buf[8:]
}
scale := int32(binary.BigEndian.Uint32(buf[:4]))
buf = buf[4:]

h.mapping, err = newMapping(int32(scale))
if err != nil {
return fmt.Errorf("failed to create mapping for histogram: %w", err)
}

offset := h.positive.UnmarshalBinary(buf)
h.negative.UnmarshalBinary(buf[offset:])
return nil
}

// Sum implements aggregation.Histogram.
func (h *Histogram[N]) Sum() N {
return h.sum
Expand Down Expand Up @@ -209,6 +281,67 @@ func (h *Histogram[N]) Negative() *Buckets {
return &h.negative
}

func (b *Buckets) AppendBinary(buf []byte) []byte {
buf = binary.BigEndian.AppendUint32(buf, uint32(b.indexBase))
buf = binary.BigEndian.AppendUint32(buf, uint32(b.indexStart))
buf = binary.BigEndian.AppendUint32(buf, uint32(b.indexEnd))
// Append backing array width type
var widthType uint8
switch b.backing.(type) {
case *bucketsVarwidth[uint8]:
widthType = 1
case *bucketsVarwidth[uint16]:
widthType = 2
case *bucketsVarwidth[uint32]:
widthType = 3
case *bucketsVarwidth[uint64]:
widthType = 4
}
buf = append(buf, widthType)
if b.backing != nil {
buf = b.backing.appendBinary(buf)
}
return buf
}

func (b *Buckets) UnmarshalBinary(buf []byte) (offset int) {
origLen := len(buf)
b.indexBase = int32(binary.BigEndian.Uint32(buf[:4]))
buf = buf[4:]

b.indexStart = int32(binary.BigEndian.Uint32(buf[:4]))
buf = buf[4:]

b.indexEnd = int32(binary.BigEndian.Uint32(buf[:4]))
buf = buf[4:]
// Append backing array width type
widthType := uint8(buf[0])
buf = buf[1:]
switch widthType {
case 0:
// backing is nil
return origLen - len(buf)
case 1:
b.backing = &bucketsVarwidth[uint8]{
counts: []uint8{},
}
case 2:
b.backing = &bucketsVarwidth[uint16]{
counts: []uint16{},
}
case 3:
b.backing = &bucketsVarwidth[uint32]{
counts: []uint32{},
}
case 4:
b.backing = &bucketsVarwidth[uint64]{
counts: []uint64{},
}
}
buf = buf[b.backing.unmarshalBinary(buf):]
return origLen - len(buf)
}

// Offset implements aggregation.Bucket.
func (b *Buckets) Offset() int32 {
return b.indexStart
Expand Down Expand Up @@ -690,6 +823,32 @@ func (b *bucketsVarwidth[N]) tryIncrement(bucketIndex int32, incr uint64) bool {
return false
}

func (b *bucketsVarwidth[N]) appendBinary(buf []byte) []byte {
if b == nil {
return buf
}
buf = binary.BigEndian.AppendUint32(buf, uint32(b.size()))
for _, c := range b.counts {
buf = binary.BigEndian.AppendUint64(buf, uint64(c))
}
return buf
}

func (b *bucketsVarwidth[N]) unmarshalBinary(buf []byte) int {
if len(buf) == 0 {
return 0
}
originalLen := len(buf)
countsLen := int32(binary.BigEndian.Uint32(buf[:4]))
buf = buf[4:]
b.counts = make([]N, countsLen)
for i := range b.counts {
b.counts[i] = N(binary.BigEndian.Uint64(buf[:8]))
buf = buf[8:]
}
return originalLen - len(buf)
}

func widenBuckets[From, To bucketsCount](in *bucketsVarwidth[From]) *bucketsVarwidth[To] {
tmp := make([]To, len(in.counts))
for i := range in.counts {
Expand Down
17 changes: 17 additions & 0 deletions structure/exponential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,23 @@ func TestAscendingSequence(t *testing.T) {
}
}

func TestCodec(t *testing.T) {
src := rand.NewSource(8584838584)
rnd := rand.New(src)
expected := NewFloat64(NewConfig(WithMaxSize(1024)))

totalCount := 512 + rand.Intn(512)
for i := 0; i < totalCount; i++ {
expected.Update(rnd.ExpFloat64())
expected.Update(-1 * rnd.ExpFloat64())
}

buf := expected.AppendBinary(nil)
actual := NewFloat64(NewConfig())
require.NoError(t, actual.UnmarshalBinary(buf))
requireEqual(t, expected, actual)
}

func testAscendingSequence(t *testing.T, maxSize, offset, initScale int32) {
for step := maxSize; step < 4*maxSize; step++ {
agg := NewFloat64(NewConfig(WithMaxSize(maxSize)))
Expand Down