Skip to content

Commit a18bf2b

Browse files
authored
Merge pull request kubernetes-sigs#1209 from goto-opensource/feature/retry-single-changes-on-batch-failure
Route53: retry single changes in a batch if the batch fails
2 parents 1a556ce + 7dd84a5 commit a18bf2b

File tree

6 files changed

+405
-182
lines changed

6 files changed

+405
-182
lines changed

endpoint/labels.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
OwnerLabelKey = "owner"
3333
// ResourceLabelKey is the name of the label that identifies k8s resource which wants to acquire the DNS name
3434
ResourceLabelKey = "resource"
35+
// OwnedRecordLabelKey is the name of the label that identifies the record that is owned by the labeled TXT registry record
36+
OwnedRecordLabelKey = "ownedRecord"
3537

3638
// AWSSDDescriptionLabel label responsible for storing raw owner/resource combination information in the Labels
3739
// supposed to be inserted by AWS SD Provider, and parsed into OwnerLabelKey and ResourceLabelKey key by AWS SD Registry

internal/testutils/endpoint.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func SameEndpoint(a, b *endpoint.Endpoint) bool {
6262
return a.DNSName == b.DNSName && a.Targets.Same(b.Targets) && a.RecordType == b.RecordType && a.SetIdentifier == b.SetIdentifier &&
6363
a.Labels[endpoint.OwnerLabelKey] == b.Labels[endpoint.OwnerLabelKey] && a.RecordTTL == b.RecordTTL &&
6464
a.Labels[endpoint.ResourceLabelKey] == b.Labels[endpoint.ResourceLabelKey] &&
65+
a.Labels[endpoint.OwnedRecordLabelKey] == b.Labels[endpoint.OwnedRecordLabelKey] &&
6566
SameProviderSpecific(a.ProviderSpecific, b.ProviderSpecific)
6667
}
6768

provider/aws/aws.go

Lines changed: 145 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,22 @@ type Route53API interface {
136136
ListTagsForResourceWithContext(ctx context.Context, input *route53.ListTagsForResourceInput, opts ...request.Option) (*route53.ListTagsForResourceOutput, error)
137137
}
138138

