Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit a036c09

Browse files
authored
Merge pull request #398 from raintank/issue-397
Issue #397
2 parents 3b59d4b + a5a23f9 commit a036c09

File tree

5 files changed

+201
-34
lines changed

5 files changed

+201
-34
lines changed

idx/cassandra/cassandra.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,15 @@ func (c *CasIdx) Stop() {
185185
c.session.Close()
186186
}
187187

188-
func (c *CasIdx) Add(data *schema.MetricData) {
188+
func (c *CasIdx) Add(data *schema.MetricData) error {
189189
existing, err := c.MemoryIdx.Get(data.Id)
190190
inMemory := true
191191
if err != nil {
192192
if err == idx.DefNotFound {
193193
inMemory = false
194194
} else {
195195
log.Error(3, "cassandra-idx Failed to query Memory Index for %s. %s", data.Id, err)
196-
return
196+
return err
197197
}
198198
}
199199
if inMemory {
@@ -204,11 +204,14 @@ func (c *CasIdx) Add(data *schema.MetricData) {
204204
c.MemoryIdx.AddDef(&existing)
205205
c.writeQueue <- writeReq{recvTime: time.Now(), def: &existing}
206206
}
207-
return
207+
return nil
208208
}
209209
def := schema.MetricDefinitionFromMetricData(data)
210-
c.MemoryIdx.AddDef(def)
211-
c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
210+
err = c.MemoryIdx.AddDef(def)
211+
if err == nil {
212+
c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
213+
}
214+
return err
212215
}
213216

214217
func (c *CasIdx) rebuildIndex() {

idx/elasticsearch/elasticsearch.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -214,29 +214,32 @@ func (e *EsIdx) Init(stats met.Backend) error {
214214
return nil
215215
}
216216

217-
func (e *EsIdx) Add(data *schema.MetricData) {
217+
func (e *EsIdx) Add(data *schema.MetricData) error {
218218
existing, err := e.MemoryIdx.Get(data.Id)
219219
inMemory := true
220220
if err != nil {
221221
if err == idx.DefNotFound {
222222
inMemory = false
223223
} else {
224224
log.Error(3, "Failed to query Memory Index for %s. %s", data.Id, err)
225-
return
225+
return err
226226
}
227227
}
228228
if inMemory {
229229
log.Debug("def already seen before. Just updating memory Index")
230230
existing.LastUpdate = data.Time
231231
e.MemoryIdx.AddDef(&existing)
232-
return
232+
return nil
233233
}
234234
def := schema.MetricDefinitionFromMetricData(data)
235-
e.MemoryIdx.AddDef(def)
236-
if err := e.BulkIndexer.Index(esIndex, "metric_index", def.Id, "", "", nil, def); err != nil {
237-
log.Error(3, "Failed to add metricDef to BulkIndexer queue. %s", err)
238-
e.retryBuf.Queue(def.Id)
235+
err = e.MemoryIdx.AddDef(def)
236+
if err == nil {
237+
if err = e.BulkIndexer.Index(esIndex, "metric_index", def.Id, "", "", nil, def); err != nil {
238+
log.Error(3, "Failed to add metricDef to BulkIndexer queue. %s", err)
239+
e.retryBuf.Queue(def.Id)
240+
}
239241
}
242+
return err
240243
}
241244

242245
func (e *EsIdx) bulkSend(buf *bytes.Buffer) error {

idx/idx.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import (
1111
)
1212

1313
var (
14-
DefNotFound = errors.New("MetricDef not found")
14+
DefNotFound = errors.New("MetricDef not found")
15+
BothBranchAndLeaf = errors.New("node can't be both branch and leaf")
16+
BranchUnderLeaf = errors.New("can't add branch under leaf")
1517
)
1618

1719
type Node struct {
@@ -46,7 +48,7 @@ Interface
4648
* Stop():
4749
This will be called when metrictank is shutting down.
4850
49-
* Add(*schema.MetricData):
51+
* Add(*schema.MetricData) error:
5052
Every metric received will result in a call to this method to ensure the
5153
metric has been added to the index.
5254
@@ -83,7 +85,7 @@ Interface
8385
type MetricIndex interface {
8486
Init(met.Backend) error
8587
Stop()
86-
Add(*schema.MetricData)
88+
Add(*schema.MetricData) error
8789
Get(string) (schema.MetricDefinition, error)
8890
Delete(int, string) ([]schema.MetricDefinition, error)
8991
Find(int, string, int64) ([]Node, error)

idx/memory/memory.go

+51-19
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func ConfigSetup() {
3434
}
3535

3636
type Tree struct {
37-
Items map[string]*Node
37+
Items map[string]*Node // key is the full path of the node.
3838
}
3939

4040
type Node struct {
@@ -54,14 +54,16 @@ func (n *Node) String() string {
5454
// Implements the the "MetricIndex" interface
5555
type MemoryIdx struct {
5656
sync.RWMutex
57-
DefById map[string]*schema.MetricDefinition
58-
Tree map[int]*Tree
57+
FailedDefs map[string]error
58+
DefById map[string]*schema.MetricDefinition
59+
Tree map[int]*Tree
5960
}
6061

6162
func New() *MemoryIdx {
6263
return &MemoryIdx{
63-
DefById: make(map[string]*schema.MetricDefinition),
64-
Tree: make(map[int]*Tree),
64+
FailedDefs: make(map[string]error),
65+
DefById: make(map[string]*schema.MetricDefinition),
66+
Tree: make(map[int]*Tree),
6567
}
6668
}
6769

@@ -80,22 +82,32 @@ func (m *MemoryIdx) Stop() {
8082
return
8183
}
8284

83-
func (m *MemoryIdx) Add(data *schema.MetricData) {
85+
func (m *MemoryIdx) Add(data *schema.MetricData) error {
8486
pre := time.Now()
8587
m.Lock()
8688
defer m.Unlock()
89+
err, ok := m.FailedDefs[data.Id]
90+
if ok {
91+
// if it failed before, it would fail again.
92+
// there's not much point in doing the work of trying over
93+
// and over again, and flooding the logs with the same failure.
94+
// so just trigger the stats metric as if we tried again
95+
idxFail.Inc(1)
96+
return err
97+
}
8798
existing, ok := m.DefById[data.Id]
8899
if ok {
89100
log.Debug("metricDef with id %s already in index.", data.Id)
90101
existing.LastUpdate = data.Time
91102
idxOk.Inc(1)
92103
idxAddDuration.Value(time.Since(pre))
93-
return
104+
return nil
94105
}
95106

96107
def := schema.MetricDefinitionFromMetricData(data)
97-
m.add(def)
108+
err = m.add(def)
98109
idxAddDuration.Value(time.Since(pre))
110+
return err
99111
}
100112

101113
// Used to rebuild the index from an existing set of metricDefinitions.
@@ -114,7 +126,7 @@ func (m *MemoryIdx) Load(defs []schema.MetricDefinition) {
114126
m.Unlock()
115127
}
116128

117-
func (m *MemoryIdx) AddDef(def *schema.MetricDefinition) {
129+
func (m *MemoryIdx) AddDef(def *schema.MetricDefinition) error {
118130
pre := time.Now()
119131
m.Lock()
120132
defer m.Unlock()
@@ -123,14 +135,14 @@ func (m *MemoryIdx) AddDef(def *schema.MetricDefinition) {
123135
existing.LastUpdate = def.LastUpdate
124136
idxOk.Inc(1)
125137
idxAddDuration.Value(time.Since(pre))
126-
return
138+
return nil
127139
}
128-
m.add(def)
140+
err := m.add(def)
129141
idxAddDuration.Value(time.Since(pre))
142+
return err
130143
}
131144

132-
func (m *MemoryIdx) add(def *schema.MetricDefinition) {
133-
m.DefById[def.Id] = def
145+
func (m *MemoryIdx) add(def *schema.MetricDefinition) error {
134146
path := def.Name
135147
//first check to see if a tree has been created for this OrgId
136148
tree, ok := m.Tree[def.OrgId]
@@ -153,28 +165,36 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) {
153165
if !node.Leaf {
154166
//bad data. A path cant be both a leaf and a branch.
155167
log.Info("memory-idx: Bad data, a path can not be both a leaf and a branch. %d - %s", def.OrgId, path)
168+
m.FailedDefs[def.Id] = idx.BothBranchAndLeaf
156169
idxFail.Inc(1)
157-
return
170+
return idx.BothBranchAndLeaf
158171
}
159172
log.Debug("memory-idx: existing index entry for %s. Adding %s as child", path, def.Id)
160173
node.Children = append(node.Children, def.Id)
174+
m.DefById[def.Id] = def
161175
idxOk.Inc(1)
162-
return
176+
return nil
163177
}
164178
}
165179
// now walk backwards through the node path to find the first branch which exists that
166180
// this path extends.
167181
nodes := strings.Split(path, ".")
168-
startPos := 0
182+
183+
// if we're trying to insert foo.bar.baz.quux then we see if we can insert it under (in this order):
184+
// - foo.bar.baz (if found, startPos is 3)
185+
// - foo.bar (if found, startPos is 2)
186+
// - foo (if found, startPos is 1)
187+
startPos := 0 // the index of the first word that is not part of the prefix
169188
var startNode *Node
170189
if len(nodes) > 1 {
171190
for i := len(nodes) - 1; i > 0; i-- {
172191
branch := strings.Join(nodes[0:i], ".")
173192
if n, ok := tree.Items[branch]; ok {
174193
if n.Leaf {
175194
log.Info("memory-idx: Branches cant be added to a leaf node. %d - %s", def.OrgId, path)
195+
m.FailedDefs[def.Id] = idx.BranchUnderLeaf
176196
idxFail.Inc(1)
177-
return
197+
return idx.BranchUnderLeaf
178198
}
179199
log.Debug("memory-idx: Found branch %s which metricDef %s is a descendant of", branch, path)
180200
startNode = n
@@ -211,8 +231,9 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) {
211231
Path: path,
212232
Children: []string{def.Id},
213233
}
234+
m.DefById[def.Id] = def
214235
idxOk.Inc(1)
215-
return
236+
return nil
216237
}
217238

218239
func (m *MemoryIdx) Get(id string) (schema.MetricDefinition, error) {
@@ -285,6 +306,11 @@ func (m *MemoryIdx) find(orgId int, pattern string) ([]*Node, error) {
285306
}
286307

287308
nodes := strings.Split(pattern, ".")
309+
310+
// pos is the index of the last node we know for sure
311+
// for a query like foo.bar.baz, pos is 2
312+
// for a query like foo.bar.* or foo.bar, pos is 1
313+
// for a query like foo.b*.baz, pos is 0
288314
pos := len(nodes) - 1
289315
for i := 0; i < len(nodes); i++ {
290316
if strings.ContainsAny(nodes[i], "*{}[]?") {
@@ -435,6 +461,12 @@ func (m *MemoryIdx) Delete(orgId int, pattern string) ([]schema.MetricDefinition
435461
if err != nil {
436462
return nil, err
437463
}
464+
465+
// by deleting one or more nodes in the tree, any defs that previously failed may now
466+
// be able to be added. An easy way to support this is just reset this map and give them
467+
// all a chance again
468+
m.FailedDefs = make(map[string]error)
469+
438470
deletedDefs := make([]schema.MetricDefinition, 0)
439471
for _, f := range found {
440472
deleted, err := m.delete(orgId, f)
@@ -469,7 +501,7 @@ func (m *MemoryIdx) delete(orgId int, n *Node) ([]schema.MetricDefinition, error
469501
deletedDefs := make([]schema.MetricDefinition, len(n.Children))
470502
// delete the metricDefs
471503
for i, id := range n.Children {
472-
log.Debug("memory-idx: deleteing %s from index", id)
504+
log.Debug("memory-idx: deleting %s from index", id)
473505
deletedDefs[i] = *m.DefById[id]
474506
delete(m.DefById, id)
475507
}

0 commit comments

Comments
 (0)