Skip to content

Commit e722c53

Browse files
committed
Node: Channel writes without blocking
1 parent 4706a9f commit e722c53

36 files changed

+122
-96
lines changed

Diff for: node/cmd/spy/spy.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
123123
return err
124124
}
125125
}
126-
sub.ch <- message{vaaBytes: vaaBytes}
126+
sub.ch <- message{vaaBytes: vaaBytes} //can_block: Don't want to drop incoming VAAs
127127
continue
128128
}
129129

@@ -143,7 +143,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
143143
return err
144144
}
145145
}
146-
sub.ch <- message{vaaBytes: vaaBytes}
146+
sub.ch <- message{vaaBytes: vaaBytes} //can_block: Don't want to drop incoming VAAs
147147
}
148148
}
149149

@@ -246,7 +246,7 @@ func newSpyServer(logger *zap.Logger) *spyServer {
246246
func DoWithTimeout(f func() error, d time.Duration) error {
247247
errChan := make(chan error, 1)
248248
go func() {
249-
errChan <- f()
249+
errChan <- f() //can_block: Has timeout below
250250
close(errChan)
251251
}()
252252
t := time.NewTimer(d)

Diff for: node/hack/evm_test/wstest.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func main() {
7777
case <-ctx.Done():
7878
return
7979
case err := <-headerSubscription.Err():
80-
errC <- fmt.Errorf("block subscription failed: %w", err)
80+
errC <- fmt.Errorf("block subscription failed: %w", err) // can_block: The watcher will exit anyway
8181
return
8282
case block := <-headSink:
8383
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
@@ -114,7 +114,7 @@ func main() {
114114
case <-ctx.Done():
115115
return
116116
case err := <-messageSub.Err():
117-
errC <- fmt.Errorf("message subscription failed: %w", err)
117+
errC <- fmt.Errorf("message subscription failed: %w", err) // can_block: The watcher will exit anyway
118118
return
119119
case ev := <-messageC:
120120
logger.Info("Received a log event from the contract", zap.Any("ev", ev))

Diff for: node/pkg/adminrpc/adminserver.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
801801

802802
vaaInjectionsTotal.Inc()
803803

804-
s.injectC <- &common.MessagePublication{
804+
s.injectC <- &common.MessagePublication{ //can_block: Only blocks this command
805805
TxID: ethcommon.Hash{}.Bytes(),
806806
Timestamp: v.Timestamp,
807807
Nonce: v.Nonce,
@@ -901,7 +901,7 @@ func (s *nodePrivilegedService) fetchMissing(
901901
// Inject into the gossip signed VAA receive path.
902902
// This has the same effect as if the VAA was received from the network
903903
// (verifying signature, storing in local DB...).
904-
s.signedInC <- &gossipv1.SignedVAAWithQuorum{
904+
s.signedInC <- &gossipv1.SignedVAAWithQuorum{ //can_block: Only blocks this command
905905
Vaa: vaaBytes,
906906
}
907907

Diff for: node/pkg/common/channel_utils.go

+21
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,29 @@ package common
22

33
import (
44
"context"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
)
9+
10+
var (
11+
channelWriteDrops = promauto.NewCounterVec(
12+
prometheus.CounterOpts{
13+
Name: "wormhole_channel_write_drops",
14+
Help: "Total number of channel writes that were dropped due to channel overflow",
15+
}, []string{"channel_id"})
516
)
617

18+
// WriteToChannelWithoutBlocking attempts to write the specified event to the specified channel. If the write would block,
19+
// it increments the `channelWriteDrops` metric with the specified channel ID.
20+
func WriteToChannelWithoutBlocking[T any](channel chan<- T, evt T, label string) {
21+
select {
22+
case channel <- evt:
23+
default:
24+
channelWriteDrops.WithLabelValues(label).Inc()
25+
}
26+
}
27+
728
// ReadFromChannelWithTimeout reads events from the channel until a timeout occurs or the max maxCount is reached.
829
func ReadFromChannelWithTimeout[T any](ctx context.Context, ch <-chan T, maxCount int) ([]T, error) {
930
out := make([]T, 0, maxCount)

Diff for: node/pkg/common/channel_utils_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,17 @@ func TestReadFromChannelWithTimeout_TooMuchData(t *testing.T) {
7878
require.Equal(t, 1, len(observations))
7979
assert.Equal(t, 3, observations[0])
8080
}
81+
82+
func TestWriteToChannelWithoutBlocking(t *testing.T) {
83+
myChan := make(chan int, 1)
84+
assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
85+
WriteToChannelWithoutBlocking(myChan, 42, "numbers")
86+
assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
87+
WriteToChannelWithoutBlocking(myChan, 43, "numbers")
88+
assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "numbers"))
89+
WriteToChannelWithoutBlocking(myChan, 44, "numbers")
90+
assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
91+
WriteToChannelWithoutBlocking(myChan, 44, "different_label")
92+
assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "different_label"))
93+
assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
94+
}

Diff for: node/pkg/common/guardianset.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (st *GuardianSetState) SetHeartbeat(addr common.Address, peerId peer.ID, hb
177177

178178
v[peerId] = hb
179179
if st.updateC != nil {
180-
st.updateC <- hb
180+
WriteToChannelWithoutBlocking(st.updateC, hb, "heartbeat")
181181
}
182182
return nil
183183
}

Diff for: node/pkg/governor/governor_monitoring.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ import (
8080
"sort"
8181
"time"
8282

83+
"github.com/certusone/wormhole/node/pkg/common"
8384
"github.com/certusone/wormhole/node/pkg/guardiansigner"
8485
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
8586
publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
@@ -618,7 +619,7 @@ func (gov *ChainGovernor) publishConfig(ctx context.Context, hb *gossipv1.Heartb
618619
panic(err)
619620
}
620621

621-
sendC <- b
622+
common.WriteToChannelWithoutBlocking(sendC, b, "gov_config_gossip_out")
622623
}
623624

624625
func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, guardianSigner guardiansigner.GuardianSigner, ourAddr ethCommon.Address) {
@@ -703,5 +704,5 @@ func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartb
703704
panic(err)
704705
}
705706

706-
sendC <- b
707+
common.WriteToChannelWithoutBlocking(sendC, b, "gov_status_gossip_out")
707708
}

Diff for: node/pkg/governor/governor_prices.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (gov *ChainGovernor) queryCoinGecko(ctx context.Context) error {
158158
for {
159159
select {
160160
case <-ticker.C:
161-
throttle <- 1
161+
throttle <- 1 //can_block: We want this to block for throttling
162162
case <-ctx.Done():
163163
return
164164
}

Diff for: node/pkg/node/node.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ const (
3535
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
3636
inboundBatchObservationBufferSize = 1000
3737

38+
// inboundMessageBufferSize configures the size of the msgC channel used to publish new observations from the watcher to the processor.
39+
// This channel is shared across all the watchers so we don't want to hang up other watchers while the processor is handling an observation from one.
40+
inboundMessageBufferSize = 1000
41+
3842
// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
3943
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
4044
// So in the worst case the entire queue can be processed in 2s.
@@ -124,7 +128,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
124128
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
125129
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
126130
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
127-
g.msgC = makeChannelPair[*common.MessagePublication](0)
131+
g.msgC = makeChannelPair[*common.MessagePublication](inboundMessageBufferSize)
128132
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
129133
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
130134
g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)

Diff for: node/pkg/node/options.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
382382
zap.String("txID", msg.TxIDString()),
383383
zap.Time("timestamp", msg.Timestamp))
384384
} else {
385-
g.msgC.writeC <- msg
385+
g.msgC.writeC <- msg //can_block: The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
386386
}
387387
}
388388
}
@@ -408,7 +408,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
408408
zap.Stringer("watcherChainId", chainId),
409409
)
410410
}
411-
g.queryResponseC.writeC <- response
411+
g.queryResponseC.writeC <- response //can_block: This channel is buffered, if it backs up we'll stop processing queries until it clears
412412
}
413413
}
414414
}(chainQueryResponseC[chainId], chainId)

