Skip to content

Commit 7dd84a5

Browse files
committed
Route53: retry single changes in a batch if the batch fails
If a single change fails during the retry, it will be added to a queue. In the next iteration, changes from this queue will be submitted after all other changes. When submitting single changes, they are always submitted as batches of changes with the same DNS name and ownership relation to avoid inconsistency between the record created and the TXT records.
1 parent adf6ad7 commit 7dd84a5

File tree

6 files changed

+252
-85
lines changed

6 files changed

+252
-85
lines changed

endpoint/labels.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
OwnerLabelKey = "owner"
3333
// ResourceLabelKey is the name of the label that identifies k8s resource which wants to acquire the DNS name
3434
ResourceLabelKey = "resource"
35+
// OwnedRecordLabelKey is the name of the label that identifies the record that is owned by the labeled TXT registry record
36+
OwnedRecordLabelKey = "ownedRecord"
3537

3638
// AWSSDDescriptionLabel label responsible for storing raw owner/resource combination information in the Labels
3739
// supposed to be inserted by AWS SD Provider, and parsed into OwnerLabelKey and ResourceLabelKey key by AWS SD Registry

internal/testutils/endpoint.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func SameEndpoint(a, b *endpoint.Endpoint) bool {
6262
return a.DNSName == b.DNSName && a.Targets.Same(b.Targets) && a.RecordType == b.RecordType && a.SetIdentifier == b.SetIdentifier &&
6363
a.Labels[endpoint.OwnerLabelKey] == b.Labels[endpoint.OwnerLabelKey] && a.RecordTTL == b.RecordTTL &&
6464
a.Labels[endpoint.ResourceLabelKey] == b.Labels[endpoint.ResourceLabelKey] &&
65+
a.Labels[endpoint.OwnedRecordLabelKey] == b.Labels[endpoint.OwnedRecordLabelKey] &&
6566
SameProviderSpecific(a.ProviderSpecific, b.ProviderSpecific)
6667
}
6768

provider/aws/aws.go

Lines changed: 102 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ type AWSProvider struct {
176176
zoneTagFilter provider.ZoneTagFilter
177177
preferCNAME bool
178178
zonesCache *zonesListCache
179+
// queue for collecting changes to submit them in the next iteration, but after all other changes
180+
failedChangesQueue map[string]Route53Changes
179181
}
180182

181183
// AWSConfig contains configuration to create a new AWS provider.
@@ -240,6 +242,7 @@ func NewAWSProvider(awsConfig AWSConfig) (*AWSProvider, error) {
240242
preferCNAME: awsConfig.PreferCNAME,
241243
dryRun: awsConfig.DryRun,
242244
zonesCache: &zonesListCache{duration: awsConfig.ZoneCacheDuration},
245+
failedChangesQueue: make(map[string]Route53Changes),
243246
}
244247

245248
return provider, nil
@@ -556,9 +559,16 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes Route53Changes,
556559
for z, cs := range changesByZone {
557560
var failedUpdate bool
558561

559-
batchCs := batchChangeSet(cs, p.batchChangeSize)
562+
// group changes into new changes and into changes that failed in a previous iteration and are retried
563+
retriedChanges, newChanges := findChangesInQueue(cs, p.failedChangesQueue[z])
564+
p.failedChangesQueue[z] = nil
560565

566+
batchCs := append(batchChangeSet(newChanges, p.batchChangeSize), batchChangeSet(retriedChanges, p.batchChangeSize)...)
561567
for i, b := range batchCs {
568+
if len(b) == 0 {
569+
continue
570+
}
571+
562572
for _, c := range b {
563573
log.Infof("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
564574
}
@@ -571,13 +581,41 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes Route53Changes,
571581
},
572582
}
573583

584+
successfulChanges := 0
585+
574586
if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
575-
log.Errorf("Failure in zone %s [Id: %s]", aws.StringValue(zones[z].Name), z)
576-
log.Error(err) // TODO(ideahitme): consider changing the interface in cases when this error might be a concern for other components
577-
failedUpdate = true
587+
log.Errorf("Failure in zone %s [Id: %s] when submitting change batch: %v", aws.StringValue(zones[z].Name), z, err)
588+
589+
changesByOwnership := groupChangesByNameAndOwnershipRelation(b)
590+
591+
if len(changesByOwnership) > 1 {
592+
log.Debug("Trying to submit change sets one-by-one instead")
593+
594+
for _, changes := range changesByOwnership {
595+
for _, c := range changes {
596+
log.Debugf("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
597+
}
598+
params.ChangeBatch = &route53.ChangeBatch{
599+
Changes: changes.Route53Changes(),
600+
}
601+
if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
602+
failedUpdate = true
603+
log.Errorf("Failed submitting change (error: %v), it will be retried in a separate change batch in the next iteration", err)
604+
p.failedChangesQueue[z] = append(p.failedChangesQueue[z], changes...)
605+
} else {
606+
successfulChanges = successfulChanges + len(changes)
607+
}
608+
}
609+
} else {
610+
failedUpdate = true
611+
}
578612
} else {
613+
successfulChanges = len(b)
614+
}
615+
616+
if successfulChanges > 0 {
579617
// z is the R53 Hosted Zone ID already as aws.StringValue
580-
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", len(b), aws.StringValue(zones[z].Name), z)
618+
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", successfulChanges, aws.StringValue(zones[z].Name), z)
581619
}
582620

