Skip to content

Commit b4b167e

Browse files
authored
Make the datapoint storage cache safe for concurrent use (#417)
* make the datapoint storage cache safe for concurrent use
1 parent ef13d3c commit b4b167e

4 files changed

Lines changed: 195 additions & 48 deletions

File tree

exporter/collector/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
go.opentelemetry.io/otel v1.6.3
1313
go.opentelemetry.io/otel/sdk v1.6.2
1414
go.opentelemetry.io/otel/trace v1.6.3
15+
go.uber.org/atomic v1.9.0
1516
google.golang.org/api v0.74.0
1617
google.golang.org/genproto v0.0.0-20220405205423-9d709892a2bf
1718
google.golang.org/grpc v1.46.0
@@ -46,7 +47,6 @@ require (
4647
github.com/modern-go/reflect2 v1.0.2 // indirect
4748
github.com/pkg/errors v0.9.1 // indirect
4849
github.com/pmezard/go-difflib v1.0.0 // indirect
49-
go.uber.org/atomic v1.9.0 // indirect
5050
golang.org/x/net v0.0.0-20220325170049-de3da57026de // indirect
5151
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
5252
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886 // indirect

exporter/collector/internal/datapointstorage/datapointcache.go

Lines changed: 65 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ package datapointstorage
1717
import (
1818
"fmt"
1919
"strings"
20+
"sync"
2021
"time"
2122

2223
"go.opentelemetry.io/collector/pdata/pcommon"
2324
"go.opentelemetry.io/collector/pdata/pmetric"
25+
"go.uber.org/atomic"
2426
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
2527
)
2628

@@ -31,31 +33,35 @@ type Cache struct {
3133
summaryCache map[string]usedSummaryPoint
3234
histogramCache map[string]usedHistogramPoint
3335
exponentialHistogramCache map[string]usedExponentialHistogramPoint
36+
numberLock sync.RWMutex
37+
summaryLock sync.RWMutex
38+
histogramLock sync.RWMutex
39+
exponentialHistogramLock sync.RWMutex
3440
}
3541

3642
type usedNumberPoint struct {
3743
point *pmetric.NumberDataPoint
38-
used bool
44+
used *atomic.Bool
3945
}
4046

4147
type usedSummaryPoint struct {
4248
point *pmetric.SummaryDataPoint
43-
used bool
49+
used *atomic.Bool
4450
}
4551

4652
type usedHistogramPoint struct {
4753
point *pmetric.HistogramDataPoint
48-
used bool
54+
used *atomic.Bool
4955
}
5056

5157
type usedExponentialHistogramPoint struct {
5258
point *pmetric.ExponentialHistogramDataPoint
53-
used bool
59+
used *atomic.Bool
5460
}
5561

5662
// NewCache instantiates a cache and starts background processes
57-
func NewCache(shutdown <-chan struct{}) Cache {
58-
c := Cache{
63+
func NewCache(shutdown <-chan struct{}) *Cache {
64+
c := &Cache{
5965
numberCache: make(map[string]usedNumberPoint),
6066
summaryCache: make(map[string]usedSummaryPoint),
6167
histogramCache: make(map[string]usedHistogramPoint),
@@ -71,118 +77,134 @@ func NewCache(shutdown <-chan struct{}) Cache {
7177

7278
// GetNumberDataPoint retrieves the point associated with the identifier, and whether
7379
// or not it was found
74-
func (c Cache) GetNumberDataPoint(identifier string) (*pmetric.NumberDataPoint, bool) {
80+
func (c *Cache) GetNumberDataPoint(identifier string) (*pmetric.NumberDataPoint, bool) {
81+
c.numberLock.RLock()
82+
defer c.numberLock.RUnlock()
7583
point, found := c.numberCache[identifier]
7684
if found {
77-
point.used = true
78-
c.numberCache[identifier] = point
85+
point.used.Store(true)
7986
}
8087
return point.point, found
8188
}
8289

8390
// SetNumberDataPoint assigns the point to the identifier in the cache
84-
func (c Cache) SetNumberDataPoint(identifier string, point *pmetric.NumberDataPoint) {
85-
c.numberCache[identifier] = usedNumberPoint{point, true}
91+
func (c *Cache) SetNumberDataPoint(identifier string, point *pmetric.NumberDataPoint) {
92+
c.numberLock.Lock()
93+
defer c.numberLock.Unlock()
94+
c.numberCache[identifier] = usedNumberPoint{point, atomic.NewBool(true)}
8695
}
8796

8897
// GetSummaryDataPoint retrieves the point associated with the identifier, and whether
8998
// or not it was found
90-
func (c Cache) GetSummaryDataPoint(identifier string) (*pmetric.SummaryDataPoint, bool) {
99+
func (c *Cache) GetSummaryDataPoint(identifier string) (*pmetric.SummaryDataPoint, bool) {
100+
c.summaryLock.RLock()
101+
defer c.summaryLock.RUnlock()
91102
point, found := c.summaryCache[identifier]
92103
if found {
93-
point.used = true
94-
c.summaryCache[identifier] = point
104+
point.used.Store(true)
95105
}
96106
return point.point, found
97107
}
98108

99109
// SetSummaryDataPoint assigns the point to the identifier in the cache
100-
func (c Cache) SetSummaryDataPoint(identifier string, point *pmetric.SummaryDataPoint) {
101-
c.summaryCache[identifier] = usedSummaryPoint{point, true}
110+
func (c *Cache) SetSummaryDataPoint(identifier string, point *pmetric.SummaryDataPoint) {
111+
c.summaryLock.Lock()
112+
defer c.summaryLock.Unlock()
113+
c.summaryCache[identifier] = usedSummaryPoint{point, atomic.NewBool(true)}
102114
}
103115

104116
// GetHistogramDataPoint retrieves the point associated with the identifier, and whether
105117
// or not it was found
106-
func (c Cache) GetHistogramDataPoint(identifier string) (*pmetric.HistogramDataPoint, bool) {
118+
func (c *Cache) GetHistogramDataPoint(identifier string) (*pmetric.HistogramDataPoint, bool) {
119+
c.histogramLock.RLock()
120+
defer c.histogramLock.RUnlock()
107121
point, found := c.histogramCache[identifier]
108122
if found {
109-
point.used = true
110-
c.histogramCache[identifier] = point
123+
point.used.Store(true)
111124
}
112125
return point.point, found
113126
}
114127

115128
// SetHistogramDataPoint assigns the point to the identifier in the cache
116-
func (c Cache) SetHistogramDataPoint(identifier string, point *pmetric.HistogramDataPoint) {
117-
c.histogramCache[identifier] = usedHistogramPoint{point, true}
129+
func (c *Cache) SetHistogramDataPoint(identifier string, point *pmetric.HistogramDataPoint) {
130+
c.histogramLock.Lock()
131+
defer c.histogramLock.Unlock()
132+
c.histogramCache[identifier] = usedHistogramPoint{point, atomic.NewBool(true)}
118133
}
119134

120135
// GetExponentialHistogramDataPoint retrieves the point associated with the identifier, and whether
121136
// or not it was found
122-
func (c Cache) GetExponentialHistogramDataPoint(identifier string) (*pmetric.ExponentialHistogramDataPoint, bool) {
137+
func (c *Cache) GetExponentialHistogramDataPoint(identifier string) (*pmetric.ExponentialHistogramDataPoint, bool) {
138+
c.exponentialHistogramLock.RLock()
139+
defer c.exponentialHistogramLock.RUnlock()
123140
point, found := c.exponentialHistogramCache[identifier]
124141
if found {
125-
point.used = true
126-
c.exponentialHistogramCache[identifier] = point
142+
point.used.Store(true)
127143
}
128144
return point.point, found
129145
}
130146

131147
// SetExponentialHistogramDataPoint assigns the point to the identifier in the cache
132-
func (c Cache) SetExponentialHistogramDataPoint(identifier string, point *pmetric.ExponentialHistogramDataPoint) {
133-
c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{point, true}
148+
func (c *Cache) SetExponentialHistogramDataPoint(identifier string, point *pmetric.ExponentialHistogramDataPoint) {
149+
c.exponentialHistogramLock.Lock()
150+
defer c.exponentialHistogramLock.Unlock()
151+
c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{point, atomic.NewBool(true)}
134152
}
135153

136154
// gc garbage collects the cache after the ticker ticks
137-
func (c Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool {
155+
func (c *Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool {
138156
select {
139157
case <-shutdown:
140158
return false
141159
case <-tickerCh:
142160
// garbage collect the numberCache
161+
c.numberLock.Lock()
143162
for id, point := range c.numberCache {
144-
if point.used {
145-
// for points that have been used, mark them as unused
146-
point.used = false
147-
c.numberCache[id] = point
163+
// for points that have been used, mark them as unused
164+
if point.used.Load() {
165+
point.used.Store(false)
148166
} else {
149167
// for points that have not been used, delete points
150168
delete(c.numberCache, id)
151169
}
152170
}
171+
c.numberLock.Unlock()
153172
// garbage collect the summaryCache
173+
c.summaryLock.Lock()
154174
for id, point := range c.summaryCache {
155-
if point.used {
156-
// for points that have been used, mark them as unused
157-
point.used = false
158-
c.summaryCache[id] = point
175+
// for points that have been used, mark them as unused
176+
if point.used.Load() {
177+
point.used.Store(false)
159178
} else {
160179
// for points that have not been used, delete points
161180
delete(c.summaryCache, id)
162181
}
163182
}
183+
c.summaryLock.Unlock()
164184
// garbage collect the histogramCache
185+
c.histogramLock.Lock()
165186
for id, point := range c.histogramCache {
166-
if point.used {
167-
// for points that have been used, mark them as unused
168-
point.used = false
169-
c.histogramCache[id] = point
187+
// for points that have been used, mark them as unused
188+
if point.used.Load() {
189+
point.used.Store(false)
170190
} else {
171191
// for points that have not been used, delete points
172192
delete(c.histogramCache, id)
173193
}
174194
}
195+
c.histogramLock.Unlock()
175196
// garbage collect the exponentialHistogramCache
197+
c.exponentialHistogramLock.Lock()
176198
for id, point := range c.exponentialHistogramCache {
177-
if point.used {
178-
// for points that have been used, mark them as unused
179-
point.used = false
180-
c.exponentialHistogramCache[id] = point
199+
// for points that have been used, mark them as unused
200+
if point.used.Load() {
201+
point.used.Store(false)
181202
} else {
182203
// for points that have not been used, delete points
183204
delete(c.exponentialHistogramCache, id)
184205
}
185206
}
207+
c.exponentialHistogramLock.Unlock()
186208
}
187209
return true
188210
}

exporter/collector/internal/datapointstorage/datapointcache_test.go

Lines changed: 127 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package datapointstorage
1616

1717
import (
18+
"sync"
1819
"testing"
1920
"time"
2021

@@ -73,7 +74,7 @@ func TestGC(t *testing.T) {
7374

7475
// bar exists since we just set it
7576
usedPoint, found := c.numberCache["bar"]
76-
assert.True(t, usedPoint.used)
77+
assert.True(t, usedPoint.used.Load())
7778
assert.True(t, found)
7879

7980
// first gc tick marks bar stale
@@ -83,7 +84,7 @@ func TestGC(t *testing.T) {
8384
cont := c.gc(shutdown, fakeTicker)
8485
assert.True(t, cont)
8586
usedPoint, found = c.numberCache["bar"]
86-
assert.False(t, usedPoint.used)
87+
assert.False(t, usedPoint.used.Load())
8788
assert.True(t, found)
8889

8990
// second gc tick removes bar
@@ -130,6 +131,130 @@ func TestGetPreventsGC(t *testing.T) {
130131
assert.True(t, found)
131132
}
132133

134+
func TestConcurrentNumber(t *testing.T) {
135+
c := Cache{
136+
numberCache: make(map[string]usedNumberPoint),
137+
summaryCache: make(map[string]usedSummaryPoint),
138+
histogramCache: make(map[string]usedHistogramPoint),
139+
exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint),
140+
}
141+
setPoint := pmetric.NewNumberDataPoint()
142+
143+
var wg sync.WaitGroup
144+
for i := 0; i < 5; i++ {
145+
wg.Add(1)
146+
go func() {
147+
c.SetNumberDataPoint("bar", &setPoint)
148+
point, found := c.GetNumberDataPoint("bar")
149+
assert.Equal(t, point, &setPoint)
150+
assert.True(t, found)
151+
wg.Done()
152+
}()
153+
}
154+
wg.Add(1)
155+
go func() {
156+
dontShutdown := make(chan struct{})
157+
fakeTick := make(chan time.Time)
158+
go func() { fakeTick <- time.Now() }()
159+
c.gc(dontShutdown, fakeTick)
160+
wg.Done()
161+
}()
162+
wg.Wait()
163+
}
164+
165+
func TestConcurrentSummary(t *testing.T) {
166+
c := Cache{
167+
numberCache: make(map[string]usedNumberPoint),
168+
summaryCache: make(map[string]usedSummaryPoint),
169+
histogramCache: make(map[string]usedHistogramPoint),
170+
exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint),
171+
}
172+
setPoint := pmetric.NewSummaryDataPoint()
173+
174+
var wg sync.WaitGroup
175+
for i := 0; i < 5; i++ {
176+
wg.Add(1)
177+
go func() {
178+
c.SetSummaryDataPoint("bar", &setPoint)
179+
point, found := c.GetSummaryDataPoint("bar")
180+
assert.Equal(t, point, &setPoint)
181+
assert.True(t, found)
182+
wg.Done()
183+
}()
184+
}
185+
wg.Add(1)
186+
go func() {
187+
dontShutdown := make(chan struct{})
188+
fakeTick := make(chan time.Time)
189+
go func() { fakeTick <- time.Now() }()
190+
c.gc(dontShutdown, fakeTick)
191+
wg.Done()
192+
}()
193+
wg.Wait()
194+
}
195+
196+
func TestConcurrentHistogram(t *testing.T) {
197+
c := Cache{
198+
numberCache: make(map[string]usedNumberPoint),
199+
summaryCache: make(map[string]usedSummaryPoint),
200+
histogramCache: make(map[string]usedHistogramPoint),
201+
exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint),
202+
}
203+
setPoint := pmetric.NewHistogramDataPoint()
204+
205+
var wg sync.WaitGroup
206+
for i := 0; i < 5; i++ {
207+
wg.Add(1)
208+
go func() {
209+
c.SetHistogramDataPoint("bar", &setPoint)
210+
point, found := c.GetHistogramDataPoint("bar")
211+
assert.Equal(t, point, &setPoint)
212+
assert.True(t, found)
213+
wg.Done()
214+
}()
215+
}
216+
wg.Add(1)
217+
go func() {
218+
dontShutdown := make(chan struct{})
219+
fakeTick := make(chan time.Time)
220+
go func() { fakeTick <- time.Now() }()
221+
c.gc(dontShutdown, fakeTick)
222+
wg.Done()
223+
}()
224+
wg.Wait()
225+
}
226+
227+
func TestConcurrentExponentialHistogram(t *testing.T) {
228+
c := Cache{
229+
numberCache: make(map[string]usedNumberPoint),
230+
summaryCache: make(map[string]usedSummaryPoint),
231+
histogramCache: make(map[string]usedHistogramPoint),
232+
exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint),
233+
}
234+
setPoint := pmetric.NewExponentialHistogramDataPoint()
235+
236+
var wg sync.WaitGroup
237+
for i := 0; i < 5; i++ {
238+
wg.Add(1)
239+
go func() {
240+
c.SetExponentialHistogramDataPoint("bar", &setPoint)
241+
point, found := c.GetExponentialHistogramDataPoint("bar")
242+
assert.Equal(t, point, &setPoint)
243+
assert.True(t, found)
244+
wg.Done()
245+
}()
246+
}
247+
wg.Add(1)
248+
go func() {
249+
dontShutdown := make(chan struct{})
250+
fakeTick := make(chan time.Time)
251+
go func() { fakeTick <- time.Now() }()
252+
c.gc(dontShutdown, fakeTick)
253+
wg.Done()
254+
}()
255+
wg.Wait()
256+
}
257+
133258
func TestIdentifier(t *testing.T) {
134259
metricWithName := pmetric.NewMetric()
135260
metricWithName.SetName("custom.googleapis.com/test.metric")

0 commit comments

Comments
 (0)