From 70e8732c5d8c3eeb862fab2e57dca701401af4b3 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Tue, 10 Dec 2024 12:19:44 +0100 Subject: [PATCH 1/3] Run Member tests in parallel Introduce port allocator and remove unused MemberNumber. Add UniquePortAlloc config field so that default behavior will stay unchanged. On my local machine it brings down execution time from 5m to 32s. Signed-off-by: Aleksander Mistewicz --- tests/common/member_test.go | 9 ++++ tests/framework/config/cluster.go | 5 +++ tests/framework/e2e/cluster.go | 14 ++++++- tests/framework/e2e/e2e.go | 3 ++ tests/framework/e2e/etcd_process.go | 14 +++++-- tests/framework/e2e/port_alloc.go | 58 ++++++++++++++++++++++++++ tests/framework/integration/cluster.go | 10 ++--- 7 files changed, 101 insertions(+), 12 deletions(-) create mode 100644 tests/framework/e2e/port_alloc.go diff --git a/tests/common/member_test.go b/tests/common/member_test.go index 1efb95039a6..e0e3a3522f9 100644 --- a/tests/common/member_test.go +++ b/tests/common/member_test.go @@ -34,8 +34,11 @@ func TestMemberList(t *testing.T) { for _, tc := range clusterTestCases() { t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + tc.config.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config)) defer clus.Close() cc := testutils.MustClient(clus.Client()) @@ -113,6 +116,8 @@ func TestMemberAdd(t *testing.T) { for _, quorumTc := range quorumTcs { for _, clusterTc := range clusterTestCases() { t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctxTimeout := 10 * time.Second if quorumTc.waitForQuorum { ctxTimeout += etcdserver.HealthInterval @@ -121,6 +126,7 @@ func TestMemberAdd(t *testing.T) { defer cancel() c := clusterTc.config c.StrictReconfigCheck = quorumTc.strictReconfigCheck + c.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c)) defer clus.Close() cc := testutils.MustClient(clus.Client()) @@ -198,10 +204,13 @@ func TestMemberRemove(t *testing.T) { continue } t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second) defer cancel() c := clusterTc.config c.StrictReconfigCheck = quorumTc.strictReconfigCheck + c.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c)) defer clus.Close() // client connects to a specific member which won't be removed from cluster diff --git a/tests/framework/config/cluster.go b/tests/framework/config/cluster.go index 0e6ec561afb..26098068398 100644 --- a/tests/framework/config/cluster.go +++ b/tests/framework/config/cluster.go @@ -36,6 +36,7 @@ type ClusterConfig struct { StrictReconfigCheck bool AuthToken string SnapshotCount uint64 + UniquePortAlloc bool // ClusterContext is used by "e2e" or "integration" to extend the // ClusterConfig. The common test cases shouldn't care about what @@ -88,3 +89,7 @@ func WithSnapshotCount(count uint64) ClusterOption { func WithStrictReconfigCheck(strict bool) ClusterOption { return func(c *ClusterConfig) { c.StrictReconfigCheck = strict } } + +func WithUniquePortAlloc() ClusterOption { + return func(c *ClusterConfig) { c.UniquePortAlloc = true } +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 083dcc7a077..46ff29a7e19 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -518,6 +518,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peer2Port := port + 3 clientHTTPPort := port + 4 + var allocatedPorts []int + if cfg.BasePort == -1 { + clientPort = uniquePorts.Alloc() + peerPort = uniquePorts.Alloc() + metricsPort = uniquePorts.Alloc() + peer2Port = uniquePorts.Alloc() + clientHTTPPort = uniquePorts.Alloc() + allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort} + } + if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)} @@ -639,7 +649,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in } var gofailPort int if cfg.GoFailEnabled { - gofailPort = (i+1)*10000 + 2381 + gofailPort = uniquePorts.Alloc() + allocatedPorts = append(allocatedPorts, gofailPort) envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) } @@ -662,6 +673,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in GoFailClientTimeout: cfg.GoFailClientTimeout, Proxy: proxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, + AllocatedPorts: allocatedPorts, } } diff --git a/tests/framework/e2e/e2e.go b/tests/framework/e2e/e2e.go index f78df57926e..9db02af9375 100644 --- a/tests/framework/e2e/e2e.go +++ b/tests/framework/e2e/e2e.go @@ -83,6 +83,9 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config. default: t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS) } + if cfg.UniquePortAlloc { + e2eConfig.BasePort = -1 + } epc, err := NewEtcdProcessCluster(ctx, t, WithConfig(e2eConfig)) if err != nil { t.Fatalf("could not start etcd integrationCluster: %s", err) diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 445ea26e94c..23f937df15c 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct { Name string - PeerURL url.URL - ClientURL string - ClientHTTPURL string - MetricsURL string + PeerURL url.URL + ClientURL string + ClientHTTPURL string + MetricsURL string + AllocatedPorts []int InitialToken string InitialCluster string @@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error { ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath)) return os.RemoveAll(ep.cfg.DataDirPath) } + + for _, port := range ep.cfg.AllocatedPorts { + uniquePorts.Free(port) + } + ep.cfg.AllocatedPorts = nil return nil } diff --git a/tests/framework/e2e/port_alloc.go b/tests/framework/e2e/port_alloc.go new file mode 100644 index 00000000000..02e1831edbf --- /dev/null +++ b/tests/framework/e2e/port_alloc.go @@ -0,0 +1,58 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import "sync" + +// uniquePorts is a global instance of testPorts. +var uniquePorts *testPorts + +func init() { + uniquePorts = newTestPorts(11000, 19000) +} + +// testPorts is used to allocate listen ports for etcd instance in tests +// in a safe way for concurrent use (i.e. running tests in parallel). +type testPorts struct { + mux sync.Mutex + unused map[int]bool +} + +// newTestPorts keeps track of unused ports in the specified range. +func newTestPorts(start, end int) *testPorts { + m := make(map[int]bool, end-start) + for i := start; i < end; i++ { + m[i] = true + } + return &testPorts{unused: m} +} + +// Alloc allocates a new port or panics if none is available. +func (pa *testPorts) Alloc() int { + pa.mux.Lock() + defer pa.mux.Unlock() + for port := range pa.unused { + delete(pa.unused, port) + return port + } + panic("all ports are used") +} + +// Free makes port available for allocation through Alloc. +func (pa *testPorts) Free(port int) { + pa.mux.Lock() + defer pa.mux.Unlock() + pa.unused[port] = true +} diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index f78af573f34..8e117f47ecd 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member { } func (c *Cluster) mustNewMember(t testutil.TB) *Member { - memberNumber := c.LastMemberNum + uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum) c.LastMemberNum++ m := MustNewMember(t, MemberConfig{ - Name: fmt.Sprintf("m%v", memberNumber), - MemberNumber: memberNumber, + Name: fmt.Sprintf("m%v", uniqueNumber), AuthToken: c.Cfg.AuthToken, PeerTLS: c.Cfg.PeerTLS, ClientTLS: c.Cfg.ClientTLS, @@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { type Member struct { config.ServerConfig UniqNumber int - MemberNumber int Port string PeerListeners, ClientListeners []net.Listener GRPCListener net.Listener @@ -591,7 +589,6 @@ type Member struct { type MemberConfig struct { Name string UniqNumber int64 - MemberNumber int PeerTLS *transport.TLSInfo ClientTLS *transport.TLSInfo AuthToken string @@ -624,8 +621,7 @@ type MemberConfig struct { func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { var err error m := &Member{ - MemberNumber: mcfg.MemberNumber, - UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), + UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), } peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS) From fa889ccfd30dfe658e0815e12de58b2339f2a046 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Thu, 12 Dec 2024 10:56:46 +0100 Subject: [PATCH 2/3] Fix race condition in tests due to use of globals in configuration Signed-off-by: Aleksander Mistewicz --- server/embed/config.go | 12 ++++++++---- tests/framework/e2e/cluster.go | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/embed/config.go b/server/embed/config.go index 3a382056834..275f63324b8 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -615,6 +615,14 @@ func NewConfig() *Config { } func (cfg *Config) AddFlags(fs *flag.FlagSet) { + cfg.AddFlagsWithoutGlobals(fs) + + // raft connection timeouts + fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") + fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") +} + +func (cfg *Config) AddFlagsWithoutGlobals(fs *flag.FlagSet) { // member fs.StringVar(&cfg.Dir, "data-dir", cfg.Dir, "Path to the data directory.") fs.StringVar(&cfg.WalDir, "wal-dir", cfg.WalDir, "Path to the dedicated wal directory.") @@ -657,10 +665,6 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { fs.Var(flags.NewUint32Value(cfg.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.") - // raft connection timeouts - fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") - fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") - // clustering fs.Var( flags.NewUniqueURLsWithExceptions(DefaultInitialAdvertisePeerURLs, ""), diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 46ff29a7e19..357375d0648 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -679,7 +679,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in func values(cfg embed.Config) map[string]string { fs := flag.NewFlagSet("etcd", flag.ContinueOnError) - cfg.AddFlags(fs) + cfg.AddFlagsWithoutGlobals(fs) values := map[string]string{} fs.VisitAll(func(f *flag.Flag) { value := f.Value.String() From cfc4e15177190918377dd01b6b5cfd492017d314 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Fri, 3 Jan 2025 13:59:03 +0100 Subject: [PATCH 3/3] Set --parallel to match --cpu flag Otherwise `go test` would execute GOMAXPROCS parallel tests running `--cpu` number of processes (1,2,4) which could starve cpu in some cases. Signed-off-by: Aleksander Mistewicz --- scripts/test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/test.sh b/scripts/test.sh index 75447c80b00..ca8d6f70acf 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -82,6 +82,7 @@ fi COMMON_TEST_FLAGS=("${RACE}") if [[ -n "${CPU:-}" ]]; then COMMON_TEST_FLAGS+=("--cpu=${CPU}") + COMMON_TEST_FLAGS+=("--parallel=${CPU}") fi log_callout "Running with ${COMMON_TEST_FLAGS[*]}"