Skip to content

Commit 425097b

Browse files
committed
Correct various race conditions
1 parent a682e82 commit 425097b

13 files changed

Lines changed: 126 additions & 75 deletions

File tree

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ $(SKYEYE_BIN): generate $(SKYEYE_SOURCES) $(LIBWHISPER_PATH) $(WHISPER_H_PATH)
127127
$(SKYEYE_SCALER_BIN): generate $(SKYEYE_SOURCES)
128128
$(BUILD_VARS) $(GO) build $(BUILD_FLAGS) ./cmd/skyeye-scaler/
129129

130+
.PHONY: run
131+
run:
132+
$(BUILD_VARS) $(GO) run -race $(BUILD_FLAGS) ./cmd/skyeye/ $(ARGS)
133+
130134
.PHONY: test
131135
test: generate
132136
$(BUILD_VARS) $(GO) run gotest.tools/gotestsum -- $(BUILD_FLAGS) ./...

internal/application/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (a *app) Run(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitG
190190
go func() {
191191
defer wg.Done()
192192
log.Info().Msg("streaming telemetry data to radar")
193-
a.tacviewClient.Stream(ctx, a.starts, a.updates, a.fades)
193+
a.tacviewClient.Stream(ctx, wg, a.starts, a.updates, a.fades)
194194
}()
195195

196196
wg.Add(1)

pkg/controller/callbacks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ func (c *controller) handleFaded(location orb.Point, group brevity.Group, coalit
4848
}
4949
}
5050

