Skip to content

Commit a7e26ad

Browse files
committed
Add an execution.Controller and a local no-op implementation
This doesn't change any existing k6 behavior (see how the tests were not touched), but it adds hooks for distributed execution later on.
1 parent 1fe0563 commit a7e26ad

10 files changed

+181
-25
lines changed

api/v1/group_routes_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/require"
1212

1313
"go.k6.io/k6/execution"
14+
"go.k6.io/k6/execution/local"
1415
"go.k6.io/k6/lib"
1516
"go.k6.io/k6/lib/testutils"
1617
"go.k6.io/k6/lib/testutils/minirunner"
@@ -41,7 +42,7 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
4142
}
4243

4344
func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
44-
execScheduler, err := execution.NewScheduler(testState)
45+
execScheduler, err := execution.NewScheduler(testState, local.NewController())
4546
require.NoError(tb, err)
4647

4748
me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)

api/v1/setup_teardown_routes_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"gopkg.in/guregu/null.v3"
1717

1818
"go.k6.io/k6/execution"
19+
"go.k6.io/k6/execution/local"
1920
"go.k6.io/k6/js"
2021
"go.k6.io/k6/lib"
2122
"go.k6.io/k6/lib/types"
@@ -138,7 +139,7 @@ func TestSetupData(t *testing.T) {
138139
TeardownTimeout: types.NullDurationFrom(5 * time.Second),
139140
}, runner)
140141

141-
execScheduler, err := execution.NewScheduler(testState)
142+
execScheduler, err := execution.NewScheduler(testState, local.NewController())
142143
require.NoError(t, err)
143144
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
144145
require.NoError(t, err)

api/v1/status_routes_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"gopkg.in/guregu/null.v3"
1717

1818
"go.k6.io/k6/execution"
19+
"go.k6.io/k6/execution/local"
1920
"go.k6.io/k6/lib"
2021
"go.k6.io/k6/lib/testutils/minirunner"
2122
"go.k6.io/k6/metrics"
@@ -115,7 +116,7 @@ func TestPatchStatus(t *testing.T) {
115116
require.NoError(t, err)
116117

117118
testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
118-
execScheduler, err := execution.NewScheduler(testState)
119+
execScheduler, err := execution.NewScheduler(testState, local.NewController())
119120
require.NoError(t, err)
120121

121122
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)

cmd/run.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"go.k6.io/k6/errext/exitcodes"
2626
"go.k6.io/k6/event"
2727
"go.k6.io/k6/execution"
28+
"go.k6.io/k6/execution/local"
2829
"go.k6.io/k6/js/common"
2930
"go.k6.io/k6/lib"
3031
"go.k6.io/k6/lib/consts"
@@ -132,7 +133,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
132133

133134
// Create a local execution scheduler wrapping the runner.
134135
logger.Debug("Initializing the execution scheduler...")
135-
execScheduler, err := execution.NewScheduler(testRunState)
136+
execScheduler, err := execution.NewScheduler(testRunState, local.NewController())
136137
if err != nil {
137138
return err
138139
}

execution/controller.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package execution
2+
3+
// Controller implementations are used to control the k6 execution of a test or
4+
// test suite, either locally or in a distributed environment.
5+
type Controller interface {
6+
// GetOrCreateData requests the data chunk with the given ID, if it already
7+
// exists. If it doesn't (i.e. this was the first time this function was
8+
// called with that ID), the given callback is called and its result and
9+
// error are saved for the ID and returned for all other calls with it.
10+
//
11+
// This is an atomic and single-flight function, so any calls to it while the callback is
12+
// being executed the the same ID will wait for the first call to to finish
13+
// and receive its result.
14+
//
15+
// TODO: split apart into `Once()`, `SetData(), `GetData()` and implement
16+
// the GetOrCreateData() behavior in a helper like the ones below?
17+
GetOrCreateData(ID string, callback func() ([]byte, error)) ([]byte, error)
18+
19+
// Signal is used to notify that the current instance has reached the given
20+
// event ID, or that it has had an error.
21+
Signal(eventID string, err error) error
22+
23+
// Subscribe creates a listener for the specified event ID and returns a
24+
// callback that can wait until all other instances have reached it.
25+
Subscribe(eventID string) (wait func() error)
26+
}
27+
28+
// SignalAndWait implements a rendezvous point / barrier, a way for all
29+
// instances to reach the same execution point and wait for each other, before
30+
// they all ~simultaneously continue with the execution.
31+
//
32+
// It subscribes for the given event ID, signals that the current instance has
33+
// reached it without an error, and then waits until all other instances have
34+
// reached it or until there is an error in one of them.
35+
func SignalAndWait(c Controller, eventID string) error {
36+
wait := c.Subscribe(eventID)
37+
38+
if err := c.Signal(eventID, nil); err != nil {
39+
return err
40+
}
41+
return wait()
42+
}
43+
44+
// SignalErrorOrWait is a helper method that either immediately signals the
45+
// given error and returns it, or it signals nominal completion and waits for
46+
// all other instances to do the same (or signal an error themselves).
47+
func SignalErrorOrWait(c Controller, eventID string, err error) error {
48+
if err != nil {
49+
_ = c.Signal(eventID, err)
50+
return err // return the same error we got
51+
}
52+
return SignalAndWait(c, eventID)
53+
}

