Skip to content

Commit 70e8732

Browse files
committed
Run Member tests in parallel
Introduce port allocator and remove unused MemberNumber. Add UniquePortAlloc config field so that default behavior will stay unchanged. On my local machine it brings down execution time from 5m to 32s. Signed-off-by: Aleksander Mistewicz <[email protected]>
1 parent fce823a commit 70e8732

File tree

7 files changed

+101
-12
lines changed

7 files changed

+101
-12
lines changed

tests/common/member_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ func TestMemberList(t *testing.T) {
3434

3535
for _, tc := range clusterTestCases() {
3636
t.Run(tc.name, func(t *testing.T) {
37+
t.Parallel()
38+
3739
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
3840
defer cancel()
41+
tc.config.UniquePortAlloc = true
3942
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config))
4043
defer clus.Close()
4144
cc := testutils.MustClient(clus.Client())
@@ -113,6 +116,8 @@ func TestMemberAdd(t *testing.T) {
113116
for _, quorumTc := range quorumTcs {
114117
for _, clusterTc := range clusterTestCases() {
115118
t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
119+
t.Parallel()
120+
116121
ctxTimeout := 10 * time.Second
117122
if quorumTc.waitForQuorum {
118123
ctxTimeout += etcdserver.HealthInterval
@@ -121,6 +126,7 @@ func TestMemberAdd(t *testing.T) {
121126
defer cancel()
122127
c := clusterTc.config
123128
c.StrictReconfigCheck = quorumTc.strictReconfigCheck
129+
c.UniquePortAlloc = true
124130
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c))
125131
defer clus.Close()
126132
cc := testutils.MustClient(clus.Client())
@@ -198,10 +204,13 @@ func TestMemberRemove(t *testing.T) {
198204
continue
199205
}
200206
t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
207+
t.Parallel()
208+
201209
ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second)
202210
defer cancel()
203211
c := clusterTc.config
204212
c.StrictReconfigCheck = quorumTc.strictReconfigCheck
213+
c.UniquePortAlloc = true
205214
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c))
206215
defer clus.Close()
207216
// client connects to a specific member which won't be removed from cluster

tests/framework/config/cluster.go

+5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type ClusterConfig struct {
3636
StrictReconfigCheck bool
3737
AuthToken string
3838
SnapshotCount uint64
39+
UniquePortAlloc bool
3940

4041
// ClusterContext is used by "e2e" or "integration" to extend the
4142
// ClusterConfig. The common test cases shouldn't care about what
@@ -88,3 +89,7 @@ func WithSnapshotCount(count uint64) ClusterOption {
8889
func WithStrictReconfigCheck(strict bool) ClusterOption {
8990
return func(c *ClusterConfig) { c.StrictReconfigCheck = strict }
9091
}
92+
93+
func WithUniquePortAlloc() ClusterOption {
94+
return func(c *ClusterConfig) { c.UniquePortAlloc = true }
95+
}

tests/framework/e2e/cluster.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
518518
peer2Port := port + 3
519519
clientHTTPPort := port + 4
520520

521+
var allocatedPorts []int
522+
if cfg.BasePort == -1 {
523+
clientPort = uniquePorts.Alloc()
524+
peerPort = uniquePorts.Alloc()
525+
metricsPort = uniquePorts.Alloc()
526+
peer2Port = uniquePorts.Alloc()
527+
clientHTTPPort = uniquePorts.Alloc()
528+
allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort}
529+
}
530+
521531
if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
522532
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
523533
curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)}
@@ -639,7 +649,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
639649
}
640650
var gofailPort int
641651
if cfg.GoFailEnabled {
642-
gofailPort = (i+1)*10000 + 2381
652+
gofailPort = uniquePorts.Alloc()
653+
allocatedPorts = append(allocatedPorts, gofailPort)
643654
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
644655
}
645656

@@ -662,6 +673,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
662673
GoFailClientTimeout: cfg.GoFailClientTimeout,
663674
Proxy: proxyCfg,
664675
LazyFSEnabled: cfg.LazyFSEnabled,
676+
AllocatedPorts: allocatedPorts,
665677
}
666678
}
667679

