Skip to content

Attempt to fix acl tests #555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 165 additions & 3 deletions kafka/resource_kafka_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"fmt"
"log"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -77,6 +78,15 @@

d.SetId(a.String())

// Wait for ACL to be visible in Kafka before returning
// This handles eventual consistency and ensures the ACL is actually created
log.Printf("[INFO] Waiting for ACL %s to be visible in Kafka", a)
err = waitForACLToBeVisible(ctx, c, a)
if err != nil {
log.Printf("[ERROR] ACL created but not visible: %v", err)
return diag.FromErr(err)
}

return nil
}

Expand All @@ -89,6 +99,16 @@
if err != nil {
return diag.FromErr(err)
}

// Wait for ACL to be removed from Kafka before returning
// This handles eventual consistency and ensures the ACL is actually deleted
log.Printf("[INFO] Waiting for ACL %s to be removed from Kafka", a)
err = waitForACLToBeDeleted(ctx, c, a)
if err != nil {
log.Printf("[ERROR] ACL deletion requested but still visible: %v", err)
return diag.FromErr(err)
}

return nil
}

Expand All @@ -103,15 +123,15 @@
return diag.FromErr(err)
}

aclNotFound := true
found := false

for _, foundACLs := range currentACLs {
// find only ACLs where ResourceName matches
if foundACLs.ResourceName != a.Resource.Name {
continue
}
if len(foundACLs.Acls) < 1 {
break
continue
}
log.Printf("[INFO] Found (%d) ACL(s) for Resource %s: %+v.", len(foundACLs.Acls), foundACLs.ResourceName, foundACLs)

Expand All @@ -132,12 +152,13 @@

// exact match
if a.String() == aclID.String() {
found = true

Check failure on line 155 in kafka/resource_kafka_acl.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to found (ineffassign)
return nil
}
}
}

if aclNotFound {
if !found {
log.Printf("[INFO] Did not find ACL %s", a.String())
d.SetId("")
}
Expand Down Expand Up @@ -195,3 +216,144 @@
}
return s
}

// waitForACLToBeVisible waits for an ACL to be visible in Kafka after creation
// This handles eventual consistency issues with Kafka ACL propagation
func waitForACLToBeVisible(ctx context.Context, c *LazyClient, expectedACL StringlyTypedACL) error {
maxRetries := 10
retryInterval := 200 * time.Millisecond

for i := 0; i < maxRetries; i++ {
// Check if context is cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Invalidate cache to ensure we get fresh data
err := c.InvalidateACLCache()
if err != nil {
return fmt.Errorf("failed to invalidate ACL cache: %w", err)
}

// List all ACLs
acls, err := c.ListACLs()
if err != nil {
return fmt.Errorf("failed to list ACLs: %w", err)
}

// Check if our ACL exists
for _, foundACLs := range acls {
if foundACLs.ResourceName != expectedACL.Resource.Name {
continue
}

for _, acl := range foundACLs.Acls {
foundACL := StringlyTypedACL{
ACL: ACL{
Principal: acl.Principal,
Host: acl.Host,
Operation: ACLOperationToString(acl.Operation),
PermissionType: ACLPermissionTypeToString(acl.PermissionType),
},
Resource: Resource{
Type: ACLResourceToString(foundACLs.ResourceType),
Name: foundACLs.ResourceName,
PatternTypeFilter: foundACLs.ResourcePatternType.String(),
},
}

// Check for exact match
if expectedACL.String() == foundACL.String() {
log.Printf("[INFO] ACL %s is now visible in Kafka (attempt %d)", expectedACL, i+1)
return nil
}
}
}

// If not found and not the last attempt, wait before retrying
if i < maxRetries-1 {
log.Printf("[DEBUG] ACL %s not yet visible, retrying in %v (attempt %d/%d)", expectedACL, retryInterval, i+1, maxRetries)
time.Sleep(retryInterval)
}
}

return fmt.Errorf("ACL %s was not visible in Kafka after %d attempts over %v", expectedACL, maxRetries, time.Duration(maxRetries)*retryInterval)
}

// waitForACLToBeDeleted waits for an ACL to be removed from Kafka after deletion
// This handles eventual consistency issues with Kafka ACL propagation
func waitForACLToBeDeleted(ctx context.Context, c *LazyClient, deletedACL StringlyTypedACL) error {
maxRetries := 10
retryInterval := 200 * time.Millisecond

for i := 0; i < maxRetries; i++ {
// Check if context is cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Invalidate cache to ensure we get fresh data
err := c.InvalidateACLCache()
if err != nil {
return fmt.Errorf("failed to invalidate ACL cache: %w", err)
}

// List all ACLs
acls, err := c.ListACLs()
if err != nil {
return fmt.Errorf("failed to list ACLs: %w", err)
}

// Check if our ACL still exists
found := false
for _, foundACLs := range acls {
if foundACLs.ResourceName != deletedACL.Resource.Name {
continue
}

for _, acl := range foundACLs.Acls {
foundACL := StringlyTypedACL{
ACL: ACL{
Principal: acl.Principal,
Host: acl.Host,
Operation: ACLOperationToString(acl.Operation),
PermissionType: ACLPermissionTypeToString(acl.PermissionType),
},
Resource: Resource{
Type: ACLResourceToString(foundACLs.ResourceType),
Name: foundACLs.ResourceName,
PatternTypeFilter: foundACLs.ResourcePatternType.String(),
},
}

// Check for exact match
if deletedACL.String() == foundACL.String() {
found = true
break
}
}

if found {
break
}
}

// If not found, the ACL has been successfully deleted
if !found {
log.Printf("[INFO] ACL %s has been removed from Kafka (attempt %d)", deletedACL, i+1)
return nil
}

// If still found and not the last attempt, wait before retrying
if i < maxRetries-1 {
log.Printf("[DEBUG] ACL %s still visible, retrying in %v (attempt %d/%d)", deletedACL, retryInterval, i+1, maxRetries)
time.Sleep(retryInterval)
}
}

return fmt.Errorf("ACL %s was still visible in Kafka after %d attempts over %v", deletedACL, maxRetries, time.Duration(maxRetries)*retryInterval)
}
Loading