Skip to content

Commit abbbe47

Browse files
authored
Merge pull request #2 from lomik/rollup_until
Rollup until
2 parents f3cc98d + 2231a4e commit abbbe47

File tree

3 files changed

+90
-9
lines changed

3 files changed

+90
-9
lines changed

helper/rollup/rollup.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,18 @@ func (r *Rollup) Match(metric string) *Pattern {
135135
return r.Default
136136
}
137137

138+
func (r *Rollup) Step(metric string, from int32) int32 {
139+
pattern := r.Match(metric)
140+
now := int32(time.Now().Unix())
141+
142+
for i := range pattern.Retention {
143+
if i == len(pattern.Retention)-1 || from > now-pattern.Retention[i+1].Age {
144+
return pattern.Retention[i].Precision
145+
}
146+
}
147+
return pattern.Retention[len(pattern.Retention)-1].Precision
148+
}
149+
138150
func doMetricPrecision(points []point.Point, precision int32, aggr func([]point.Point) float64) []point.Point {
139151
l := len(points)
140152
var i, n int

helper/rollup/rollup_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package rollup
22

33
import (
4+
"fmt"
45
"testing"
6+
"time"
57

68
"github.com/lomik/graphite-clickhouse/helper/point"
79
)
@@ -73,3 +75,63 @@ func TestMetricPrecision(t *testing.T) {
7375
point.AssertListEq(t, test[1], result)
7476
}
7577
}
78+
79+
func TestMetricStep(t *testing.T) {
80+
config := `
81+
<graphite_rollup>
82+
<pattern>
83+
<regexp>^metric\.</regexp>
84+
<function>any</function>
85+
<retention>
86+
<age>0</age>
87+
<precision>1</precision>
88+
</retention>
89+
<retention>
90+
<age>3600</age>
91+
<precision>10</precision>
92+
</retention>
93+
</pattern>
94+
<default>
95+
<function>max</function>
96+
<retention>
97+
<age>0</age>
98+
<precision>60</precision>
99+
</retention>
100+
<retention>
101+
<age>3600</age>
102+
<precision>300</precision>
103+
</retention>
104+
<retention>
105+
<age>86400</age>
106+
<precision>3600</precision>
107+
</retention>
108+
</default>
109+
</graphite_rollup>
110+
`
111+
r, err := ParseXML([]byte(config))
112+
if err != nil {
113+
t.Fatal(err)
114+
}
115+
now := int32(time.Now().Unix())
116+
117+
tests := []struct {
118+
name string
119+
from int32
120+
expectedStep int32
121+
}{
122+
{"metric.foo.first-retention", now - 500, 1},
123+
{"metric.foo.second-retention", now - 3600, 10},
124+
{"foo.bar.default-first-retention", now - 500, 60},
125+
{"foo.bar.default-second-retention", now - 3700, 300},
126+
{"foo.bar.default-last-retention", now - 87000, 3600},
127+
}
128+
129+
for _, test := range tests {
130+
t.Run(fmt.Sprintf("metric=%v (from=now-%v)", test.name, now-test.from), func(t *testing.T) {
131+
step := r.Step(test.name, test.from)
132+
if step != test.expectedStep {
133+
t.Fatalf("metric=%v (from=now-%v), expected step=%v, actual step=%v", test.name, now-test.from, test.expectedStep, step)
134+
}
135+
})
136+
}
137+
}

render/handler.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6969
return
7070
}
7171

72-
dateWhere := fmt.Sprintf(
73-
"(Date >='%s' AND Date <= '%s' AND Time >= %d AND Time <= %d)",
74-
time.Unix(fromTimestamp, 0).Format("2006-01-02"),
75-
time.Unix(untilTimestamp, 0).Format("2006-01-02"),
76-
fromTimestamp,
77-
untilTimestamp,
78-
)
79-
8072
var pathWhere string
8173

74+
maxStep := int32(0)
8275
if find.HasWildcard(target) {
8376
// Search in small index table first
8477
treeWhere := find.MakeWhere(target, true)
@@ -105,6 +98,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
10598
if p == "" {
10699
continue
107100
}
101+
step := h.config.Rollup.Step(p, int32(fromTimestamp))
102+
if step > maxStep {
103+
maxStep = step
104+
}
108105

109106
if !first {
110107
listBuf.Write([]byte{','})
@@ -131,8 +128,18 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
131128
// pathWhere = makeWhere(target, false)
132129
} else {
133130
pathWhere = fmt.Sprintf("Path = '%s'", target)
131+
maxStep = h.config.Rollup.Step(target, int32(fromTimestamp))
134132
}
135133

134+
until := untilTimestamp - untilTimestamp % int64(maxStep) + int64(maxStep) - 1
135+
dateWhere := fmt.Sprintf(
136+
"(Date >='%s' AND Date <= '%s' AND Time >= %d AND Time <= %d)",
137+
time.Unix(fromTimestamp, 0).Format("2006-01-02"),
138+
time.Unix(untilTimestamp, 0).Format("2006-01-02"),
139+
fromTimestamp,
140+
until,
141+
)
142+
136143
// @TODO: change format to RowBinary
137144
query := fmt.Sprintf(
138145
`
@@ -325,7 +332,7 @@ func (h *Handler) ReplyProtobuf(w http.ResponseWriter, r *http.Request, points [
325332

326333
var index int32
327334
// skip points before start
328-
for index = 0; points[index].Time < start; index++ {
335+
for index = 0; index < int32(len(points)) && points[index].Time < start; index++ {
329336
}
330337

331338
for i := int32(0); i < count; i++ {

0 commit comments

Comments
 (0)