Skip to content

Commit 7a9894e

Browse files
committed
Node: Channel writes without blocking
1 parent 53e9961 commit 7a9894e

37 files changed

+422
-99
lines changed

Diff for: node/cmd/spy/spy.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
123123
return err
124124
}
125125
}
126-
sub.ch <- message{vaaBytes: vaaBytes}
126+
sub.ch <- message{vaaBytes: vaaBytes} //can_block: Don't want to drop incoming VAAs
127127
continue
128128
}
129129

@@ -143,7 +143,7 @@ func (s *spyServer) PublishSignedVAA(vaaBytes []byte) error {
143143
return err
144144
}
145145
}
146-
sub.ch <- message{vaaBytes: vaaBytes}
146+
sub.ch <- message{vaaBytes: vaaBytes} //can_block: Don't want to drop incoming VAAs
147147
}
148148
}
149149

@@ -246,7 +246,7 @@ func newSpyServer(logger *zap.Logger) *spyServer {
246246
func DoWithTimeout(f func() error, d time.Duration) error {
247247
errChan := make(chan error, 1)
248248
go func() {
249-
errChan <- f()
249+
errChan <- f() //can_block: Has timeout below
250250
close(errChan)
251251
}()
252252
t := time.NewTimer(d)

Diff for: node/hack/analyze.go

+297
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"flag"
6+
"fmt"
7+
"go/ast"
8+
"go/parser"
9+
"go/token"
10+
"log"
11+
"os"
12+
"path/filepath"
13+
"strings"
14+
)
15+
16+
// Position represents a range in the source code
17+
type Position struct {
18+
Filename string `json:"filename"`
19+
StartLine int `json:"start_line"`
20+
StartColumn int `json:"start_column"`
21+
EndLine int `json:"end_line"`
22+
EndColumn int `json:"end_column"`
23+
}
24+
25+
func (p Position) String() string {
26+
if p.StartLine == p.EndLine {
27+
return fmt.Sprintf("%s:%d:%d-%d", p.Filename, p.StartLine, p.StartColumn, p.EndColumn)
28+
}
29+
return fmt.Sprintf("%s:%d:%d-%d:%d", p.Filename, p.StartLine, p.StartColumn, p.EndLine, p.EndColumn)
30+
}
31+
32+
type Issue struct {
33+
Pos Position
34+
Message string
35+
Severity string
36+
}
37+
38+
type Analyzer struct {
39+
issues []Issue
40+
fset *token.FileSet
41+
stack parentStack
42+
}
43+
44+
// getPosition converts ast node position information into a Position
45+
func (a *Analyzer) getPosition(start, end token.Pos) Position {
46+
startPos := a.fset.Position(start)
47+
endPos := a.fset.Position(end)
48+
49+
return Position{
50+
Filename: startPos.Filename,
51+
StartLine: startPos.Line,
52+
StartColumn: startPos.Column,
53+
EndLine: endPos.Line,
54+
EndColumn: endPos.Column,
55+
}
56+
}
57+
58+
type parentStack struct {
59+
nodes []ast.Node
60+
}
61+
62+
func (p *parentStack) push(n ast.Node) {
63+
if n == nil {
64+
return
65+
}
66+
p.nodes = append(p.nodes, n)
67+
}
68+
69+
func (p *parentStack) pop() {
70+
if len(p.nodes) > 0 {
71+
p.nodes = p.nodes[:len(p.nodes)-1]
72+
}
73+
}
74+
75+
type OutputFormat string
76+
77+
const (
78+
OutputFormatText OutputFormat = "txt"
79+
OutputFormatJSON OutputFormat = "json"
80+
)
81+
82+
type JSONOutput struct {
83+
Issues []JSONIssue `json:"issues"`
84+
Total int `json:"total"`
85+
}
86+
87+
type JSONIssue struct {
88+
Severity string `json:"severity"`
89+
Message string `json:"message"`
90+
Position Position `json:"position"`
91+
}
92+
93+
func main() {
94+
if err := run(); err != nil {
95+
log.Fatalf("Error: %v", err)
96+
}
97+
}
98+
99+
func run() error {
100+
path := flag.String("path", ".", "Path to file or directory to analyze")
101+
output := flag.String("output", "txt", "Output format: txt or json")
102+
flag.Parse()
103+
104+
if path == nil || output == nil {
105+
return fmt.Errorf("invalid flag values")
106+
}
107+
108+
outputFormat := OutputFormat(*output)
109+
if outputFormat != OutputFormatText && outputFormat != OutputFormatJSON {
110+
return fmt.Errorf("invalid output format: %s. Valid options are: txt, json", *output)
111+
}
112+
113+
analyzer := &Analyzer{
114+
fset: token.NewFileSet(),
115+
}
116+
if analyzer.fset == nil {
117+
return fmt.Errorf("failed to create token.FileSet")
118+
}
119+
120+
if err := analyzer.analyzePath(*path); err != nil {
121+
return fmt.Errorf("error analyzing path: %w", err)
122+
}
123+
124+
if err := printOutput(outputFormat, analyzer.issues); err != nil {
125+
return fmt.Errorf("error printing output: %w", err)
126+
}
127+
128+
return nil
129+
}
130+
131+
func printOutput(format OutputFormat, issues []Issue) error {
132+
switch format {
133+
case OutputFormatJSON:
134+
return printJSON(issues)
135+
case OutputFormatText:
136+
return printText(issues)
137+
default:
138+
return fmt.Errorf("unknown output format: %s", format)
139+
}
140+
}
141+
142+
func printJSON(issues []Issue) error {
143+
output := JSONOutput{
144+
Total: len(issues),
145+
Issues: make([]JSONIssue, len(issues)),
146+
}
147+
148+
for i, issue := range issues {
149+
output.Issues[i] = JSONIssue{
150+
Severity: issue.Severity,
151+
Message: issue.Message,
152+
Position: issue.Pos,
153+
}
154+
}
155+
156+
jsonBytes, err := json.MarshalIndent(output, "", " ")
157+
if err != nil {
158+
return fmt.Errorf("error marshaling JSON: %w", err)
159+
}
160+
161+
_, err = fmt.Println(string(jsonBytes))
162+
return err
163+
}
164+
165+
func printText(issues []Issue) error {
166+
if len(issues) == 0 {
167+
_, err := fmt.Println("No issues found!")
168+
return err
169+
}
170+
171+
_, err := fmt.Printf("Found %d potential issues:\n\n", len(issues))
172+
if err != nil {
173+
return err
174+
}
175+
176+
for _, issue := range issues {
177+
if _, err := fmt.Printf("[%s] %s: %s\n", issue.Severity, issue.Pos, issue.Message); err != nil {
178+
return err
179+
}
180+
}
181+
return nil
182+
}
183+
184+
func (a *Analyzer) analyzePath(path string) error {
185+
fileInfo, err := os.Stat(path)
186+
if err != nil {
187+
return fmt.Errorf("error accessing path: %w", err)
188+
}
189+
190+
if fileInfo.IsDir() {
191+
return filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
192+
if err != nil {
193+
return err
194+
}
195+
if !info.IsDir() && strings.HasSuffix(path, ".go") {
196+
if err := a.analyzeFile(path); err != nil {
197+
return fmt.Errorf("error analyzing file %s: %w", path, err)
198+
}
199+
}
200+
return nil
201+
})
202+
}
203+
204+
return a.analyzeFile(path)
205+
}
206+
207+
func (a *Analyzer) analyzeFile(path string) error {
208+
file, err := parser.ParseFile(a.fset, path, nil, parser.AllErrors)
209+
if err != nil {
210+
return fmt.Errorf("error parsing file: %w", err)
211+
}
212+
213+
if file == nil {
214+
return fmt.Errorf("parsed file is nil")
215+
}
216+
217+
a.analyze(file)
218+
return nil
219+
}
220+
221+
func (a *Analyzer) analyze(file *ast.File) {
222+
if file == nil {
223+
return
224+
}
225+
226+
// Reset the stack for each file
227+
a.stack = parentStack{}
228+
229+
ast.Inspect(file, func(n ast.Node) bool {
230+
if n == nil {
231+
if len(a.stack.nodes) > 0 {
232+
a.stack.pop()
233+
}
234+
return true
235+
}
236+
237+
a.stack.push(n)
238+
239+
switch node := n.(type) {
240+
case *ast.SendStmt:
241+
if node != nil {
242+
a.checkChannelSend(node)
243+
}
244+
case *ast.CallExpr:
245+
if node != nil {
246+
a.checkChannelCreation(node)
247+
}
248+
}
249+
return true
250+
})
251+
}
252+
253+
func (a *Analyzer) checkChannelSend(node *ast.SendStmt) {
254+
// Check if any parent is a select statement
255+
inSelect := false
256+
for _, parent := range a.stack.nodes {
257+
if parent == nil {
258+
continue
259+
}
260+
if _, ok := parent.(*ast.SelectStmt); ok {
261+
inSelect = true
262+
break
263+
}
264+
}
265+
266+
if !inSelect {
267+
a.addIssue(Issue{
268+
Pos: a.getPosition(node.Pos(), node.End()),
269+
Message: "channel send without select statement may block indefinitely",
270+
Severity: "WARNING",
271+
})
272+
}
273+
}
274+
275+
func (a *Analyzer) checkChannelCreation(node *ast.CallExpr) {
276+
fun, ok := node.Fun.(*ast.Ident)
277+
if !ok || fun == nil || fun.Name != "make" {
278+
return
279+
}
280+
281+
if len(node.Args) > 0 {
282+
if chanType, ok := node.Args[0].(*ast.ChanType); ok && chanType != nil {
283+
// Check if buffer size is specified
284+
if len(node.Args) == 1 {
285+
a.addIssue(Issue{
286+
Pos: a.getPosition(node.Pos(), node.End()),
287+
Message: "unbuffered channel creation detected - consider specifying buffer size",
288+
Severity: "INFO",
289+
})
290+
}
291+
}
292+
}
293+
}
294+
295+
func (a *Analyzer) addIssue(issue Issue) {
296+
a.issues = append(a.issues, issue)
297+
}

