Skip to content
This repository has been archived by the owner on Mar 18, 2024. It is now read-only.

Commit

Permalink
Fix tests and remove old offset calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshartig committed Jun 5, 2016
1 parent 8348d4c commit a5b7aeb
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 77 deletions.
7 changes: 2 additions & 5 deletions transaction/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,16 @@ func handleIncomingReport(t *tx, rep *lproto.Report) (*lproto.TxMsg_Report, *lpr
//seq starts at 1 so after 1 iteration it'll be at 3
//only the master can terminate a sequence
if numTrips >= config.Iterations {
offset, err := calculateAverageOffset(t.tripTimes, t.offsets)
offset2, latency, _ := calculateAverageOffsetLatency(t.trips)
offset, latency, err := calculateAverageOffsetLatency(t.trips)
fin := &lproto.Fin{}
if err != nil {
fin.Error = err.Error()
llog.Error("error calculating avg offset", kv.Set("err", err))
} else {
fin.Offset = offset
kv["offset"] = offset
kv["offset2"] = offset2
absOff := math.Abs(offset)
absOff2 := math.Abs(offset2)
llog.Info("slave offset", kv, llog.KV{"absOffset": absOff, "absOffset2": absOff2, "latency": latency})
llog.Info("slave offset", kv, llog.KV{"absOffset": absOff, "latency": latency})
if config.Threshold < absOff {
llog.Warn("slave offset is over threshold", kv)
}
Expand Down
47 changes: 4 additions & 43 deletions transaction/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"math"
"sort"
"time"
)

type Trip struct {
Expand All @@ -19,46 +18,8 @@ func (a int64s) Len() int { return len(a) }
func (a int64s) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64s) Less(i, j int) bool { return a[i] < a[j] }

//returns the average milliseconds of the durations
func calculateAverageOffset(tripTimes []time.Duration, offsets []time.Duration) (float64, error) {
if len(tripTimes) == 0 {
return 0, errors.New("finalizing transaction with 0 iterations")
}
if len(tripTimes) != len(offsets) {
return 0, errors.New("finalizing transaction with invalid iterations")
}

maxNanosecs := math.MaxFloat64
//first we need to calculate the 80th percentile of the tripTimes
//we only want to keep track of those and discard the others
if len(tripTimes) > 2 {
sortedTimes := make([]float64, len(tripTimes))
for i, v := range tripTimes {
sortedTimes[i] = float64(v.Nanoseconds())
}
sort.Float64s(sortedTimes)
percentIndex := int64(float64(len(sortedTimes)) * 0.8)
maxNanosecs = sortedTimes[percentIndex]
}
var n float64
var totalTimes float64
var totalOffsets float64
count := 0.0
for i, v := range tripTimes {
n = float64(v.Nanoseconds())
//only accept this trip if its less than the max allowed time
if n < maxNanosecs {
totalTimes += n / 1000000
totalOffsets += float64(offsets[i]) / 1000000
count++
}
}
//totalTimes is the total of all the RTTs but offset is only affected 1 way
//so divide RTT by 2 to get one-way time
return (totalOffsets + (totalTimes / 2)) / count, nil
}

//returns the average milliseconds of the durations
// returns the average milliseconds of the durations
// and also includes the average latency in milliseconds
func calculateAverageOffsetLatency(trips []Trip) (float64, float64, error) {
if len(trips) == 0 {
return 0, 0, errors.New("finalizing transaction with 0 trips")
Expand All @@ -74,7 +35,7 @@ func calculateAverageOffsetLatency(trips []Trip) (float64, float64, error) {
}
sort.Sort(sortedTimes)
// make sure we have at most the length of the trips - 1 and at least 3
percentIndex := int64(math.Min(float64(len(trips)) - 1, math.Max(3, (float64(len(sortedTimes)) * 0.6))))
percentIndex := int64(math.Min(float64(len(trips))-1, math.Max(3, (float64(len(sortedTimes))*0.6))))
maxNanosecs = sortedTimes[percentIndex]
}
var latency float64
Expand All @@ -91,7 +52,7 @@ func calculateAverageOffsetLatency(trips []Trip) (float64, float64, error) {
latency = float64(t.RTT) / 2.0
// the Slave will have the opposite offset (since it's subtracting
// its time minus the master's) so we negate it here
// devide by 2 to get the average
// divide by 2 to get the average
offset = ((float64(t.MasterDiff) + latency) - (float64(t.SlaveDiff) + latency)) / 2.0
totalOffsets += float64(offset) / 1e6
totalLatency += latency / 1e6
Expand Down
81 changes: 52 additions & 29 deletions transaction/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,73 @@ package transaction

import (
. "testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestCalcAvg1(t *T) {
d := []time.Duration{
time.Duration(2) * time.Millisecond,
trips := []Trip{
// offset: 10 latency: 1
Trip{
MasterDiff: 9000000,
SlaveDiff: -11000000,
RTT: 2000000,
},
}
o := []time.Duration{
1000000,
}
a, err := calculateAverageOffset(d, o)
o, l, err := calculateAverageOffsetLatency(trips)
assert.Nil(t, err)
assert.Equal(t, float64(2), a)
assert.Equal(t, float64(10), o)
assert.Equal(t, float64(1), l)
}

func TestCalcAvg2(t *T) {
d := []time.Duration{
time.Duration(1) * time.Millisecond,
time.Duration(1) * time.Millisecond,
}
o := []time.Duration{
1000000,
1000000,
trips := []Trip{
// offset: 10 latency: 1
Trip{
MasterDiff: 9000000,
SlaveDiff: -11000000,
RTT: 2000000,
},

// offset: 20 latency: 1
Trip{
MasterDiff: 19000000,
SlaveDiff: -21000000,
RTT: 2000000,
},
}
a, err := calculateAverageOffset(d, o)
o, l, err := calculateAverageOffsetLatency(trips)
assert.Nil(t, err)
assert.Equal(t, float64(1.5), a)
assert.Equal(t, float64(15), o)
assert.Equal(t, float64(1), l)
}

func TestCalcAvg3(t *T) {
//the outlier (2) should be eliminated so this should match TestCalcAvg2
d := []time.Duration{
time.Duration(1) * time.Millisecond,
time.Duration(1) * time.Millisecond,
time.Duration(2) * time.Millisecond,
}
o := []time.Duration{
1000000,
1000000,
9000000,
trips := []Trip{
// offset: 10 latency: 1
Trip{
MasterDiff: 9000000,
SlaveDiff: -11000000,
RTT: 2000000,
},

// offset: 20 latency: 1
Trip{
MasterDiff: 19000000,
SlaveDiff: -21000000,
RTT: 2000000,
},

// offset: 40 latency: 2
// this should be eliminated
Trip{
MasterDiff: 38000000,
SlaveDiff: -32000000,
RTT: 4000000,
},
}
a, err := calculateAverageOffset(d, o)
o, l, err := calculateAverageOffsetLatency(trips)
assert.Nil(t, err)
assert.Equal(t, float64(1.5), a)
assert.Equal(t, float64(15), o)
assert.Equal(t, float64(1), l)
}

0 comments on commit a5b7aeb

Please sign in to comment.