Skip to content

Commit fe8b891

Browse files
authored
Merge pull request #36 from cultureamp/feat/add-configurable-timeout
feat: add configurable timeout
2 parents b470c2c + 4e86cf6 commit fe8b891

File tree

10 files changed

+223
-37
lines changed

10 files changed

+223
-37
lines changed

README.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,23 @@ Add the following lines to your `pipeline.yml`:
88
steps:
99
- plugins:
1010
- cultureamp/ecs-task-runner#v0.0.0:
11-
parameter-name: "/my-cool-service/farm-name"
12-
command: "./run-my-fully-siq-migrations"
11+
parameter-name: "test-parameter"
12+
command: "/bin/migrate"
13+
timeout: 900
1314
```
1415
1516
## Configuration
1617
17-
### `message` (Required, string)
18+
### `parameter-name` (Required, string)
19+
The name or ARN of the parameter in Parameter Store that contains the task definition.
1820

19-
The message to annotate onto the build.
21+
### `command` (Optional, string)
22+
The name of the command to run in the task. When omitted, the task will run the command specified in the parameter.
23+
24+
### `timeout` (Optional, integer)
25+
The timeout in seconds that the plugin will wait for the task to complete. If the task does not complete within this time, the plugin will fail. The task execution will continue to run in the background.
26+
27+
Default: 2700
2028

2129
## Usage
2230
This plugin is based on an existing pattern in `murmur` where database migrations are run as a task on ECS. To provide additional context for how this plugin is expected to be used, this is the expected pattern:
@@ -35,7 +43,7 @@ This plugin comes with some assumed infrastructure that needs to be deployed bef
3543
- An IAM role for the BK agent to start the task
3644
- A Parameter Store parameter extending the task definition by providing entrypoint overrides and networking configuration
3745
- A log group for the task
38-
- A security group for your service (this can be the [base-infrastructure-for-services](https://github.com/cultureamp/base-infrastructure-for-services) source security group
46+
- A security group for your service (this can be the [base-infrastructure-for-services](https://github.com/cultureamp/base-infrastructure-for-services) source security group)
3947

4048
This can be visualised below:
4149
![The overall flow of this plugin and AWS resources](docs/images/diagram.svg)

plugin.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ configuration:
99
type: string
1010
command:
1111
type: string
12+
timeout:
13+
type: integer
1214
additionalProperties: false
1315
anyOf:
1416
- required:
15-
- parameter-name
17+
- parameter-name

src/aws/ecs.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type EcsClientAPI interface {
1919
DescribeTaskDefinition(ctx context.Context, params *ecs.DescribeTaskDefinitionInput, optFns ...func(*ecs.Options)) (*ecs.DescribeTaskDefinitionOutput, error)
2020
}
2121

22-
type ecsWaiterAPI interface {
22+
type EcsWaiterAPI interface {
2323
WaitForOutput(ctx context.Context, params *ecs.DescribeTasksInput, maxWaitDur time.Duration, optFns ...func(*ecs.TasksStoppedWaiterOptions)) (*ecs.DescribeTasksOutput, error)
2424
}
2525

@@ -56,11 +56,10 @@ func SubmitTask(ctx context.Context, ecsAPI EcsClientAPI, input *TaskRunnerConfi
5656
return *response.Tasks[0].TaskArn, nil
5757
}
5858

59-
func WaitForCompletion(ctx context.Context, waiter ecsWaiterAPI, taskArn string) (*ecs.DescribeTasksOutput, error) {
59+
func WaitForCompletion(ctx context.Context, waiter EcsWaiterAPI, taskArn string, timeOut int) (*ecs.DescribeTasksOutput, error) {
6060
cluster := ClusterFromTaskArn(taskArn)
6161

62-
// TODO: This magic number will be resolved in a future piece of work, not going to refactor this right now
63-
maxWaitDuration := 15 * time.Minute //nolint:mnd
62+
maxWaitDuration := time.Duration(timeOut) * time.Second
6463
result, err := waiter.WaitForOutput(ctx, &ecs.DescribeTasksInput{
6564
Cluster: aws.String(cluster),
6665
Tasks: []string{taskArn},

src/aws/ecs_test.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -388,16 +388,23 @@ func TestFindLogStreamFromTaskNegative(t *testing.T) {
388388
// to allow thing to finish in the background. The return value is used only for when a task fails, and we push
389389
// this to a log.
390390
func TestWaitForCompletion(t *testing.T) {
391-
mockedWaiter := mockECSWaiter{
392-
mockWaitForOutput: func(context.Context, *ecs.DescribeTasksInput, time.Duration, ...func(*ecs.TasksStoppedWaiterOptions)) (*ecs.DescribeTasksOutput, error) {
393-
return &ecs.DescribeTasksOutput{
394-
Failures: []types.Failure{
395-
{
396-
Arn: aws.String("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/07cc583696bd44e0be450bff7314ddaf"),
397-
Detail: aws.String("task stopped"),
398-
Reason: aws.String("computer is full of beanz"),
399-
},
400-
}}, errors.New("task stopped: computer is full of beanz")
391+
mockedWaiter := map[string]mockECSWaiter{
392+
"beans": {
393+
mockWaitForOutput: func(context.Context, *ecs.DescribeTasksInput, time.Duration, ...func(*ecs.TasksStoppedWaiterOptions)) (*ecs.DescribeTasksOutput, error) {
394+
return &ecs.DescribeTasksOutput{
395+
Failures: []types.Failure{
396+
{
397+
Arn: aws.String("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/07cc583696bd44e0be450bff7314ddaf"),
398+
Detail: aws.String("task stopped"),
399+
Reason: aws.String("computer is full of beanz"),
400+
},
401+
}}, nil
402+
},
403+
},
404+
"slowpoke": {
405+
mockWaitForOutput: func(context.Context, *ecs.DescribeTasksInput, time.Duration, ...func(*ecs.TasksStoppedWaiterOptions)) (*ecs.DescribeTasksOutput, error) {
406+
return nil, errors.New("task timed out: computer still thinking")
407+
},
401408
},
402409
}
403410

@@ -410,35 +417,41 @@ func TestWaitForCompletion(t *testing.T) {
410417
tests := []struct {
411418
name string
412419
input string
413-
waiter ecsWaiterAPI
420+
waiter EcsWaiterAPI
414421
expected expectedReturn
415422
}{
416423
{
417424
name: "given a task ARN, it should return the task details",
418425
input: "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/07cc583696bd44e0be450bff7314ddaf",
419-
waiter: mockedWaiter,
426+
waiter: mockedWaiter["beans"],
420427
expected: expectedReturn{&ecs.DescribeTasksOutput{
421428
Failures: []types.Failure{
422429
{
423430
Arn: aws.String("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/07cc583696bd44e0be450bff7314ddaf"),
424431
Detail: aws.String("task stopped"),
425432
Reason: aws.String("computer is full of beanz"),
426433
},
427-
}}, errors.New("task stopped: computer is full of beanz"),
434+
}}, nil,
428435
},
429436
},
437+
{
438+
name: "given a task that times out, it should return an error",
439+
input: "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/07cc583696bd44e0be450bff7314ddaf",
440+
waiter: mockedWaiter["slowpoke"],
441+
expected: expectedReturn{nil, errors.New("task timed out: computer still thinking")},
442+
},
430443
}
431444

432445
for _, tc := range tests {
433446
t.Run(tc.name, func(t *testing.T) {
434-
result, err := WaitForCompletion(context.TODO(), tc.waiter, tc.input)
435-
t.Logf("result: '%v'", err)
436-
t.Logf("expected: detail: %v, reason: %v", *tc.expected.Failures[0].Detail, *tc.expected.Failures[0].Reason)
437-
438-
// The function is most-useful when the underlying task fails. i.e. no news is good news in a real-world scenario
439-
// So, we will test the failure cases
440-
require.Error(t, err)
441-
assert.Equal(t, tc.expected.Failures[0], result.Failures[0])
447+
result, err := WaitForCompletion(context.TODO(), tc.waiter, tc.input, 15)
448+
t.Logf("name: %s result: '%v'", tc.name, err)
449+
// Errors are only returned when the waiter times out
450+
if err != nil {
451+
require.Equal(t, tc.expected.Error(), err.Error())
452+
} else {
453+
require.Equal(t, tc.expected.Failures, result.Failures)
454+
}
442455
})
443456
}
444457
}

src/buildkite/agent.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ import (
1111
osexec "golang.org/x/sys/execabs"
1212
)
1313

14+
type AgentAPI interface {
15+
Annotate(ctx context.Context, message string, style string, annotationContext string) error
16+
}
17+
1418
type Agent struct {
1519
}
1620

17-
func (a *Agent) Annotate(ctx context.Context, message string, style string, annotationContext string) error {
21+
func (a Agent) Annotate(ctx context.Context, message string, style string, annotationContext string) error {
1822
return execCmd(ctx, "buildkite-agent", &message, "annotate", "--style", style, "--context", annotationContext)
1923
}
2024

src/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"os"
66

7+
awsinternal "github.com/cultureamp/ecs-task-runner-buildkite-plugin/aws"
78
"github.com/cultureamp/ecs-task-runner-buildkite-plugin/buildkite"
89
"github.com/cultureamp/ecs-task-runner-buildkite-plugin/plugin"
910
)
@@ -13,7 +14,7 @@ func main() {
1314
fetcher := plugin.EnvironmentConfigFetcher{}
1415
taskRunnerPlugin := plugin.TaskRunnerPlugin{}
1516

16-
err := taskRunnerPlugin.Run(ctx, fetcher)
17+
err := taskRunnerPlugin.Run(ctx, fetcher, awsinternal.WaitForCompletion)
1718

1819
if err != nil {
1920
buildkite.LogFailuref("plugin execution failed: %s\n", err.Error())

src/plugin/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
type Config struct {
88
ParameterName string `required:"true" split_words:"true"`
99
Command string `required:"false" split_words:"true"`
10+
TimeOut int `default:"2700" split_words:"true"`
1011
}
1112

1213
type EnvironmentConfigFetcher struct {

src/plugin/config_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func TestFailOnMissingRequiredEnvironment(t *testing.T) {
2525
disabledEnvVars: []string{
2626
"BUILDKITE_PLUGIN_ECS_TASK_RUNNER_PARAMETER_NAME",
2727
"BUILDKITE_PLUGIN_ECS_TASK_RUNNER_COMMAND",
28+
"BUILDKITE_PLUGIN_ECS_TASK_RUNNER_TIMEOUT",
2829
},
2930
enabledEnvVars: map[string]string{},
3031
expectedErr: "required key BUILDKITE_PLUGIN_ECS_TASK_RUNNER_PARAMETER_NAME missing value",
@@ -100,18 +101,27 @@ func TestSucceedOnMissingOptionalEnvironment(t *testing.T) {
100101
func TestFetchConfigFromEnvironment(t *testing.T) {
101102
unsetEnv(t, "BUILDKITE_PLUGIN_ECS_TASK_RUNNER_PARAMETER_NAME")
102103
unsetEnv(t, "BUILDKITE_PLUGIN_ECS_TASK_RUNNER_COMMAND")
104+
unsetEnv(t, "BUILDKITE_PLUGIN_ECS_TASK_RUNNER_TIME_OUT")
103105

104106
var config plugin.Config
105107
fetcher := plugin.EnvironmentConfigFetcher{}
106108

107109
t.Setenv("BUILDKITE_PLUGIN_ECS_TASK_RUNNER_PARAMETER_NAME", "test-parameter")
108110
t.Setenv("BUILDKITE_PLUGIN_ECS_TASK_RUNNER_COMMAND", "hello-world")
111+
t.Setenv("BUILDKITE_PLUGIN_ECS_TASK_RUNNER_TIME_OUT", "600")
109112

110113
err := fetcher.Fetch(&config)
111114

112115
require.NoError(t, err, "fetch should not error")
113116
assert.Equal(t, "test-parameter", config.ParameterName, "fetched message should match environment")
114-
assert.Equal(t, "hello-world", config.Command, "fetched message should match environment")
117+
assert.Equal(t, "hello-world", config.Command, "fetched script should match environment")
118+
assert.Equal(t, 600, config.TimeOut, "fetched timeout should match environment")
119+
120+
// test default value
121+
unsetEnv(t, "BUILDKITE_PLUGIN_ECS_TASK_RUNNER_TIME_OUT")
122+
err = fetcher.Fetch(&config)
123+
require.NoError(t, err, "fetch should not error")
124+
assert.Equal(t, 2700, config.TimeOut, "fetched timeout should match environment")
115125
}
116126

117127
func unsetEnv(t *testing.T, key string) {

src/plugin/task-runner.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package plugin
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"strings"
78
"time"
@@ -18,16 +19,19 @@ import (
1819
type TaskRunnerPlugin struct {
1920
}
2021

22+
type WaitForCompletion func(ctx context.Context, waiter awsinternal.EcsWaiterAPI, taskArn string, timeOut int) (*ecs.DescribeTasksOutput, error)
2123
type ConfigFetcher interface {
2224
Fetch(config *Config) error
2325
}
2426

25-
func (trp TaskRunnerPlugin) Run(ctx context.Context, fetcher ConfigFetcher) error {
27+
func (trp TaskRunnerPlugin) Run(ctx context.Context, fetcher ConfigFetcher, waiter WaitForCompletion) error {
2628
var config Config
29+
2730
err := fetcher.Fetch(&config)
2831
if err != nil {
2932
return fmt.Errorf("plugin configuration error: %w", err)
3033
}
34+
buildKiteAgent := buildkite.Agent{}
3135

3236
buildkite.Log("Executing task-runner plugin\n")
3337

@@ -61,10 +65,12 @@ func (trp TaskRunnerPlugin) Run(ctx context.Context, fetcher ConfigFetcher) erro
6165
// TODO: This is currently a magic number. If we want this to be configurable, remove the nolint directive and fix it up
6266
o.MaxDelay = 10 * time.Second //nolint:mnd
6367
})
64-
result, err := awsinternal.WaitForCompletion(ctx, waiterClient, taskArn)
68+
result, err := waiter(ctx, waiterClient, taskArn, config.TimeOut)
69+
err = trp.HandleResults(ctx, result, err, buildKiteAgent, config)
6570
if err != nil {
66-
return fmt.Errorf("failed to wait for task completion: %w\nFailure information: %v", err, result.Failures[0])
71+
return fmt.Errorf("failed to handle task results: %w", err)
6772
}
73+
6874
// In a successful scenario for task completion, we would have a `tasks` slice with a single element
6975
task := result.Tasks[0]
7076
taskLogDetails, err := awsinternal.FindLogStreamFromTask(ctx, ecsClient, task)
@@ -104,3 +110,33 @@ func (trp TaskRunnerPlugin) Run(ctx context.Context, fetcher ConfigFetcher) erro
104110
buildkite.Log("done. \n")
105111
return nil
106112
}
113+
114+
func (trp TaskRunnerPlugin) HandleResults(ctx context.Context, output *ecs.DescribeTasksOutput, err error, bkAgent buildkite.AgentAPI, config Config) error {
115+
if err != nil {
116+
// This comparison is hacky, but is the only way that I could get the wrapped errors surfaced
117+
// from the AWS library to be properly handled. It would be better if this was done using errors.As
118+
if strings.Contains(err.Error(), "exceeded max wait time for TasksStopped waiter") {
119+
err := bkAgent.Annotate(ctx, fmt.Sprintf("Task did not complete successfully within timeout (%d seconds)", config.TimeOut), "error", "ecs-task-runner")
120+
if err != nil {
121+
return fmt.Errorf("failed to annotate buildkite with task timeout failure: %w", err)
122+
}
123+
return errors.New("task did not complete within the time limit")
124+
}
125+
bkerr := bkAgent.Annotate(ctx, fmt.Sprintf("failed to wait for task completion: %v\n", err), "error", "ecs-task-runner")
126+
if bkerr != nil {
127+
return fmt.Errorf("failed to annotate buildkite with task wait failure: %w, annotation error: %w", err, bkerr)
128+
}
129+
} else if len(output.Failures) > 0 {
130+
// There is still a scenario where the task could return failures but this isn't handled by the waiter
131+
// This is due to the waiter only returning errors in scenarios where there are issues querying the task
132+
// or scheduling the task. For a list of the Failures that can be returned in this case, see:
133+
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/api_failures_messages.html
134+
// specifically, under the `DescribeTasks` API.
135+
err := bkAgent.Annotate(ctx, fmt.Sprintf("Task did not complete successfully: %v", output.Failures[0]), "error", "ecs-task-runner")
136+
if err != nil {
137+
return fmt.Errorf("failed to annotate buildkite with task failure: %w", err)
138+
}
139+
return fmt.Errorf("task did not complete successfully: %v", output.Failures[0])
140+
}
141+
return nil
142+
}

0 commit comments

Comments
 (0)