Skip to content

Commit 9388590

Browse files
sendqueeryAllison Maheu
authored and
Allison Maheu
committed
Add safety for rate limited invocations (#7)
* Add MaxThrottleDelay to allow for slower retries * Add nil check for invocations that are rate limited (fixes #3 for real) * Minor syntax/naming cleanup
1 parent d86705a commit 9388590

File tree

5 files changed

+108
-61
lines changed

5 files changed

+108
-61
lines changed

aws/config/config.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package config
22

33
import (
4+
"time"
5+
46
"github.com/aws/aws-sdk-go/aws"
57
"github.com/aws/aws-sdk-go/aws/client"
68
"github.com/disneystreaming/go-ssmhelpers/util/httpx"
@@ -16,7 +18,8 @@ func NewDefaultConfig() *aws.Config {
1618
return &aws.Config{
1719
HTTPClient: httpx.NewDefaultClient(),
1820
Retryer: &client.DefaultRetryer{
19-
NumMaxRetries: 10,
21+
NumMaxRetries: 10,
22+
MaxThrottleDelay: 1500 * time.Millisecond,
2023
},
2124
}
2225
}
@@ -27,7 +30,8 @@ func NewDefaultConfigWithRegion(region string) *aws.Config {
2730
Region: aws.String(region),
2831
HTTPClient: httpx.NewDefaultClient(),
2932
Retryer: &client.DefaultRetryer{
30-
NumMaxRetries: 10,
33+
NumMaxRetries: 10,
34+
MaxThrottleDelay: 1500 * time.Millisecond,
3135
},
3236
}
3337
}

go.sum

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
github.com/aws/aws-sdk-go v1.27.3 h1:CBWC7Yot0U6OU/uosUmq7tKJVBTq6HrhgW1Vjpt9SMw=
2+
github.com/aws/aws-sdk-go v1.27.3/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
3+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
5+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
7+
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
8+
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
9+
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
10+
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
11+
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
12+
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
13+
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
14+
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
15+
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
16+
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
17+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
18+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
19+
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
20+
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
21+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
22+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
23+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
24+
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
25+
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
26+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
27+
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8=
28+
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
29+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
30+
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
31+
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
32+
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
33+
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
34+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
35+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
36+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
37+
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
38+
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

ssm/helpers.go

+2-10
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package ssm
22

33
import (
4-
"sync"
5-
64
"github.com/aws/aws-sdk-go/service/ec2"
75
"github.com/aws/aws-sdk-go/service/ssm"
86
"github.com/hashicorp/go-multierror"
@@ -59,13 +57,10 @@ func addInstanceInfo(instanceID *string, tags []ec2helpers.InstanceTags, instanc
5957
func RunInvocations(sp *session.Pool, sess *ssm.SSM, instances []*ssm.InstanceInformation, params *invocation.RunShellScriptParameters, dryRun bool, resultsPool *invocation.ResultSafe) (err error) {
6058
var commandOutput invocation.CommandOutputSafe
6159
var invError error
62-
var wg sync.WaitGroup
6360

6461
scoChan := make(chan *ssm.SendCommandOutput)
6562
errChan := make(chan error)
6663

67-
wg.Add(len(instances))
68-
6964
/*
7065
In a standard deployment, SSM allows us run commands on a maximum of
7166
up to 50 instances simultaneously.
@@ -81,8 +76,6 @@ func RunInvocations(sp *session.Pool, sess *ssm.SSM, instances []*ssm.InstanceIn
8176
*/
8277

8378
for _, instance := range instances {
84-
defer wg.Done()
85-
8679
go invocation.RunSSMCommand(sess, params, dryRun, scoChan, errChan, *instance.InstanceId)
8780
output, err := <-scoChan, <-errChan
8881

@@ -95,17 +88,16 @@ func RunInvocations(sp *session.Pool, sess *ssm.SSM, instances []*ssm.InstanceIn
9588
}
9689
}
9790

98-
wg.Wait()
9991
// Fetch the results of our invocation for all provided instances
100-
invocationtatus, err := invocation.GetCommandInvocationResult(sess, commandOutput.Output...)
92+
invocationStatus, err := invocation.GetCommandInvocationResult(sess, commandOutput.Output...)
10193
if err != nil {
10294
// If we somehow throw an error here, something has gone screwy with our invocation or the target instance
10395
// See the docs on ssm.GetCommandInvocation() for error details
10496
invError = multierror.Append(invError, err)
10597
}
10698

10799
// Iterate through all retrieved invocation results to add some extra context
108-
addInvocationResults(invocationtatus, resultsPool, sp)
100+
addInvocationResults(invocationStatus, resultsPool, sp)
109101
return invError
110102
}
111103

ssm/invocation/runner.go

+50-49
Original file line numberDiff line numberDiff line change
@@ -52,55 +52,56 @@ func GetCommandInvocationResult(context ssmiface.SSMAPI, jobs ...*ssm.SendComman
5252

5353
// Concurrently iterate through all items in []instanceIDs and get the invocation status
5454
for _, v := range jobs {
55-
wg.Add(len(v.Command.InstanceIds))
56-
for _, i := range v.Command.InstanceIds {
57-
58-
go func(v *ssm.SendCommandOutput, i *string, context ssmiface.SSMAPI) {
59-
defer wg.Done()
60-
/*
61-
GetCommandInvocation() requires a GetCommandInvocationInput object, which
62-
has required parameters CommandId and InstanceId. It is important to note
63-
that unlike the execution of the command, you can only retrieve the invocation
64-
results for one instance+command at a time.
65-
*/
66-
gciInput := &ssm.GetCommandInvocationInput{
67-
CommandId: v.Command.CommandId,
68-
InstanceId: i,
69-
}
70-
71-
// Retrieve the status of the command invocation
72-
status, err := context.GetCommandInvocation(gciInput)
73-
74-
// If we get "InvocationDoesNotExist", it just means we tried to check the results too quickly
75-
for awsErr, ok := err.(awserr.Error); ok && err != nil && awsErr.Code() == "InvocationDoesNotExist"; {
76-
time.Sleep(1000 * time.Millisecond)
77-
status, err = context.GetCommandInvocation(gciInput)
78-
}
79-
80-
// If we somehow throw a real error here, something has gone screwy with our invocation or the target instance
81-
// See the docs on ssm.GetCommandInvocation() for error details
82-
if err != nil {
83-
errLog.Errorln(err)
84-
return
85-
}
86-
87-
// If the invocation is in a pending state, we sleep for a couple seconds before retrying the query
88-
// NOTE: This may need to change based on API limits, but as there is no documentation, we'll have to wait and see.
89-
for *status.StatusDetails == "InProgress" || *status.StatusDetails == "Pending" {
90-
status, err = context.GetCommandInvocation(gciInput)
91-
time.Sleep(2000 * time.Millisecond)
92-
}
93-
94-
if err != nil {
95-
errLog.Errorln(err)
96-
return
97-
}
98-
99-
// Append the result to our slice of results
100-
results.Lock()
101-
results.results = append(results.results, status)
102-
results.Unlock()
103-
}(v, i, context)
55+
if v.Command != nil {
56+
for _, i := range v.Command.InstanceIds {
57+
wg.Add(1)
58+
go func(v *ssm.SendCommandOutput, i *string, context ssmiface.SSMAPI) {
59+
defer wg.Done()
60+
/*
61+
GetCommandInvocation() requires a GetCommandInvocationInput object, which
62+
has required parameters CommandId and InstanceId. It is important to note
63+
that unlike the execution of the command, you can only retrieve the invocation
64+
results for one instance+command at a time.
65+
*/
66+
gciInput := &ssm.GetCommandInvocationInput{
67+
CommandId: v.Command.CommandId,
68+
InstanceId: i,
69+
}
70+
71+
// Retrieve the status of the command invocation
72+
status, err := context.GetCommandInvocation(gciInput)
73+
74+
// If we get "InvocationDoesNotExist", it just means we tried to check the results too quickly
75+
for awsErr, ok := err.(awserr.Error); ok && err != nil && awsErr.Code() == "InvocationDoesNotExist"; {
76+
time.Sleep(1000 * time.Millisecond)
77+
status, err = context.GetCommandInvocation(gciInput)
78+
}
79+
80+
// If we somehow throw a real error here, something has gone screwy with our invocation or the target instance
81+
// See the docs on ssm.GetCommandInvocation() for error details
82+
if err != nil {
83+
errLog.Errorln(err)
84+
return
85+
}
86+
87+
// If the invocation is in a pending state, we sleep for a couple seconds before retrying the query
88+
// NOTE: This may need to change based on API limits, but as there is no documentation, we'll have to wait and see.
89+
for *status.StatusDetails == "InProgress" || *status.StatusDetails == "Pending" {
90+
status, err = context.GetCommandInvocation(gciInput)
91+
time.Sleep(2000 * time.Millisecond)
92+
}
93+
94+
if err != nil {
95+
errLog.Errorln(err)
96+
return
97+
}
98+
99+
// Append the result to our slice of results
100+
results.Lock()
101+
results.results = append(results.results, status)
102+
results.Unlock()
103+
}(v, i, context)
104+
}
104105
}
105106
}
106107

ssm/types.go

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (l *ListSlice) Set(value string) error {
2525
return nil
2626
}
2727

28+
func (l *ListSlice) Type() string {
29+
return "stringSlice"
30+
}
31+
2832
func (s *SemiSlice) String() string {
2933
return fmt.Sprintf("%s", *s)
3034
}
@@ -38,6 +42,10 @@ func (s *SemiSlice) Set(value string) error {
3842
return nil
3943
}
4044

45+
func (s *SemiSlice) Type() string {
46+
return "stringSlice"
47+
}
48+
4149
func (c *CommaSlice) String() string {
4250
return fmt.Sprintf("%s", *c)
4351
}
@@ -50,3 +58,7 @@ func (c *CommaSlice) Set(value string) error {
5058
}
5159
return nil
5260
}
61+
62+
func (c *CommaSlice) Type() string {
63+
return "stringSlice"
64+
}

0 commit comments

Comments
 (0)