Diff for: node/hack/evm_test/wstest.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func main() {
7777
case <-ctx.Done():
7878
return
7979
case err := <-headerSubscription.Err():
80-
errC <- fmt.Errorf("block subscription failed: %w", err)
80+
errC <- fmt.Errorf("block subscription failed: %w", err) //can_block: Only does one write
8181
return
8282
case block := <-headSink:
8383
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
@@ -114,7 +114,7 @@ func main() {
114114
case <-ctx.Done():
115115
return
116116
case err := <-messageSub.Err():
117-
errC <- fmt.Errorf("message subscription failed: %w", err)
117+
errC <- fmt.Errorf("message subscription failed: %w", err) //can_block: Only does one write
118118
return
119119
case ev := <-messageC:
120120
logger.Info("Received a log event from the contract", zap.Any("ev", ev))

Diff for: node/pkg/adminrpc/adminserver.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
797797

798798
vaaInjectionsTotal.Inc()
799799

800-
s.injectC <- &common.MessagePublication{
800+
s.injectC <- &common.MessagePublication{ //can_block: Only blocks this command
801801
TxID: ethcommon.Hash{}.Bytes(),
802802
Timestamp: v.Timestamp,
803803
Nonce: v.Nonce,
@@ -897,7 +897,7 @@ func (s *nodePrivilegedService) fetchMissing(
897897
// Inject into the gossip signed VAA receive path.
898898
// This has the same effect as if the VAA was received from the network
899899
// (verifying signature, storing in local DB...).
900-
s.signedInC <- &gossipv1.SignedVAAWithQuorum{
900+
s.signedInC <- &gossipv1.SignedVAAWithQuorum{ //can_block: Only blocks this command
901901
Vaa: vaaBytes,
902902
}
903903

Diff for: node/pkg/common/channel_utils.go

+21
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,29 @@ package common
22

33
import (
44
"context"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
)
9+
10+
var (
11+
channelWriteDrops = promauto.NewCounterVec(
12+
prometheus.CounterOpts{
13+
Name: "wormhole_channel_write_drops",
14+
Help: "Total number of channel writes that were dropped due to channel overflow",
15+
}, []string{"channel_id"})
516
)
617

18+
// WriteToChannelWithoutBlocking attempts to write the specified event to the specified channel. If the write would block,
19+
// it increments the `channelWriteDrops` metric with the specified channel ID.
20+
func WriteToChannelWithoutBlocking[T any](channel chan<- T, evt T, label string) {
21+
select {
22+
case channel <- evt:
23+
default:
24+
channelWriteDrops.WithLabelValues(label).Inc()
25+
}
26+
}
27+
728
// ReadFromChannelWithTimeout reads events from the channel until a timeout occurs or the max maxCount is reached.
829
func ReadFromChannelWithTimeout[T any](ctx context.Context, ch <-chan T, maxCount int) ([]T, error) {
930
out := make([]T, 0, maxCount)

0 commit comments

Comments
 (0)