@@ -227,19 +227,22 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) {
227227 metric .Lock ()
228228 defer metric .Unlock ()
229229 var sent bool
230+ var err error
230231
231232 // In case we are in pre get state or our data spreads across multiple partitions, get the new state for the current partition
232233 if metric .getState () == storeStatePreGet ||
233234 (metric .canSendRequests () && metric .shouldGetState ) {
234- sent = mc .sendGetMetricState (metric )
235- if sent {
235+ sent , err = mc .sendGetMetricState (metric )
236+ if err != nil {
237+ metric .setState (storeStateInit )
238+ } else if sent {
236239 mc .updatesInFlight ++
237240 }
238241 } else if metric .canSendRequests () {
239- sent = mc .writeChunksAndGetState (metric )
242+ sent , err = mc .writeChunksAndGetState (metric )
240243
241244 if ! sent {
242- if metric .store .samplesQueueLength () == 0 {
245+ if err != nil || metric .store .samplesQueueLength () == 0 {
243246 metric .setState (storeStateReady )
244247 if metric .store .numNotProcessed == 0 {
245248 mc .cacheMetricMap .ResetMetric (metric .hash )
@@ -255,10 +258,10 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) {
255258
256259}
257260
258- func (mc * MetricsCache ) sendGetMetricState (metric * MetricState ) bool {
261+ func (mc * MetricsCache ) sendGetMetricState (metric * MetricState ) ( bool , error ) {
259262 // If we are already in a get state, discard
260263 if metric .getState () == storeStateGet {
261- return false
264+ return false , nil
262265 }
263266
264267 sent , err := metric .store .getChunksState (mc , metric )
@@ -268,14 +271,14 @@ func (mc *MetricsCache) sendGetMetricState(metric *MetricState) bool {
268271
269272 mc .logger .ErrorWith ("Failed to get item state" , "metric" , metric .Lset , "err" , err )
270273 setError (mc , metric , err )
271- } else {
272- metric .setState (storeStateGet )
274+ return false , err
273275 }
276+ metric .setState (storeStateGet )
274277
275- return sent
278+ return sent , nil
276279}
277280
278- func (mc * MetricsCache ) writeChunksAndGetState (metric * MetricState ) bool {
281+ func (mc * MetricsCache ) writeChunksAndGetState (metric * MetricState ) ( bool , error ) {
279282 sent , err := metric .store .writeChunks (mc , metric )
280283 if err != nil {
281284 // Count errors
@@ -287,13 +290,13 @@ func (mc *MetricsCache) writeChunksAndGetState(metric *MetricState) bool {
287290 metric .setState (storeStateUpdate )
288291 } else if metric .shouldGetState {
289292 // In case we didn't write any data and the metric state needs to be updated, update it straight away
290- sent = mc .sendGetMetricState (metric )
293+ sent , err = mc .sendGetMetricState (metric )
291294 }
292295
293296 if sent {
294297 mc .updatesInFlight ++
295298 }
296- return sent
299+ return sent , err
297300}
298301
299302// Handle DB responses
@@ -315,6 +318,14 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
315318 reflect .TypeOf (reqInput ), "request" , reqInput )
316319 }
317320
321+ clear := func () {
322+ resp .Release ()
323+ metric .store = newChunkStore (mc .logger , metric .Lset .LabelNames (), metric .store .isAggr ())
324+ metric .retryCount = 0
325+ metric .setState (storeStateInit )
326+ mc .cacheMetricMap .ResetMetric (metric .hash )
327+ }
328+
318329 if metric .getState () == storeStateGet {
319330 // Handle Get response, sync metric state with the DB
320331 metric .store .processGetResp (mc , metric , resp )
@@ -328,14 +339,6 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
328339 }
329340 metric .retryCount = 0
330341 } else {
331- clear := func () {
332- resp .Release ()
333- metric .store = newChunkStore (mc .logger , metric .Lset .LabelNames (), metric .store .isAggr ())
334- metric .retryCount = 0
335- metric .setState (storeStateInit )
336- mc .cacheMetricMap .ResetMetric (metric .hash )
337- }
338-
339342 // Count errors
340343 mc .performanceReporter .IncrementCounter ("ChunkUpdateRetries" , 1 )
341344
@@ -370,15 +373,24 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
370373 metric .setState (storeStateReady )
371374
372375 var sent bool
376+ var err error
373377
374378 // In case our data spreads across multiple partitions, get the new state for the current partition
375379 if metric .shouldGetState {
376- sent = mc .sendGetMetricState (metric )
380+ sent , err = mc .sendGetMetricState (metric )
381+ if err != nil {
382+ clear ()
383+ return false
384+ }
377385 if sent {
378386 mc .updatesInFlight ++
379387 }
380388 } else if canWrite {
381- sent = mc .writeChunksAndGetState (metric )
389+ sent , err = mc .writeChunksAndGetState (metric )
390+ if err != nil {
391+ clear ()
392+ return false
393+ }
382394 } else if metric .store .samplesQueueLength () > 0 {
383395 mc .metricQueue .Push (metric )
384396 metric .setState (storeStateAboutToUpdate )
0 commit comments