Diff for: node/pkg/node/publicwebRunnable.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ func publicwebServiceRunnable(
147147
go func() {
148148
logger.Info("publicweb server listening", zap.String("addr", srv.Addr))
149149
if tlsHostname != "" {
150-
errC <- srv.ServeTLS(listener, "", "")
150+
errC <- srv.ServeTLS(listener, "", "") //can_block: Only does one write
151151
} else {
152-
errC <- srv.Serve(listener)
152+
errC <- srv.Serve(listener) //can_block: Only does one write
153153
}
154154
}()
155155
select {

Diff for: node/pkg/p2p/p2p.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
670670

671671
// Send to local observation request queue (the loopback message is ignored)
672672
if params.obsvReqRecvC != nil {
673-
params.obsvReqRecvC <- msg
673+
common.WriteToChannelWithoutBlocking(params.obsvReqRecvC, msg, "obs_req_int")
674674
}
675675

676676
if controlPubsubTopic == nil {
@@ -695,7 +695,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
695695
for {
696696
envelope, err := controlSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
697697
if err != nil {
698-
errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err)
698+
errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err) // can_block: The runnable will exit anyway
699699
return
700700
}
701701

@@ -830,11 +830,11 @@ func Run(params *RunParams) func(ctx context.Context) error {
830830
}
831831
case *gossipv1.GossipMessage_SignedChainGovernorConfig:
832832
if params.signedGovCfgRecvC != nil {
833-
params.signedGovCfgRecvC <- m.SignedChainGovernorConfig
833+
common.WriteToChannelWithoutBlocking(params.signedGovCfgRecvC, m.SignedChainGovernorConfig, "gov_config_gossip_in")
834834
}
835835
case *gossipv1.GossipMessage_SignedChainGovernorStatus:
836836
if params.signedGovStatusRecvC != nil {
837-
params.signedGovStatusRecvC <- m.SignedChainGovernorStatus
837+
common.WriteToChannelWithoutBlocking(params.signedGovStatusRecvC, m.SignedChainGovernorStatus, "gov_status_gossip_in")
838838
}
839839
default:
840840
p2pMessagesReceived.WithLabelValues("unknown").Inc()
@@ -853,7 +853,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
853853
for {
854854
envelope, err := attestationSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
855855
if err != nil {
856-
errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err)
856+
errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err) // can_block: The runnable will exit anyway
857857
return
858858
}
859859

@@ -911,7 +911,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
911911
for {
912912
envelope, err := vaaSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled
913913
if err != nil {
914-
errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err)
914+
errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err) // can_block: The runnable will exit anyway
915915
return
916916
}
917917

