Skip to content

Commit 0ba2f30

Browse files
committed
Node: Channel writes without blocking
1 parent 53e9961 commit 0ba2f30

36 files changed

+125
-99
lines changed

node/cmd/spy/spy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
123123
return err
124124
}
125125
}
126-
sub.ch <- message{vaaBytes: vaaBytes}
126+
sub.ch <- message{vaaBytes: vaaBytes} //can_block: Don't want to drop incoming VAAs
127127
continue
128128
}
129129

@@ -143,7 +143,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
143143
return err
144144
}
145145
}
146-
sub.ch <- message{vaaBytes: vaaBytes}
146+
sub.ch <- message{vaaBytes: vaaBytes} //can_block: Don't want to drop incoming VAAs
147147
}
148148
}
149149

@@ -246,7 +246,7 @@ func newSpyServer(logger *zap.Logger) *spyServer {
246246
func DoWithTimeout(f func() error, d time.Duration) error {
247247
errChan := make(chan error, 1)
248248
go func() {
249-
errChan <- f()
249+
errChan <- f() //can_block: Has timeout below
250250
close(errChan)
251251
}()
252252
t := time.NewTimer(d)

node/hack/evm_test/wstest.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func main() {
7777
case <-ctx.Done():
7878
return
7979
case err := <-headerSubscription.Err():
80-
errC <- fmt.Errorf("block subscription failed: %w", err)
80+
errC <- fmt.Errorf("block subscription failed: %w", err) //can_block: Only does one write
8181
return
8282
case block := <-headSink:
8383
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
@@ -114,7 +114,7 @@ func main() {
114114
case <-ctx.Done():
115115
return
116116
case err := <-messageSub.Err():
117-
errC <- fmt.Errorf("message subscription failed: %w", err)
117+
errC <- fmt.Errorf("message subscription failed: %w", err) //can_block: Only does one write
118118
return
119119
case ev := <-messageC:
120120
logger.Info("Received a log event from the contract", zap.Any("ev", ev))

node/pkg/adminrpc/adminserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
797797

798798
vaaInjectionsTotal.Inc()
799799

800-
s.injectC <- &common.MessagePublication{
800+
s.injectC <- &common.MessagePublication{ //can_block: Only blocks this command
801801
TxID: ethcommon.Hash{}.Bytes(),
802802
Timestamp: v.Timestamp,
803803
Nonce: v.Nonce,
@@ -897,7 +897,7 @@ func (s *nodePrivilegedService) fetchMissing(
897897
// Inject into the gossip signed VAA receive path.
898898
// This has the same effect as if the VAA was received from the network
899899
// (verifying signature, storing in local DB...).
900-
s.signedInC <- &gossipv1.SignedVAAWithQuorum{
900+
s.signedInC <- &gossipv1.SignedVAAWithQuorum{ //can_block: Only blocks this command
901901
Vaa: vaaBytes,
902902
}
903903

node/pkg/common/channel_utils.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,29 @@ package common
22

33
import (
44
"context"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
)
9+
10+
var (
11+
channelWriteDrops = promauto.NewCounterVec(
12+
prometheus.CounterOpts{
13+
Name: "wormhole_channel_write_drops",
14+
Help: "Total number of channel writes that were dropped due to channel overflow",
15+
}, []string{"channel_id"})
516
)
617

18+
// WriteToChannelWithoutBlocking attempts to write the specified event to the specified channel. If the write would block,
19+
// it increments the `channelWriteDrops` metric with the specified channel ID.
20+
func WriteToChannelWithoutBlocking[T any](channel chan<- T, evt T, label string) {
21+
select {
22+
case channel <- evt:
23+
default:
24+
channelWriteDrops.WithLabelValues(label).Inc()
25+
}
26+
}
27+
728
// ReadFromChannelWithTimeout reads events from the channel until a timeout occurs or the max maxCount is reached.
829
func ReadFromChannelWithTimeout[T any](ctx context.Context, ch <-chan T, maxCount int) ([]T, error) {
930
out := make([]T, 0, maxCount)

node/pkg/common/channel_utils_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,17 @@ func TestReadFromChannelWithTimeout_TooMuchData(t *testing.T) {
7878
require.Equal(t, 1, len(observations))
7979
assert.Equal(t, 3, observations[0])
8080
}
81+
82+
func TestWriteToChannelWithoutBlocking(t *testing.T) {
83+
myChan := make(chan int, 1)
84+
assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
85+
WriteToChannelWithoutBlocking(myChan, 42, "numbers")
86+
assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
87+
WriteToChannelWithoutBlocking(myChan, 43, "numbers")
88+
assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "numbers"))
89+
WriteToChannelWithoutBlocking(myChan, 44, "numbers")
90+
assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
91+
WriteToChannelWithoutBlocking(myChan, 44, "different_label")
92+
assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "different_label"))
93+
assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
94+
}

node/pkg/common/guardianset.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (st *GuardianSetState) SetHeartbeat(addr common.Address, peerId peer.ID, hb
177177

178178
v[peerId] = hb
179179
if st.updateC != nil {
180-
st.updateC <- hb
180+
WriteToChannelWithoutBlocking(st.updateC, hb, "heartbeat")
181181
}
182182
return nil
183183
}

node/pkg/governor/governor_monitoring.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ import (
8080
"sort"
8181
"time"
8282

83+
"github.com/certusone/wormhole/node/pkg/common"
8384
"github.com/certusone/wormhole/node/pkg/guardiansigner"
8485
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
8586
publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
@@ -618,7 +619,7 @@ func (gov *ChainGovernor) publishConfig(ctx context.Context, hb *gossipv1.Heartb
618619
panic(err)
619620
}
620621

621-
sendC <- b
622+
common.WriteToChannelWithoutBlocking(sendC, b, "gov_config_gossip_out")
622623
}
623624

624625
func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, guardianSigner guardiansigner.GuardianSigner, ourAddr ethCommon.Address) {
@@ -703,5 +704,5 @@ func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartb
703704
panic(err)
704705
}
705706

706-
sendC <- b
707+
common.WriteToChannelWithoutBlocking(sendC, b, "gov_status_gossip_out")
707708
}

node/pkg/governor/governor_prices.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (gov *ChainGovernor) queryCoinGecko(ctx context.Context) error {
158158
for {
159159
select {
160160
case <-ticker.C:
161-
throttle <- 1
161+
throttle <- 1 //can_block: We want this to block for throttling
162162
case <-ctx.Done():
163163
return
164164
}

node/pkg/node/node.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ const (
3434
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
3535
inboundBatchObservationBufferSize = 1000
3636

37+
// inboundMessageBufferSize configures the size of the msgC channel used to publish new observations from the watcher to the processor.
38+
// This channel is shared across all the watchers so we don't want to hang up other watchers while the processor is handling an observation from one.
39+
inboundMessageBufferSize = 1000
40+
3741
// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
3842
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
3943
// So in the worst case the entire queue can be processed in 2s.
@@ -122,7 +126,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
122126
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
123127
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
124128
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
125-
g.msgC = makeChannelPair[*common.MessagePublication](0)
129+
g.msgC = makeChannelPair[*common.MessagePublication](inboundMessageBufferSize)
126130
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
127131
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
128132
g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)

node/pkg/node/options.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
382382
zap.String("txID", msg.TxIDString()),
383383
zap.Time("timestamp", msg.Timestamp))
384384
} else {
385-
g.msgC.writeC <- msg
385+
g.msgC.writeC <- msg //can_block: The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
386386
}
387387
}
388388
}
@@ -408,7 +408,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
408408
zap.Stringer("watcherChainId", chainId),
409409
)
410410
}
411-
g.queryResponseC.writeC <- response
411+
g.queryResponseC.writeC <- response //can_block: This channel is buffered, if it backs up we'll stop processing queries until it clears
412412
}
413413
}
414414
}(chainQueryResponseC[chainId], chainId)

0 commit comments

Comments
 (0)