execution/local/controller.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Package local implements the execution.Controller interface for local
2+
// (single-machine) k6 execution.
3+
package local
4+
5+
// Controller "controls" local tests. It doesn't actually do anything, it just
6+
// implements the execution.Controller interface with no-op operations. The
7+
// methods don't do anything because local tests have only a single instance.
8+
//
9+
// However, for test suites (https://github.com/grafana/k6/issues/1342) in the
10+
// future, we will probably need to actually implement some of these methods and
11+
// introduce simple synchronization primitives even for a single machine...
12+
type Controller struct{}
13+
14+
// NewController creates a new local execution Controller.
15+
func NewController() *Controller {
16+
return &Controller{}
17+
}
18+
19+
// GetOrCreateData immediately calls the given callback and returns its results.
20+
func (c *Controller) GetOrCreateData(_ string, callback func() ([]byte, error)) ([]byte, error) {
21+
return callback()
22+
}
23+
24+
// Subscribe is a no-op, it doesn't actually wait for anything, because there is
25+
// nothing to wait on - we only have one instance in local tests.
26+
//
27+
// TODO: actually use waitgroups, since this may actually matter for test
28+
// suites, even for local test runs. That's because multiple tests might be
29+
// executed even by a single instance, and if we have complicated flows (e.g.
30+
// "test C is executed only after test A and test B finish"), the easiest way
31+
// would be for different tests in the suite to reuse this Controller API *both*
32+
// local and distributed runs.
33+
func (c *Controller) Subscribe(_ string) func() error {
34+
return func() error {
35+
return nil
36+
}
37+
}
38+
39+
// Signal is a no-op, it doesn't actually do anything for local test runs.
40+
//
41+
// TODO: similar to Wait() above, this might actually be required for
42+
// complex/branching test suites, even during local non-distributed execution.
43+
func (c *Controller) Signal(_ string, _ error) error {
44+
return nil
45+
}

execution/scheduler.go

+57-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
// executors, running setup() and teardown(), and actually starting the
2121
// executors for the different scenarios at the appropriate times.
2222
type Scheduler struct {
23+
controller Controller
24+
2325
initProgress *pb.ProgressBar
2426
executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID)
2527
executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work
@@ -33,7 +35,7 @@ type Scheduler struct {
3335
// initializing it beyond the bare minimum. Specifically, it creates the needed
3436
// executor instances and a lot of state placeholders, but it doesn't initialize
3537
// the executors and it doesn't initialize or run VUs.
36-
func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
38+
func NewScheduler(trs *lib.TestRunState, controller Controller) (*Scheduler, error) {
3739
options := trs.Options
3840
et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence)
3941
if err != nil {
@@ -81,6 +83,7 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
8183
maxDuration: maxDuration,
8284
maxPossibleVUs: maxPossibleVUs,
8385
state: executionState,
86+
controller: controller,
8487
}, nil
8588
}
8689