51-
func (c *controller) handleRemoved(trackfile trackfiles.Trackfile) {
51+
func (c *controller) handleRemoved(trackfile *trackfiles.Trackfile) {
5252
c.remove(trackfile.Contact.ID)
5353
}

pkg/radar/callbacks.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
type StartedCallback func()
1111

1212
func (s *scope) SetStartedCallback(callback StartedCallback) {
13+
s.callbackLock.Lock()
14+
defer s.callbackLock.Unlock()
1315
s.startedCallback = callback
1416
}
1517

@@ -18,13 +20,17 @@ func (s *scope) SetStartedCallback(callback StartedCallback) {
1820
type FadedCallback func(location orb.Point, group brevity.Group, coalition coalitions.Coalition)
1921

2022
func (s *scope) SetFadedCallback(callback FadedCallback) {
23+
s.callbackLock.Lock()
24+
defer s.callbackLock.Unlock()
2125
s.fadedCallback = callback
2226
}
2327

2428
// RemovedCallback is a callback function that is called when a trackfile is aged out and removed.
2529
// A copy of the trackfile is provided.
26-
type RemovedCallback func(trackfile trackfiles.Trackfile)
30+
type RemovedCallback func(trackfile *trackfiles.Trackfile)
2731

2832
func (s *scope) SetRemovedCallback(callback RemovedCallback) {
33+
s.callbackLock.Lock()
34+
defer s.callbackLock.Unlock()
2935
s.removalCallback = callback
3036
}

pkg/radar/faded.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ func (s *scope) handleFaded(fades []sim.Faded) {
8989
}
9090

9191
// call the faded callback for each group
92+
s.callbackLock.RLock()
93+
defer s.callbackLock.RUnlock()
9294
for _, grp := range groups {
9395
if s.fadedCallback != nil {
9496
s.fadedCallback(grp.point(), &grp, grp.contacts[0].Contact.Coalition)

pkg/radar/radar.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ type scope struct {
133133
startedCallback StartedCallback
134134
fadedCallback FadedCallback
135135
removalCallback RemovedCallback
136+
callbackLock sync.RWMutex
136137
center orb.Point
137138
mandatoryThreatRadius unit.Length
138139
}
@@ -239,7 +240,7 @@ func (s *scope) handleGarbageCollection() {
239240
Stringer("age", s.missionTime.Sub(lastSeen)).
240241
Msg("expired trackfile")
241242
if s.removalCallback != nil {
242-
s.removalCallback(*trackfile)
243+
s.removalCallback(trackfile)
243244
}
244245
}
245246
}

pkg/radar/started.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import "github.com/rs/zerolog/log"
55
func (s *scope) handleStarted() {
66
log.Info().Msg("clearing all trackfiles due to mission (re)start")
77
s.contacts.reset()
8+
s.callbackLock.RLock()
9+
defer s.callbackLock.RUnlock()
810
if s.startedCallback != nil {
911
s.startedCallback()
1012
}

pkg/sim/sim.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,9 @@
1-
// package sim provides an inteface for receiving telemetry data from DCS World
21
package sim
32

43
import (
5-
"context"
6-
"time"
7-
8-
"github.com/dharmab/skyeye/pkg/coalitions"
94
"github.com/dharmab/skyeye/pkg/trackfiles"
10-
"github.com/paulmach/orb"
115
)
126

13-
// Sim is the interface for receiving telemetry data from the flight simulator.
14-
type Sim interface {
15-
// Stream aircraft updates from the sim to the provided channels.
16-
// The first channel receives updates for active aircraft.
17-
// The second channel receives messages when an aircraft disappears.
18-
// This function blocks until the context is cancelled.
19-
Stream(context.Context, chan<- Started, chan<- Updated, chan<- Faded)
20-
// Bullseye returns the coalition's bullseye center.
21-
Bullseye(coalitions.Coalition) (orb.Point, error)
22-
// Time returns the starting time of the mission.
23-
// This is useful for looking up magnetic variation.
24-
Time() time.Time
25-
}
26-
277
// Started is a message sent when a new mission starts.
288
type Started struct {
299
}

pkg/simpleradio/client.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ type client struct {
8686

8787
// lastPing tracks the last time a ping was received. If no pings are received for a period of time, the client will
8888
// attempt to reconnect.
89-
lastPing time.Time
89+
lastPing time.Time
90+
lastPingLock sync.RWMutex
9091
}
9192

9293
func NewClient(config types.ClientConfiguration) (Client, error) {
@@ -165,20 +166,24 @@ func (c *client) autoheal(ctx context.Context) {
165166
case <-ctx.Done():
166167
return
167168
case <-ticker.C:
168-
if time.Since(c.lastPing) > pingInterval*3 {
169-
log.Warn().Msg("stopped receiving traffic from SRS server")
170-
171-
log.Warn().Msg("attempting to reconnect to SRS server")
172-
if reconnectErr := c.reconnect(ctx); reconnectErr != nil {
173-
log.Err(reconnectErr).Msg("failed to reconnect to SRS server")
174-
continue
175-
}
176-
if initErr := c.initialize(); initErr != nil {
177-
log.Err(initErr).Msg("failed to reinitialize SRS client")
178-
continue
169+
func() {
170+
c.lastPingLock.Lock()
171+
defer c.lastPingLock.Unlock()
172+
if time.Since(c.lastPing) > pingInterval*3 {
173+
log.Warn().Msg("stopped receiving traffic from SRS server")
174+
175+
log.Warn().Msg("attempting to reconnect to SRS server")
176+
if reconnectErr := c.reconnect(ctx); reconnectErr != nil {
177+
log.Err(reconnectErr).Msg("failed to reconnect to SRS server")
178+
return
179+
}
180+
if initErr := c.initialize(); initErr != nil {
181+
log.Err(initErr).Msg("failed to reinitialize SRS client")
182+
return
183+
}
184+
c.lastPing = time.Now()
179185
}
180-
c.lastPing = time.Now()
181-
}
186+
}()
182187
}
183188
}
184189
}

pkg/simpleradio/ping.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ func (c *client) receivePings(ctx context.Context, in <-chan []byte) {
5858
log.Debug().Int("bytes", n).Msg("received UDP ping larger than expected")
5959
} else {
6060
log.Trace().Str("GUID", string(b[0:types.GUIDLength])).Msg("received UDP ping")
61-
c.lastPing = time.Now()
61+
t := time.Now()
62+
func() {
63+
c.lastPingLock.Lock()
64+
defer c.lastPingLock.Unlock()
65+
c.lastPing = t
66+
}()
6267
}
6368
case <-ctx.Done():
6469
log.Info().Msg("stopping SRS ping receiver due to context cancellation")

0 commit comments

Comments
 (0)