583621
if i != len(batchCs)-1 {
@@ -736,9 +774,51 @@ func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*Route53C
736774
change.ResourceRecordSet.HealthCheckId = aws.String(prop.Value)
737775
}
738776

777+
if ownedRecord, ok := ep.Labels[endpoint.OwnedRecordLabelKey]; ok {
778+
change.OwnedRecord = ownedRecord
779+
}
780+
739781
return change, dualstack
740782
}
741783

784+
// searches for `changes` that are contained in `queue` and returns the `changes` separated by whether they were found in the queue (`foundChanges`) or not (`notFoundChanges`)
785+
func findChangesInQueue(changes Route53Changes, queue Route53Changes) (foundChanges, notFoundChanges Route53Changes) {
786+
if queue == nil {
787+
return Route53Changes{}, changes
788+
}
789+
790+
for _, c := range changes {
791+
found := false
792+
for _, qc := range queue {
793+
if c == qc {
794+
foundChanges = append(foundChanges, c)
795+
found = true
796+
break
797+
}
798+
}
799+
if !found {
800+
notFoundChanges = append(notFoundChanges, c)
801+
}
802+
}
803+
804+
return
805+
}
806+
807+
// group the given changes by name and ownership relation to ensure these are always submitted in the same transaction to Route53;
808+
// grouping by name is done to always submit changes with the same name but different set identifier together,
809+
// grouping by ownership relation is done to always submit changes of records and e.g. their corresponding TXT registry records together
810+
func groupChangesByNameAndOwnershipRelation(cs Route53Changes) map[string]Route53Changes {
811+
changesByOwnership := make(map[string]Route53Changes)
812+
for _, v := range cs {
813+
key := v.OwnedRecord
814+
if key == "" {
815+
key = aws.StringValue(v.ResourceRecordSet.Name)
816+
}
817+
changesByOwnership[key] = append(changesByOwnership[key], v)
818+
}
819+
return changesByOwnership
820+
}
821+
742822
func (p *AWSProvider) tagsForZone(ctx context.Context, zoneID string) (map[string]string, error) {
743823
response, err := p.client.ListTagsForResourceWithContext(ctx, &route53.ListTagsForResourceInput{
744824
ResourceType: aws.String("hostedzone"),
@@ -762,41 +842,34 @@ func batchChangeSet(cs Route53Changes, batchSize int) []Route53Changes {
762842

763843
batchChanges := make([]Route53Changes, 0)
764844

765-
changesByName := make(map[string]Route53Changes)
766-
for _, v := range cs {
767-
changesByName[*v.ResourceRecordSet.Name] = append(changesByName[*v.ResourceRecordSet.Name], v)
768-
}
845+
changesByOwnership := groupChangesByNameAndOwnershipRelation(cs)
769846

770847
names := make([]string, 0)
771-
for v := range changesByName {
848+
for v := range changesByOwnership {
772849
names = append(names, v)
773850
}
774851
sort.Strings(names)
775852

776-
for _, name := range names {
777-
totalChangesByName := len(changesByName[name])
778-
779-
if totalChangesByName > batchSize {
780-
log.Warnf("Total changes for %s exceeds max batch size of %d, total changes: %d", name,
781-
batchSize, totalChangesByName)
853+
currentBatch := Route53Changes{}
854+
for k, name := range names {
855+
v := changesByOwnership[name]
856+
if len(v) > batchSize {
857+
log.Warnf("Total changes for %v exceeds max batch size of %d, total changes: %d; changes will not be performed", k, batchSize, len(v))
782858
continue
783859
}
784860

785-
var existingBatch bool
786-
for i, b := range batchChanges {
787-
if len(b)+totalChangesByName <= batchSize {
788-
batchChanges[i] = append(batchChanges[i], changesByName[name]...)
789-
existingBatch = true
790-
break
791-
}
792-
}
793-
if !existingBatch {
794-
batchChanges = append(batchChanges, changesByName[name])
861+
if len(currentBatch)+len(v) > batchSize {
862+
// currentBatch would be too large if we add this changeset;
863+
// add currentBatch to batchChanges and start a new currentBatch
864+
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
865+
currentBatch = append(Route53Changes{}, v...)
866+
} else {
867+
currentBatch = append(currentBatch, v...)
795868
}
796869
}
797-
798-
for i, batch := range batchChanges {
799-
batchChanges[i] = sortChangesByActionNameType(batch)
870+
if len(currentBatch) > 0 {
871+
// add final currentBatch
872+
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
800873
}
801874

802875
return batchChanges

provider/aws/aws_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,65 @@ func TestAWSsubmitChangesError(t *testing.T) {
847847
require.Error(t, provider.submitChanges(ctx, cs, zones))
848848
}
849849

850+
func TestAWSsubmitChangesRetryOnError(t *testing.T) {
851+
provider, clientStub := newAWSProvider(t, endpoint.NewDomainFilter([]string{"ext-dns-test-2.teapot.zalan.do."}), provider.NewZoneIDFilter([]string{}), provider.NewZoneTypeFilter(""), defaultEvaluateTargetHealth, false, []*endpoint.Endpoint{})
852+
853+
ctx := context.Background()
854+
zones, err := provider.Zones(ctx)
855+
require.NoError(t, err)
856+
857+
ep1 := endpoint.NewEndpointWithTTL("success.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.1")
858+
ep2 := endpoint.NewEndpointWithTTL("fail.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.2")
859+
ep3 := endpoint.NewEndpointWithTTL("success2.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.3")
860+
861+
ep2txt := endpoint.NewEndpointWithTTL("fail__edns_housekeeping.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeTXT, endpoint.TTL(recordTTL), "something") // "__edns_housekeeping" is the TXT suffix
862+
ep2txt.Labels = map[string]string{
863+
endpoint.OwnedRecordLabelKey: "fail.zone-1.ext-dns-test-2.teapot.zalan.do",
864+
}
865+
866+
// "success" and "fail" are created in the first step, both are submitted in the same batch; this should fail
867+
cs1 := provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep2, ep2txt, ep1})
868+
input1 := &route53.ChangeResourceRecordSetsInput{
869+
HostedZoneId: aws.String("/hostedzone/zone-1.ext-dns-test-2.teapot.zalan.do."),
870+
ChangeBatch: &route53.ChangeBatch{
871+
Changes: cs1.Route53Changes(),
872+
},
873+
}
874+
clientStub.MockMethod("ChangeResourceRecordSets", input1).Return(nil, fmt.Errorf("Mock route53 failure"))
875+
876+
// because of the failure, changes will be retried one by one; make "fail" submitted in its own batch fail as well
877+
cs2 := provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep2, ep2txt})
878+
input2 := &route53.ChangeResourceRecordSetsInput{
879+
HostedZoneId: aws.String("/hostedzone/zone-1.ext-dns-test-2.teapot.zalan.do."),
880+
ChangeBatch: &route53.ChangeBatch{
881+
Changes: cs2.Route53Changes(),
882+
},
883+
}
884+
clientStub.MockMethod("ChangeResourceRecordSets", input2).Return(nil, fmt.Errorf("Mock route53 failure"))
885+
886+
// "success" should have been created, verify that we still get an error because "fail" failed
887+
require.Error(t, provider.submitChanges(ctx, cs1, zones))
888+
889+
// assert that "success" was successfully created and "fail" and its TXT record were not
890+
records, err := provider.Records(ctx)
891+
require.NoError(t, err)
892+
require.True(t, containsRecordWithDNSName(records, "success.zone-1.ext-dns-test-2.teapot.zalan.do"))
893+
require.False(t, containsRecordWithDNSName(records, "fail.zone-1.ext-dns-test-2.teapot.zalan.do"))
894+
require.False(t, containsRecordWithDNSName(records, "fail__edns_housekeeping.zone-1.ext-dns-test-2.teapot.zalan.do"))
895+
896+
// next batch should contain "fail" and "success2", should succeed this time
897+
cs3 := provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep2, ep2txt, ep3})
898+
require.NoError(t, provider.submitChanges(ctx, cs3, zones))
899+
900+
// verify all records are there
901+
records, err = provider.Records(ctx)
902+
require.NoError(t, err)
903+
require.True(t, containsRecordWithDNSName(records, "success.zone-1.ext-dns-test-2.teapot.zalan.do"))
904+
require.True(t, containsRecordWithDNSName(records, "fail.zone-1.ext-dns-test-2.teapot.zalan.do"))
905+
require.True(t, containsRecordWithDNSName(records, "success2.zone-1.ext-dns-test-2.teapot.zalan.do"))
906+
require.True(t, containsRecordWithDNSName(records, "fail__edns_housekeeping.zone-1.ext-dns-test-2.teapot.zalan.do"))
907+
}
908+
850909
func TestAWSBatchChangeSet(t *testing.T) {
851910
var cs Route53Changes
852911

@@ -1375,6 +1434,7 @@ func newAWSProviderWithTagFilter(t *testing.T, domainFilter endpoint.DomainFilte
13751434
zoneTagFilter: zoneTagFilter,
13761435
dryRun: false,
13771436
zonesCache: &zonesListCache{duration: 1 * time.Minute},
1437+
failedChangesQueue: make(map[string]Route53Changes),
13781438
}
13791439

13801440
createAWSZone(t, provider, &route53.HostedZone{
@@ -1449,6 +1509,15 @@ func validateRecords(t *testing.T, records []*route53.ResourceRecordSet, expecte
14491509
assert.ElementsMatch(t, expected, records)
14501510
}
14511511

1512+
func containsRecordWithDNSName(records []*endpoint.Endpoint, dnsName string) bool {
1513+
for _, record := range records {
1514+
if record.DNSName == dnsName {
1515+
return true
1516+
}
1517+
}
1518+
return false
1519+
}
1520+
14521521
func TestRequiresDeleteCreate(t *testing.T) {
14531522
provider, _ := newAWSProvider(t, endpoint.NewDomainFilter([]string{"foo.bar."}), provider.NewZoneIDFilter([]string{}), provider.NewZoneTypeFilter(""), defaultEvaluateTargetHealth, false, []*endpoint.Endpoint{})
14541523

registry/txt.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func (im *TXTRegistry) generateTXTRecord(r *endpoint.Endpoint) []*endpoint.Endpo
198198
txt := endpoint.NewEndpoint(im.mapper.toTXTName(r.DNSName), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
199199
if txt != nil {
200200
txt.WithSetIdentifier(r.SetIdentifier)
201+
txt.Labels[endpoint.OwnedRecordLabelKey] = r.DNSName
201202
txt.ProviderSpecific = r.ProviderSpecific
202203
endpoints = append(endpoints, txt)
203204
}
@@ -206,6 +207,7 @@ func (im *TXTRegistry) generateTXTRecord(r *endpoint.Endpoint) []*endpoint.Endpo
206207
txtNew := endpoint.NewEndpoint(im.mapper.toNewTXTName(r.DNSName, r.RecordType), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
207208
if txtNew != nil {
208209
txtNew.WithSetIdentifier(r.SetIdentifier)
210+
txtNew.Labels[endpoint.OwnedRecordLabelKey] = r.DNSName
209211
txtNew.ProviderSpecific = r.ProviderSpecific
210212
endpoints = append(endpoints, txtNew)
211213
}

0 commit comments

Comments
 (0)