Skip to content

Commit 988f391

Browse files
authored
fix: wait for load test to complete (#106)
1 parent b20ed3d commit 988f391

File tree

2 files changed

+144
-53
lines changed

2 files changed

+144
-53
lines changed

workflows/testnet/testnet.go

Lines changed: 63 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -246,64 +246,62 @@ func createWallets(ctx workflow.Context, req messages.TestnetWorkflowRequest, ch
246246
}
247247

248248
func runLoadTest(ctx workflow.Context, req messages.TestnetWorkflowRequest, chainState, providerState []byte,
249-
mnemonics []string, selector workflow.Selector) error {
249+
mnemonics []string, selector workflow.Selector) (workflow.Future, error) {
250250
if req.EthereumLoadTestSpec != nil {
251-
workflow.Go(ctx, func(ctx workflow.Context) {
252-
var loadTestResp messages.RunLoadTestResponse
253-
f := workflow.ExecuteActivity(
254-
workflow.WithStartToCloseTimeout(ctx, loadTestTimeout),
255-
loadTestActivities.RunLoadTest,
256-
messages.RunLoadTestRequest{
257-
ChainState: chainState,
258-
ProviderState: providerState,
259-
LoadTestSpec: *req.EthereumLoadTestSpec,
260-
RunnerType: req.RunnerType,
261-
IsEvmChain: req.IsEvmChain,
262-
Mnemonics: mnemonics,
263-
CatalystVersion: req.CatalystVersion,
264-
},
265-
)
266-
267-
selector.AddFuture(f, func(f workflow.Future) {
268-
activityErr := f.Get(ctx, &loadTestResp)
269-
if activityErr != nil {
270-
workflow.GetLogger(ctx).Error("ethereum load test failed", zap.Error(activityErr))
271-
} else if loadTestResp.Result.Error != "" {
272-
workflow.GetLogger(ctx).Error("ethereum load test reported an error", zap.String("error", loadTestResp.Result.Error))
273-
}
274-
})
275-
251+
var loadTestResp messages.RunLoadTestResponse
252+
f := workflow.ExecuteActivity(
253+
workflow.WithStartToCloseTimeout(ctx, loadTestTimeout),
254+
loadTestActivities.RunLoadTest,
255+
messages.RunLoadTestRequest{
256+
ChainState: chainState,
257+
ProviderState: providerState,
258+
LoadTestSpec: *req.EthereumLoadTestSpec,
259+
RunnerType: req.RunnerType,
260+
IsEvmChain: req.IsEvmChain,
261+
Mnemonics: mnemonics,
262+
CatalystVersion: req.CatalystVersion,
263+
},
264+
)
265+
266+
selector.AddFuture(f, func(f workflow.Future) {
267+
activityErr := f.Get(ctx, &loadTestResp)
268+
if activityErr != nil {
269+
workflow.GetLogger(ctx).Error("ethereum load test failed", zap.Error(activityErr))
270+
} else if loadTestResp.Result.Error != "" {
271+
workflow.GetLogger(ctx).Error("ethereum load test reported an error", zap.String("error", loadTestResp.Result.Error))
272+
}
276273
})
277-
} else if req.CosmosLoadTestSpec != nil {
278-
workflow.Go(ctx, func(ctx workflow.Context) {
279-
var loadTestResp messages.RunLoadTestResponse
280-
f := workflow.ExecuteActivity(
281-
workflow.WithStartToCloseTimeout(ctx, loadTestTimeout),
282-
loadTestActivities.RunLoadTest,
283-
messages.RunLoadTestRequest{
284-
ChainState: chainState,
285-
ProviderState: providerState,
286-
LoadTestSpec: *req.CosmosLoadTestSpec,
287-
RunnerType: req.RunnerType,
288-
IsEvmChain: req.IsEvmChain,
289-
Mnemonics: mnemonics,
290-
CatalystVersion: req.CatalystVersion,
291-
},
292-
)
293-
294-
selector.AddFuture(f, func(f workflow.Future) {
295-
activityErr := f.Get(ctx, &loadTestResp)
296-
if activityErr != nil {
297-
workflow.GetLogger(ctx).Error("cosmos load test failed", zap.Error(activityErr))
298-
} else if loadTestResp.Result.Error != "" {
299-
workflow.GetLogger(ctx).Error("cosmos load test reported an error", zap.String("error", loadTestResp.Result.Error))
300-
}
301-
})
302274

275+
return f, nil
276+
} else if req.CosmosLoadTestSpec != nil {
277+
var loadTestResp messages.RunLoadTestResponse
278+
f := workflow.ExecuteActivity(
279+
workflow.WithStartToCloseTimeout(ctx, loadTestTimeout),
280+
loadTestActivities.RunLoadTest,
281+
messages.RunLoadTestRequest{
282+
ChainState: chainState,
283+
ProviderState: providerState,
284+
LoadTestSpec: *req.CosmosLoadTestSpec,
285+
RunnerType: req.RunnerType,
286+
IsEvmChain: req.IsEvmChain,
287+
Mnemonics: mnemonics,
288+
CatalystVersion: req.CatalystVersion,
289+
},
290+
)
291+
292+
selector.AddFuture(f, func(f workflow.Future) {
293+
activityErr := f.Get(ctx, &loadTestResp)
294+
if activityErr != nil {
295+
workflow.GetLogger(ctx).Error("cosmos load test failed", zap.Error(activityErr))
296+
} else if loadTestResp.Result.Error != "" {
297+
workflow.GetLogger(ctx).Error("cosmos load test reported an error", zap.String("error", loadTestResp.Result.Error))
298+
}
303299
})
300+
301+
return f, nil
304302
}
305303

306-
return nil
304+
return nil, nil
307305
}
308306

309307
func startWorkflow(ctx workflow.Context, req messages.TestnetWorkflowRequest, runName string, buildResult messages.BuildDockerImageResponse, workflowID string) error {
@@ -334,7 +332,7 @@ func startWorkflow(ctx workflow.Context, req messages.TestnetWorkflowRequest, ru
334332

335333
shutdownSelector := workflow.NewSelector(ctx)
336334
// 1. load test selector
337-
err = runLoadTest(ctx, req, chainState, providerState, mnemonics, shutdownSelector)
335+
loadTestFuture, err := runLoadTest(ctx, req, chainState, providerState, mnemonics, shutdownSelector)
338336
if err != nil {
339337
workflow.GetLogger(ctx).Error("load test initiation failed", zap.Error(err))
340338
}
@@ -343,6 +341,18 @@ func startWorkflow(ctx workflow.Context, req messages.TestnetWorkflowRequest, ru
343341

344342
shutdownSelector.Select(ctx)
345343

344+
// If we have a loadtest running and the duration timer expired (not cancelled),
345+
// wait for the loadtest to complete before allowing teardown
346+
if loadTestFuture != nil && !temporal.IsCanceledError(ctx.Err()) {
347+
workflow.GetLogger(ctx).Info("testnet duration expired but loadtest is still running, waiting for completion")
348+
err = loadTestFuture.Get(ctx, nil) // Wait for loadtest to complete
349+
if err != nil {
350+
workflow.GetLogger(ctx).Error("failed to wait for load test future", zap.Error(err))
351+
return err
352+
}
353+
workflow.GetLogger(ctx).Info("loadtest completed, proceeding with teardown")
354+
}
355+
346356
if ctx.Err() != nil && temporal.IsCanceledError(ctx.Err()) {
347357
workflow.GetLogger(ctx).Info("workflow was cancelled, completing gracefully")
348358
return nil

workflows/testnet/testnet_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,87 @@ func (s *TestnetWorkflowTestSuite) Test_TestnetWorkflowLongRunningCancelled() {
553553
s.env.AssertActivityNumberOfCalls(s.T(), "TeardownProvider", 1)
554554
}
555555

556+
func (s *TestnetWorkflowTestSuite) Test_TestnetWorkflowWaitsForLoadTestOnDurationExpiry() {
557+
loadTestCompleted := false
558+
559+
cfg, err := types.ParseWorkerConfig("../../conf/worker.yaml")
560+
if err != nil {
561+
s.T().Fatal(err)
562+
}
563+
testnetActivity := &testnettypes.Activity{
564+
Chains: cfg.Chains,
565+
}
566+
loadTestActivity := &loadtest.Activity{}
567+
builderActivity := &builder.Activity{}
568+
walletCreatorActivities := walletcreator.Activity{}
569+
570+
s.env.RegisterActivity(testnetActivity.CreateProvider)
571+
s.env.RegisterActivity(testnetActivity.TeardownProvider)
572+
s.env.RegisterActivity(testnetActivity.LaunchTestnet)
573+
s.env.RegisterActivity(loadTestActivity.RunLoadTest)
574+
s.env.RegisterActivity(builderActivity.BuildDockerImage)
575+
s.env.RegisterActivity(walletCreatorActivities.CreateWallets)
576+
577+
testnetActivities = testnetActivity
578+
loadTestActivities = loadTestActivity
579+
580+
s.env.OnActivity(loadTestActivity.RunLoadTest, mock.Anything, mock.Anything).Return(
581+
func(ctx context.Context, req messages.RunLoadTestRequest) (messages.RunLoadTestResponse, error) {
582+
time.Sleep(5 * time.Second)
583+
loadTestCompleted = true
584+
return messages.RunLoadTestResponse{
585+
Result: catalysttypes.LoadTestResult{},
586+
}, nil
587+
})
588+
589+
s.env.OnActivity(testnetActivity.TeardownProvider, mock.Anything, mock.Anything).Return(
590+
func(ctx context.Context, req messages.TeardownProviderRequest) (messages.TeardownProviderResponse, error) {
591+
// fail if teardown is called before loadtest completes
592+
if !loadTestCompleted {
593+
s.T().Errorf("TeardownProvider called before LoadTest completed instead of waiting for load test to complete")
594+
}
595+
return messages.TeardownProviderResponse{}, nil
596+
})
597+
598+
s.env.OnActivity(builderActivity.BuildDockerImage, mock.Anything, mock.Anything).Return(
599+
func(ctx context.Context, req messages.BuildDockerImageRequest) (messages.BuildDockerImageResponse, error) {
600+
originalTag := "ghcr.io/cosmos/simapp:v0.50"
601+
newTag := "simapp-v53"
602+
603+
cmd := exec.Command("docker", "pull", originalTag)
604+
output, err := cmd.CombinedOutput()
605+
if err != nil {
606+
return messages.BuildDockerImageResponse{}, err
607+
}
608+
609+
tagCmd := exec.Command("docker", "tag", originalTag, newTag)
610+
tagOutput, err := tagCmd.CombinedOutput()
611+
if err != nil {
612+
return messages.BuildDockerImageResponse{}, err
613+
}
614+
615+
return messages.BuildDockerImageResponse{
616+
FQDNTag: newTag,
617+
Logs: append(output, tagOutput...),
618+
}, nil
619+
})
620+
621+
dockerReq := simappReq
622+
dockerReq.Repo = "cosmos-sdk"
623+
dockerReq.SHA = "acb1d65cdc1e0fc36d93f3c5bb6aaf919a1321e2"
624+
dockerReq.RunnerType = messages.Docker
625+
dockerReq.ChainConfig.Name = "stake"
626+
dockerReq.TestnetDuration = "3s"
627+
dockerReq.LongRunningTestnet = false
628+
629+
s.env.ExecuteWorkflow(Workflow, dockerReq)
630+
631+
s.True(s.env.IsWorkflowCompleted())
632+
s.NoError(s.env.GetWorkflowError())
633+
s.env.AssertActivityNumberOfCalls(s.T(), "RunLoadTest", 1)
634+
s.env.AssertActivityNumberOfCalls(s.T(), "TeardownProvider", 1)
635+
}
636+
556637
func TestTestnetWorkflowTestSuite(t *testing.T) {
557638
suite.Run(t, new(TestnetWorkflowTestSuite))
558639
}

0 commit comments

Comments
 (0)