diff --git a/lib/executor/constant_arrival_rate_test.go b/lib/executor/constant_arrival_rate_test.go index afdb663d5..dd33f36d9 100644 --- a/lib/executor/constant_arrival_rate_test.go +++ b/lib/executor/constant_arrival_rate_test.go @@ -3,10 +3,10 @@ package executor import ( "context" "fmt" - "runtime" "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/sirupsen/logrus" @@ -49,72 +49,70 @@ func getTestConstantArrivalRateConfig() *ConstantArrivalRateConfig { func TestConstantArrivalRateRunNotEnoughAllocatedVUsWarn(t *testing.T) { t.Parallel() - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - time.Sleep(time.Second) - return nil - }) + synctest.Test(t, func(t *testing.T) { + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + time.Sleep(time.Second) + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestConstantArrivalRateConfig()) - defer test.cancel() - - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - entries := test.logHook.Drain() - require.NotEmpty(t, entries) - for _, entry := range entries { - require.Equal(t, - "Insufficient VUs, reached 20 active VUs and cannot initialize more", - entry.Message) - require.Equal(t, logrus.WarnLevel, entry.Level) - } + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestConstantArrivalRateConfig()) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + entries := test.logHook.Drain() + require.NotEmpty(t, entries) + for _, entry := range entries { + require.Equal(t, + "Insufficient VUs, reached 20 active VUs and cannot initialize more", + entry.Message) + require.Equal(t, logrus.WarnLevel, entry.Level) + } + }) } func TestConstantArrivalRateRunCorrectRate(t *testing.T) { t.Parallel() + synctest.Test(t, func(t *testing.T) { + var count int64 + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + atomic.AddInt64(&count, 1) + return nil + }) - var count int64 - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - atomic.AddInt64(&count, 1) - return nil - }) - - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestConstantArrivalRateConfig()) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestConstantArrivalRateConfig()) + defer test.cancel() - var wg sync.WaitGroup - wg.Go(func() { - // check that we got around the amount of VU iterations as we would expect - var totalCount int64 + go func() { + // check that we got around the amount of VU iterations as we would expect + var totalCount int64 - i := 5 - ticker := time.NewTicker(time.Second) - for range ticker.C { - i-- - if i == 0 { - break + i := 5 + ticker := time.NewTicker(time.Second) + for range ticker.C { + i-- + if i == 0 { + break + } + currentCount := atomic.SwapInt64(&count, 0) + totalCount += currentCount + // We have a relatively relaxed constraint here, but we also check + // the final iteration count exactly below: + assert.InDelta(t, 50, currentCount, 5) } - currentCount := atomic.SwapInt64(&count, 0) - totalCount += currentCount - // We have a relatively relaxed constraint here, but we also check - // the final iteration count exactly below: - assert.InDelta(t, 50, currentCount, 5) - } - - time.Sleep(200 * time.Millisecond) // just in case - - assert.InDelta(t, 250, totalCount+atomic.LoadInt64(&count), 2) + assert.InDelta(t, 250, totalCount+atomic.LoadInt64(&count), 2) + }() + synctest.Wait() + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + synctest.Wait() + require.Empty(t, test.logHook.Drain()) }) - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - wg.Wait() - require.Empty(t, test.logHook.Drain()) } -//nolint:paralleltest // this is flaky if ran with other tests func TestConstantArrivalRateRunCorrectTiming(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skipf("this test is very flaky on the Windows GitHub Action runners...") - } + t.Parallel() + tests := []struct { segment string sequence string @@ -169,59 +167,61 @@ func TestConstantArrivalRateRunCorrectTiming(t *testing.T) { } for _, test := range tests { t.Run(fmt.Sprintf("segment %s sequence %s", test.segment, test.sequence), func(t *testing.T) { - var count int64 - startTime := time.Now() - expectedTimeInt64 := int64(test.start) - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - current := atomic.AddInt64(&count, 1) - - expectedTime := test.start - if current != 1 { - expectedTime = time.Duration(atomic.AddInt64(&expectedTimeInt64, - int64(time.Millisecond)*test.steps[(current-2)%int64(len(test.steps))])) - } - - // FIXME: replace this check with a unit test asserting that the scheduling is correct, - // without depending on the execution time itself - assert.WithinDuration(t, - startTime.Add(expectedTime), - time.Now(), - time.Millisecond*24, - "%d expectedTime %s", current, expectedTime, + t.Parallel() + synctest.Test(t, func(t *testing.T) { + var count int64 + startTime := time.Now() + expectedTimeInt64 := int64(test.start) + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + current := atomic.AddInt64(&count, 1) + + expectedTime := test.start + if current != 1 { + expectedTime = time.Duration(atomic.AddInt64(&expectedTimeInt64, + int64(time.Millisecond)*test.steps[(current-2)%int64(len(test.steps))])) + } + + // Under synctest, time is virtualized so scheduling assertions are deterministic + assert.WithinDuration(t, + startTime.Add(expectedTime), + time.Now(), + time.Millisecond*24, + "%d expectedTime %s", current, expectedTime, + ) + + return nil + }) + + config := getTestConstantArrivalRateConfig() + seconds := 2 + config.Duration.Duration = types.Duration(time.Second * time.Duration(seconds)) + execTest := setupExecutorTest( + t, test.segment, test.sequence, lib.Options{}, runner, config, ) + defer execTest.cancel() - return nil - }) + newET, err := execTest.state.ExecutionTuple.GetNewExecutionTupleFromValue(config.MaxVUs.Int64) + require.NoError(t, err) + rateScaled := newET.ScaleInt64(config.Rate.Int64) - config := getTestConstantArrivalRateConfig() - seconds := 2 - config.Duration.Duration = types.Duration(time.Second * time.Duration(seconds)) - execTest := setupExecutorTest( - t, test.segment, test.sequence, lib.Options{}, runner, config, - ) - defer execTest.cancel() - - newET, err := execTest.state.ExecutionTuple.GetNewExecutionTupleFromValue(config.MaxVUs.Int64) - require.NoError(t, err) - rateScaled := newET.ScaleInt64(config.Rate.Int64) - - var wg sync.WaitGroup - wg.Go(func() { - // check that we got around the amount of VU iterations as we would expect - var currentCount int64 - - for i := range seconds { - time.Sleep(time.Second) - currentCount = atomic.LoadInt64(&count) - assert.InDelta(t, int64(i+1)*rateScaled, currentCount, 3) - } + go func() { + // check that we got around the amount of VU iterations as we would expect + var currentCount int64 + + for i := range seconds { + time.Sleep(time.Second) + currentCount = atomic.LoadInt64(&count) + assert.InDelta(t, int64(i+1)*rateScaled, currentCount, 3) + } + }() + + startTime = time.Now() + engineOut := make(chan metrics.SampleContainer, 1000) + err = execTest.executor.Run(execTest.ctx, engineOut) + synctest.Wait() + require.NoError(t, err) + require.Empty(t, execTest.logHook.Drain()) }) - startTime = time.Now() - engineOut := make(chan metrics.SampleContainer, 1000) - err = execTest.executor.Run(execTest.ctx, engineOut) - wg.Wait() - require.NoError(t, err) - require.Empty(t, execTest.logHook.Drain()) }) } } @@ -236,74 +236,78 @@ func TestArrivalRateCancel(t *testing.T) { for name, config := range testCases { t.Run(name, func(t *testing.T) { t.Parallel() - ch := make(chan struct{}) - errCh := make(chan error, 1) - weAreDoneCh := make(chan struct{}) - - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + synctest.Test(t, func(t *testing.T) { + ch := make(chan struct{}) + errCh := make(chan error, 1) + weAreDoneCh := make(chan struct{}) + + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + select { + case <-ch: + <-ch + default: + } + return nil + }) + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() + + go func() { + engineOut := make(chan metrics.SampleContainer, 1000) + errCh <- test.executor.Run(test.ctx, engineOut) + close(weAreDoneCh) + }() + + time.Sleep(time.Second) + ch <- struct{}{} + test.cancel() + time.Sleep(time.Second) select { - case <-ch: - <-ch + case <-weAreDoneCh: + t.Fatal("Run returned before all VU iterations were finished") default: } - return nil + close(ch) + <-weAreDoneCh + synctest.Wait() + require.NoError(t, <-errCh) + require.Empty(t, test.logHook.Drain()) }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() - - var wg sync.WaitGroup - wg.Go(func() { - engineOut := make(chan metrics.SampleContainer, 1000) - errCh <- test.executor.Run(test.ctx, engineOut) - close(weAreDoneCh) - }) - - time.Sleep(time.Second) - ch <- struct{}{} - test.cancel() - time.Sleep(time.Second) - select { - case <-weAreDoneCh: - t.Fatal("Run returned before all VU iterations were finished") - default: - } - close(ch) - <-weAreDoneCh - wg.Wait() - require.NoError(t, <-errCh) - require.Empty(t, test.logHook.Drain()) }) } } func TestConstantArrivalRateDroppedIterations(t *testing.T) { t.Parallel() - var count int64 - config := &ConstantArrivalRateConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, - TimeUnit: types.NullDurationFrom(time.Second), - Rate: null.IntFrom(10), - Duration: types.NullDurationFrom(950 * time.Millisecond), - PreAllocatedVUs: null.IntFrom(5), - MaxVUs: null.IntFrom(5), - } + synctest.Test(t, func(t *testing.T) { + var count int64 - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - atomic.AddInt64(&count, 1) - <-ctx.Done() - return nil + config := &ConstantArrivalRateConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, + TimeUnit: types.NullDurationFrom(time.Second), + Rate: null.IntFrom(10), + Duration: types.NullDurationFrom(950 * time.Millisecond), + PreAllocatedVUs: null.IntFrom(5), + MaxVUs: null.IntFrom(5), + } + + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + }) + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + logs := test.logHook.Drain() + require.Len(t, logs, 1) + assert.Contains(t, logs[0].Message, "cannot initialize more") + assert.Equal(t, int64(5), count) + assert.Equal(t, float64(5), sumMetricValues(engineOut, metrics.DroppedIterationsName)) }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() - - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - logs := test.logHook.Drain() - require.Len(t, logs, 1) - assert.Contains(t, logs[0].Message, "cannot initialize more") - assert.Equal(t, int64(5), count) - assert.Equal(t, float64(5), sumMetricValues(engineOut, metrics.DroppedIterationsName)) } func TestConstantArrivalRateGlobalIters(t *testing.T) { @@ -330,21 +334,22 @@ func TestConstantArrivalRateGlobalIters(t *testing.T) { for _, tc := range testCases { t.Run(fmt.Sprintf("%s_%s", tc.seq, tc.seg), func(t *testing.T) { t.Parallel() - - gotIters := []uint64{} - var mx sync.Mutex - runner := simpleRunner(func(_ context.Context, state *lib.State) error { - mx.Lock() - gotIters = append(gotIters, state.GetScenarioGlobalVUIter()) - mx.Unlock() - return nil + synctest.Test(t, func(t *testing.T) { + gotIters := []uint64{} + var mx sync.Mutex + runner := simpleRunner(func(_ context.Context, state *lib.State) error { + mx.Lock() + gotIters = append(gotIters, state.GetScenarioGlobalVUIter()) + mx.Unlock() + return nil + }) + test := setupExecutorTest(t, tc.seg, tc.seq, lib.Options{}, runner, config) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 100) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + assert.Equal(t, tc.expIters, gotIters) }) - test := setupExecutorTest(t, tc.seg, tc.seq, lib.Options{}, runner, config) - defer test.cancel() - - engineOut := make(chan metrics.SampleContainer, 100) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.Equal(t, tc.expIters, gotIters) }) } } @@ -352,38 +357,40 @@ func TestConstantArrivalRateGlobalIters(t *testing.T) { func TestConstantArrivalRateActiveVUs(t *testing.T) { t.Parallel() - config := &ConstantArrivalRateConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, - TimeUnit: types.NullDurationFrom(time.Second), - Rate: null.IntFrom(10), - Duration: types.NullDurationFrom(950 * time.Millisecond), - PreAllocatedVUs: null.IntFrom(5), - MaxVUs: null.IntFrom(10), - } + synctest.Test(t, func(t *testing.T) { + config := &ConstantArrivalRateConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, + TimeUnit: types.NullDurationFrom(time.Second), + Rate: null.IntFrom(10), + Duration: types.NullDurationFrom(950 * time.Millisecond), + PreAllocatedVUs: null.IntFrom(5), + MaxVUs: null.IntFrom(10), + } - var ( - running int64 - getCurrActiveVUs func() int64 - runMx sync.Mutex - ) - - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - runMx.Lock() - running++ - assert.Equal(t, running, getCurrActiveVUs()) - runMx.Unlock() - // Block the VU to cause the executor to schedule more - <-ctx.Done() - return nil - }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + var ( + running int64 + getCurrActiveVUs func() int64 + runMx sync.Mutex + ) + + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + runMx.Lock() + running++ + assert.Equal(t, running, getCurrActiveVUs()) + runMx.Unlock() + // Block the VU to cause the executor to schedule more + <-ctx.Done() + return nil + }) + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - getCurrActiveVUs = test.state.GetCurrentlyActiveVUsCount + getCurrActiveVUs = test.state.GetCurrentlyActiveVUsCount - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.GreaterOrEqual(t, running, int64(5)) - assert.LessOrEqual(t, running, int64(10)) + assert.GreaterOrEqual(t, running, int64(5)) + assert.LessOrEqual(t, running, int64(10)) + }) } diff --git a/lib/executor/constant_vus_test.go b/lib/executor/constant_vus_test.go index bd59896e9..c7e836a7b 100644 --- a/lib/executor/constant_vus_test.go +++ b/lib/executor/constant_vus_test.go @@ -4,6 +4,7 @@ import ( "context" "sync" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" @@ -24,31 +25,34 @@ func getTestConstantVUsConfig() ConstantVUsConfig { func TestConstantVUsRun(t *testing.T) { t.Parallel() - var result sync.Map - runner := simpleRunner(func(ctx context.Context, state *lib.State) error { - select { - case <-ctx.Done(): + synctest.Test(t, func(t *testing.T) { + var result sync.Map + + runner := simpleRunner(func(ctx context.Context, state *lib.State) error { + select { + case <-ctx.Done(): + return nil + default: + } + currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) + result.Store(state.VUID, currIter.(uint64)+1) + time.Sleep(210 * time.Millisecond) return nil - default: - } - currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) - result.Store(state.VUID, currIter.(uint64)+1) - time.Sleep(210 * time.Millisecond) - return nil - }) + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestConstantVUsConfig()) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestConstantVUsConfig()) + defer test.cancel() - require.NoError(t, test.executor.Run(test.ctx, nil)) + require.NoError(t, test.executor.Run(test.ctx, nil)) - var totalIters uint64 - result.Range(func(_, value any) bool { - vuIters := value.(uint64) - assert.Equal(t, uint64(5), vuIters) - totalIters += vuIters - return true + var totalIters uint64 + result.Range(func(_, value any) bool { + vuIters := value.(uint64) + assert.Equal(t, uint64(5), vuIters) + totalIters += vuIters + return true + }) + assert.Equal(t, uint64(50), totalIters) }) - assert.Equal(t, uint64(50), totalIters) } diff --git a/lib/executor/execution_test.go b/lib/executor/execution_test.go index 6ac4cdf18..00ada5492 100644 --- a/lib/executor/execution_test.go +++ b/lib/executor/execution_test.go @@ -5,8 +5,8 @@ import ( "fmt" "io" "math/rand" - "sync" "testing" + "testing/synctest" "time" "github.com/sirupsen/logrus" @@ -61,111 +61,117 @@ func TestExecutionStateVUIDs(t *testing.T) { r := rand.New(rand.NewSource(seed)) t.Logf("Random source seeded with %d\n", seed) count := 100 + r.Intn(50) - wg := sync.WaitGroup{} - wg.Add(count) - for range count { - go func() { - es.GetUniqueVUIdentifiers() - wg.Done() - }() - } - wg.Wait() - idl, idg = es.GetUniqueVUIdentifiers() - assert.EqualValues(t, 4+count, idl) - assert.EqualValues(t, (3+count)*int(offsets[0])+int(start+1), idg) + + synctest.Test(t, func(t *testing.T) { + for range count { + go func() { + es.GetUniqueVUIdentifiers() + }() + } + synctest.Wait() + idl, idg = es.GetUniqueVUIdentifiers() + assert.EqualValues(t, 4+count, idl) + assert.EqualValues(t, (3+count)*int(offsets[0])+int(start+1), idg) + }) }) } } func TestExecutionStateGettingVUsWhenNonAreAvailable(t *testing.T) { t.Parallel() - et, err := lib.NewExecutionTuple(nil, nil) - require.NoError(t, err) - es := lib.NewExecutionState(nil, et, 0, 0) - logHook := testutils.NewLogHook(logrus.WarnLevel) - testLog := logrus.New() - testLog.AddHook(logHook) - testLog.SetOutput(io.Discard) - vu, err := es.GetPlannedVU(logrus.NewEntry(testLog), true) - require.Nil(t, vu) - require.Error(t, err) - require.Contains(t, err.Error(), "could not get a VU from the buffer in") - entries := logHook.Drain() - require.Equal(t, lib.MaxRetriesGetPlannedVU, len(entries)) - for _, entry := range entries { - require.Contains(t, entry.Message, "Could not get a VU from the buffer for ") - } + + synctest.Test(t, func(t *testing.T) { + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + es := lib.NewExecutionState(nil, et, 0, 0) + logHook := testutils.NewLogHook(logrus.WarnLevel) + testLog := logrus.New() + testLog.AddHook(logHook) + testLog.SetOutput(io.Discard) + vu, err := es.GetPlannedVU(logrus.NewEntry(testLog), true) + require.Nil(t, vu) + require.Error(t, err) + require.Contains(t, err.Error(), "could not get a VU from the buffer in") + entries := logHook.Drain() + require.Equal(t, lib.MaxRetriesGetPlannedVU, len(entries)) + for _, entry := range entries { + require.Contains(t, entry.Message, "Could not get a VU from the buffer for ") + } + }) } func TestExecutionStateGettingVUs(t *testing.T) { t.Parallel() - logHook := testutils.NewLogHook(logrus.WarnLevel, logrus.DebugLevel) - testLog := logrus.New() - testLog.AddHook(logHook) - testLog.SetOutput(io.Discard) - logEntry := logrus.NewEntry(testLog) - et, err := lib.NewExecutionTuple(nil, nil) - require.NoError(t, err) - es := lib.NewExecutionState(nil, et, 10, 20) - es.SetInitVUFunc(func(_ context.Context, _ *logrus.Entry) (lib.InitializedVU, error) { - return &minirunner.VU{}, nil - }) + synctest.Test(t, func(t *testing.T) { + logHook := testutils.NewLogHook(logrus.WarnLevel, logrus.DebugLevel) + testLog := logrus.New() + testLog.AddHook(logHook) + testLog.SetOutput(io.Discard) + logEntry := logrus.NewEntry(testLog) - var vu lib.InitializedVU - for i := range 10 { - require.EqualValues(t, i, es.GetInitializedVUsCount()) - vu, err = es.InitializeNewVU(context.Background(), logEntry) + et, err := lib.NewExecutionTuple(nil, nil) require.NoError(t, err) - require.EqualValues(t, i+1, es.GetInitializedVUsCount()) - es.ReturnVU(vu, false) - require.EqualValues(t, 0, es.GetCurrentlyActiveVUsCount()) - require.EqualValues(t, i+1, es.GetInitializedVUsCount()) - } + es := lib.NewExecutionState(nil, et, 10, 20) + es.SetInitVUFunc(func(_ context.Context, _ *logrus.Entry) (lib.InitializedVU, error) { + return &minirunner.VU{}, nil + }) - // Test getting initialized VUs is okay :) - for i := range 10 { - require.EqualValues(t, i, es.GetCurrentlyActiveVUsCount()) - vu, err = es.GetPlannedVU(logEntry, true) - require.NoError(t, err) - require.Empty(t, logHook.Drain()) - require.NotNil(t, vu) - require.EqualValues(t, i+1, es.GetCurrentlyActiveVUsCount()) - require.EqualValues(t, 10, es.GetInitializedVUsCount()) - } + var vu lib.InitializedVU + for i := range 10 { + require.EqualValues(t, i, es.GetInitializedVUsCount()) + vu, err = es.InitializeNewVU(context.Background(), logEntry) + require.NoError(t, err) + require.EqualValues(t, i+1, es.GetInitializedVUsCount()) + es.ReturnVU(vu, false) + require.EqualValues(t, 0, es.GetCurrentlyActiveVUsCount()) + require.EqualValues(t, i+1, es.GetInitializedVUsCount()) + } + + // Test getting initialized VUs is okay :) + for i := range 10 { + require.EqualValues(t, i, es.GetCurrentlyActiveVUsCount()) + vu, err = es.GetPlannedVU(logEntry, true) + require.NoError(t, err) + require.Empty(t, logHook.Drain()) + require.NotNil(t, vu) + require.EqualValues(t, i+1, es.GetCurrentlyActiveVUsCount()) + require.EqualValues(t, 10, es.GetInitializedVUsCount()) + } - // Check that getting 1 more planned VU will error out - vu, err = es.GetPlannedVU(logEntry, true) - require.Nil(t, vu) - require.Error(t, err) - require.Contains(t, err.Error(), "could not get a VU from the buffer in") - entries := logHook.Drain() - require.Equal(t, lib.MaxRetriesGetPlannedVU, len(entries)) - for _, entry := range entries { - require.Contains(t, entry.Message, "Could not get a VU from the buffer for ") - } + // Check that getting 1 more planned VU will error out + vu, err = es.GetPlannedVU(logEntry, true) + require.Nil(t, vu) + require.Error(t, err) + require.Contains(t, err.Error(), "could not get a VU from the buffer in") + entries := logHook.Drain() + require.Equal(t, lib.MaxRetriesGetPlannedVU, len(entries)) + for _, entry := range entries { + require.Contains(t, entry.Message, "Could not get a VU from the buffer for ") + } + + // Test getting uninitialized vus will work + for i := range 10 { + require.EqualValues(t, 10+i, es.GetInitializedVUsCount()) + vu, err = es.GetUnplannedVU(context.Background(), logEntry) + require.NoError(t, err) + require.Empty(t, logHook.Drain()) + require.NotNil(t, vu) + require.EqualValues(t, 10+i+1, es.GetInitializedVUsCount()) + require.EqualValues(t, 10, es.GetCurrentlyActiveVUsCount()) + } - // Test getting uninitialized vus will work - for i := range 10 { - require.EqualValues(t, 10+i, es.GetInitializedVUsCount()) + // Check that getting 1 more unplanned VU will error out vu, err = es.GetUnplannedVU(context.Background(), logEntry) - require.NoError(t, err) - require.Empty(t, logHook.Drain()) - require.NotNil(t, vu) - require.EqualValues(t, 10+i+1, es.GetInitializedVUsCount()) - require.EqualValues(t, 10, es.GetCurrentlyActiveVUsCount()) - } - - // Check that getting 1 more unplanned VU will error out - vu, err = es.GetUnplannedVU(context.Background(), logEntry) - require.Nil(t, vu) - require.Error(t, err) - require.Contains(t, err.Error(), "could not get a VU from the buffer in") - entries = logHook.Drain() - require.Equal(t, lib.MaxRetriesGetPlannedVU, len(entries)) - for _, entry := range entries { - require.Contains(t, entry.Message, "Could not get a VU from the buffer for ") - } + require.Nil(t, vu) + require.Error(t, err) + require.Contains(t, err.Error(), "could not get a VU from the buffer in") + entries = logHook.Drain() + require.Equal(t, lib.MaxRetriesGetPlannedVU, len(entries)) + for _, entry := range entries { + require.Contains(t, entry.Message, "Could not get a VU from the buffer for ") + } + }) } func TestMarkStartedPanicsOnSecondRun(t *testing.T) { diff --git a/lib/executor/externally_controlled_test.go b/lib/executor/externally_controlled_test.go index 94d9df733..83b197726 100644 --- a/lib/executor/externally_controlled_test.go +++ b/lib/executor/externally_controlled_test.go @@ -2,9 +2,9 @@ package executor import ( "context" - "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" @@ -28,78 +28,79 @@ func getTestExternallyControlledConfig() ExternallyControlledConfig { func TestExternallyControlledRun(t *testing.T) { t.Parallel() - doneIters := new(uint64) - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - time.Sleep(200 * time.Millisecond) - atomic.AddUint64(doneIters, 1) - return nil - }) + synctest.Test(t, func(t *testing.T) { + doneIters := new(uint64) + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + time.Sleep(200 * time.Millisecond) + atomic.AddUint64(doneIters, 1) + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestExternallyControlledConfig()) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestExternallyControlledConfig()) + defer test.cancel() - var ( - wg sync.WaitGroup - errCh = make(chan error, 1) - doneCh = make(chan struct{}) - ) - wg.Go(func() { - test.state.MarkStarted() - errCh <- test.executor.Run(test.ctx, nil) - test.state.MarkEnded() - close(doneCh) - }) + var ( + errCh = make(chan error, 1) + doneCh = make(chan struct{}) + ) + go func() { + test.state.MarkStarted() + errCh <- test.executor.Run(test.ctx, nil) + test.state.MarkEnded() + close(doneCh) + }() - updateConfig := func(vus, maxVUs int64, errMsg string) { - newConfig := ExternallyControlledConfigParams{ - VUs: null.IntFrom(vus), - MaxVUs: null.IntFrom(maxVUs), - Duration: types.NullDurationFrom(2 * time.Second), - } - err := test.executor.(*ExternallyControlled).UpdateConfig(test.ctx, newConfig) - if errMsg != "" { - assert.EqualError(t, err, errMsg) - } else { - assert.NoError(t, err) + updateConfig := func(vus, maxVUs int64, errMsg string) { + newConfig := ExternallyControlledConfigParams{ + VUs: null.IntFrom(vus), + MaxVUs: null.IntFrom(maxVUs), + Duration: types.NullDurationFrom(2 * time.Second), + } + err := test.executor.(*ExternallyControlled).UpdateConfig(test.ctx, newConfig) + if errMsg != "" { + assert.EqualError(t, err, errMsg) + } else { + assert.NoError(t, err) + } } - } - var resultVUCount [][]int64 - snapshot := func() { - resultVUCount = append(resultVUCount, - []int64{test.state.GetCurrentlyActiveVUsCount(), test.state.GetInitializedVUsCount()}) - } + var resultVUCount [][]int64 + snapshot := func() { + resultVUCount = append(resultVUCount, + []int64{test.state.GetCurrentlyActiveVUsCount(), test.state.GetInitializedVUsCount()}) + } - wg.Go(func() { - snapshotTicker := time.NewTicker(500 * time.Millisecond) - ticks := 0 - for { - select { - case <-snapshotTicker.C: - snapshot() - switch ticks { - case 0, 2: - updateConfig(4, 10, "") - case 1: - updateConfig(8, 20, "") - case 3: - updateConfig(15, 10, - "invalid configuration supplied: the number of active VUs (15)"+ - " must be less than or equal to the number of maxVUs (10)") - updateConfig(-1, 10, - "invalid configuration supplied: the number of VUs can't be negative") + go func() { + snapshotTicker := time.NewTicker(500 * time.Millisecond) + ticks := 0 + for { + select { + case <-snapshotTicker.C: + snapshot() + switch ticks { + case 0, 2: + updateConfig(4, 10, "") + case 1: + updateConfig(8, 20, "") + case 3: + updateConfig(15, 10, + "invalid configuration supplied: the number of active VUs (15)"+ + " must be less than or equal to the number of maxVUs (10)") + updateConfig(-1, 10, + "invalid configuration supplied: the number of VUs can't be negative") + } + ticks++ + case <-doneCh: + snapshotTicker.Stop() + snapshot() + return } - ticks++ - case <-doneCh: - snapshotTicker.Stop() - snapshot() - return } - } - }) + }() - wg.Wait() - require.NoError(t, <-errCh) - assert.InDelta(t, 48, atomic.LoadUint64(doneIters), 2) - assert.Equal(t, [][]int64{{2, 10}, {4, 10}, {8, 20}, {4, 10}, {0, 10}}, resultVUCount) + require.NoError(t, <-errCh) + synctest.Wait() + assert.InDelta(t, 48, atomic.LoadUint64(doneIters), 2) + assert.Equal(t, [][]int64{{2, 10}, {4, 10}, {8, 20}, {4, 10}, {0, 10}}, resultVUCount) + }) } diff --git a/lib/executor/per_vu_iterations_test.go b/lib/executor/per_vu_iterations_test.go index fb91de6b7..c653f6144 100644 --- a/lib/executor/per_vu_iterations_test.go +++ b/lib/executor/per_vu_iterations_test.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" @@ -56,67 +57,73 @@ func TestPerVUIterationsRun(t *testing.T) { // This is the reverse behavior of the SharedIterations executor. func TestPerVUIterationsRunVariableVU(t *testing.T) { t.Parallel() - var ( - result sync.Map - slowVUID = uint64(1) - ) - runner := simpleRunner(func(_ context.Context, state *lib.State) error { - if state.VUID == slowVUID { - time.Sleep(200 * time.Millisecond) - } - currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) - result.Store(state.VUID, currIter.(uint64)+1) - return nil + synctest.Test(t, func(t *testing.T) { + var ( + result sync.Map + slowVUID = uint64(1) + ) + + runner := simpleRunner(func(_ context.Context, state *lib.State) error { + if state.VUID == slowVUID { + time.Sleep(201 * time.Millisecond) + } + currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) + result.Store(state.VUID, currIter.(uint64)+1) + return nil + }) + + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestPerVUIterationsConfig()) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + + val, ok := result.Load(slowVUID) + assert.True(t, ok) + + var totalIters uint64 + result.Range(func(key, value any) bool { + vuIters := value.(uint64) + if key != slowVUID { + assert.Equal(t, uint64(100), vuIters) + } + totalIters += vuIters + return true + }) + + // The slow VU should complete 15 iterations given these timings, + // while the rest should equally complete their assigned 100 iterations. + assert.EqualValues(t, 15, val) + assert.EqualValues(t, 9*100+val.(uint64), totalIters) }) - - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestPerVUIterationsConfig()) - defer test.cancel() - - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - - val, ok := result.Load(slowVUID) - assert.True(t, ok) - - var totalIters uint64 - result.Range(func(key, value any) bool { - vuIters := value.(uint64) - if key != slowVUID { - assert.Equal(t, uint64(100), vuIters) - } - totalIters += vuIters - return true - }) - - // The slow VU should complete 15 iterations given these timings, - // while the rest should equally complete their assigned 100 iterations. - assert.Equal(t, uint64(15), val) - assert.Equal(t, uint64(915), totalIters) } func TestPerVuIterationsEmitDroppedIterations(t *testing.T) { t.Parallel() - var count int64 - config := PerVUIterationsConfig{ - VUs: null.IntFrom(5), - Iterations: null.IntFrom(20), - MaxDuration: types.NullDurationFrom(1 * time.Second), - } + synctest.Test(t, func(t *testing.T) { + var count int64 - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - atomic.AddInt64(&count, 1) - <-ctx.Done() - return nil - }) + config := PerVUIterationsConfig{ + VUs: null.IntFrom(5), + Iterations: null.IntFrom(20), + MaxDuration: types.NullDurationFrom(1 * time.Second), + } - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + }) - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.Empty(t, test.logHook.Drain()) - assert.Equal(t, int64(5), count) - assert.Equal(t, float64(95), sumMetricValues(engineOut, metrics.DroppedIterationsName)) + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + assert.Empty(t, test.logHook.Drain()) + assert.Equal(t, int64(5), count) + assert.Equal(t, float64(95), sumMetricValues(engineOut, metrics.DroppedIterationsName)) + }) } diff --git a/lib/executor/ramping_arrival_rate.go b/lib/executor/ramping_arrival_rate.go index f9147812c..12537bd04 100644 --- a/lib/executor/ramping_arrival_rate.go +++ b/lib/executor/ramping_arrival_rate.go @@ -231,7 +231,7 @@ func (varr *RampingArrivalRate) Init(_ context.Context) error { // The specific implementation here can only go forward and does incorporate // the striping algorithm from the lib.ExecutionTuple for additional speed up but this could // possibly be refactored if need for this arises. -func (varc RampingArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time.Duration) { +func (varc RampingArrivalRateConfig) cal(ctx context.Context, et *lib.ExecutionTuple, ch chan<- time.Duration) { start, offsets, _ := et.GetStripedOffsets() li := -1 // TODO: move this to a utility function, or directly what GetStripedOffsets uses once we see everywhere we will use it @@ -246,7 +246,8 @@ func (varc RampingArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time. doneSoFar, endCount, to, dur float64 from = float64(varc.StartRate.ValueOrZero()) / timeUnit // start .. starts at 0 but the algorithm works with area so we need to start from 1 not 0 - i = float64(start + 1) + i = float64(start + 1) + done = ctx.Done() ) for _, stage := range varc.Stages { @@ -259,12 +260,20 @@ func (varc RampingArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time. // somewhere where it is less in the middle of the equation x := (from*dur - noNegativeSqrt(dur*(from*from*dur+2*(i-doneSoFar)*(to-from)))) / (from - to) - ch <- time.Duration(x) + stageStart + select { + case <-done: + return + case ch <- time.Duration(x) + stageStart: + } } } else { endCount += dur * to for ; i <= endCount; i += float64(next()) { - ch <- time.Duration((i-doneSoFar)/to) + stageStart + select { + case <-done: + return + case ch <- time.Duration((i-doneSoFar)/to) + stageStart: + } } } doneSoFar = endCount @@ -442,7 +451,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics var prevTime time.Duration shownWarning := false metricTags := varr.getMetricTags(nil) - go varr.config.cal(varr.et, ch) + go varr.config.cal(maxDurationCtx, varr.et, ch) for nextTime := range ch { select { case <-regDurationDone: diff --git a/lib/executor/ramping_arrival_rate_test.go b/lib/executor/ramping_arrival_rate_test.go index 8073e45b4..930f40995 100644 --- a/lib/executor/ramping_arrival_rate_test.go +++ b/lib/executor/ramping_arrival_rate_test.go @@ -7,6 +7,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/sirupsen/logrus" @@ -46,210 +47,233 @@ func getTestRampingArrivalRateConfig() *RampingArrivalRateConfig { func TestRampingArrivalRateRunNotEnoughAllocatedVUsWarn(t *testing.T) { t.Parallel() - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - time.Sleep(time.Second) - return nil - }) + synctest.Test(t, func(t *testing.T) { + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + time.Sleep(time.Second) + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestRampingArrivalRateConfig()) - defer test.cancel() - - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - entries := test.logHook.Drain() - require.NotEmpty(t, entries) - for _, entry := range entries { - require.Equal(t, - "Insufficient VUs, reached 20 active VUs and cannot initialize more", - entry.Message) - require.Equal(t, logrus.WarnLevel, entry.Level) - } + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestRampingArrivalRateConfig()) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + entries := test.logHook.Drain() + require.NotEmpty(t, entries) + for _, entry := range entries { + require.Equal(t, + "Insufficient VUs, reached 20 active VUs and cannot initialize more", + entry.Message) + require.Equal(t, logrus.WarnLevel, entry.Level) + } + }) } func TestRampingArrivalRateRunCorrectRate(t *testing.T) { t.Parallel() - var count int64 - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - atomic.AddInt64(&count, 1) - return nil - }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestRampingArrivalRateConfig()) - defer test.cancel() + synctest.Test(t, func(t *testing.T) { + var count int64 + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + atomic.AddInt64(&count, 1) + return nil + }) - var wg sync.WaitGroup - wg.Go(func() { - // check that we got around the amount of VU iterations as we would expect - var currentCount int64 + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestRampingArrivalRateConfig()) + defer test.cancel() - time.Sleep(time.Second) - currentCount = atomic.SwapInt64(&count, 0) - assert.InDelta(t, 10, currentCount, 1) + go func(ctx context.Context) { + // Under synctest, time is virtualized so sampling is deterministic + var currentCount int64 - time.Sleep(time.Second) - currentCount = atomic.SwapInt64(&count, 0) - assert.InDelta(t, 30, currentCount, 2) + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + currentCount = atomic.SwapInt64(&count, 0) + assert.InDelta(t, 10, currentCount, 1) + + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + currentCount = atomic.SwapInt64(&count, 0) + assert.InDelta(t, 30, currentCount, 2) - time.Sleep(time.Second) - currentCount = atomic.SwapInt64(&count, 0) - assert.InDelta(t, 50, currentCount, 3) + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + currentCount = atomic.SwapInt64(&count, 0) + assert.InDelta(t, 50, currentCount, 3) + }(test.ctx) + + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + synctest.Wait() + require.Empty(t, test.logHook.Drain()) }) - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - wg.Wait() - require.Empty(t, test.logHook.Drain()) } func TestRampingArrivalRateRunUnplannedVUs(t *testing.T) { t.Parallel() - config := &RampingArrivalRateConfig{ - TimeUnit: types.NullDurationFrom(time.Second), - Stages: []Stage{ - { - // the minus one makes it so only 9 iterations will be started instead of 10 - // as the 10th happens to be just at the end and sometimes doesn't get executed :( - Duration: types.NullDurationFrom(time.Second*2 - 1), - Target: null.IntFrom(10), + synctest.Test(t, func(t *testing.T) { + config := &RampingArrivalRateConfig{ + TimeUnit: types.NullDurationFrom(time.Second), + Stages: []Stage{ + { + // the minus one makes it so only 9 iterations will be started instead of 10 + // as the 10th happens to be just at the end and sometimes doesn't get executed :( + Duration: types.NullDurationFrom(time.Second*2 - 1), + Target: null.IntFrom(10), + }, }, - }, - PreAllocatedVUs: null.IntFrom(1), - MaxVUs: null.IntFrom(3), - } - - var count int64 - ch := make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations - ch2 := make(chan struct{}) // closed when a second iteration was started on an old VU in order to test it won't start a second unplanned VU in parallel or at all - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - cur := atomic.AddInt64(&count, 1) - switch cur { - case 1: - <-ch // wait to start again - case 2: - <-ch2 // wait to start again + PreAllocatedVUs: null.IntFrom(1), + MaxVUs: null.IntFrom(3), } - return nil - }) + var count int64 + ch := make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations + ch2 := make(chan struct{}) // closed when a second iteration was started on an old VU in order to test it won't start a second unplanned VU in parallel or at all + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + cur := atomic.AddInt64(&count, 1) + switch cur { + case 1: + <-ch // wait to start again + case 2: + <-ch2 // wait to start again + } - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + return nil + }) - engineOut := make(chan metrics.SampleContainer, 1000) - test.state.SetInitVUFunc(func(ctx context.Context, _ *logrus.Entry) (lib.InitializedVU, error) { - cur := atomic.LoadInt64(&count) - require.Equal(t, int64(1), cur) - time.Sleep(time.Second / 2) + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - close(ch) - time.Sleep(time.Millisecond * 150) + engineOut := make(chan metrics.SampleContainer, 1000) + test.state.SetInitVUFunc(func(ctx context.Context, _ *logrus.Entry) (lib.InitializedVU, error) { + cur := atomic.LoadInt64(&count) + require.Equal(t, int64(1), cur) + time.Sleep(time.Second / 2) - cur = atomic.LoadInt64(&count) - require.Equal(t, int64(2), cur) + close(ch) + time.Sleep(time.Millisecond * 150) - time.Sleep(time.Millisecond * 150) - cur = atomic.LoadInt64(&count) - require.Equal(t, int64(2), cur) + cur = atomic.LoadInt64(&count) + require.Equal(t, int64(2), cur) - close(ch2) - time.Sleep(time.Millisecond * 200) - cur = atomic.LoadInt64(&count) - require.NotEqual(t, int64(2), cur) - idl, idg := test.state.GetUniqueVUIdentifiers() - return runner.NewVU(ctx, idl, idg, engineOut) - }) + time.Sleep(time.Millisecond * 150) + cur = atomic.LoadInt64(&count) + require.Equal(t, int64(2), cur) - assert.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.Empty(t, test.logHook.Drain()) + close(ch2) + time.Sleep(time.Millisecond * 200) + cur = atomic.LoadInt64(&count) + require.NotEqual(t, int64(2), cur) + idl, idg := test.state.GetUniqueVUIdentifiers() + return runner.NewVU(ctx, idl, idg, engineOut) + }) - droppedIters := sumMetricValues(engineOut, metrics.DroppedIterationsName) - assert.Equal(t, count+int64(droppedIters), int64(9)) + assert.NoError(t, test.executor.Run(test.ctx, engineOut)) + assert.Empty(t, test.logHook.Drain()) + + droppedIters := sumMetricValues(engineOut, metrics.DroppedIterationsName) + assert.Equal(t, count+int64(droppedIters), int64(9)) + }) } func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) { t.Parallel() - config := &RampingArrivalRateConfig{ - TimeUnit: types.NullDurationFrom(time.Second), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(time.Second * 2), - Target: null.IntFrom(10), + synctest.Test(t, func(t *testing.T) { + config := &RampingArrivalRateConfig{ + TimeUnit: types.NullDurationFrom(time.Second), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(time.Second * 2), + Target: null.IntFrom(10), + }, }, - }, - PreAllocatedVUs: null.IntFrom(1), - MaxVUs: null.IntFrom(3), - } - - var count int64 - ch := make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - cur := atomic.AddInt64(&count, 1) - if cur == 1 { - <-ch // wait to start again + PreAllocatedVUs: null.IntFrom(1), + MaxVUs: null.IntFrom(3), } - return nil - }) + var count int64 + ch := make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + cur := atomic.AddInt64(&count, 1) + if cur == 1 { + <-ch // wait to start again + } - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() - - engineOut := make(chan metrics.SampleContainer, 1000) - test.state.SetInitVUFunc(func(ctx context.Context, _ *logrus.Entry) (lib.InitializedVU, error) { - t.Log("init") - cur := atomic.LoadInt64(&count) - require.Equal(t, int64(1), cur) - time.Sleep(time.Millisecond * 200) - close(ch) - time.Sleep(time.Millisecond * 200) - cur = atomic.LoadInt64(&count) - require.NotEqual(t, int64(1), cur) - - idl, idg := test.state.GetUniqueVUIdentifiers() - return runner.NewVU(ctx, idl, idg, engineOut) - }) + return nil + }) + + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 1000) + test.state.SetInitVUFunc(func(ctx context.Context, _ *logrus.Entry) (lib.InitializedVU, error) { + t.Log("init") + cur := atomic.LoadInt64(&count) + require.Equal(t, int64(1), cur) + time.Sleep(time.Millisecond * 200) + close(ch) + time.Sleep(time.Millisecond * 200) + cur = atomic.LoadInt64(&count) + require.NotEqual(t, int64(1), cur) + + idl, idg := test.state.GetUniqueVUIdentifiers() + return runner.NewVU(ctx, idl, idg, engineOut) + }) - assert.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.Empty(t, test.logHook.Drain()) - assert.Equal(t, int64(0), test.state.GetCurrentlyActiveVUsCount()) - assert.Equal(t, int64(2), test.state.GetInitializedVUsCount()) + assert.NoError(t, test.executor.Run(test.ctx, engineOut)) + assert.Empty(t, test.logHook.Drain()) + assert.Equal(t, int64(0), test.state.GetCurrentlyActiveVUsCount()) + assert.Equal(t, int64(2), test.state.GetInitializedVUsCount()) + }) } func TestRampingArrivalRateRunGracefulStop(t *testing.T) { t.Parallel() - config := &RampingArrivalRateConfig{ - TimeUnit: types.NullDurationFrom(1 * time.Second), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(2 * time.Second), - Target: null.IntFrom(10), + synctest.Test(t, func(t *testing.T) { + config := &RampingArrivalRateConfig{ + TimeUnit: types.NullDurationFrom(1 * time.Second), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(2 * time.Second), + Target: null.IntFrom(10), + }, }, - }, - StartRate: null.IntFrom(10), - PreAllocatedVUs: null.IntFrom(10), - MaxVUs: null.IntFrom(10), - BaseConfig: BaseConfig{ - GracefulStop: types.NullDurationFrom(5 * time.Second), - }, - } + StartRate: null.IntFrom(10), + PreAllocatedVUs: null.IntFrom(10), + MaxVUs: null.IntFrom(10), + BaseConfig: BaseConfig{ + GracefulStop: types.NullDurationFrom(5 * time.Second), + }, + } - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - time.Sleep(5 * time.Second) - return nil - }) + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + time.Sleep(5 * time.Second) + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - engineOut := make(chan metrics.SampleContainer, 1000) - defer close(engineOut) + engineOut := make(chan metrics.SampleContainer, 1000) + defer close(engineOut) - assert.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.Equal(t, int64(0), test.state.GetCurrentlyActiveVUsCount()) - assert.Equal(t, int64(10), test.state.GetInitializedVUsCount()) - assert.Equal(t, uint64(10), test.state.GetFullIterationCount()) + assert.NoError(t, test.executor.Run(test.ctx, engineOut)) + assert.Equal(t, int64(0), test.state.GetCurrentlyActiveVUsCount()) + assert.Equal(t, int64(10), test.state.GetInitializedVUsCount()) + assert.Equal(t, uint64(10), test.state.GetFullIterationCount()) + }) } func BenchmarkRampingArrivalRateRun(b *testing.B) { @@ -413,7 +437,7 @@ func TestRampingArrivalRateCal(t *testing.T) { t.Run(fmt.Sprintf("testNum %d - %s timeunit %s", testNum, et, config.TimeUnit), func(t *testing.T) { t.Parallel() ch := make(chan time.Duration) - go config.cal(et, ch) + go config.cal(t.Context(), et, ch) changes := make([]time.Duration, 0, len(expectedTimes)) for c := range ch { changes = append(changes, c) @@ -453,7 +477,7 @@ func BenchmarkCal(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { ch := make(chan time.Duration, 20) - go config.cal(et, ch) + go config.cal(b.Context(), et, ch) for c := range ch { _ = c } @@ -551,7 +575,7 @@ func TestCompareCalImplementation(t *testing.T) { chRat := make(chan time.Duration, 20) ch := make(chan time.Duration, 20) go config.calRat(et, chRat) - go config.cal(et, ch) + go config.cal(t.Context(), et, ch) count := 0 var diff int for c := range ch { @@ -691,21 +715,23 @@ func TestRampingArrivalRateGlobalIters(t *testing.T) { t.Run(fmt.Sprintf("%s_%s", tc.seq, tc.seg), func(t *testing.T) { t.Parallel() - gotIters := []uint64{} - var mx sync.Mutex - runner := simpleRunner(func(_ context.Context, state *lib.State) error { - mx.Lock() - gotIters = append(gotIters, state.GetScenarioGlobalVUIter()) - mx.Unlock() - return nil + synctest.Test(t, func(t *testing.T) { + gotIters := []uint64{} + var mx sync.Mutex + runner := simpleRunner(func(_ context.Context, state *lib.State) error { + mx.Lock() + gotIters = append(gotIters, state.GetScenarioGlobalVUIter()) + mx.Unlock() + return nil + }) + + test := setupExecutorTest(t, tc.seg, tc.seq, lib.Options{}, runner, config) + defer test.cancel() + + engineOut := make(chan metrics.SampleContainer, 100) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + assert.Equal(t, tc.expIters, gotIters) }) - - test := setupExecutorTest(t, tc.seg, tc.seq, lib.Options{}, runner, config) - defer test.cancel() - - engineOut := make(chan metrics.SampleContainer, 100) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.Equal(t, tc.expIters, gotIters) }) } } @@ -733,45 +759,47 @@ func TestRampingArrivalRateCornerCase(t *testing.T) { func TestRampingArrivalRateActiveVUs(t *testing.T) { t.Parallel() - config := &RampingArrivalRateConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, - TimeUnit: types.NullDurationFrom(time.Second), - StartRate: null.IntFrom(10), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(950 * time.Millisecond), - Target: null.IntFrom(20), + synctest.Test(t, func(t *testing.T) { + config := &RampingArrivalRateConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)}, + TimeUnit: types.NullDurationFrom(time.Second), + StartRate: null.IntFrom(10), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(950 * time.Millisecond), + Target: null.IntFrom(20), + }, }, - }, - PreAllocatedVUs: null.IntFrom(5), - MaxVUs: null.IntFrom(10), - } - - var ( - running int64 - getCurrActiveVUs func() int64 - runMx sync.Mutex - ) + PreAllocatedVUs: null.IntFrom(5), + MaxVUs: null.IntFrom(10), + } - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - runMx.Lock() - running++ - assert.Equal(t, running, getCurrActiveVUs()) - runMx.Unlock() - // Block the VU to cause the executor to schedule more - <-ctx.Done() - return nil - }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + var ( + running int64 + getCurrActiveVUs func() int64 + runMx sync.Mutex + ) + + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + runMx.Lock() + running++ + assert.Equal(t, running, getCurrActiveVUs()) + runMx.Unlock() + // Block the VU to cause the executor to schedule more + <-ctx.Done() + return nil + }) + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - getCurrActiveVUs = test.state.GetCurrentlyActiveVUsCount + getCurrActiveVUs = test.state.GetCurrentlyActiveVUsCount - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.GreaterOrEqual(t, running, int64(5)) - assert.LessOrEqual(t, running, int64(10)) + assert.GreaterOrEqual(t, running, int64(5)) + assert.LessOrEqual(t, running, int64(10)) + }) } func TestRampingArrivalRateActiveVUs_GetExecutionRequirements(t *testing.T) { diff --git a/lib/executor/ramping_vus_test.go b/lib/executor/ramping_vus_test.go index 8ae1d343b..81a1779a1 100644 --- a/lib/executor/ramping_vus_test.go +++ b/lib/executor/ramping_vus_test.go @@ -8,6 +8,7 @@ import ( "math/rand" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" @@ -90,211 +91,219 @@ func TestRampingVUsConfigValidation(t *testing.T) { func TestRampingVUsRun(t *testing.T) { t.Parallel() - config := RampingVUsConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0)}, - GracefulRampDown: types.NullDurationFrom(0), - StartVUs: null.IntFrom(5), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(1 * time.Second), - Target: null.IntFrom(5), + synctest.Test(t, func(t *testing.T) { + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0)}, + GracefulRampDown: types.NullDurationFrom(0), + StartVUs: null.IntFrom(5), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(5), + }, + { + Duration: types.NullDurationFrom(0), + Target: null.IntFrom(3), + }, + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(3), + }, }, - { - Duration: types.NullDurationFrom(0), - Target: null.IntFrom(3), - }, - { - Duration: types.NullDurationFrom(1 * time.Second), - Target: null.IntFrom(3), - }, - }, - } + } - var iterCount int64 + var iterCount int64 - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - // Sleeping for a weird duration somewhat offset from the - // executor ticks to hopefully keep race conditions out of - // our control from failing the test. - time.Sleep(300 * time.Millisecond) - atomic.AddInt64(&iterCount, 1) - return nil - }) + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + // Sleeping for a weird duration somewhat offset from the + // executor ticks to hopefully keep race conditions out of + // our control from failing the test. + time.Sleep(300 * time.Millisecond) + atomic.AddInt64(&iterCount, 1) + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - sampleTimes := []time.Duration{ - 500 * time.Millisecond, - 1000 * time.Millisecond, - 900 * time.Millisecond, - } + sampleTimes := []time.Duration{ + 500 * time.Millisecond, + 1000 * time.Millisecond, + 900 * time.Millisecond, + } - errCh := make(chan error) - go func() { errCh <- test.executor.Run(test.ctx, nil) }() + errCh := make(chan error) + go func() { errCh <- test.executor.Run(test.ctx, nil) }() - result := make([]int64, len(sampleTimes)) - for i, d := range sampleTimes { - time.Sleep(d) - result[i] = test.state.GetCurrentlyActiveVUsCount() - } + result := make([]int64, len(sampleTimes)) + for i, d := range sampleTimes { + time.Sleep(d) + result[i] = test.state.GetCurrentlyActiveVUsCount() + } - require.NoError(t, <-errCh) + require.NoError(t, <-errCh) - assert.Equal(t, []int64{5, 3, 0}, result) - assert.Equal(t, int64(29), atomic.LoadInt64(&iterCount)) + assert.Equal(t, []int64{5, 3, 0}, result) + assert.Equal(t, int64(29), atomic.LoadInt64(&iterCount)) + }) } func TestRampingVUsGracefulStopWaits(t *testing.T) { t.Parallel() - config := RampingVUsConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)}, - StartVUs: null.IntFrom(1), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(1 * time.Second), - Target: null.IntFrom(1), + synctest.Test(t, func(t *testing.T) { + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)}, + StartVUs: null.IntFrom(1), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(1), + }, }, - }, - } + } - var ( - started = make(chan struct{}) // the iteration started - stopped = make(chan struct{}) // the iteration stopped - stop = make(chan struct{}) // the itearation should stop - ) + var ( + started = make(chan struct{}) // the iteration started + stopped = make(chan struct{}) // the iteration stopped + stop = make(chan struct{}) // the itearation should stop + ) - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - close(started) - defer close(stopped) - select { - case <-ctx.Done(): - t.Fatal("The iterations should've ended before the context") - case <-stop: - } - return nil - }) + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + close(started) + defer close(stopped) + select { + case <-ctx.Done(): + t.Fatal("The iterations should've ended before the context") + case <-stop: + } + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - errCh := make(chan error) - go func() { errCh <- test.executor.Run(test.ctx, nil) }() + errCh := make(chan error) + go func() { errCh <- test.executor.Run(test.ctx, nil) }() - <-started - // 500 milliseconds more then the duration and 500 less then the gracefulStop - time.Sleep(time.Millisecond * 1500) - close(stop) - <-stopped + <-started + // 500 milliseconds more then the duration and 500 less then the gracefulStop + time.Sleep(time.Millisecond * 1500) + close(stop) + <-stopped - require.NoError(t, <-errCh) + require.NoError(t, <-errCh) + }) } func TestRampingVUsGracefulStopStops(t *testing.T) { t.Parallel() - config := RampingVUsConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)}, - StartVUs: null.IntFrom(1), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(1 * time.Second), - Target: null.IntFrom(1), + synctest.Test(t, func(t *testing.T) { + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)}, + StartVUs: null.IntFrom(1), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(1), + }, }, - }, - } + } - var ( - started = make(chan struct{}) // the iteration started - stopped = make(chan struct{}) // the iteration stopped - stop = make(chan struct{}) // the itearation should stop - ) + var ( + started = make(chan struct{}) // the iteration started + stopped = make(chan struct{}) // the iteration stopped + stop = make(chan struct{}) // the itearation should stop + ) - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - close(started) - defer close(stopped) - select { - case <-ctx.Done(): - case <-stop: - t.Fatal("The iterations shouldn't have ended before the context") - } - return nil - }) + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + close(started) + defer close(stopped) + select { + case <-ctx.Done(): + case <-stop: + t.Fatal("The iterations shouldn't have ended before the context") + } + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - errCh := make(chan error) - go func() { errCh <- test.executor.Run(test.ctx, nil) }() + errCh := make(chan error) + go func() { errCh <- test.executor.Run(test.ctx, nil) }() - <-started - // 500 milliseconds more then the gracefulStop + duration - time.Sleep(time.Millisecond * 2500) - close(stop) - <-stopped + <-started + // 500 milliseconds more then the gracefulStop + duration + time.Sleep(time.Millisecond * 2500) + close(stop) + <-stopped - require.NoError(t, <-errCh) + require.NoError(t, <-errCh) + }) } func TestRampingVUsGracefulRampDown(t *testing.T) { t.Parallel() - config := RampingVUsConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(5 * time.Second)}, - StartVUs: null.IntFrom(2), - GracefulRampDown: types.NullDurationFrom(5 * time.Second), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(1 * time.Second), - Target: null.IntFrom(2), + synctest.Test(t, func(t *testing.T) { + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(5 * time.Second)}, + StartVUs: null.IntFrom(2), + GracefulRampDown: types.NullDurationFrom(5 * time.Second), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(2), + }, + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(0), + }, }, - { - Duration: types.NullDurationFrom(1 * time.Second), - Target: null.IntFrom(0), - }, - }, - } - - var ( - started = make(chan struct{}) // the iteration started - stopped = make(chan struct{}) // the iteration stopped - stop = make(chan struct{}) // the itearation should stop - ) + } - runner := simpleRunner(func(ctx context.Context, state *lib.State) error { - if state.VUID == 1 { // the first VU will wait here to do stuff - close(started) - defer close(stopped) - select { - case <-ctx.Done(): - t.Fatal("The iterations can't have ended before the context") - case <-stop: + var ( + started = make(chan struct{}) // the iteration started + stopped = make(chan struct{}) // the iteration stopped + stop = make(chan struct{}) // the itearation should stop + ) + + runner := simpleRunner(func(ctx context.Context, state *lib.State) error { + if state.VUID == 1 { // the first VU will wait here to do stuff + close(started) + defer close(stopped) + select { + case <-ctx.Done(): + t.Fatal("The iterations can't have ended before the context") + case <-stop: + } + } else { // all other (1) VUs will just sleep long enough + time.Sleep(2500 * time.Millisecond) } - } else { // all other (1) VUs will just sleep long enough - time.Sleep(2500 * time.Millisecond) - } - return nil - }) + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - errCh := make(chan error) - go func() { errCh <- test.executor.Run(test.ctx, nil) }() + errCh := make(chan error) + go func() { errCh <- test.executor.Run(test.ctx, nil) }() - <-started - // 500 milliseconds more then the gracefulRampDown + duration - time.Sleep(2500 * time.Millisecond) - close(stop) - <-stopped + <-started + // 500 milliseconds more then the gracefulRampDown + duration + time.Sleep(2500 * time.Millisecond) + close(stop) + <-stopped - select { - case err := <-errCh: - require.NoError(t, err) - case <-time.After(time.Second): // way too much time - t.Fatal("Execution should've ended already") - } + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(time.Second): // way too much time + t.Fatal("Execution should've ended already") + } + }) } // This test aims to check whether the ramping VU executor interrupts @@ -311,64 +320,66 @@ func TestRampingVUsGracefulRampDown(t *testing.T) { func TestRampingVUsHandleRemainingVUs(t *testing.T) { t.Parallel() - const ( - maxVus = 2 - vuSleepDuration = 65 * time.Millisecond // Each VU will sleep 65ms - wantVuFinished uint32 = 1 // one VU should finish an iteration - wantVuInterrupted uint32 = 1 // one VU should be interrupted - ) - - cfg := RampingVUsConfig{ - BaseConfig: BaseConfig{ - // Extend the total test duration 50ms more - // - // test duration = sum(stages) + GracefulStop - // - // This could have been 30ms but increased it to 50ms - // to prevent the test to become flaky. - GracefulStop: types.NullDurationFrom(50 * time.Millisecond), - }, - // Wait 30ms more for already started iterations - // (Happens in the 2nd stage below: Graceful rampdown period) - GracefulRampDown: types.NullDurationFrom(30 * time.Millisecond), - // Total test duration is 50ms (excluding the GracefulRampdown period) - Stages: []Stage{ - // Activate 2 VUs in 10ms - { - Duration: types.NullDurationFrom(10 * time.Millisecond), - Target: null.IntFrom(int64(maxVus)), + synctest.Test(t, func(t *testing.T) { + const ( + maxVus = 2 + vuSleepDuration = 65 * time.Millisecond // Each VU will sleep 65ms + wantVuFinished uint32 = 1 // one VU should finish an iteration + wantVuInterrupted uint32 = 1 // one VU should be interrupted + ) + + cfg := RampingVUsConfig{ + BaseConfig: BaseConfig{ + // Extend the total test duration 50ms more + // + // test duration = sum(stages) + GracefulStop + // + // This could have been 30ms but increased it to 50ms + // to prevent the test to become flaky. + GracefulStop: types.NullDurationFrom(50 * time.Millisecond), }, - // Rampdown to 0 VUs in 40ms - { - Duration: types.NullDurationFrom(40 * time.Millisecond), - Target: null.IntFrom(int64(0)), + // Wait 30ms more for already started iterations + // (Happens in the 2nd stage below: Graceful rampdown period) + GracefulRampDown: types.NullDurationFrom(30 * time.Millisecond), + // Total test duration is 50ms (excluding the GracefulRampdown period) + Stages: []Stage{ + // Activate 2 VUs in 10ms + { + Duration: types.NullDurationFrom(10 * time.Millisecond), + Target: null.IntFrom(int64(maxVus)), + }, + // Rampdown to 0 VUs in 40ms + { + Duration: types.NullDurationFrom(40 * time.Millisecond), + Target: null.IntFrom(int64(0)), + }, }, - }, - } - - var ( - gotVuInterrupted uint32 - gotVuFinished uint32 - ) - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - select { - case <-time.After(vuSleepDuration): - atomic.AddUint32(&gotVuFinished, 1) - case <-ctx.Done(): - atomic.AddUint32(&gotVuInterrupted, 1) } - return nil - }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, cfg) - defer test.cancel() + var ( + gotVuInterrupted uint32 + gotVuFinished uint32 + ) + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + select { + case <-time.After(vuSleepDuration): + atomic.AddUint32(&gotVuFinished, 1) + case <-ctx.Done(): + atomic.AddUint32(&gotVuInterrupted, 1) + } + return nil + }) + + test := setupExecutorTest(t, "", "", lib.Options{}, runner, cfg) + defer test.cancel() - // run the executor: this should finish in ~70ms - // sum(stages) + GracefulRampDown - require.NoError(t, test.executor.Run(test.ctx, nil)) + // run the executor: this should finish in ~70ms + // sum(stages) + GracefulRampDown + require.NoError(t, test.executor.Run(test.ctx, nil)) - assert.Equal(t, wantVuInterrupted, atomic.LoadUint32(&gotVuInterrupted)) - assert.Equal(t, wantVuFinished, atomic.LoadUint32(&gotVuFinished)) + assert.Equal(t, wantVuInterrupted, atomic.LoadUint32(&gotVuInterrupted)) + assert.Equal(t, wantVuFinished, atomic.LoadUint32(&gotVuFinished)) + }) } // Ensure there's no wobble of VUs during graceful ramp-down, without segments. @@ -376,67 +387,69 @@ func TestRampingVUsHandleRemainingVUs(t *testing.T) { func TestRampingVUsRampDownNoWobble(t *testing.T) { t.Parallel() - config := RampingVUsConfig{ - BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0)}, - GracefulRampDown: types.NullDurationFrom(1 * time.Second), - StartVUs: null.IntFrom(0), - Stages: []Stage{ - { - Duration: types.NullDurationFrom(3 * time.Second), - Target: null.IntFrom(10), - }, - { - Duration: types.NullDurationFrom(2 * time.Second), - Target: null.IntFrom(0), + synctest.Test(t, func(t *testing.T) { + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0)}, + GracefulRampDown: types.NullDurationFrom(1 * time.Second), + StartVUs: null.IntFrom(0), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(3 * time.Second), + Target: null.IntFrom(10), + }, + { + Duration: types.NullDurationFrom(2 * time.Second), + Target: null.IntFrom(0), + }, }, - }, - } + } - runner := simpleRunner(func(_ context.Context, _ *lib.State) error { - time.Sleep(500 * time.Millisecond) - return nil - }) + runner := simpleRunner(func(_ context.Context, _ *lib.State) error { + time.Sleep(500 * time.Millisecond) + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - sampleTimes := []time.Duration{ - 100 * time.Millisecond, - 3000 * time.Millisecond, - } - const rampDownSampleTime = 50 * time.Millisecond - rampDownSamples := int((config.Stages[len(config.Stages)-1].Duration.TimeDuration() + config.GracefulRampDown.TimeDuration()) / rampDownSampleTime) + sampleTimes := []time.Duration{ + 100 * time.Millisecond, + 3000 * time.Millisecond, + } + const rampDownSampleTime = 50 * time.Millisecond + rampDownSamples := int((config.Stages[len(config.Stages)-1].Duration.TimeDuration() + config.GracefulRampDown.TimeDuration()) / rampDownSampleTime) - errCh := make(chan error) - go func() { errCh <- test.executor.Run(test.ctx, nil) }() + errCh := make(chan error) + go func() { errCh <- test.executor.Run(test.ctx, nil) }() - result := make([]int64, len(sampleTimes)+rampDownSamples) - for i, d := range sampleTimes { - time.Sleep(d) - result[i] = test.state.GetCurrentlyActiveVUsCount() - } + result := make([]int64, len(sampleTimes)+rampDownSamples) + for i, d := range sampleTimes { + time.Sleep(d) + result[i] = test.state.GetCurrentlyActiveVUsCount() + } - // Sample ramp-down at a higher rate - for i := len(sampleTimes); i < rampDownSamples; i++ { - time.Sleep(rampDownSampleTime) - result[i] = test.state.GetCurrentlyActiveVUsCount() - } + // Sample ramp-down at a higher rate + for i := len(sampleTimes); i < rampDownSamples; i++ { + time.Sleep(rampDownSampleTime) + result[i] = test.state.GetCurrentlyActiveVUsCount() + } - require.NoError(t, <-errCh) + require.NoError(t, <-errCh) - // Some baseline checks - assert.Equal(t, int64(0), result[0]) - assert.Equal(t, int64(10), result[1]) - assert.Equal(t, int64(0), result[len(result)-1]) + // Some baseline checks + assert.Equal(t, int64(0), result[0]) + assert.Equal(t, int64(10), result[1]) + assert.Equal(t, int64(0), result[len(result)-1]) - vuChanges := []int64{result[2]} - // Check ramp-down consistency - for i := 3; i < len(result[2:]); i++ { - if result[i] != result[i-1] { - vuChanges = append(vuChanges, result[i]) + vuChanges := []int64{result[2]} + // Check ramp-down consistency + for i := 3; i < len(result[2:]); i++ { + if result[i] != result[i-1] { + vuChanges = append(vuChanges, result[i]) + } } - } - assert.Equal(t, []int64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}, vuChanges) + assert.Equal(t, []int64{10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}, vuChanges) + }) } func TestRampingVUsConfigExecutionPlanExample(t *testing.T) { diff --git a/lib/executor/shared_iterations_test.go b/lib/executor/shared_iterations_test.go index 0c286d815..7d662bf25 100644 --- a/lib/executor/shared_iterations_test.go +++ b/lib/executor/shared_iterations_test.go @@ -7,6 +7,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/stretchr/testify/assert" @@ -47,69 +48,75 @@ func TestSharedIterationsRun(t *testing.T) { // This is the reverse behavior of the PerVUIterations executor. func TestSharedIterationsRunVariableVU(t *testing.T) { t.Parallel() - var ( - result sync.Map - slowVUID uint64 - ) - - runner := simpleRunner(func(_ context.Context, state *lib.State) error { - time.Sleep(10 * time.Millisecond) // small wait to stabilize the test - // Pick one VU randomly and always slow it down. - sid := atomic.LoadUint64(&slowVUID) - if sid == uint64(0) { - atomic.StoreUint64(&slowVUID, state.VUID) - } - if sid == state.VUID { - time.Sleep(200 * time.Millisecond) - } - currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) - result.Store(state.VUID, currIter.(uint64)+1) - return nil - }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestSharedIterationsConfig()) - defer test.cancel() + synctest.Test(t, func(t *testing.T) { + var ( + result sync.Map + slowVUID uint64 + ) + + runner := simpleRunner(func(_ context.Context, state *lib.State) error { + time.Sleep(10 * time.Millisecond) // small wait to stabilize the test + // Pick one VU randomly and always slow it down. + sid := atomic.LoadUint64(&slowVUID) + if sid == uint64(0) { + atomic.StoreUint64(&slowVUID, state.VUID) + } + if sid == state.VUID { + time.Sleep(200 * time.Millisecond) + } + currIter, _ := result.LoadOrStore(state.VUID, uint64(0)) + result.Store(state.VUID, currIter.(uint64)+1) + return nil + }) - require.NoError(t, test.executor.Run(test.ctx, nil)) + test := setupExecutorTest(t, "", "", lib.Options{}, runner, getTestSharedIterationsConfig()) + defer test.cancel() - var totalIters uint64 - result.Range(func(_, value any) bool { - totalIters += value.(uint64) - return true - }) + require.NoError(t, test.executor.Run(test.ctx, nil)) - // The slow VU should complete 2 iterations given these timings, - // while the rest should randomly complete the other 98 iterations. - val, ok := result.Load(slowVUID) - assert.True(t, ok) - assert.Equal(t, uint64(2), val) - assert.Equal(t, uint64(100), totalIters) + var totalIters uint64 + result.Range(func(_, value any) bool { + totalIters += value.(uint64) + return true + }) + + // The slow VU should complete 2 iterations given these timings, + // while the rest should randomly complete the other 98 iterations. + val, ok := result.Load(slowVUID) + assert.True(t, ok) + assert.Equal(t, uint64(2), val) + assert.Equal(t, uint64(100), totalIters) + }) } func TestSharedIterationsEmitDroppedIterations(t *testing.T) { t.Parallel() - var count int64 - runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { - atomic.AddInt64(&count, 1) - <-ctx.Done() - return nil - }) + synctest.Test(t, func(t *testing.T) { + var count int64 - config := &SharedIterationsConfig{ - VUs: null.IntFrom(5), - Iterations: null.IntFrom(100), - MaxDuration: types.NullDurationFrom(1 * time.Second), - } + runner := simpleRunner(func(ctx context.Context, _ *lib.State) error { + atomic.AddInt64(&count, 1) + <-ctx.Done() + return nil + }) - test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) - defer test.cancel() + config := &SharedIterationsConfig{ + VUs: null.IntFrom(5), + Iterations: null.IntFrom(100), + MaxDuration: types.NullDurationFrom(1 * time.Second), + } + + test := setupExecutorTest(t, "", "", lib.Options{}, runner, config) + defer test.cancel() - engineOut := make(chan metrics.SampleContainer, 1000) - require.NoError(t, test.executor.Run(test.ctx, engineOut)) - assert.Empty(t, test.logHook.Drain()) - assert.Equal(t, int64(5), count) - assert.Equal(t, float64(95), sumMetricValues(engineOut, metrics.DroppedIterationsName)) + engineOut := make(chan metrics.SampleContainer, 1000) + require.NoError(t, test.executor.Run(test.ctx, engineOut)) + assert.Empty(t, test.logHook.Drain()) + assert.Equal(t, int64(5), count) + assert.Equal(t, float64(95), sumMetricValues(engineOut, metrics.DroppedIterationsName)) + }) } func TestSharedIterationsGlobalIters(t *testing.T) { diff --git a/lib/executor/vu_handle_test.go b/lib/executor/vu_handle_test.go index 6b76f8253..f9d36fe77 100644 --- a/lib/executor/vu_handle_test.go +++ b/lib/executor/vu_handle_test.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" "github.com/sirupsen/logrus" @@ -225,118 +226,124 @@ func TestVUHandleSimple(t *testing.T) { t.Run("start before gracefulStop finishes", func(t *testing.T) { t.Parallel() - logHook := testutils.NewLogHook(logrus.DebugLevel) - testLog := logrus.New() - testLog.AddHook(logHook) - testLog.SetOutput(testutils.NewTestOutput(t)) - // testLog.Level = logrus.DebugLevel - logEntry := logrus.NewEntry(testLog) - test := &handleVUTest{runner: &minirunner.MiniRunner{}} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vuHandle := newStoppedVUHandle(ctx, test.getVU, test.returnVU, mockNextIterations, &BaseConfig{}, logEntry) - var wg sync.WaitGroup - wg.Go(func() { - vuHandle.runLoopsIfPossible(test.runIter) + synctest.Test(t, func(t *testing.T) { + logHook := testutils.NewLogHook(logrus.DebugLevel) + testLog := logrus.New() + testLog.AddHook(logHook) + testLog.SetOutput(testutils.NewTestOutput(t)) + // testLog.Level = logrus.DebugLevel + logEntry := logrus.NewEntry(testLog) + test := &handleVUTest{runner: &minirunner.MiniRunner{}} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vuHandle := newStoppedVUHandle(ctx, test.getVU, test.returnVU, mockNextIterations, &BaseConfig{}, logEntry) + var wg sync.WaitGroup + wg.Go(func() { + vuHandle.runLoopsIfPossible(test.runIter) + }) + err := vuHandle.start() + require.NoError(t, err) + time.Sleep(time.Millisecond * 50) + vuHandle.gracefulStop() + // time.Sleep(time.Millisecond * 5) // No sleep as we want to always not return the VU + err = vuHandle.start() + require.NoError(t, err) + time.Sleep(time.Millisecond * 1500) + assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) + assert.EqualValues(t, 0, atomic.LoadUint32(&test.returnVUCount)) + assert.EqualValues(t, 0, atomic.LoadInt64(&test.interruptedIter)) + assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) + cancel() + wg.Wait() + time.Sleep(time.Millisecond * 5) + assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) + assert.EqualValues(t, 1, atomic.LoadUint32(&test.returnVUCount)) + assert.EqualValues(t, 1, atomic.LoadInt64(&test.interruptedIter)) + assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) }) - err := vuHandle.start() - require.NoError(t, err) - time.Sleep(time.Millisecond * 50) - vuHandle.gracefulStop() - // time.Sleep(time.Millisecond * 5) // No sleep as we want to always not return the VU - err = vuHandle.start() - require.NoError(t, err) - time.Sleep(time.Millisecond * 1500) - assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) - assert.EqualValues(t, 0, atomic.LoadUint32(&test.returnVUCount)) - assert.EqualValues(t, 0, atomic.LoadInt64(&test.interruptedIter)) - assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) - cancel() - wg.Wait() - time.Sleep(time.Millisecond * 5) - assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) - assert.EqualValues(t, 1, atomic.LoadUint32(&test.returnVUCount)) - assert.EqualValues(t, 1, atomic.LoadInt64(&test.interruptedIter)) - assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) }) t.Run("start after gracefulStop finishes", func(t *testing.T) { t.Parallel() - logHook := testutils.NewLogHook(logrus.DebugLevel) - testLog := logrus.New() - testLog.AddHook(logHook) - testLog.SetOutput(testutils.NewTestOutput(t)) - // testLog.Level = logrus.DebugLevel - logEntry := logrus.NewEntry(testLog) - test := &handleVUTest{runner: &minirunner.MiniRunner{}} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vuHandle := newStoppedVUHandle(ctx, test.getVU, test.returnVU, mockNextIterations, &BaseConfig{}, logEntry) - var wg sync.WaitGroup - wg.Go(func() { - vuHandle.runLoopsIfPossible(test.runIter) + synctest.Test(t, func(t *testing.T) { + logHook := testutils.NewLogHook(logrus.DebugLevel) + testLog := logrus.New() + testLog.AddHook(logHook) + testLog.SetOutput(testutils.NewTestOutput(t)) + // testLog.Level = logrus.DebugLevel + logEntry := logrus.NewEntry(testLog) + test := &handleVUTest{runner: &minirunner.MiniRunner{}} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vuHandle := newStoppedVUHandle(ctx, test.getVU, test.returnVU, mockNextIterations, &BaseConfig{}, logEntry) + var wg sync.WaitGroup + wg.Go(func() { + vuHandle.runLoopsIfPossible(test.runIter) + }) + err := vuHandle.start() + require.NoError(t, err) + time.Sleep(time.Millisecond * 50) + vuHandle.gracefulStop() + time.Sleep(time.Millisecond * 1500) + assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) + assert.EqualValues(t, 1, atomic.LoadUint32(&test.returnVUCount)) + assert.EqualValues(t, 0, atomic.LoadInt64(&test.interruptedIter)) + assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) + err = vuHandle.start() + require.NoError(t, err) + time.Sleep(time.Millisecond * 1500) + cancel() + wg.Wait() + + time.Sleep(time.Millisecond * 50) + assert.EqualValues(t, 2, atomic.LoadUint32(&test.getVUCount)) + assert.EqualValues(t, 2, atomic.LoadUint32(&test.returnVUCount)) + assert.EqualValues(t, 1, atomic.LoadInt64(&test.interruptedIter)) + assert.EqualValues(t, 2, atomic.LoadInt64(&test.fullIterations)) }) - err := vuHandle.start() - require.NoError(t, err) - time.Sleep(time.Millisecond * 50) - vuHandle.gracefulStop() - time.Sleep(time.Millisecond * 1500) - assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) - assert.EqualValues(t, 1, atomic.LoadUint32(&test.returnVUCount)) - assert.EqualValues(t, 0, atomic.LoadInt64(&test.interruptedIter)) - assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) - err = vuHandle.start() - require.NoError(t, err) - time.Sleep(time.Millisecond * 1500) - cancel() - wg.Wait() - - time.Sleep(time.Millisecond * 50) - assert.EqualValues(t, 2, atomic.LoadUint32(&test.getVUCount)) - assert.EqualValues(t, 2, atomic.LoadUint32(&test.returnVUCount)) - assert.EqualValues(t, 1, atomic.LoadInt64(&test.interruptedIter)) - assert.EqualValues(t, 2, atomic.LoadInt64(&test.fullIterations)) }) t.Run("start after hardStop", func(t *testing.T) { t.Parallel() - logHook := testutils.NewLogHook(logrus.DebugLevel) - testLog := logrus.New() - testLog.AddHook(logHook) - testLog.SetOutput(testutils.NewTestOutput(t)) - // testLog.Level = logrus.DebugLevel - logEntry := logrus.NewEntry(testLog) - test := &handleVUTest{runner: &minirunner.MiniRunner{}} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vuHandle := newStoppedVUHandle(ctx, test.getVU, test.returnVU, mockNextIterations, &BaseConfig{}, logEntry) - var wg sync.WaitGroup - wg.Go(func() { - vuHandle.runLoopsIfPossible(test.runIter) + synctest.Test(t, func(t *testing.T) { + logHook := testutils.NewLogHook(logrus.DebugLevel) + testLog := logrus.New() + testLog.AddHook(logHook) + testLog.SetOutput(testutils.NewTestOutput(t)) + // testLog.Level = logrus.DebugLevel + logEntry := logrus.NewEntry(testLog) + test := &handleVUTest{runner: &minirunner.MiniRunner{}} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vuHandle := newStoppedVUHandle(ctx, test.getVU, test.returnVU, mockNextIterations, &BaseConfig{}, logEntry) + var wg sync.WaitGroup + wg.Go(func() { + vuHandle.runLoopsIfPossible(test.runIter) + }) + err := vuHandle.start() + require.NoError(t, err) + time.Sleep(time.Millisecond * 5) + vuHandle.hardStop() + time.Sleep(time.Millisecond * 15) + assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) + assert.EqualValues(t, 1, atomic.LoadUint32(&test.returnVUCount)) + assert.EqualValues(t, 1, atomic.LoadInt64(&test.interruptedIter)) + assert.EqualValues(t, 0, atomic.LoadInt64(&test.fullIterations)) + err = vuHandle.start() + require.NoError(t, err) + time.Sleep(time.Millisecond * 1500) + cancel() + wg.Wait() + + time.Sleep(time.Millisecond * 5) + assert.EqualValues(t, 2, atomic.LoadUint32(&test.getVUCount)) + assert.EqualValues(t, 2, atomic.LoadUint32(&test.returnVUCount)) + assert.EqualValues(t, 2, atomic.LoadInt64(&test.interruptedIter)) + assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) }) - err := vuHandle.start() - require.NoError(t, err) - time.Sleep(time.Millisecond * 5) - vuHandle.hardStop() - time.Sleep(time.Millisecond * 15) - assert.EqualValues(t, 1, atomic.LoadUint32(&test.getVUCount)) - assert.EqualValues(t, 1, atomic.LoadUint32(&test.returnVUCount)) - assert.EqualValues(t, 1, atomic.LoadInt64(&test.interruptedIter)) - assert.EqualValues(t, 0, atomic.LoadInt64(&test.fullIterations)) - err = vuHandle.start() - require.NoError(t, err) - time.Sleep(time.Millisecond * 1500) - cancel() - wg.Wait() - - time.Sleep(time.Millisecond * 5) - assert.EqualValues(t, 2, atomic.LoadUint32(&test.getVUCount)) - assert.EqualValues(t, 2, atomic.LoadUint32(&test.returnVUCount)) - assert.EqualValues(t, 2, atomic.LoadInt64(&test.interruptedIter)) - assert.EqualValues(t, 1, atomic.LoadInt64(&test.fullIterations)) }) }