@@ -380,6 +383,13 @@ func (e *Scheduler) Init(
380383
) (stopVUEmission func(), initErr error) {
381384
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")
382385

386+
if err := SignalAndWait(e.controller, "scheduler-init-start"); err != nil {
387+
return nil, err
388+
}
389+
defer func() {
390+
initErr = SignalErrorOrWait(e.controller, "scheduler-init-done", initErr)
391+
}()
392+
383393
execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
384394
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
385395
stopVUEmission = func() {
@@ -405,16 +415,20 @@ func (e *Scheduler) Init(
405415
// Run the Scheduler, funneling all generated metric samples through the supplied
406416
// out channel.
407417
//
408-
//nolint:funlen
418+
//nolint:funlen, gocognit
409419
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) {
410420
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")
411421

422+
if err := SignalAndWait(e.controller, "scheduler-run-start"); err != nil {
423+
return err
424+
}
412425
defer func() {
413426
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
414427
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr)
415428
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
416429
runErr = interruptErr
417430
}
431+
runErr = SignalErrorOrWait(e.controller, "scheduler-run-done", runErr)
418432
}()
419433

420434
e.initProgress.Modify(pb.WithConstLeft("Run"))
@@ -430,6 +444,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
430444
}
431445
}
432446

447+
if err := SignalAndWait(e.controller, "test-ready-to-run-setup"); err != nil {
448+
return err
449+
}
450+
433451
e.initProgress.Modify(pb.WithConstProgress(1, "Starting test..."))
434452
e.state.MarkStarted()
435453
defer e.state.MarkEnded()
@@ -449,11 +467,27 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
449467
if !e.state.Test.Options.NoSetup.Bool {
450468
e.state.SetExecutionStatus(lib.ExecutionStatusSetup)
451469
e.initProgress.Modify(pb.WithConstProgress(1, "setup()"))
452-
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
453-
logger.WithField("error", err).Debug("setup() aborted by error")
470+
actuallyRanSetup := false
471+
data, err := e.controller.GetOrCreateData("setup", func() ([]byte, error) {
472+
actuallyRanSetup = true
473+
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
474+
logger.WithField("error", err).Debug("setup() aborted by error")
475+
return nil, err
476+
}
477+
return e.state.Test.Runner.GetSetupData(), nil
478+
})
479+
if err != nil {
454480
return err
455481
}
482+
if !actuallyRanSetup {
483+
e.state.Test.Runner.SetSetupData(data)
484+
}
485+
}
486+
487+
if err := SignalAndWait(e.controller, "setup-done"); err != nil {
488+
return err
456489
}
490+
457491
e.initProgress.Modify(pb.WithHijack(e.getRunStats))
458492

459493
// Start all executors at their particular startTime in a separate goroutine...
@@ -469,6 +503,8 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
469503
// Wait for all executors to finish
470504
var firstErr error
471505
for range e.executors {
506+
// TODO: add logic to abort the test early if there was an error from
507+
// the controller (e.g. some other instance for this test died)
472508
err := <-runResults
473509
if err != nil && firstErr == nil {
474510
logger.WithError(err).Debug("Executor returned with an error, cancelling test run...")
@@ -477,19 +513,34 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
477513
}
478514
}
479515

516+
if err := SignalAndWait(e.controller, "execution-done"); err != nil {
517+
return err
518+
}
519+
480520
// Run teardown() after all executors are done, if it's not disabled
481521
if !e.state.Test.Options.NoTeardown.Bool {
482522
e.state.SetExecutionStatus(lib.ExecutionStatusTeardown)
483523
e.initProgress.Modify(pb.WithConstProgress(1, "teardown()"))
484524

485525
// We run teardown() with the global context, so it isn't interrupted by
486526
// thresholds or test.abort() or even Ctrl+C (unless used twice).
487-
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
488-
logger.WithField("error", err).Debug("teardown() aborted by error")
527+
// TODO: add a `sync.Once` equivalent?
528+
_, err := e.controller.GetOrCreateData("teardown", func() ([]byte, error) {
529+
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
530+
logger.WithField("error", err).Debug("teardown() aborted by error")
531+
return nil, err
532+
}
533+
return nil, nil
534+
})
535+
if err != nil {
489536
return err
490537
}
491538
}
492539

540+
if err := SignalAndWait(e.controller, "teardown-done"); err != nil {
541+
return err
542+
}
543+
493544
return firstErr
494545
}
495546

0 commit comments

Comments
 (0)