Skip to content

Commit 93014af

Browse files
committed
chunked fetch all data from graphite_tree in tagger
1 parent 8027c2c commit 93014af

File tree

1 file changed

+57
-33
lines changed

1 file changed

+57
-33
lines changed

tagger/tagger.go

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"github.com/lomik/zapwriter"
2020
)
2121

22+
const SelectChunksCount = 10
23+
2224
func unsafeString(b []byte) string {
2325
return *(*string)(unsafe.Pointer(&b))
2426
}
@@ -99,59 +101,81 @@ func Make(cfg *config.Config) error {
99101

100102
// Read clickhouse
101103
begin("read and parse tree")
102-
var body []byte
104+
// bodies := make([][]byte, 0)
105+
106+
var bodies [][]byte
103107

104108
if cfg.Tags.InputFile != "" {
105-
body, err = ioutil.ReadFile(cfg.Tags.InputFile)
109+
body, err := ioutil.ReadFile(cfg.Tags.InputFile)
106110
if err != nil {
107111
return err
108112
}
113+
bodies = [][]byte{body}
109114
} else {
110-
body, err = clickhouse.Query(
111-
context.WithValue(context.Background(), "logger", logger),
112-
cfg.ClickHouse.Url,
113-
fmt.Sprintf("SELECT Path FROM %s GROUP BY Path HAVING argMax(Deleted, Version)==0 FORMAT RowBinary", cfg.ClickHouse.TreeTable),
114-
cfg.ClickHouse.TreeTimeout.Value(),
115-
)
116-
}
115+
bodies = make([][]byte, SelectChunksCount)
116+
for i := 0; i < SelectChunksCount; i++ {
117+
bodies[i], err = clickhouse.Query(
118+
context.WithValue(context.Background(), "logger", logger),
119+
cfg.ClickHouse.Url,
120+
fmt.Sprintf(
121+
"SELECT Path FROM %s WHERE cityHash64(Path) %% %d == %d GROUP BY Path HAVING argMax(Deleted, Version)==0 FORMAT RowBinary",
122+
cfg.ClickHouse.TreeTable,
123+
SelectChunksCount,
124+
i,
125+
),
126+
cfg.ClickHouse.TreeTimeout.Value(),
127+
)
128+
}
117129

118-
if err != nil {
119-
return err
130+
if err != nil {
131+
return err
132+
}
120133
}
121134

122-
count, err := countMetrics(body)
123-
if err != nil {
124-
return err
135+
var count int
136+
137+
for i := 0; i < len(bodies); i++ {
138+
c, err := countMetrics(bodies[i])
139+
if err != nil {
140+
return err
141+
}
142+
count += c
125143
}
126144

127145
metricList := make([]Metric, count)
128146

129-
var namelen uint64
130-
bodyLen := len(body)
131-
var offset, readBytes int
147+
index := 0
148+
132149
var maxLevel int
133150

134-
for index := 0; ; index++ {
135-
if offset >= bodyLen {
136-
if offset == bodyLen {
137-
break
151+
for i := 0; i < len(bodies); i++ {
152+
body := bodies[i]
153+
var namelen uint64
154+
bodyLen := len(body)
155+
var offset, readBytes int
156+
157+
for ; ; index++ {
158+
if offset >= bodyLen {
159+
if offset == bodyLen {
160+
break
161+
}
162+
return clickhouse.ErrClickHouseResponse
138163
}
139-
return clickhouse.ErrClickHouseResponse
140-
}
141164

142-
namelen, readBytes, err = clickhouse.ReadUvarint(body[offset:])
143-
if err != nil {
144-
return err
145-
}
165+
namelen, readBytes, err = clickhouse.ReadUvarint(body[offset:])
166+
if err != nil {
167+
return err
168+
}
146169

147-
metricList[index].Path = body[offset+readBytes : offset+readBytes+int(namelen)]
148-
metricList[index].Level = pathLevel(metricList[index].Path)
170+
metricList[index].Path = body[offset+readBytes : offset+readBytes+int(namelen)]
171+
metricList[index].Level = pathLevel(metricList[index].Path)
149172

150-
if metricList[index].Level > maxLevel {
151-
maxLevel = metricList[index].Level
152-
}
173+
if metricList[index].Level > maxLevel {
174+
maxLevel = metricList[index].Level
175+
}
153176

154-
offset += readBytes + int(namelen)
177+
offset += readBytes + int(namelen)
178+
}
155179
}
156180
end()
157181

0 commit comments

Comments
 (0)