Skip to content

Commit fb71920

Browse files
committed
Fix race condition when we init lambda
There's a couple of problems here: 1. We subscribe a telemetry endpoint before the service is ready. If we receive a flush to that endpoint, then we try to perform a flush and get an NPE. 2. The access to the flusher is unprotected across multiple goroutines. We just use an RWMutex for this, because it's rarely accessed and doesn't need anything fancy. 3. There's a race condition in the test because the nop write is (was) async.
1 parent 71c6190 commit fb71920

File tree

5 files changed

+44
-17
lines changed

5 files changed

+44
-17
lines changed

internal/awslambda/extension/manager.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,9 @@ func (m *manager) Run(parent context.Context) error {
104104
return err
105105
}
106106

107-
// Start telemetry server
108107
ctx, cancel := context.WithCancel(parent)
109108
defer cancel()
110109

111-
if m.telemetryEnabled() {
112-
wg.StartWithContext(ctx, func(c context.Context) {
113-
chErrs <- m.telemetryServer.Start(c)
114-
})
115-
116-
if err := m.subscribeToTelemetry(ctx); err != nil {
117-
m.log.WithError(err).Error("error subscribing to lambda telemetry endpoint")
118-
return err
119-
}
120-
}
121-
122110
// Start gostatsd server
123111
wg.StartWithContext(ctx, func(c context.Context) {
124112
err := m.server.Run(c)
@@ -130,6 +118,18 @@ func (m *manager) Run(parent context.Context) error {
130118
}
131119
})
132120

121+
// Start telemetry server
122+
if m.telemetryEnabled() {
123+
wg.StartWithContext(ctx, func(c context.Context) {
124+
chErrs <- m.telemetryServer.Start(c)
125+
})
126+
127+
if err := m.subscribeToTelemetry(ctx); err != nil {
128+
m.log.WithError(err).Error("error subscribing to lambda telemetry endpoint")
129+
return err
130+
}
131+
}
132+
133133
select {
134134
case err := <-chErrs:
135135
// In the event that the lambda finished early before

internal/flush/flush_coordinator.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package flush
22

3+
import (
4+
"sync"
5+
)
6+
37
type Coordinator interface {
48
Flushable
59
NotifyFlush()
@@ -14,13 +18,20 @@ type Flushable interface {
1418
var _ Coordinator = (*coordinator)(nil)
1519

1620
type coordinator struct {
17-
t Flushable
1821
flushChan chan struct{}
22+
23+
mu sync.RWMutex
24+
t Flushable
1925
}
2026

2127
func NewFlushCoordinator() Coordinator {
28+
// TODO: Because of the overall architecture, we don't have the Flusher
29+
// at the time we create the Coordinator. It would be good to
30+
// re-architect the world for proper dependency injection, however
31+
// that is a non-trivial chunk of work. So for now, it's just TODO.
2232
return &coordinator{
2333
flushChan: make(chan struct{}, 1),
34+
t: noopFlusher{},
2435
}
2536
}
2637

@@ -29,14 +40,24 @@ func (fm *coordinator) NotifyFlush() {
2940
}
3041

3142
func (fm *coordinator) Flush() {
32-
fm.t.Flush()
43+
if t := fm.getTarget(); t != nil {
44+
t.Flush()
45+
}
3346
}
3447

3548
func (fm *coordinator) WaitForFlush() {
3649
<-fm.flushChan
3750
}
3851

52+
func (fm *coordinator) getTarget() Flushable {
53+
fm.mu.RLock()
54+
defer fm.mu.RUnlock()
55+
return fm.t
56+
}
57+
3958
func (fm *coordinator) RegisterFlushable(t Flushable) {
59+
fm.mu.Lock()
60+
defer fm.mu.Unlock()
4061
fm.t = t
4162
}
4263

@@ -53,3 +74,7 @@ func (n *noopFlushCoordinator) NotifyFlush() {}
5374
func (n *noopFlushCoordinator) RegisterFlushable(Flushable) {}
5475

5576
func (n *noopFlushCoordinator) WaitForFlush() {}
77+
78+
type noopFlusher struct{}
79+
80+
func (noopFlusher) Flush() {}

pkg/statsd/handler_http_forwarder_v2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func (hfh *HttpForwarderHandlerV2) sendNop(ctx context.Context) {
308308

309309
func (hfh *HttpForwarderHandlerV2) Run(ctx context.Context) {
310310
var wg wait.Group
311-
wg.StartWithContext(ctx, hfh.sendNop)
311+
hfh.sendNop(ctx)
312312
wg.Start(func() {
313313
for metricMaps := range hfh.consolidatedMetrics {
314314
hfh.acquireMergingSem()

pkg/statsd/handler_http_forwarder_v2_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,9 @@ func TestManualFlush(t *testing.T) {
494494

495495
wg.Wait()
496496

497-
assert.Equal(t, uint64(2), atomic.LoadUint64(&ts.called), "Handler must have been called")
497+
// First is for the "nop" (see HttpForwarderHandlerV2.Run + HttpForwarderHandlerV2.sendNop)
498+
// Second is for the actual flush.
499+
assert.Equal(t, uint64(1+1), atomic.LoadUint64(&ts.called), "Handler must have been called")
498500
assert.EqualValues(t, 1, atomic.LoadUint64(&ts.pineappleCount))
499501
assert.EqualValues(t, 1, atomic.LoadUint64(&ts.derpCount))
500502
assert.EqualValues(t, 10, atomic.LoadInt64(&ts.derpValue))

pkg/web/http_receiver_v2_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestForwardingEndToEndV2(t *testing.T) {
4545
ctx = clock.Context(ctx, mockClock)
4646

4747
ch := &channeledHandler{
48-
chMaps: make(chan *gostatsd.MetricMap),
48+
chMaps: make(chan *gostatsd.MetricMap, 1),
4949
}
5050

5151
hs, err := web.NewHttpServer(

0 commit comments

Comments
 (0)