139+
// wrapper to handle ownership relation throughout the provider implementation
140+
type Route53Change struct {
141+
route53.Change
142+
OwnedRecord string
143+
}
144+
145+
type Route53Changes []*Route53Change
146+
147+
func (cs Route53Changes) Route53Changes() []*route53.Change {
148+
ret := []*route53.Change{}
149+
for _, c := range cs {
150+
ret = append(ret, &c.Change)
151+
}
152+
return ret
153+
}
154+
139155
type zonesListCache struct {
140156
age time.Time
141157
duration time.Duration
@@ -160,6 +176,8 @@ type AWSProvider struct {
160176
zoneTagFilter provider.ZoneTagFilter
161177
preferCNAME bool
162178
zonesCache *zonesListCache
179+
// queue for collecting changes to submit them in the next iteration, but after all other changes
180+
failedChangesQueue map[string]Route53Changes
163181
}
164182

165183
// AWSConfig contains configuration to create a new AWS provider.
@@ -224,6 +242,7 @@ func NewAWSProvider(awsConfig AWSConfig) (*AWSProvider, error) {
224242
preferCNAME: awsConfig.PreferCNAME,
225243
dryRun: awsConfig.DryRun,
226244
zonesCache: &zonesListCache{duration: awsConfig.ZoneCacheDuration},
245+
failedChangesQueue: make(map[string]Route53Changes),
227246
}
228247

229248
return provider, nil
@@ -467,7 +486,7 @@ func (p *AWSProvider) requiresDeleteCreate(old *endpoint.Endpoint, new *endpoint
467486
return false
468487
}
469488

470-
func (p *AWSProvider) createUpdateChanges(newEndpoints, oldEndpoints []*endpoint.Endpoint) []*route53.Change {
489+
func (p *AWSProvider) createUpdateChanges(newEndpoints, oldEndpoints []*endpoint.Endpoint) Route53Changes {
471490
var deletes []*endpoint.Endpoint
472491
var creates []*endpoint.Endpoint
473492
var updates []*endpoint.Endpoint
@@ -483,7 +502,7 @@ func (p *AWSProvider) createUpdateChanges(newEndpoints, oldEndpoints []*endpoint
483502
}
484503
}
485504

486-
combined := make([]*route53.Change, 0, len(deletes)+len(creates)+len(updates))
505+
combined := make(Route53Changes, 0, len(deletes)+len(creates)+len(updates))
487506
combined = append(combined, p.newChanges(route53.ChangeActionCreate, creates)...)
488507
combined = append(combined, p.newChanges(route53.ChangeActionUpsert, updates)...)
489508
combined = append(combined, p.newChanges(route53.ChangeActionDelete, deletes)...)
@@ -514,7 +533,7 @@ func (p *AWSProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) e
514533

515534
updateChanges := p.createUpdateChanges(changes.UpdateNew, changes.UpdateOld)
516535

517-
combinedChanges := make([]*route53.Change, 0, len(changes.Delete)+len(changes.Create)+len(updateChanges))
536+
combinedChanges := make(Route53Changes, 0, len(changes.Delete)+len(changes.Create)+len(updateChanges))
518537
combinedChanges = append(combinedChanges, p.newChanges(route53.ChangeActionCreate, changes.Create)...)
519538
combinedChanges = append(combinedChanges, p.newChanges(route53.ChangeActionDelete, changes.Delete)...)
520539
combinedChanges = append(combinedChanges, updateChanges...)
@@ -523,7 +542,7 @@ func (p *AWSProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) e
523542
}
524543

525544
// submitChanges takes a zone and a collection of Changes and sends them as a single transaction.
526-
func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Change, zones map[string]*route53.HostedZone) error {
545+
func (p *AWSProvider) submitChanges(ctx context.Context, changes Route53Changes, zones map[string]*route53.HostedZone) error {
527546
// return early if there is nothing to change
528547
if len(changes) == 0 {
529548
log.Info("All records are already up to date")
@@ -540,9 +559,16 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Chan
540559
for z, cs := range changesByZone {
541560
var failedUpdate bool
542561

543-
batchCs := batchChangeSet(cs, p.batchChangeSize)
562+
// group changes into new changes and into changes that failed in a previous iteration and are retried
563+
retriedChanges, newChanges := findChangesInQueue(cs, p.failedChangesQueue[z])
564+
p.failedChangesQueue[z] = nil
544565

566+
batchCs := append(batchChangeSet(newChanges, p.batchChangeSize), batchChangeSet(retriedChanges, p.batchChangeSize)...)
545567
for i, b := range batchCs {
568+
if len(b) == 0 {
569+
continue
570+
}
571+
546572
for _, c := range b {
547573
log.Infof("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
548574
}
@@ -551,17 +577,45 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Chan
551577
params := &route53.ChangeResourceRecordSetsInput{
552578
HostedZoneId: aws.String(z),
553579
ChangeBatch: &route53.ChangeBatch{
554-
Changes: b,
580+
Changes: b.Route53Changes(),
555581
},
556582
}
557583

584+
successfulChanges := 0
585+
558586
if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
559-
log.Errorf("Failure in zone %s [Id: %s]", aws.StringValue(zones[z].Name), z)
560-
log.Error(err) // TODO(ideahitme): consider changing the interface in cases when this error might be a concern for other components
561-
failedUpdate = true
587+
log.Errorf("Failure in zone %s [Id: %s] when submitting change batch: %v", aws.StringValue(zones[z].Name), z, err)
588+
589+
changesByOwnership := groupChangesByNameAndOwnershipRelation(b)
590+
591+
if len(changesByOwnership) > 1 {
592+
log.Debug("Trying to submit change sets one-by-one instead")
593+
594+
for _, changes := range changesByOwnership {
595+
for _, c := range changes {
596+
log.Debugf("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
597+
}
598+
params.ChangeBatch = &route53.ChangeBatch{
599+
Changes: changes.Route53Changes(),
600+
}
601+
if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
602+
failedUpdate = true
603+
log.Errorf("Failed submitting change (error: %v), it will be retried in a separate change batch in the next iteration", err)
604+
p.failedChangesQueue[z] = append(p.failedChangesQueue[z], changes...)
605+
} else {
606+
successfulChanges = successfulChanges + len(changes)
607+
}
608+
}
609+
} else {
610+
failedUpdate = true
611+
}
562612
} else {
613+
successfulChanges = len(b)
614+
}
615+
616+
if successfulChanges > 0 {
563617
// z is the R53 Hosted Zone ID already as aws.StringValue
564-
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", len(b), aws.StringValue(zones[z].Name), z)
618+
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", successfulChanges, aws.StringValue(zones[z].Name), z)
565619
}
566620

567621
if i != len(batchCs)-1 {
@@ -583,16 +637,16 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Chan
583637
}
584638

585639
// newChanges returns a collection of Changes based on the given records and action.
586-
func (p *AWSProvider) newChanges(action string, endpoints []*endpoint.Endpoint) []*route53.Change {
587-
changes := make([]*route53.Change, 0, len(endpoints))
640+
func (p *AWSProvider) newChanges(action string, endpoints []*endpoint.Endpoint) Route53Changes {
641+
changes := make(Route53Changes, 0, len(endpoints))
588642

589643
for _, endpoint := range endpoints {
590644
change, dualstack := p.newChange(action, endpoint)
591645
changes = append(changes, change)
592646
if dualstack {
593647
// make a copy of change, modify RRS type to AAAA, then add new change
594648
rrs := *change.ResourceRecordSet
595-
change2 := &route53.Change{Action: change.Action, ResourceRecordSet: &rrs}
649+
change2 := &Route53Change{Change: route53.Change{Action: change.Action, ResourceRecordSet: &rrs}}
596650
change2.ResourceRecordSet.Type = aws.String(route53.RRTypeAaaa)
597651
changes = append(changes, change2)
598652
}
@@ -635,11 +689,13 @@ func (p *AWSProvider) AdjustEndpoints(endpoints []*endpoint.Endpoint) []*endpoin
635689
// returned Change is based on the given record by the given action, e.g.
636690
// action=ChangeActionCreate returns a change for creation of the record and
637691
// action=ChangeActionDelete returns a change for deletion of the record.
638-
func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*route53.Change, bool) {
639-
change := &route53.Change{
640-
Action: aws.String(action),
641-
ResourceRecordSet: &route53.ResourceRecordSet{
642-
Name: aws.String(ep.DNSName),
692+
func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*Route53Change, bool) {
693+
change := &Route53Change{
694+
Change: route53.Change{
695+
Action: aws.String(action),
696+
ResourceRecordSet: &route53.ResourceRecordSet{
697+
Name: aws.String(ep.DNSName),
698+
},
643699
},
644700
}
645701
dualstack := false
@@ -718,9 +774,51 @@ func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*route53.
718774
change.ResourceRecordSet.HealthCheckId = aws.String(prop.Value)
719775
}
720776

777+
if ownedRecord, ok := ep.Labels[endpoint.OwnedRecordLabelKey]; ok {
778+
change.OwnedRecord = ownedRecord
779+
}
780+
721781
return change, dualstack
722782
}
723783

784+
// searches for `changes` that are contained in `queue` and returns the `changes` separated by whether they were found in the queue (`foundChanges`) or not (`notFoundChanges`)
785+
func findChangesInQueue(changes Route53Changes, queue Route53Changes) (foundChanges, notFoundChanges Route53Changes) {
786+
if queue == nil {
787+
return Route53Changes{}, changes
788+
}
789+
790+
for _, c := range changes {
791+
found := false
792+
for _, qc := range queue {
793+
if c == qc {
794+
foundChanges = append(foundChanges, c)
795+
found = true
796+
break
797+
}
798+
}
799+
if !found {
800+
notFoundChanges = append(notFoundChanges, c)
801+
}
802+
}
803+
804+
return
805+
}
806+
807+
// group the given changes by name and ownership relation to ensure these are always submitted in the same transaction to Route53;
808+
// grouping by name is done to always submit changes with the same name but different set identifier together,
809+
// grouping by ownership relation is done to always submit changes of records and e.g. their corresponding TXT registry records together
810+
func groupChangesByNameAndOwnershipRelation(cs Route53Changes) map[string]Route53Changes {
811+
changesByOwnership := make(map[string]Route53Changes)
812+
for _, v := range cs {
813+
key := v.OwnedRecord
814+
if key == "" {
815+
key = aws.StringValue(v.ResourceRecordSet.Name)
816+
}
817+
changesByOwnership[key] = append(changesByOwnership[key], v)
818+
}
819+
return changesByOwnership
820+
}
821+
724822
func (p *AWSProvider) tagsForZone(ctx context.Context, zoneID string) (map[string]string, error) {
725823
response, err := p.client.ListTagsForResourceWithContext(ctx, &route53.ListTagsForResourceInput{
726824
ResourceType: aws.String("hostedzone"),
@@ -736,55 +834,48 @@ func (p *AWSProvider) tagsForZone(ctx context.Context, zoneID string) (map[strin
736834
return tagMap, nil
737835
}
738836

739-
func batchChangeSet(cs []*route53.Change, batchSize int) [][]*route53.Change {
837+
func batchChangeSet(cs Route53Changes, batchSize int) []Route53Changes {
740838
if len(cs) <= batchSize {
741839
res := sortChangesByActionNameType(cs)
742-
return [][]*route53.Change{res}
840+
return []Route53Changes{res}
743841
}
744842

745-
batchChanges := make([][]*route53.Change, 0)
843+
batchChanges := make([]Route53Changes, 0)
746844

747-
changesByName := make(map[string][]*route53.Change)
748-
for _, v := range cs {
749-
changesByName[*v.ResourceRecordSet.Name] = append(changesByName[*v.ResourceRecordSet.Name], v)
750-
}
845+
changesByOwnership := groupChangesByNameAndOwnershipRelation(cs)
751846

752847
names := make([]string, 0)
753-
for v := range changesByName {
848+
for v := range changesByOwnership {
754849
names = append(names, v)
755850
}
756851
sort.Strings(names)
757852

758-
for _, name := range names {
759-
totalChangesByName := len(changesByName[name])
760-
761-
if totalChangesByName > batchSize {
762-
log.Warnf("Total changes for %s exceeds max batch size of %d, total changes: %d", name,
763-
batchSize, totalChangesByName)
853+
currentBatch := Route53Changes{}
854+
for k, name := range names {
855+
v := changesByOwnership[name]
856+
if len(v) > batchSize {
857+
log.Warnf("Total changes for %v exceeds max batch size of %d, total changes: %d; changes will not be performed", k, batchSize, len(v))
764858
continue
765859
}
766860

767-
var existingBatch bool
768-
for i, b := range batchChanges {
769-
if len(b)+totalChangesByName <= batchSize {
770-
batchChanges[i] = append(batchChanges[i], changesByName[name]...)
771-
existingBatch = true
772-
break
773-
}
774-
}
775-
if !existingBatch {
776-
batchChanges = append(batchChanges, changesByName[name])
861+
if len(currentBatch)+len(v) > batchSize {
862+
// currentBatch would be too large if we add this changeset;
863+
// add currentBatch to batchChanges and start a new currentBatch
864+
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
865+
currentBatch = append(Route53Changes{}, v...)
866+
} else {
867+
currentBatch = append(currentBatch, v...)
777868
}
778869
}
779-
780-
for i, batch := range batchChanges {
781-
batchChanges[i] = sortChangesByActionNameType(batch)
870+
if len(currentBatch) > 0 {
871+
// add final currentBatch
872+
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
782873
}
783874

784875
return batchChanges
785876
}
786877

787-
func sortChangesByActionNameType(cs []*route53.Change) []*route53.Change {
878+
func sortChangesByActionNameType(cs Route53Changes) Route53Changes {
788879
sort.SliceStable(cs, func(i, j int) bool {
789880
if *cs[i].Action > *cs[j].Action {
790881
return true
@@ -805,11 +896,11 @@ func sortChangesByActionNameType(cs []*route53.Change) []*route53.Change {
805896
}
806897

807898
// changesByZone separates a multi-zone change into a single change per zone.
808-
func changesByZone(zones map[string]*route53.HostedZone, changeSet []*route53.Change) map[string][]*route53.Change {
809-
changes := make(map[string][]*route53.Change)
899+
func changesByZone(zones map[string]*route53.HostedZone, changeSet Route53Changes) map[string]Route53Changes {
900+
changes := make(map[string]Route53Changes)
810901

811902
for _, z := range zones {
812-
changes[aws.StringValue(z.Id)] = []*route53.Change{}
903+
changes[aws.StringValue(z.Id)] = Route53Changes{}
813904
}
814905

815906
for _, c := range changeSet {
@@ -828,9 +919,11 @@ func changesByZone(zones map[string]*route53.HostedZone, changeSet []*route53.Ch
828919
aliasTarget := *rrset.AliasTarget
829920
aliasTarget.HostedZoneId = aws.String(cleanZoneID(aws.StringValue(z.Id)))
830921
rrset.AliasTarget = &aliasTarget
831-
c = &route53.Change{
832-
Action: c.Action,
833-
ResourceRecordSet: &rrset,
922+
c = &Route53Change{
923+
Change: route53.Change{
924+
Action: c.Action,
925+
ResourceRecordSet: &rrset,
926+
},
834927
}
835928
}
836929
changes[aws.StringValue(z.Id)] = append(changes[aws.StringValue(z.Id)], c)

0 commit comments

Comments
 (0)