Skip to content

Commit 2e6a797

Browse files
authored
Stop appender go funcs on close (#427)
1 parent 8fb665b commit 2e6a797

File tree

4 files changed

+38
-16
lines changed

4 files changed

+38
-16
lines changed

Diff for: pkg/appender/appender.go

+9
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ type MetricsCache struct {
130130

131131
lastError error
132132
performanceReporter *performance.MetricReporter
133+
134+
stopChan chan int
133135
}
134136

135137
func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config.V3ioConfig,
@@ -146,6 +148,7 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config
146148
newCache.metricQueue = NewElasticQueue()
147149
newCache.updatesComplete = make(chan int, 100)
148150
newCache.newUpdates = make(chan int, 1000)
151+
newCache.stopChan = make(chan int, 3)
149152

150153
newCache.NameLabelMap = map[string]bool{}
151154
newCache.performanceReporter = performance.ReporterInstanceFromConfig(cfg)
@@ -285,6 +288,12 @@ func verifyTimeValid(t int64) error {
285288
}
286289
return nil
287290
}
291+
func (mc *MetricsCache) Close() {
292+
//for 3 go funcs
293+
mc.stopChan <- 0
294+
mc.stopChan <- 0
295+
mc.stopChan <- 0
296+
}
288297

289298
func (mc *MetricsCache) WaitForCompletion(timeout time.Duration) (int, error) {
290299
waitChan := make(chan int, 2)

Diff for: pkg/appender/ingest.go

+23-16
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ func (mc *MetricsCache) metricFeed(index int) {
5353

5454
for {
5555
select {
56+
case _ = <-mc.stopChan:
57+
return
5658
case inFlight = <-mc.updatesComplete:
5759
// Handle completion notifications from the update loop
5860
length := mc.metricQueue.Length()
@@ -149,6 +151,8 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
149151
counter := 0
150152
for {
151153
select {
154+
case _ = <-mc.stopChan:
155+
return
152156
case _ = <-mc.newUpdates:
153157
// Handle new metric notifications (from metricFeed)
154158
for mc.updatesInFlight < mc.cfg.Workers*2 { //&& newMetrics > 0{
@@ -347,24 +351,27 @@ func (mc *MetricsCache) nameUpdateRespLoop() {
347351

348352
go func() {
349353
for {
350-
resp := <-mc.nameUpdateChan
351-
// Handle V3IO PutItem in names table
352-
353-
metric, ok := resp.Context.(*MetricState)
354-
if ok {
355-
metric.Lock()
356-
if resp.Error != nil {
357-
// Count errors
358-
mc.performanceReporter.IncrementCounter("UpdateNameError", 1)
359-
360-
mc.logger.ErrorWith("Update-name process failed", "id", resp.ID, "name", metric.name)
361-
} else {
362-
mc.logger.DebugWith("Update-name process response", "id", resp.ID, "name", metric.name)
354+
select {
355+
case _ = <-mc.stopChan:
356+
return
357+
case resp := <-mc.nameUpdateChan:
358+
// Handle V3IO PutItem in names table
359+
metric, ok := resp.Context.(*MetricState)
360+
if ok {
361+
metric.Lock()
362+
if resp.Error != nil {
363+
// Count errors
364+
mc.performanceReporter.IncrementCounter("UpdateNameError", 1)
365+
366+
mc.logger.ErrorWith("Update-name process failed", "id", resp.ID, "name", metric.name)
367+
} else {
368+
mc.logger.DebugWith("Update-name process response", "id", resp.ID, "name", metric.name)
369+
}
370+
metric.Unlock()
363371
}
364-
metric.Unlock()
365-
}
366372

367-
resp.Release()
373+
resp.Release()
374+
}
368375
}
369376
}()
370377
}

Diff for: pkg/tsdb/v3iotsdb.go

+5
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ func (a v3ioAppender) WaitForCompletion(timeout time.Duration) (int, error) {
343343
return a.metricsCache.WaitForCompletion(timeout)
344344
}
345345

346+
func (a v3ioAppender) Close() {
347+
a.metricsCache.Close()
348+
}
349+
346350
// In V3IO, all operations are committed (no client cache)
347351
func (a v3ioAppender) Commit() error { return nil }
348352
func (a v3ioAppender) Rollback() error { return nil }
@@ -354,4 +358,5 @@ type Appender interface {
354358
WaitForCompletion(timeout time.Duration) (int, error)
355359
Commit() error
356360
Rollback() error
361+
Close()
357362
}

Diff for: pkg/tsdbctl/add.go

+1
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ func (ac *addCommandeer) add() error {
181181
return errors.Wrap(err, "Operation timed out.")
182182
}
183183

184+
appender.Close()
184185
ac.rootCommandeer.logger.Info("Done!")
185186
return nil
186187
}

0 commit comments

Comments
 (0)