Skip to content

Commit 8b0e031

Browse files
authored
feat: log error and stacktrace when panic in goroutine (#1225)
1 parent 798c9c5 commit 8b0e031

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+113
-0
lines changed

library/filter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/waku-org/go-waku/waku/v2/protocol"
1111
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
1212
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
13+
"github.com/waku-org/go-waku/waku/v2/utils"
1314
)
1415

1516
type filterArgument struct {
@@ -74,6 +75,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m
7475

7576
for _, subscriptionDetails := range subscriptions {
7677
go func(subscriptionDetails *subscription.SubscriptionDetails) {
78+
defer utils.LogOnPanic()
7779
for envelope := range subscriptionDetails.C {
7880
send(instance, "message", toSubscriptionMessage(envelope))
7981
}

library/relay.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/waku-org/go-waku/waku/v2/protocol"
88
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
99
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
10+
"github.com/waku-org/go-waku/waku/v2/utils"
1011
)
1112

1213
// RelayEnoughPeers determines if there are enough peers to publish a message on a topic
@@ -66,6 +67,7 @@ func relaySubscribe(instance *WakuInstance, filterJSON string) error {
6667

6768
for _, sub := range subscriptions {
6869
go func(subscription *relay.Subscription) {
70+
defer utils.LogOnPanic()
6971
for envelope := range subscription.Ch {
7072
send(instance, "message", toSubscriptionMessage(envelope))
7173
}

waku/persistence/store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
1515
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
1616
"github.com/waku-org/go-waku/waku/v2/timesource"
17+
"github.com/waku-org/go-waku/waku/v2/utils"
1718
"go.uber.org/zap"
1819
"google.golang.org/protobuf/proto"
1920
)
@@ -186,6 +187,7 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e
186187
}
187188

188189
func (d *DBStore) updateMetrics(ctx context.Context) {
190+
defer utils.LogOnPanic()
189191
ticker := time.NewTicker(5 * time.Second)
190192
defer ticker.Stop()
191193
defer d.wg.Done()
@@ -251,6 +253,7 @@ func (d *DBStore) getDeleteOldRowsQuery() string {
251253
}
252254

253255
func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) {
256+
defer utils.LogOnPanic()
254257
defer d.wg.Done()
255258

256259
ticker := time.NewTicker(t)

waku/v2/api/filter/filter.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/waku-org/go-waku/waku/v2/protocol"
1212
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
1313
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
14+
"github.com/waku-org/go-waku/waku/v2/utils"
1415
"go.uber.org/zap"
1516
)
1617

@@ -98,6 +99,7 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
9899
}
99100

100101
func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
102+
defer utils.LogOnPanic()
101103
_, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter)
102104
//Not reading result unless we want to do specific error handling?
103105
if err != nil {
@@ -106,6 +108,7 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
106108
}
107109

108110
func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
111+
defer utils.LogOnPanic()
109112
ticker := time.NewTicker(batchInterval)
110113
defer ticker.Stop()
111114
for {
@@ -213,12 +216,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
213216
for _, subDetails := range subs {
214217
apiSub.subs[subDetails.ID] = subDetails
215218
go func(subDetails *subscription.SubscriptionDetails) {
219+
defer utils.LogOnPanic()
216220
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
217221
for env := range subDetails.C {
218222
apiSub.DataCh <- env
219223
}
220224
}(subDetails)
221225
go func(subDetails *subscription.SubscriptionDetails) {
226+
defer utils.LogOnPanic()
222227
select {
223228
case <-apiSub.ctx.Done():
224229
return

waku/v2/api/filter/filter_manager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
1414
"github.com/waku-org/go-waku/waku/v2/protocol"
1515
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
16+
"github.com/waku-org/go-waku/waku/v2/utils"
1617
)
1718

1819
// Methods on FilterManager just aggregate filters from application and subscribe to them
@@ -87,6 +88,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
8788
}
8889

8990
func (mgr *FilterManager) startFilterSubLoop() {
91+
defer utils.LogOnPanic()
9092
ticker := time.NewTicker(mgr.filterSubBatchDuration)
9193
defer ticker.Stop()
9294
for {
@@ -157,6 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
157159
}
158160

159161
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
162+
defer utils.LogOnPanic()
160163
ctx, cancel := context.WithCancel(mgr.ctx)
161164
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
162165
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)

waku/v2/api/missing/missing_messages.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
1616
"github.com/waku-org/go-waku/waku/v2/protocol/store"
1717
"github.com/waku-org/go-waku/waku/v2/timesource"
18+
"github.com/waku-org/go-waku/waku/v2/utils"
1819
"go.uber.org/zap"
1920
"google.golang.org/protobuf/proto"
2021
)
@@ -102,6 +103,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
102103
m.C = c
103104

104105
go func() {
106+
defer utils.LogOnPanic()
105107
t := time.NewTicker(m.params.interval)
106108
defer t.Stop()
107109

@@ -123,6 +125,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
123125
default:
124126
semaphore <- struct{}{}
125127
go func(interest criteriaInterest) {
128+
defer utils.LogOnPanic()
126129
m.fetchHistory(c, interest)
127130
<-semaphore
128131
}(interest)
@@ -276,6 +279,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
276279

277280
wg.Add(1)
278281
go func(messageHashes []pb.MessageHash) {
282+
defer utils.LogOnPanic()
279283
defer wg.Wait()
280284

281285
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {

waku/v2/api/publish/message_check.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
1515
"github.com/waku-org/go-waku/waku/v2/protocol/store"
1616
"github.com/waku-org/go-waku/waku/v2/timesource"
17+
"github.com/waku-org/go-waku/waku/v2/utils"
1718
"go.uber.org/zap"
1819
)
1920

@@ -145,6 +146,7 @@ func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) {
145146

146147
// Start checks if the tracked outgoing messages are stored periodically
147148
func (m *MessageSentCheck) Start() {
149+
defer utils.LogOnPanic()
148150
ticker := time.NewTicker(m.hashQueryInterval)
149151
defer ticker.Stop()
150152
for {

waku/v2/api/publish/message_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77

88
"github.com/waku-org/go-waku/waku/v2/protocol"
9+
"github.com/waku-org/go-waku/waku/v2/utils"
910
)
1011

1112
// MessagePriority determines the ordering for the message priority queue
@@ -182,6 +183,7 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
182183
ch := make(chan *protocol.Envelope)
183184

184185
go func() {
186+
defer utils.LogOnPanic()
185187
defer close(ch)
186188

187189
select {

waku/v2/discv5/discover.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
172172
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
173173
d.WaitGroup().Add(1)
174174
go func() {
175+
defer utils.LogOnPanic()
175176
defer d.WaitGroup().Done()
176177
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
177178
}()
@@ -217,6 +218,7 @@ func (d *DiscoveryV5) start() error {
217218
if d.params.autoFindPeers {
218219
d.WaitGroup().Add(1)
219220
go func() {
221+
defer utils.LogOnPanic()
220222
defer d.WaitGroup().Done()
221223
d.runDiscoveryV5Loop(d.Context())
222224
}()

waku/v2/discv5/mock_peer_discoverer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/libp2p/go-libp2p/core/peer"
88
"github.com/waku-org/go-waku/waku/v2/service"
9+
"github.com/waku-org/go-waku/waku/v2/utils"
910
)
1011

1112
// TestPeerDiscoverer is mock peer discoverer for testing
@@ -26,6 +27,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer {
2627
// Subscribe is for subscribing to peer discoverer
2728
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) {
2829
go func() {
30+
defer utils.LogOnPanic()
2931
for {
3032
select {
3133
case <-ctx.Done():

0 commit comments

Comments
 (0)