Skip to content

Commit 25e98c7

Browse files
authored
Merge pull request #524 from v3io/development
Development
2 parents e36c3ea + a313caf commit 25e98c7

File tree

5 files changed

+146
-75
lines changed

5 files changed

+146
-75
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.14
44

55
require (
66
github.com/cespare/xxhash v1.1.0
7+
github.com/cpuguy83/go-md2man v1.0.10 // indirect
78
github.com/ghodss/yaml v1.0.0
89
github.com/imdario/mergo v0.3.7
910
github.com/nuclio/logger v0.0.1
@@ -12,6 +13,7 @@ require (
1213
github.com/pkg/errors v0.8.1
1314
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
1415
github.com/spf13/cobra v0.0.3
16+
github.com/spf13/pflag v1.0.5 // indirect
1517
github.com/stretchr/testify v1.4.0
1618
github.com/v3io/frames v0.7.33
1719
github.com/v3io/v3io-go v0.1.9

pkg/appender/appender.go

+21-10
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,32 @@ type MetricState struct {
5959
retryCount uint8
6060
newName bool
6161
isVariant bool
62+
63+
shouldGetState bool
6264
}
6365

6466
// Metric store states
6567
type storeState uint8
6668

6769
const (
68-
storeStateInit storeState = 0
69-
storeStatePreGet storeState = 1 // Need to get state
70-
storeStateGet storeState = 2 // Getting old state from storage
71-
storeStateReady storeState = 3 // Ready to update
72-
storeStateUpdate storeState = 4 // Update/write in progress
70+
storeStateInit storeState = 0
71+
storeStatePreGet storeState = 1 // Need to get state
72+
storeStateGet storeState = 2 // Getting old state from storage
73+
storeStateReady storeState = 3 // Ready to update
74+
storeStateUpdate storeState = 4 // Update/write in progress
75+
storeStateAboutToUpdate storeState = 5 // Like ready state but with updates pending
7376
)
7477

7578
// store is ready to update samples into the DB
7679
func (m *MetricState) isReady() bool {
7780
return m.state == storeStateReady
7881
}
7982

83+
// Indicates whether the metric has no inflight requests and can send new ones
84+
func (m *MetricState) canSendRequests() bool {
85+
return m.state == storeStateReady || m.state == storeStateAboutToUpdate
86+
}
87+
8088
func (m *MetricState) getState() storeState {
8189
return m.state
8290
}
@@ -118,6 +126,9 @@ type MetricsCache struct {
118126
updatesComplete chan int
119127
newUpdates chan int
120128

129+
outstandingUpdates int64
130+
requestsInFlight int64
131+
121132
lastMetric uint64
122133

123134
// TODO: consider switching to synch.Map (https://golang.org/pkg/sync/#Map)
@@ -144,7 +155,6 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config
144155
newCache.asyncAppendChan = make(chan *asyncAppend, channelSize)
145156

146157
newCache.metricQueue = NewElasticQueue()
147-
newCache.updatesComplete = make(chan int, 100)
148158
newCache.newUpdates = make(chan int, 1000)
149159
newCache.stopChan = make(chan int, 3)
150160

@@ -155,10 +165,11 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config
155165
}
156166

157167
type asyncAppend struct {
158-
metric *MetricState
159-
t int64
160-
v interface{}
161-
resp chan int
168+
metric *MetricState
169+
t int64
170+
v interface{}
171+
resp chan int
172+
isCompletion bool
162173
}
163174

164175
func (mc *MetricsCache) Start() error {

pkg/appender/ingest.go

+95-64
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"net/http"
2626
"reflect"
27+
"sync/atomic"
2728
"time"
2829

2930
"github.com/pkg/errors"
@@ -47,39 +48,24 @@ func (mc *MetricsCache) start() error {
4748
func (mc *MetricsCache) metricFeed(index int) {
4849

4950
go func() {
50-
inFlight := 0
51-
gotData := false
5251
potentialCompletion := false
5352
var completeChan chan int
5453

5554
for {
5655
select {
5756
case _ = <-mc.stopChan:
5857
return
59-
case inFlight = <-mc.updatesComplete:
60-
// Handle completion notifications from the update loop
61-
length := mc.metricQueue.Length()
62-
mc.logger.Debug(`Complete update cycle - "in-flight requests"=%d; "metric queue length"=%d\n`, inFlight, length)
63-
64-
// If data was sent and the queue is empty, mark as completion
65-
if length == 0 && gotData {
66-
switch len(mc.asyncAppendChan) {
67-
case 0:
68-
potentialCompletion = true
69-
if completeChan != nil {
70-
completeChan <- 0
71-
}
72-
case 1:
73-
potentialCompletion = true
74-
}
75-
}
7658
case app := <-mc.asyncAppendChan:
7759
newMetrics := 0
7860
dataQueued := 0
7961
numPushed := 0
62+
gotCompletion := false
8063
inLoop:
8164
for i := 0; i <= mc.cfg.BatchSize; i++ {
82-
if app.metric == nil {
65+
// Handle completion notifications from the update loop
66+
if app.isCompletion {
67+
gotCompletion = true
68+
} else if app.metric == nil {
8369
// Handle update completion requests (metric == nil)
8470
completeChan = app.resp
8571
if potentialCompletion {
@@ -88,7 +74,6 @@ func (mc *MetricsCache) metricFeed(index int) {
8874
} else {
8975
potentialCompletion = false
9076
// Handle append requests (Add / AddFast)
91-
gotData = true
9277
metric := app.metric
9378
metric.Lock()
9479

@@ -103,7 +88,7 @@ func (mc *MetricsCache) metricFeed(index int) {
10388
metric.setState(storeStatePreGet)
10489
}
10590
if metric.isReady() {
106-
metric.setState(storeStateUpdate)
91+
metric.setState(storeStateAboutToUpdate)
10792
}
10893

10994
length := mc.metricQueue.Push(metric)
@@ -124,7 +109,22 @@ func (mc *MetricsCache) metricFeed(index int) {
124109
}
125110
// Notify the update loop that there are new metrics to process
126111
if newMetrics > 0 {
112+
atomic.AddInt64(&mc.outstandingUpdates, 1)
127113
mc.newUpdates <- newMetrics
114+
} else if gotCompletion {
115+
inFlight := atomic.LoadInt64(&mc.requestsInFlight)
116+
outstanding := atomic.LoadInt64(&mc.outstandingUpdates)
117+
if outstanding == 0 && inFlight == 0 {
118+
switch len(mc.asyncAppendChan) {
119+
case 0:
120+
potentialCompletion = true
121+
if completeChan != nil {
122+
completeChan <- 0
123+
}
124+
case 1:
125+
potentialCompletion = true
126+
}
127+
}
128128
}
129129

130130
// If we have too much work, stall the queue for some time
@@ -154,7 +154,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
154154
return
155155
case _ = <-mc.newUpdates:
156156
// Handle new metric notifications (from metricFeed)
157-
for mc.updatesInFlight < mc.cfg.Workers*2 { //&& newMetrics > 0{
157+
for mc.updatesInFlight < mc.cfg.Workers*2 {
158158
freeSlots := mc.cfg.Workers*2 - mc.updatesInFlight
159159
metrics := mc.metricQueue.PopN(freeSlots)
160160
for _, metric := range metrics {
@@ -165,9 +165,11 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
165165
}
166166
}
167167

168-
if mc.updatesInFlight == 0 {
169-
mc.logger.Debug("Complete new update cycle - in-flight %d.\n", mc.updatesInFlight)
170-
mc.updatesComplete <- 0
168+
outstandingUpdates := atomic.AddInt64(&mc.outstandingUpdates, -1)
169+
170+
if atomic.LoadInt64(&mc.requestsInFlight) == 0 && outstandingUpdates == 0 {
171+
mc.logger.Debug("Return to feed after processing newUpdates")
172+
mc.asyncAppendChan <- &asyncAppend{isCompletion: true}
171173
}
172174
case resp := <-mc.responseChan:
173175
// Handle V3IO async responses
@@ -188,6 +190,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
188190
if i < mc.cfg.BatchSize {
189191
select {
190192
case resp = <-mc.responseChan:
193+
atomic.AddInt64(&mc.requestsInFlight, -1)
191194
default:
192195
break inLoop
193196
}
@@ -206,10 +209,12 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
206209
}
207210
}
208211

212+
requestsInFlight := atomic.AddInt64(&mc.requestsInFlight, -1)
213+
209214
// Notify the metric feeder when all in-flight tasks are done
210-
if mc.updatesInFlight == 0 {
211-
mc.logger.Debug("Return to feed. Metric queue length: %d", mc.metricQueue.Length())
212-
mc.updatesComplete <- 0
215+
if requestsInFlight == 0 && atomic.LoadInt64(&mc.outstandingUpdates) == 0 {
216+
mc.logger.Debug("Return to feed after processing responseChan")
217+
mc.asyncAppendChan <- &asyncAppend{isCompletion: true}
213218
}
214219
}
215220
}
@@ -223,45 +228,70 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) {
223228
metric.Lock()
224229
defer metric.Unlock()
225230
var sent bool
226-
var err error
227231

228-
if metric.getState() == storeStatePreGet {
229-
sent, err = metric.store.getChunksState(mc, metric)
230-
if err != nil {
231-
// Count errors
232-
mc.performanceReporter.IncrementCounter("GetChunksStateError", 1)
233-
234-
mc.logger.ErrorWith("Failed to get item state", "metric", metric.Lset, "err", err)
235-
setError(mc, metric, err)
236-
} else {
237-
metric.setState(storeStateGet)
232+
// In case we are in pre get state or our data spreads across multiple partitions, get the new state for the current partition
233+
if metric.getState() == storeStatePreGet ||
234+
(metric.canSendRequests() && metric.shouldGetState) {
235+
sent = mc.sendGetMetricState(metric)
236+
if sent {
237+
mc.updatesInFlight++
238238
}
239+
} else if metric.canSendRequests() {
240+
sent = mc.writeChunksAndGetState(metric)
239241

240-
} else {
241-
sent, err = metric.store.writeChunks(mc, metric)
242-
if err != nil {
243-
// Count errors
244-
mc.performanceReporter.IncrementCounter("WriteChunksError", 1)
245-
246-
mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err)
247-
setError(mc, metric, errors.Wrap(err, "Chunk write submit failed."))
248-
} else if sent {
249-
metric.setState(storeStateUpdate)
250-
}
251242
if !sent {
252243
if metric.store.samplesQueueLength() == 0 {
253244
metric.setState(storeStateReady)
254245
} else {
255246
if mc.metricQueue.length() > 0 {
247+
atomic.AddInt64(&mc.outstandingUpdates, 1)
256248
mc.newUpdates <- mc.metricQueue.length()
257249
}
258250
}
259251
}
260252
}
261253

254+
}
255+
256+
func (mc *MetricsCache) sendGetMetricState(metric *MetricState) bool {
257+
// If we are already in a get state, discard
258+
if metric.getState() == storeStateGet {
259+
return false
260+
}
261+
262+
sent, err := metric.store.getChunksState(mc, metric)
263+
if err != nil {
264+
// Count errors
265+
mc.performanceReporter.IncrementCounter("GetChunksStateError", 1)
266+
267+
mc.logger.ErrorWith("Failed to get item state", "metric", metric.Lset, "err", err)
268+
setError(mc, metric, err)
269+
} else {
270+
metric.setState(storeStateGet)
271+
}
272+
273+
return sent
274+
}
275+
276+
func (mc *MetricsCache) writeChunksAndGetState(metric *MetricState) bool {
277+
sent, err := metric.store.writeChunks(mc, metric)
278+
if err != nil {
279+
// Count errors
280+
mc.performanceReporter.IncrementCounter("WriteChunksError", 1)
281+
282+
mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err)
283+
setError(mc, metric, errors.Wrap(err, "Chunk write submit failed."))
284+
} else if sent {
285+
metric.setState(storeStateUpdate)
286+
} else if metric.shouldGetState {
287+
// In case we didn't write any data and the metric state needs to be updated, update it straight away
288+
sent = mc.sendGetMetricState(metric)
289+
}
290+
262291
if sent {
263292
mc.updatesInFlight++
264293
}
294+
return sent
265295
}
266296

267297
// Handle DB responses
@@ -337,24 +367,18 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
337367
metric.setState(storeStateReady)
338368

339369
var sent bool
340-
var err error
341-
342-
if canWrite {
343-
sent, err = metric.store.writeChunks(mc, metric)
344-
if err != nil {
345-
// Count errors
346-
mc.performanceReporter.IncrementCounter("WriteChunksError", 1)
347370

348-
mc.logger.ErrorWith("Submit failed", "metric", metric.Lset, "err", err)
349-
setError(mc, metric, errors.Wrap(err, "Chunk write submit failed."))
350-
} else if sent {
351-
metric.setState(storeStateUpdate)
371+
// In case our data spreads across multiple partitions, get the new state for the current partition
372+
if metric.shouldGetState {
373+
sent = mc.sendGetMetricState(metric)
374+
if sent {
352375
mc.updatesInFlight++
353376
}
354-
377+
} else if canWrite {
378+
sent = mc.writeChunksAndGetState(metric)
355379
} else if metric.store.samplesQueueLength() > 0 {
356380
mc.metricQueue.Push(metric)
357-
metric.setState(storeStateUpdate)
381+
metric.setState(storeStateAboutToUpdate)
358382
}
359383

360384
return sent
@@ -385,6 +409,13 @@ func (mc *MetricsCache) nameUpdateRespLoop() {
385409
}
386410

387411
resp.Release()
412+
413+
requestsInFlight := atomic.AddInt64(&mc.requestsInFlight, -1)
414+
415+
if requestsInFlight == 0 && atomic.LoadInt64(&mc.outstandingUpdates) == 0 {
416+
mc.logger.Debug("Return to feed after processing nameUpdateChan")
417+
mc.asyncAppendChan <- &asyncAppend{isCompletion: true}
418+
}
388419
}
389420
}
390421
}()

0 commit comments

Comments
 (0)