Skip to content

Commit 1d7b3aa

Browse files
authored
Merge pull request #485 from dinal/v0.9.12-x
IG-15233 fix hang in ingest
2 parents a98b572 + 7258cea commit 1d7b3aa

File tree

1 file changed

+27
-13
lines changed

1 file changed

+27
-13
lines changed

Diff for: pkg/appender/ingest.go

+27-13
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (mc *MetricsCache) metricFeed(index int) {
4949
inFlight := 0
5050
gotData := false
5151
gotCompletion := false
52+
potentialCompletion := false
5253
var completeChan chan int
5354

5455
for {
@@ -61,11 +62,16 @@ func (mc *MetricsCache) metricFeed(index int) {
6162
mc.logger.Debug(`Complete update cycle - "in-flight requests"=%d; "metric queue length"=%d\n`, inFlight, length)
6263

6364
// If data was sent and the queue is empty, mark as completion
64-
if length == 0 && gotData && len(mc.asyncAppendChan) == 0 {
65-
gotCompletion = true
66-
if completeChan != nil {
67-
completeChan <- 0
68-
gotData = false
65+
if length == 0 && gotData {
66+
if len(mc.asyncAppendChan) == 0 {
67+
gotCompletion = true
68+
if completeChan != nil {
69+
completeChan <- 0
70+
gotData = false
71+
potentialCompletion = false
72+
}
73+
} else if len(mc.asyncAppendChan) == 1 {
74+
potentialCompletion = true
6975
}
7076
}
7177
case app := <-mc.asyncAppendChan:
@@ -77,14 +83,17 @@ func (mc *MetricsCache) metricFeed(index int) {
7783
if app.metric == nil {
7884
// Handle update completion requests (metric == nil)
7985
completeChan = app.resp
80-
8186
length := mc.metricQueue.Length()
82-
if gotCompletion && length == 0 && len(mc.asyncAppendChan) == 0 {
83-
completeChan <- 0
84-
gotCompletion = false
85-
gotData = false
87+
if length == 0 && len(mc.asyncAppendChan) == 0 {
88+
if gotCompletion || (potentialCompletion && gotData) {
89+
completeChan <- 0
90+
gotCompletion = false
91+
gotData = false
92+
}
8693
}
94+
potentialCompletion = false
8795
} else {
96+
potentialCompletion = false
8897
// Handle append requests (Add / AddFast)
8998
gotData = true
9099
metric := app.metric
@@ -113,7 +122,6 @@ func (mc *MetricsCache) metricFeed(index int) {
113122
}
114123
metric.Unlock()
115124
}
116-
117125
// Poll if we have more updates (accelerate the outer select)
118126
if i < mc.cfg.BatchSize {
119127
select {
@@ -249,8 +257,14 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) {
249257
} else if sent {
250258
metric.setState(storeStateUpdate)
251259
}
252-
if !sent && metric.store.samplesQueueLength() == 0 {
253-
metric.setState(storeStateReady)
260+
if !sent {
261+
if metric.store.samplesQueueLength() == 0 {
262+
metric.setState(storeStateReady)
263+
} else {
264+
if mc.metricQueue.length() > 0 {
265+
mc.newUpdates <- mc.metricQueue.length()
266+
}
267+
}
254268
}
255269
}
256270

0 commit comments

Comments
 (0)