diff --git a/scripts/test.sh b/scripts/test.sh index 75447c80b00a..ca8d6f70acf4 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -82,6 +82,7 @@ fi COMMON_TEST_FLAGS=("${RACE}") if [[ -n "${CPU:-}" ]]; then COMMON_TEST_FLAGS+=("--cpu=${CPU}") + COMMON_TEST_FLAGS+=("--parallel=${CPU}") fi log_callout "Running with ${COMMON_TEST_FLAGS[*]}" diff --git a/server/embed/config.go b/server/embed/config.go index 3a3820568342..275f63324b84 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -615,6 +615,14 @@ func NewConfig() *Config { } func (cfg *Config) AddFlags(fs *flag.FlagSet) { + cfg.AddFlagsWithoutGlobals(fs) + + // raft connection timeouts + fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") + fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") +} + +func (cfg *Config) AddFlagsWithoutGlobals(fs *flag.FlagSet) { // member fs.StringVar(&cfg.Dir, "data-dir", cfg.Dir, "Path to the data directory.") fs.StringVar(&cfg.WalDir, "wal-dir", cfg.WalDir, "Path to the dedicated wal directory.") @@ -657,10 +665,6 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { fs.Var(flags.NewUint32Value(cfg.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.") - // raft connection timeouts - fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") - fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") - // clustering fs.Var( flags.NewUniqueURLsWithExceptions(DefaultInitialAdvertisePeerURLs, ""), diff --git a/tests/common/member_test.go b/tests/common/member_test.go index 1efb95039a66..e0e3a3522f96 100644 --- a/tests/common/member_test.go +++ b/tests/common/member_test.go @@ -34,8 +34,11 @@ func TestMemberList(t *testing.T) { for _, tc := range clusterTestCases() { t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + tc.config.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config)) defer clus.Close() cc := testutils.MustClient(clus.Client()) @@ -113,6 +116,8 @@ func TestMemberAdd(t *testing.T) { for _, quorumTc := range quorumTcs { for _, clusterTc := range clusterTestCases() { t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctxTimeout := 10 * time.Second if quorumTc.waitForQuorum { ctxTimeout += etcdserver.HealthInterval @@ -121,6 +126,7 @@ func TestMemberAdd(t *testing.T) { defer cancel() c := clusterTc.config c.StrictReconfigCheck = quorumTc.strictReconfigCheck + c.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c)) defer clus.Close() cc := testutils.MustClient(clus.Client()) @@ -198,10 +204,13 @@ func TestMemberRemove(t *testing.T) { continue } t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second) defer cancel() c := clusterTc.config c.StrictReconfigCheck = quorumTc.strictReconfigCheck + c.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c)) defer clus.Close() // client connects to a specific member which won't be removed from cluster diff --git a/tests/framework/config/cluster.go b/tests/framework/config/cluster.go index 0e6ec561afb9..260980683989 100644 --- a/tests/framework/config/cluster.go +++ b/tests/framework/config/cluster.go @@ -36,6 +36,7 @@ type ClusterConfig struct { StrictReconfigCheck bool AuthToken string SnapshotCount uint64 + UniquePortAlloc bool // ClusterContext is used by "e2e" or "integration" to extend the // ClusterConfig. The common test cases shouldn't care about what @@ -88,3 +89,7 @@ func WithSnapshotCount(count uint64) ClusterOption { func WithStrictReconfigCheck(strict bool) ClusterOption { return func(c *ClusterConfig) { c.StrictReconfigCheck = strict } } + +func WithUniquePortAlloc() ClusterOption { + return func(c *ClusterConfig) { c.UniquePortAlloc = true } +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 083dcc7a077f..357375d06487 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -518,6 +518,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peer2Port := port + 3 clientHTTPPort := port + 4 + var allocatedPorts []int + if cfg.BasePort == -1 { + clientPort = uniquePorts.Alloc() + peerPort = uniquePorts.Alloc() + metricsPort = uniquePorts.Alloc() + peer2Port = uniquePorts.Alloc() + clientHTTPPort = uniquePorts.Alloc() + allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort} + } + if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)} @@ -639,7 +649,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in } var gofailPort int if cfg.GoFailEnabled { - gofailPort = (i+1)*10000 + 2381 + gofailPort = uniquePorts.Alloc() + allocatedPorts = append(allocatedPorts, gofailPort) envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) } @@ -662,12 +673,13 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in GoFailClientTimeout: cfg.GoFailClientTimeout, Proxy: proxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, + AllocatedPorts: allocatedPorts, } } func values(cfg embed.Config) map[string]string { fs := flag.NewFlagSet("etcd", flag.ContinueOnError) - cfg.AddFlags(fs) + cfg.AddFlagsWithoutGlobals(fs) values := map[string]string{} fs.VisitAll(func(f *flag.Flag) { value := f.Value.String() diff --git a/tests/framework/e2e/e2e.go b/tests/framework/e2e/e2e.go index f78df57926ea..9db02af93755 100644 --- a/tests/framework/e2e/e2e.go +++ b/tests/framework/e2e/e2e.go @@ -83,6 +83,9 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config. default: t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS) } + if cfg.UniquePortAlloc { + e2eConfig.BasePort = -1 + } epc, err := NewEtcdProcessCluster(ctx, t, WithConfig(e2eConfig)) if err != nil { t.Fatalf("could not start etcd integrationCluster: %s", err) diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 445ea26e94cc..23f937df15c8 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct { Name string - PeerURL url.URL - ClientURL string - ClientHTTPURL string - MetricsURL string + PeerURL url.URL + ClientURL string + ClientHTTPURL string + MetricsURL string + AllocatedPorts []int InitialToken string InitialCluster string @@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error { ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath)) return os.RemoveAll(ep.cfg.DataDirPath) } + + for _, port := range ep.cfg.AllocatedPorts { + uniquePorts.Free(port) + } + ep.cfg.AllocatedPorts = nil return nil } diff --git a/tests/framework/e2e/port_alloc.go b/tests/framework/e2e/port_alloc.go new file mode 100644 index 000000000000..02e1831edbf8 --- /dev/null +++ b/tests/framework/e2e/port_alloc.go @@ -0,0 +1,58 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import "sync" + +// uniquePorts is a global instance of testPorts. +var uniquePorts *testPorts + +func init() { + uniquePorts = newTestPorts(11000, 19000) +} + +// testPorts is used to allocate listen ports for etcd instance in tests +// in a safe way for concurrent use (i.e. running tests in parallel). +type testPorts struct { + mux sync.Mutex + unused map[int]bool +} + +// newTestPorts keeps track of unused ports in the specified range. +func newTestPorts(start, end int) *testPorts { + m := make(map[int]bool, end-start) + for i := start; i < end; i++ { + m[i] = true + } + return &testPorts{unused: m} +} + +// Alloc allocates a new port or panics if none is available. +func (pa *testPorts) Alloc() int { + pa.mux.Lock() + defer pa.mux.Unlock() + for port := range pa.unused { + delete(pa.unused, port) + return port + } + panic("all ports are used") +} + +// Free makes port available for allocation through Alloc. +func (pa *testPorts) Free(port int) { + pa.mux.Lock() + defer pa.mux.Unlock() + pa.unused[port] = true +} diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index f78af573f348..8e117f47ecd1 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member { } func (c *Cluster) mustNewMember(t testutil.TB) *Member { - memberNumber := c.LastMemberNum + uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum) c.LastMemberNum++ m := MustNewMember(t, MemberConfig{ - Name: fmt.Sprintf("m%v", memberNumber), - MemberNumber: memberNumber, + Name: fmt.Sprintf("m%v", uniqueNumber), AuthToken: c.Cfg.AuthToken, PeerTLS: c.Cfg.PeerTLS, ClientTLS: c.Cfg.ClientTLS, @@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { type Member struct { config.ServerConfig UniqNumber int - MemberNumber int Port string PeerListeners, ClientListeners []net.Listener GRPCListener net.Listener @@ -591,7 +589,6 @@ type Member struct { type MemberConfig struct { Name string UniqNumber int64 - MemberNumber int PeerTLS *transport.TLSInfo ClientTLS *transport.TLSInfo AuthToken string @@ -624,8 +621,7 @@ type MemberConfig struct { func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { var err error m := &Member{ - MemberNumber: mcfg.MemberNumber, - UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), + UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), } peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)