From a5b7aebaba80b834ec3aaf3661bb9173d8832863 Mon Sep 17 00:00:00 2001 From: James Hartig Date: Sat, 4 Jun 2016 22:08:17 -0400 Subject: [PATCH] Fix tests and remove old offset calculation --- transaction/incoming.go | 7 +--- transaction/util.go | 47 ++--------------------- transaction/util_test.go | 81 ++++++++++++++++++++++++++-------------- 3 files changed, 58 insertions(+), 77 deletions(-) diff --git a/transaction/incoming.go b/transaction/incoming.go index 07b981d..2eed292 100644 --- a/transaction/incoming.go +++ b/transaction/incoming.go @@ -171,8 +171,7 @@ func handleIncomingReport(t *tx, rep *lproto.Report) (*lproto.TxMsg_Report, *lpr //seq starts at 1 so after 1 iteration it'll be at 3 //only the master can terminate a sequence if numTrips >= config.Iterations { - offset, err := calculateAverageOffset(t.tripTimes, t.offsets) - offset2, latency, _ := calculateAverageOffsetLatency(t.trips) + offset, latency, err := calculateAverageOffsetLatency(t.trips) fin := &lproto.Fin{} if err != nil { fin.Error = err.Error() @@ -180,10 +179,8 @@ func handleIncomingReport(t *tx, rep *lproto.Report) (*lproto.TxMsg_Report, *lpr } else { fin.Offset = offset kv["offset"] = offset - kv["offset2"] = offset2 absOff := math.Abs(offset) - absOff2 := math.Abs(offset2) - llog.Info("slave offset", kv, llog.KV{"absOffset": absOff, "absOffset2": absOff2, "latency": latency}) + llog.Info("slave offset", kv, llog.KV{"absOffset": absOff, "latency": latency}) if config.Threshold < absOff { llog.Warn("slave offset is over threshold", kv) } diff --git a/transaction/util.go b/transaction/util.go index 9df4187..3c85abd 100644 --- a/transaction/util.go +++ b/transaction/util.go @@ -4,7 +4,6 @@ import ( "errors" "math" "sort" - "time" ) type Trip struct { @@ -19,46 +18,8 @@ func (a int64s) Len() int { return len(a) } func (a int64s) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a int64s) Less(i, j int) bool { return a[i] < a[j] } -//returns the average milliseconds of the durations -func calculateAverageOffset(tripTimes []time.Duration, offsets []time.Duration) (float64, error) { - if len(tripTimes) == 0 { - return 0, errors.New("finalizing transaction with 0 iterations") - } - if len(tripTimes) != len(offsets) { - return 0, errors.New("finalizing transaction with invalid iterations") - } - - maxNanosecs := math.MaxFloat64 - //first we need to calculate the 80th percentile of the tripTimes - //we only want to keep track of those and discard the others - if len(tripTimes) > 2 { - sortedTimes := make([]float64, len(tripTimes)) - for i, v := range tripTimes { - sortedTimes[i] = float64(v.Nanoseconds()) - } - sort.Float64s(sortedTimes) - percentIndex := int64(float64(len(sortedTimes)) * 0.8) - maxNanosecs = sortedTimes[percentIndex] - } - var n float64 - var totalTimes float64 - var totalOffsets float64 - count := 0.0 - for i, v := range tripTimes { - n = float64(v.Nanoseconds()) - //only accept this trip if its less than the max allowed time - if n < maxNanosecs { - totalTimes += n / 1000000 - totalOffsets += float64(offsets[i]) / 1000000 - count++ - } - } - //totalTimes is the total of all the RTTs but offset is only affected 1 way - //so divide RTT by 2 to get one-way time - return (totalOffsets + (totalTimes / 2)) / count, nil -} - -//returns the average milliseconds of the durations +// returns the average milliseconds of the durations +// and also includes the average latency in milliseconds func calculateAverageOffsetLatency(trips []Trip) (float64, float64, error) { if len(trips) == 0 { return 0, 0, errors.New("finalizing transaction with 0 trips") @@ -74,7 +35,7 @@ func calculateAverageOffsetLatency(trips []Trip) (float64, float64, error) { } sort.Sort(sortedTimes) // make sure we have at most the length of the trips - 1 and at least 3 - percentIndex := int64(math.Min(float64(len(trips)) - 1, math.Max(3, (float64(len(sortedTimes)) * 0.6)))) + percentIndex := int64(math.Min(float64(len(trips))-1, math.Max(3, (float64(len(sortedTimes))*0.6)))) maxNanosecs = sortedTimes[percentIndex] } var latency float64 @@ -91,7 +52,7 @@ func calculateAverageOffsetLatency(trips []Trip) (float64, float64, error) { latency = float64(t.RTT) / 2.0 // the Slave will have the opposite offset (since it's subtracting // its time minus the master's) so we negate it here - // devide by 2 to get the average + // divide by 2 to get the average offset = ((float64(t.MasterDiff) + latency) - (float64(t.SlaveDiff) + latency)) / 2.0 totalOffsets += float64(offset) / 1e6 totalLatency += latency / 1e6 diff --git a/transaction/util_test.go b/transaction/util_test.go index 82fb0b4..436256b 100644 --- a/transaction/util_test.go +++ b/transaction/util_test.go @@ -2,50 +2,73 @@ package transaction import ( . "testing" - "time" "github.com/stretchr/testify/assert" ) func TestCalcAvg1(t *T) { - d := []time.Duration{ - time.Duration(2) * time.Millisecond, + trips := []Trip{ + // offset: 10 latency: 1 + Trip{ + MasterDiff: 9000000, + SlaveDiff: -11000000, + RTT: 2000000, + }, } - o := []time.Duration{ - 1000000, - } - a, err := calculateAverageOffset(d, o) + o, l, err := calculateAverageOffsetLatency(trips) assert.Nil(t, err) - assert.Equal(t, float64(2), a) + assert.Equal(t, float64(10), o) + assert.Equal(t, float64(1), l) } func TestCalcAvg2(t *T) { - d := []time.Duration{ - time.Duration(1) * time.Millisecond, - time.Duration(1) * time.Millisecond, - } - o := []time.Duration{ - 1000000, - 1000000, + trips := []Trip{ + // offset: 10 latency: 1 + Trip{ + MasterDiff: 9000000, + SlaveDiff: -11000000, + RTT: 2000000, + }, + + // offset: 20 latency: 1 + Trip{ + MasterDiff: 19000000, + SlaveDiff: -21000000, + RTT: 2000000, + }, } - a, err := calculateAverageOffset(d, o) + o, l, err := calculateAverageOffsetLatency(trips) assert.Nil(t, err) - assert.Equal(t, float64(1.5), a) + assert.Equal(t, float64(15), o) + assert.Equal(t, float64(1), l) } func TestCalcAvg3(t *T) { - //the outlier (2) should be eliminated so this should match TestCalcAvg2 - d := []time.Duration{ - time.Duration(1) * time.Millisecond, - time.Duration(1) * time.Millisecond, - time.Duration(2) * time.Millisecond, - } - o := []time.Duration{ - 1000000, - 1000000, - 9000000, + trips := []Trip{ + // offset: 10 latency: 1 + Trip{ + MasterDiff: 9000000, + SlaveDiff: -11000000, + RTT: 2000000, + }, + + // offset: 20 latency: 1 + Trip{ + MasterDiff: 19000000, + SlaveDiff: -21000000, + RTT: 2000000, + }, + + // offset: 40 latency: 2 + // this should be eliminated + Trip{ + MasterDiff: 38000000, + SlaveDiff: -32000000, + RTT: 4000000, + }, } - a, err := calculateAverageOffset(d, o) + o, l, err := calculateAverageOffsetLatency(trips) assert.Nil(t, err) - assert.Equal(t, float64(1.5), a) + assert.Equal(t, float64(15), o) + assert.Equal(t, float64(1), l) }