Skip to content

Commit 00b2fd6

Browse files
authored
Merge pull request #560 from v3io/development
Development
2 parents 6399c38 + 371fc7f commit 00b2fd6

File tree

6 files changed

+40
-36
lines changed

6 files changed

+40
-36
lines changed

.github/workflows/pr.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929

3030
- uses: actions/setup-go@v2
3131
with:
32-
go-version: "^1.14.0"
32+
go-version: 1.14.15
3333

3434
- uses: actions/cache@v2
3535
with:

Makefile

+1-5
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,7 @@ bin:
7171

7272
PHONY: gofmt
7373
gofmt:
74-
ifeq ($(shell gofmt -l .),)
75-
# gofmt OK
76-
else
77-
$(error Please run `go fmt ./...` to format the code)
78-
endif
74+
if [ "$(gofmt -l .)" != "" ]; then echo 'Please run `go fmt ./...` to format the code'; fi
7975

8076
.PHONY: impi
8177
impi:

pkg/appender/appender.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,8 @@ type MetricsCache struct {
115115
asyncAppendChan chan *asyncAppend
116116
updatesInFlight int
117117

118-
metricQueue *ElasticQueue
119-
updatesComplete chan int
120-
newUpdates chan int
118+
metricQueue *ElasticQueue
119+
newUpdates chan int
121120

122121
outstandingUpdates int64
123122
requestsInFlight int64

pkg/appender/ingest.go

+34-22
Original file line numberDiff line numberDiff line change
@@ -227,19 +227,22 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) {
227227
metric.Lock()
228228
defer metric.Unlock()
229229
var sent bool
230+
var err error
230231

231232
// In case we are in pre get state or our data spreads across multiple partitions, get the new state for the current partition
232233
if metric.getState() == storeStatePreGet ||
233234
(metric.canSendRequests() && metric.shouldGetState) {
234-
sent = mc.sendGetMetricState(metric)
235-
if sent {
235+
sent, err = mc.sendGetMetricState(metric)
236+
if err != nil {
237+
metric.setState(storeStateInit)
238+
} else if sent {
236239
mc.updatesInFlight++
237240
}
238241
} else if metric.canSendRequests() {
239-
sent = mc.writeChunksAndGetState(metric)
242+
sent, err = mc.writeChunksAndGetState(metric)
240243

241244
if !sent {
242-
if metric.store.samplesQueueLength() == 0 {
245+
if err != nil || metric.store.samplesQueueLength() == 0 {
243246
metric.setState(storeStateReady)
244247
if metric.store.numNotProcessed == 0 {
245248
mc.cacheMetricMap.ResetMetric(metric.hash)
@@ -255,10 +258,10 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) {
255258

256259
}
257260

258-
func (mc *MetricsCache) sendGetMetricState(metric *MetricState) bool {
261+
func (mc *MetricsCache) sendGetMetricState(metric *MetricState) (bool, error) {
259262
// If we are already in a get state, discard
260263
if metric.getState() == storeStateGet {
261-
return false
264+
return false, nil
262265
}
263266

264267
sent, err := metric.store.getChunksState(mc, metric)
@@ -268,14 +271,14 @@ func (mc *MetricsCache) sendGetMetricState(metric *MetricState) bool {
268271

269272
mc.logger.ErrorWith("Failed to get item state", "metric", metric.Lset, "err", err)
270273
setError(mc, metric, err)
271-
} else {
272-
metric.setState(storeStateGet)
274+
return false, err
273275
}
276+
metric.setState(storeStateGet)
274277

275-
return sent
278+
return sent, nil
276279
}
277280

278-
func (mc *MetricsCache) writeChunksAndGetState(metric *MetricState) bool {
281+
func (mc *MetricsCache) writeChunksAndGetState(metric *MetricState) (bool, error) {
279282
sent, err := metric.store.writeChunks(mc, metric)
280283
if err != nil {
281284
// Count errors
@@ -287,13 +290,13 @@ func (mc *MetricsCache) writeChunksAndGetState(metric *MetricState) bool {
287290
metric.setState(storeStateUpdate)
288291
} else if metric.shouldGetState {
289292
// In case we didn't write any data and the metric state needs to be updated, update it straight away
290-
sent = mc.sendGetMetricState(metric)
293+
sent, err = mc.sendGetMetricState(metric)
291294
}
292295

293296
if sent {
294297
mc.updatesInFlight++
295298
}
296-
return sent
299+
return sent, err
297300
}
298301

299302
// Handle DB responses
@@ -315,6 +318,14 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
315318
reflect.TypeOf(reqInput), "request", reqInput)
316319
}
317320

321+
clear := func() {
322+
resp.Release()
323+
metric.store = newChunkStore(mc.logger, metric.Lset.LabelNames(), metric.store.isAggr())
324+
metric.retryCount = 0
325+
metric.setState(storeStateInit)
326+
mc.cacheMetricMap.ResetMetric(metric.hash)
327+
}
328+
318329
if metric.getState() == storeStateGet {
319330
// Handle Get response, sync metric state with the DB
320331
metric.store.processGetResp(mc, metric, resp)
@@ -328,14 +339,6 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
328339
}
329340
metric.retryCount = 0
330341
} else {
331-
clear := func() {
332-
resp.Release()
333-
metric.store = newChunkStore(mc.logger, metric.Lset.LabelNames(), metric.store.isAggr())
334-
metric.retryCount = 0
335-
metric.setState(storeStateInit)
336-
mc.cacheMetricMap.ResetMetric(metric.hash)
337-
}
338-
339342
// Count errors
340343
mc.performanceReporter.IncrementCounter("ChunkUpdateRetries", 1)
341344

@@ -370,15 +373,24 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
370373
metric.setState(storeStateReady)
371374

372375
var sent bool
376+
var err error
373377

374378
// In case our data spreads across multiple partitions, get the new state for the current partition
375379
if metric.shouldGetState {
376-
sent = mc.sendGetMetricState(metric)
380+
sent, err = mc.sendGetMetricState(metric)
381+
if err != nil {
382+
clear()
383+
return false
384+
}
377385
if sent {
378386
mc.updatesInFlight++
379387
}
380388
} else if canWrite {
381-
sent = mc.writeChunksAndGetState(metric)
389+
sent, err = mc.writeChunksAndGetState(metric)
390+
if err != nil {
391+
clear()
392+
return false
393+
}
382394
} else if metric.store.samplesQueueLength() > 0 {
383395
mc.metricQueue.Push(metric)
384396
metric.setState(storeStateAboutToUpdate)

pkg/appender/store.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen
302302

303303
// Init the partition info and find whether we need to init the metric headers (labels, ..) in the case of a new partition
304304
t0 := cs.pending[0].t
305-
partition, err := mc.partitionMngr.TimeToPart(t0)
305+
var partition *partmgr.DBPartition
306+
partition, err = mc.partitionMngr.TimeToPart(t0)
306307
if err != nil {
307308
hasPendingUpdates = false
308309
return

pkg/partmgr/partmgr.go

-4
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,6 @@ func (p *PartitionManager) ReadAndUpdateSchema() (err error) {
233233
}
234234

235235
schemaFilePath := p.GetSchemaFilePath()
236-
if err != nil {
237-
err = errors.Wrap(err, "Failed to create timer ReadAndUpdateSchemaTimer.")
238-
return
239-
}
240236
schemaInfoResp, err := p.container.GetItemSync(&v3io.GetItemInput{Path: schemaFilePath, AttributeNames: []string{"__mtime_secs", "__mtime_nsecs"}})
241237
if err != nil {
242238
err = errors.Wrapf(err, "Failed to read schema at path '%s'.", schemaFilePath)

0 commit comments

Comments
 (0)