Diff for: node/pkg/processor/broadcast.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
ethCommon "github.com/ethereum/go-ethereum/common"
88
"google.golang.org/protobuf/proto"
99

10+
"github.com/certusone/wormhole/node/pkg/common"
1011
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
1112
"github.com/wormhole-foundation/wormhole/sdk/vaa"
1213
)
@@ -75,7 +76,7 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
7576
}
7677

7778
// Broadcast the signed VAA.
78-
p.gossipVaaSendC <- msg
79+
common.WriteToChannelWithoutBlocking(p.gossipVaaSendC, msg, "vaa_broadcast")
7980
signedVAAsBroadcast.Inc()
8081

8182
if p.gatewayRelayer != nil {

Diff for: node/pkg/processor/cleanup.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,11 @@ func (p *Processor) handleCleanup(ctx context.Context) {
230230
}
231231
if s.ourMsg != nil {
232232
// This is the case for immediately published messages (as well as anything still pending from before the cutover).
233-
p.gossipAttestationSendC <- s.ourMsg
233+
select {
234+
case p.gossipAttestationSendC <- s.ourMsg:
235+
default:
236+
batchObservationChannelOverflow.WithLabelValues("gossipResend").Inc()
237+
}
234238
} else {
235239
p.postObservationToBatch(s.ourObs)
236240
}

Diff for: node/pkg/supervisor/supervisor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func New(ctx context.Context, logger *zap.Logger, rootRunnable Runnable, opts ..
110110

111111
go sup.processor(ctx)
112112

113-
sup.pReq <- &processorRequest{
113+
sup.pReq <- &processorRequest{ //can_block: Only does one write
114114
schedule: &processorRequestSchedule{dn: "root"},
115115
}
116116

Diff for: node/pkg/supervisor/supervisor_node.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (n *node) runGroup(runnables map[string]Runnable) error {
238238
// Schedule execution of group members.
239239
go func() {
240240
for name := range runnables {
241-
n.sup.pReq <- &processorRequest{
241+
n.sup.pReq <- &processorRequest{ //should_this_block: ???
242242
schedule: &processorRequestSchedule{
243243
dn: dns[name],
244244
},

Diff for: node/pkg/supervisor/supervisor_processor.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (s *supervisor) processSchedule(r *processorRequestSchedule) {
134134
if !s.propagatePanic {
135135
defer func() {
136136
if rec := recover(); rec != nil {
137-
s.pReq <- &processorRequest{
137+
s.pReq <- &processorRequest{ //should_this_block: ???
138138
died: &processorRequestDied{
139139
dn: r.dn,
140140
err: fmt.Errorf("panic: %v, stacktrace: %s", rec, string(debug.Stack())),
@@ -146,7 +146,7 @@ func (s *supervisor) processSchedule(r *processorRequestSchedule) {
146146

147147
res := n.runnable(n.ctx)
148148

149-
s.pReq <- &processorRequest{
149+
s.pReq <- &processorRequest{ //should_this_block: ???
150150
died: &processorRequestDied{
151151
dn: r.dn,
152152
err: res,
@@ -382,7 +382,7 @@ func (s *supervisor) processGC() {
382382
// Reschedule node runnable to run after backoff.
383383
go func(n *node, bo time.Duration) {
384384
time.Sleep(bo)
385-
s.pReq <- &processorRequest{
385+
s.pReq <- &processorRequest{ //should_this_block: ???
386386
schedule: &processorRequestSchedule{dn: n.dn()},
387387
}
388388
}(n, bo)

Diff for: node/pkg/supervisor/supervisor_support.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func GRPCServer(srv *grpc.Server, lis net.Listener, graceful bool) Runnable {
1919
Signal(ctx, SignalHealthy)
2020
errC := make(chan error)
2121
go func() {
22-
errC <- srv.Serve(lis)
22+
errC <- srv.Serve(lis) //should_this_block: Only does one write
2323
}()
2424
select {
2525
case <-ctx.Done():

Diff for: node/pkg/supervisor/supervisor_testhelpers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
// This is used in tests only.
1111
func (s *supervisor) waitSettle(ctx context.Context) error {
1212
waiter := make(chan struct{})
13-
s.pReq <- &processorRequest{
13+
s.pReq <- &processorRequest{ //should_this_block: ???
1414
waitSettled: &processorRequestWaitSettled{
1515
waiter: waiter,
1616
},

Diff for: node/pkg/telemetry/loki.go

+2-21
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/blendle/zapdriver"
14+
"github.com/certusone/wormhole/node/pkg/common"
1415
"go.uber.org/zap"
1516
"go.uber.org/zap/zapcore"
1617
"google.golang.org/api/option"
@@ -23,7 +24,6 @@ import (
2324
"github.com/grafana/loki/pkg/logproto"
2425
lokiflag "github.com/grafana/loki/pkg/util/flagext"
2526
"github.com/prometheus/client_golang/prometheus"
26-
"github.com/prometheus/client_golang/prometheus/promauto"
2727
"github.com/prometheus/common/config"
2828
"github.com/prometheus/common/model"
2929
)
@@ -40,20 +40,6 @@ type ExternalLoggerLoki struct {
4040
localLogger *zap.Logger
4141
}
4242

43-
var (
44-
lokiMessagesSent = promauto.NewCounter(
45-
prometheus.CounterOpts{
46-
Name: "wormhole_loki_messages_sent",
47-
Help: "Total number of log messages posted to Loki",
48-
})
49-
50-
lokiMessagesDropped = promauto.NewCounter(
51-
prometheus.CounterOpts{
52-
Name: "wormhole_loki_messages_dropped",
53-
Help: "Total number of log messages dropped while posting to Loki",
54-
})
55-
)
56-
5743
func (logger *ExternalLoggerLoki) log(time time.Time, message json.RawMessage, level zapcore.Level) {
5844
lokiLabels := logger.labels[level]
5945

@@ -71,12 +57,7 @@ func (logger *ExternalLoggerLoki) log(time time.Time, message json.RawMessage, l
7157
Labels: lokiLabels,
7258
}
7359

74-
select {
75-
case logger.c.Chan() <- entry:
76-
lokiMessagesSent.Inc()
77-
default:
78-
lokiMessagesDropped.Inc()
79-
}
60+
common.WriteToChannelWithoutBlocking(logger.c.Chan(), entry, "loki")
8061

8162
// A fatal error exits, which can cause us to lose messages. Flush everything.
8263
if level == zapcore.FatalLevel {

0 commit comments

Comments
 (0)