tests/framework/e2e/e2e.go

+3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config.
8383
default:
8484
t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS)
8585
}
86+
if cfg.UniquePortAlloc {
87+
e2eConfig.BasePort = -1
88+
}
8689
epc, err := NewEtcdProcessCluster(ctx, t, WithConfig(e2eConfig))
8790
if err != nil {
8891
t.Fatalf("could not start etcd integrationCluster: %s", err)

tests/framework/e2e/etcd_process.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct {
8888

8989
Name string
9090

91-
PeerURL url.URL
92-
ClientURL string
93-
ClientHTTPURL string
94-
MetricsURL string
91+
PeerURL url.URL
92+
ClientURL string
93+
ClientHTTPURL string
94+
MetricsURL string
95+
AllocatedPorts []int
9596

9697
InitialToken string
9798
InitialCluster string
@@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error {
248249
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
249250
return os.RemoveAll(ep.cfg.DataDirPath)
250251
}
252+
253+
for _, port := range ep.cfg.AllocatedPorts {
254+
uniquePorts.Free(port)
255+
}
256+
ep.cfg.AllocatedPorts = nil
251257
return nil
252258
}
253259

tests/framework/e2e/port_alloc.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2024 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package e2e
16+
17+
import "sync"
18+
19+
// uniquePorts is a global instance of testPorts.
20+
var uniquePorts *testPorts
21+
22+
func init() {
23+
uniquePorts = newTestPorts(11000, 19000)
24+
}
25+
26+
// testPorts is used to allocate listen ports for etcd instance in tests
27+
// in a safe way for concurrent use (i.e. running tests in parallel).
28+
type testPorts struct {
29+
mux sync.Mutex
30+
unused map[int]bool
31+
}
32+
33+
// newTestPorts keeps track of unused ports in the specified range.
34+
func newTestPorts(start, end int) *testPorts {
35+
m := make(map[int]bool, end-start)
36+
for i := start; i < end; i++ {
37+
m[i] = true
38+
}
39+
return &testPorts{unused: m}
40+
}
41+
42+
// Alloc allocates a new port or panics if none is available.
43+
func (pa *testPorts) Alloc() int {
44+
pa.mux.Lock()
45+
defer pa.mux.Unlock()
46+
for port := range pa.unused {
47+
delete(pa.unused, port)
48+
return port
49+
}
50+
panic("all ports are used")
51+
}
52+
53+
// Free makes port available for allocation through Alloc.
54+
func (pa *testPorts) Free(port int) {
55+
pa.mux.Lock()
56+
defer pa.mux.Unlock()
57+
pa.unused[port] = true
58+
}

tests/framework/integration/cluster.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member {
260260
}
261261

262262
func (c *Cluster) mustNewMember(t testutil.TB) *Member {
263-
memberNumber := c.LastMemberNum
263+
uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum)
264264
c.LastMemberNum++
265265

266266
m := MustNewMember(t,
267267
MemberConfig{
268-
Name: fmt.Sprintf("m%v", memberNumber),
269-
MemberNumber: memberNumber,
268+
Name: fmt.Sprintf("m%v", uniqueNumber),
270269
AuthToken: c.Cfg.AuthToken,
271270
PeerTLS: c.Cfg.PeerTLS,
272271
ClientTLS: c.Cfg.ClientTLS,
@@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
549548
type Member struct {
550549
config.ServerConfig
551550
UniqNumber int
552-
MemberNumber int
553551
Port string
554552
PeerListeners, ClientListeners []net.Listener
555553
GRPCListener net.Listener
@@ -591,7 +589,6 @@ type Member struct {
591589
type MemberConfig struct {
592590
Name string
593591
UniqNumber int64
594-
MemberNumber int
595592
PeerTLS *transport.TLSInfo
596593
ClientTLS *transport.TLSInfo
597594
AuthToken string
@@ -624,8 +621,7 @@ type MemberConfig struct {
624621
func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
625622
var err error
626623
m := &Member{
627-
MemberNumber: mcfg.MemberNumber,
628-
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
624+
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
629625
}
630626

631627
peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)

0 commit comments

Comments
 (0)