Skip to content

Commit 021bae7

Browse files
author
var-nan
committed
TWA code refactor
1 parent ae2268f commit 021bae7

1 file changed

Lines changed: 29 additions & 39 deletions

File tree

src/types/redis_timeseries.cc

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -101,33 +101,23 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
101101
}
102102
return 0;
103103
};
104-
/// Computes area of polygon from start of the current bucket to the first sample of the current span.
105-
/// Total Area = Area of bottom rectangle + Area of above triangle.
106-
auto front_area = [](uint64_t bucket_left, const TSSample &prev, const TSSample &curr) {
107-
auto x = static_cast<double>(bucket_left - prev.ts); // Distance from
108-
auto y = static_cast<double>(curr.ts - prev.ts);
109-
auto z = curr.v - prev.v;
110-
auto triangle_area = (z * (y - (x * x) / y)) / 2;
111-
auto rect_area = static_cast<double>(y - x) * prev.v;
112-
return triangle_area + rect_area;
113-
};
114-
/// Computes area of polygon from the last sample of the current span to the end of current bucket.
115-
/// Total Area = Area of bottom rectangle + Area of above triangle.
116-
auto end_area = [](uint64_t bucket_right, const TSSample &curr, const TSSample &next) {
117-
auto x = static_cast<double>(bucket_right - curr.ts);
118-
auto y = static_cast<double>(next.ts - curr.ts);
119-
auto z = next.v - curr.v;
120-
auto rect_area = x * curr.v;
121-
auto triangle_area = (x * x * z) / (2 * y);
122-
return triangle_area + rect_area;
104+
// Linear interpolation.
105+
auto interpolate_sample = [](const TSSample &left_nb, uint64_t ts, const TSSample &right_nb) {
106+
auto y_diff = right_nb.v - left_nb.v;
107+
auto x_diff = static_cast<double>(right_nb.ts - left_nb.ts);
108+
auto x_diff_prime = static_cast<double>(ts - left_nb.ts);
109+
auto y_diff_prime = (x_diff_prime * y_diff) / x_diff;
110+
TSSample sample;
111+
sample.ts = ts;
112+
sample.v = y_diff_prime + left_nb.v;
113+
return sample;
123114
};
124115
// Computes the TWA of empty bucket from its neighbor samples.
125-
auto empty_bucket_twa = [&front_area](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right,
126-
const TSSample &right_nb) {
127-
// Area of empty bucket = Area from left_nb to bucket_right - Area from left_nb to bucket_left
128-
auto f_area = front_area(bucket_left, left_nb, right_nb);
129-
auto s_area = front_area(bucket_right, left_nb, right_nb);
130-
return (f_area - s_area) / static_cast<double>(bucket_right - bucket_left);
116+
auto empty_bucket_twa = [&interpolate_sample](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right,
117+
const TSSample &right_nb) {
118+
auto left = interpolate_sample(left_nb, bucket_left, right_nb);
119+
auto right = interpolate_sample(left_nb, bucket_right, right_nb);
120+
return Reducer::Area(std::array<TSSample, 2>{left, right}) / static_cast<double>(bucket_right - bucket_left);
131121
};
132122

133123
// Retrieve prev_sample and next_sample from samples when TWA aggregation.
@@ -151,8 +141,6 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
151141
res = std::move(samples);
152142
return res;
153143
}
154-
// Both prev and next should be available. Total range should be in between the prev and next samples.
155-
assert(prev_sample.ts <= option.start_ts && option.end_ts <= next_sample.ts);
156144

157145
uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / option.aggregator.bucket_duration;
158146
res.reserve(n_buckets_estimate + 1);
@@ -172,11 +160,7 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
172160
TSSample sample;
173161
sample.ts = bucket_left;
174162
if (bucket_left == option.end_ts) { // Calculate last sample.
175-
double y_diff = next_sample.v - prev_sample.v;
176-
auto x_diff = static_cast<double>(next_sample.ts - prev_sample.ts);
177-
auto x_prime_diff = static_cast<double>(option.end_ts - prev_sample.ts);
178-
double y_prime_diff = (x_prime_diff * y_diff) / x_diff;
179-
sample.v = y_prime_diff + prev_sample.v;
163+
sample.v = interpolate_sample(prev_sample, option.end_ts, next_sample).v;
180164
} else {
181165
sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample);
182166
}
@@ -205,9 +189,7 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
205189
TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : prev_sample;
206190
TSSample next = (i != (spans.size() - 1)) ? spans[non_empty_right_bucket_idx(i)].front() : next_sample;
207191
neighbors.emplace_back(prev, next);
208-
assert(spans[i].empty() ||
209-
(neighbors[i].first.ts <= spans[i].front().ts && spans[i].back().ts <= neighbors[i].second.ts));
210-
} // Should follow: neighbors[i].first <= span[i].front() <= span[i].back() <= neighbors[i].second;
192+
}
211193

212194
uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts);
213195
for (size_t i = 0; i < spans.size(); i++) {
@@ -255,12 +237,20 @@ std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> sample
255237
bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left);
256238
bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts);
257239
double area = 0;
258-
area += front_available ? front_area(bucket_left, neighbors[i].first, spans[i].front()) : 0.0;
259-
area += back_available ? end_area(bucket_right, spans[i].back(), neighbors[i].second) : 0.0;
240+
uint64_t l = spans[i].front().ts;
241+
uint64_t r = spans[i].back().ts;
242+
if (front_available) {
243+
TSSample left_sample = interpolate_sample(neighbors[i].first, bucket_left, spans[i].front());
244+
area += Reducer::Area(std::array<TSSample, 2>{left_sample, spans[i].front()});
245+
l = bucket_left;
246+
}
247+
if (back_available) {
248+
TSSample right_sample = interpolate_sample(spans[i].back(), bucket_right, neighbors[i].second);
249+
area += Reducer::Area(std::array<TSSample, 2>{spans[i].back(), right_sample});
250+
r = bucket_right;
251+
}
260252
// Edge case: If single bucket and it contains only one element.
261253
area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0;
262-
uint64_t l = front_available ? bucket_left : spans[i].front().ts;
263-
uint64_t r = back_available ? bucket_right : spans[i].back().ts;
264254
sample.v = (sample.v + area) / std::max(static_cast<double>(r - l), 1.0);
265255
}
266256
} else {

0 commit comments

Comments
 (0)