From 7760ae52376b118487661912e51764e010eed64c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Mon, 2 Mar 2026 22:01:37 +0100 Subject: [PATCH 1/4] Do not iterate over all ACLs to find one by name --- .../acl/AclNonNamespacedController.java | 4 +- .../KafkaAccessControlEntryRepository.java | 53 ++++++++++++++----- .../michelin/ns4kafka/service/AclService.java | 5 +- .../AclNonNamespacedControllerTest.java | 3 +- .../ns4kafka/service/AclServiceTest.java | 3 +- 5 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/acl/AclNonNamespacedController.java b/src/main/java/com/michelin/ns4kafka/controller/acl/AclNonNamespacedController.java index 97f2731bd..47577a2f6 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/acl/AclNonNamespacedController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/acl/AclNonNamespacedController.java @@ -29,7 +29,7 @@ import io.micronaut.security.utils.SecurityService; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.annotation.security.RolesAllowed; -import java.util.List; +import java.util.Collection; /** Non-namespaced controller to manage ACLs. */ @Tag(name = "ACLs", description = "Manage the ACLs.") @@ -59,7 +59,7 @@ public AclNonNamespacedController( * @return A list of ACLs */ @Get - public List listAll() { + public Collection listAll() { return aclService.findAll(); } } diff --git a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java index 8df1c4889..832ac9c34 100644 --- a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java +++ b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java @@ -70,32 +70,57 @@ String getMessageKey(AccessControlEntry accessControlEntry) { + accessControlEntry.getMetadata().getName(); } + /** + * Find all ACLs. + * + * @return A collection of ACLs + */ @Override - public AccessControlEntry create(AccessControlEntry accessControlEntry) { - return this.produce(getMessageKey(accessControlEntry), accessControlEntry); + public Collection findAll() { + return getKafkaStore().values(); } + /** + * Find an ACL by name. + * + * @param namespace The namespace of the ACL + * @param name The name of the ACL + * @return An optional containing the ACL if found, or empty if not found + */ @Override - public void delete(AccessControlEntry accessControlEntry) { - produce(getMessageKey(accessControlEntry), null); + public Optional findByName(String namespace, String name) { + return Optional.ofNullable(getKafkaStore().get(namespace + "/" + name)); } + /** + * Create an ACL. + * + * @param accessControlEntry The ACL to create + * @return The created ACL + */ @Override - public Optional findByName(String namespace, String name) { - return getKafkaStore().values().stream() - .filter(ace -> ace.getMetadata().getNamespace().equals(namespace)) - .filter(ace -> ace.getMetadata().getName().equals(name)) - .findFirst(); + public AccessControlEntry create(AccessControlEntry accessControlEntry) { + return this.produce(getMessageKey(accessControlEntry), accessControlEntry); } + /** + * Delete an ACL. + * + * @param accessControlEntry The ACL to delete + */ @Override - @Topic(value = "${ns4kafka.store.kafka.topics.prefix}.access-control-entries") - void receive(ConsumerRecord message) { - super.receive(message); + public void delete(AccessControlEntry accessControlEntry) { + produce(getMessageKey(accessControlEntry), null); } + /** + * Receive messages from Kafka topic and update the store accordingly. + * + * @param message The record + */ @Override - public Collection findAll() { - return getKafkaStore().values(); + @Topic(value = "${ns4kafka.store.kafka.topics.prefix}.access-control-entries") + public void receive(ConsumerRecord message) { + super.receive(message); } } diff --git a/src/main/java/com/michelin/ns4kafka/service/AclService.java b/src/main/java/com/michelin/ns4kafka/service/AclService.java index 4c721d033..f4c9bde90 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AclService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AclService.java @@ -38,6 +38,7 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -509,8 +510,8 @@ public List findAllForCluster(String cluster) { * * @return A list of ACLs */ - public List findAll() { - return new ArrayList<>(accessControlEntryRepository.findAll()); + public Collection findAll() { + return accessControlEntryRepository.findAll(); } /** diff --git a/src/test/java/com/michelin/ns4kafka/controller/AclNonNamespacedControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/AclNonNamespacedControllerTest.java index 7a277eb70..4f14ca30e 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/AclNonNamespacedControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/AclNonNamespacedControllerTest.java @@ -25,6 +25,7 @@ import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.Metadata; import com.michelin.ns4kafka.service.AclService; +import java.util.Collection; import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -58,7 +59,7 @@ void shouldListAcls() { when(aclService.findAll()).thenReturn(List.of(accessControlEntry, accessControlEntry2)); - List actual = aclNonNamespacedController.listAll(); + Collection actual = aclNonNamespacedController.listAll(); assertEquals(2, actual.size()); assertEquals(List.of(accessControlEntry, accessControlEntry2), actual); diff --git a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java index 3ed97a265..870f943b8 100644 --- a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java @@ -37,6 +37,7 @@ import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor; import io.micronaut.context.ApplicationContext; import io.micronaut.inject.qualifiers.Qualifiers; +import java.util.Collection; import java.util.List; import java.util.Optional; import org.junit.jupiter.api.Test; @@ -988,7 +989,7 @@ void shouldFindAllAcls() { when(accessControlEntryRepository.findAll()).thenReturn(List.of(ace1, ace2, ace3)); - List actual = aclService.findAll(); + Collection actual = aclService.findAll(); assertEquals(3, actual.size()); } From f16e2f51ea4973bb5822afd6a56585f75fa00ad0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Mon, 2 Mar 2026 22:11:36 +0100 Subject: [PATCH 2/4] Add comment --- .../AccessControlEntryRepository.java | 23 +++++++++++++++++++ .../KafkaAccessControlEntryRepository.java | 8 ++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/michelin/ns4kafka/repository/AccessControlEntryRepository.java b/src/main/java/com/michelin/ns4kafka/repository/AccessControlEntryRepository.java index d849c7637..097c9ea9d 100644 --- a/src/main/java/com/michelin/ns4kafka/repository/AccessControlEntryRepository.java +++ b/src/main/java/com/michelin/ns4kafka/repository/AccessControlEntryRepository.java @@ -24,11 +24,34 @@ /** Access control entry repository. */ public interface AccessControlEntryRepository { + /** + * Find all ACLs. + * + * @return A collection of ACLs + */ Collection findAll(); + /** + * Find an ACL by name. + * + * @param namespace The namespace of the ACL + * @param name The name of the ACL + * @return An optional containing the ACL if found, or empty if not found + */ Optional findByName(String namespace, String name); + /** + * Create an ACL. + * + * @param accessControlEntry The ACL to create + * @return The created ACL + */ AccessControlEntry create(AccessControlEntry accessControlEntry); + /** + * Delete an ACL. + * + * @param accessControlEntry The ACL to delete + */ void delete(AccessControlEntry accessControlEntry); } diff --git a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java index 832ac9c34..89bc000e5 100644 --- a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java +++ b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java @@ -64,8 +64,14 @@ public KafkaAccessControlEntryRepository( super(kafkaTopic, kafkaProducer, adminClient, ns4KafkaProperties, taskScheduler); } + /** + * Get the message key for an ACL. + * + * @param accessControlEntry The ACL + * @return The message key + */ @Override - String getMessageKey(AccessControlEntry accessControlEntry) { + public String getMessageKey(AccessControlEntry accessControlEntry) { return accessControlEntry.getMetadata().getNamespace() + "/" + accessControlEntry.getMetadata().getName(); } From 281ebca1f6928b2e4fb4f750728343dcd62e43aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Mon, 2 Mar 2026 23:18:46 +0100 Subject: [PATCH 3/4] Do not instantiate static collections each time --- .../KafkaAccessControlEntryRepository.java | 2 +- .../michelin/ns4kafka/service/AclService.java | 38 ++++++++----------- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java index 89bc000e5..831aad0e9 100644 --- a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java +++ b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaAccessControlEntryRepository.java @@ -106,7 +106,7 @@ public Optional findByName(String namespace, String name) { */ @Override public AccessControlEntry create(AccessControlEntry accessControlEntry) { - return this.produce(getMessageKey(accessControlEntry), accessControlEntry); + return produce(getMessageKey(accessControlEntry), accessControlEntry); } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/AclService.java b/src/main/java/com/michelin/ns4kafka/service/AclService.java index f4c9bde90..049a79ded 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AclService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AclService.java @@ -37,15 +37,18 @@ import io.micronaut.inject.qualifiers.Qualifiers; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; /** Access control entry service. */ @Singleton public class AclService { + private static final Set ALLOWED_RESOURCE_TYPES = + EnumSet.of(AccessControlEntry.ResourceType.TOPIC, AccessControlEntry.ResourceType.CONNECT_CLUSTER); + private static final Set ALLOWED_PERMISSIONS = + EnumSet.of(AccessControlEntry.Permission.READ, AccessControlEntry.Permission.WRITE); + private static final Set ALLOWED_PATTERN_TYPES = + EnumSet.of(AccessControlEntry.ResourcePatternType.LITERAL, AccessControlEntry.ResourcePatternType.PREFIXED); public static final String PUBLIC_GRANTED_TO = "*"; @Inject @@ -73,36 +76,25 @@ public boolean isPublicAcl(AccessControlEntry accessControlEntry) { */ public List validate(AccessControlEntry accessControlEntry, Namespace namespace) { List validationErrors = new ArrayList<>(); - // Which resource can be granted cross namespaces - List allowedResourceTypes = - List.of(AccessControlEntry.ResourceType.TOPIC, AccessControlEntry.ResourceType.CONNECT_CLUSTER); - - // Which permission can be granted cross namespaces: READ, WRITE - // Only admin can grant OWNER - List allowedPermissions = - List.of(AccessControlEntry.Permission.READ, AccessControlEntry.Permission.WRITE); - - // Which patternTypes can be granted - List allowedPatternTypes = List.of( - AccessControlEntry.ResourcePatternType.LITERAL, AccessControlEntry.ResourcePatternType.PREFIXED); - - if (!allowedResourceTypes.contains(accessControlEntry.getSpec().getResourceType())) { + if (!ALLOWED_RESOURCE_TYPES.contains(accessControlEntry.getSpec().getResourceType())) { validationErrors.add(invalidAclResourceType( String.valueOf(accessControlEntry.getSpec().getResourceType()), - allowedResourceTypes.stream().map(Object::toString).collect(Collectors.joining(", ")))); + ALLOWED_RESOURCE_TYPES.stream().map(Object::toString).collect(Collectors.joining(", ")))); } - if (!allowedPermissions.contains(accessControlEntry.getSpec().getPermission())) { + // Which permission can be granted cross namespaces + if (!ALLOWED_PERMISSIONS.contains(accessControlEntry.getSpec().getPermission())) { validationErrors.add(invalidAclPermission( String.valueOf(accessControlEntry.getSpec().getPermission()), - allowedPermissions.stream().map(Object::toString).collect(Collectors.joining(", ")))); + ALLOWED_PERMISSIONS.stream().map(Object::toString).collect(Collectors.joining(", ")))); } - if (!allowedPatternTypes.contains(accessControlEntry.getSpec().getResourcePatternType())) { + // Which pattern types can be granted cross namespaces + if (!ALLOWED_PATTERN_TYPES.contains(accessControlEntry.getSpec().getResourcePatternType())) { validationErrors.add(invalidAclPatternType( String.valueOf(accessControlEntry.getSpec().getResourcePatternType()), - allowedPatternTypes.stream().map(Object::toString).collect(Collectors.joining(", ")))); + ALLOWED_PATTERN_TYPES.stream().map(Object::toString).collect(Collectors.joining(", ")))); } // GrantedTo namespace exists? From 806d5f828a103c1eec8a5cdd3decf2bb14ee52e5 Mon Sep 17 00:00:00 2001 From: Thomas Cai Date: Tue, 3 Mar 2026 11:26:24 +0100 Subject: [PATCH 4/4] Fix import --- .../java/com/michelin/ns4kafka/service/AclService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/michelin/ns4kafka/service/AclService.java b/src/main/java/com/michelin/ns4kafka/service/AclService.java index 049a79ded..2d93cfe75 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AclService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AclService.java @@ -37,7 +37,12 @@ import io.micronaut.inject.qualifiers.Qualifiers; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** Access control entry service. */