Skip to content

Commit 9577191

Browse files
committed
refactor(lazyClientConn): Use synctest friendly once func
1 parent 41b0995 commit 9577191

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

Diff for: lazyClient.go

+33-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package multistream
33
import (
44
"fmt"
55
"io"
6-
"sync"
76
)
87

98
// NewMSSelect returns a new Multistream which is able to perform
@@ -12,6 +11,9 @@ func NewMSSelect[T StringLike](c io.ReadWriteCloser, proto T) LazyConn {
1211
return &lazyClientConn[T]{
1312
protos: []T{ProtocolID, proto},
1413
con: c,
14+
15+
rhandshakeOnce: newOnceFunc(),
16+
whandshakeOnce: newOnceFunc(),
1517
}
1618
}
1719

@@ -25,6 +27,34 @@ func NewMultistream[T StringLike](c io.ReadWriteCloser, proto T) LazyConn {
2527
}
2628
}
2729

30+
// onceFunc is a sync.Once that can be used by synctest.
31+
// For the Multistream, it is a bit better than sync.Once because it doesn't
32+
// spin when acquiring the lock.
33+
type onceFunc struct {
34+
sem chan struct{}
35+
}
36+
37+
func newOnceFunc() *onceFunc {
38+
o := onceFunc{
39+
sem: make(chan struct{}, 1),
40+
}
41+
o.sem <- struct{}{}
42+
return &o
43+
}
44+
45+
func (o *onceFunc) Do(f func()) {
46+
// We only ever pull a single value from the channel. But we want to block
47+
// Do until the first call to Do has completed. The first call will close
48+
// the channel, so by checking if it's closed we know we don't need to do
49+
// anything.
50+
_, ok := <-o.sem
51+
if !ok {
52+
return
53+
}
54+
defer close(o.sem)
55+
f()
56+
}
57+
2858
// lazyClientConn is a ReadWriteCloser adapter that lazily negotiates a protocol
2959
// using multistream-select on first use.
3060
//
@@ -33,11 +63,11 @@ func NewMultistream[T StringLike](c io.ReadWriteCloser, proto T) LazyConn {
3363
// See: https://github.com/multiformats/go-multistream/issues/20
3464
type lazyClientConn[T StringLike] struct {
3565
// Used to ensure we only trigger the write half of the handshake once.
36-
rhandshakeOnce sync.Once
66+
rhandshakeOnce *onceFunc
3767
rerr error
3868

3969
// Used to ensure we only trigger the read half of the handshake once.
40-
whandshakeOnce sync.Once
70+
whandshakeOnce *onceFunc
4171
werr error
4272

4373
// The sequence of protocols to negotiate.

Diff for: multistream_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net"
1010
"sort"
1111
"strings"
12+
"sync"
1213
"sync/atomic"
1314
"testing"
1415
"time"
@@ -938,3 +939,27 @@ func TestComparableErrors(t *testing.T) {
938939
t.Fatalf("Should be read as ErrNotSupported")
939940
}
940941
}
942+
943+
func TestOnceFunc(t *testing.T) {
944+
o := newOnceFunc()
945+
start := make(chan struct{})
946+
var runCount int
947+
var wg sync.WaitGroup
948+
const workers = 3
949+
wg.Add(workers)
950+
for range workers {
951+
go func() {
952+
defer wg.Done()
953+
<-start
954+
o.Do(func() { runCount++ })
955+
if runCount != 1 {
956+
t.Errorf("Do returned before func was run")
957+
}
958+
}()
959+
}
960+
close(start)
961+
wg.Wait()
962+
if runCount != 1 {
963+
t.Fatalf("should have run only once")
964+
}
965+
}

0 commit comments

Comments
 (0)