Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.micronaut.security.utils.SecurityService;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.security.RolesAllowed;
import java.util.List;
import java.util.Collection;

/** Non-namespaced controller to manage ACLs. */
@Tag(name = "ACLs", description = "Manage the ACLs.")
Expand Down Expand Up @@ -59,7 +59,7 @@ public AclNonNamespacedController(
* @return A list of ACLs
*/
@Get
public List<AccessControlEntry> listAll() {
public Collection<AccessControlEntry> listAll() {
return aclService.findAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,34 @@

/** Access control entry repository. */
public interface AccessControlEntryRepository {
/**
* Find all ACLs.
*
* @return A collection of ACLs
*/
Collection<AccessControlEntry> findAll();

/**
* Find an ACL by name.
*
* @param namespace The namespace of the ACL
* @param name The name of the ACL
* @return An optional containing the ACL if found, or empty if not found
*/
Optional<AccessControlEntry> findByName(String namespace, String name);

/**
* Create an ACL.
*
* @param accessControlEntry The ACL to create
* @return The created ACL
*/
AccessControlEntry create(AccessControlEntry accessControlEntry);

/**
* Delete an ACL.
*
* @param accessControlEntry The ACL to delete
*/
void delete(AccessControlEntry accessControlEntry);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,69 @@ public KafkaAccessControlEntryRepository(
super(kafkaTopic, kafkaProducer, adminClient, ns4KafkaProperties, taskScheduler);
}

/**
* Get the message key for an ACL.
*
* @param accessControlEntry The ACL
* @return The message key
*/
@Override
String getMessageKey(AccessControlEntry accessControlEntry) {
public String getMessageKey(AccessControlEntry accessControlEntry) {
return accessControlEntry.getMetadata().getNamespace() + "/"
+ accessControlEntry.getMetadata().getName();
}

/**
* Find all ACLs.
*
* @return A collection of ACLs
*/
@Override
public AccessControlEntry create(AccessControlEntry accessControlEntry) {
return this.produce(getMessageKey(accessControlEntry), accessControlEntry);
public Collection<AccessControlEntry> findAll() {
return getKafkaStore().values();
}

/**
* Find an ACL by name.
*
* @param namespace The namespace of the ACL
* @param name The name of the ACL
* @return An optional containing the ACL if found, or empty if not found
*/
@Override
public void delete(AccessControlEntry accessControlEntry) {
produce(getMessageKey(accessControlEntry), null);
public Optional<AccessControlEntry> findByName(String namespace, String name) {
return Optional.ofNullable(getKafkaStore().get(namespace + "/" + name));
}

/**
* Create an ACL.
*
* @param accessControlEntry The ACL to create
* @return The created ACL
*/
@Override
public Optional<AccessControlEntry> findByName(String namespace, String name) {
return getKafkaStore().values().stream()
.filter(ace -> ace.getMetadata().getNamespace().equals(namespace))
.filter(ace -> ace.getMetadata().getName().equals(name))
.findFirst();
public AccessControlEntry create(AccessControlEntry accessControlEntry) {
return produce(getMessageKey(accessControlEntry), accessControlEntry);
}

/**
* Delete an ACL.
*
* @param accessControlEntry The ACL to delete
*/
@Override
@Topic(value = "${ns4kafka.store.kafka.topics.prefix}.access-control-entries")
void receive(ConsumerRecord<String, AccessControlEntry> message) {
super.receive(message);
public void delete(AccessControlEntry accessControlEntry) {
produce(getMessageKey(accessControlEntry), null);
}

/**
* Receive messages from Kafka topic and update the store accordingly.
*
* @param message The record
*/
@Override
public Collection<AccessControlEntry> findAll() {
return getKafkaStore().values();
@Topic(value = "${ns4kafka.store.kafka.topics.prefix}.access-control-entries")
public void receive(ConsumerRecord<String, AccessControlEntry> message) {
super.receive(message);
}
}
40 changes: 19 additions & 21 deletions src/main/java/com/michelin/ns4kafka/service/AclService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,22 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/** Access control entry service. */
@Singleton
public class AclService {
private static final Set<AccessControlEntry.ResourceType> ALLOWED_RESOURCE_TYPES =
EnumSet.of(AccessControlEntry.ResourceType.TOPIC, AccessControlEntry.ResourceType.CONNECT_CLUSTER);
private static final Set<AccessControlEntry.Permission> ALLOWED_PERMISSIONS =
EnumSet.of(AccessControlEntry.Permission.READ, AccessControlEntry.Permission.WRITE);
private static final Set<AccessControlEntry.ResourcePatternType> ALLOWED_PATTERN_TYPES =
EnumSet.of(AccessControlEntry.ResourcePatternType.LITERAL, AccessControlEntry.ResourcePatternType.PREFIXED);
public static final String PUBLIC_GRANTED_TO = "*";

@Inject
Expand Down Expand Up @@ -72,36 +81,25 @@ public boolean isPublicAcl(AccessControlEntry accessControlEntry) {
*/
public List<String> validate(AccessControlEntry accessControlEntry, Namespace namespace) {
List<String> validationErrors = new ArrayList<>();

// Which resource can be granted cross namespaces
List<AccessControlEntry.ResourceType> allowedResourceTypes =
List.of(AccessControlEntry.ResourceType.TOPIC, AccessControlEntry.ResourceType.CONNECT_CLUSTER);

// Which permission can be granted cross namespaces: READ, WRITE
// Only admin can grant OWNER
List<AccessControlEntry.Permission> allowedPermissions =
List.of(AccessControlEntry.Permission.READ, AccessControlEntry.Permission.WRITE);

// Which patternTypes can be granted
List<AccessControlEntry.ResourcePatternType> allowedPatternTypes = List.of(
AccessControlEntry.ResourcePatternType.LITERAL, AccessControlEntry.ResourcePatternType.PREFIXED);

if (!allowedResourceTypes.contains(accessControlEntry.getSpec().getResourceType())) {
if (!ALLOWED_RESOURCE_TYPES.contains(accessControlEntry.getSpec().getResourceType())) {
validationErrors.add(invalidAclResourceType(
String.valueOf(accessControlEntry.getSpec().getResourceType()),
allowedResourceTypes.stream().map(Object::toString).collect(Collectors.joining(", "))));
ALLOWED_RESOURCE_TYPES.stream().map(Object::toString).collect(Collectors.joining(", "))));
}

if (!allowedPermissions.contains(accessControlEntry.getSpec().getPermission())) {
// Which permission can be granted cross namespaces
if (!ALLOWED_PERMISSIONS.contains(accessControlEntry.getSpec().getPermission())) {
validationErrors.add(invalidAclPermission(
String.valueOf(accessControlEntry.getSpec().getPermission()),
allowedPermissions.stream().map(Object::toString).collect(Collectors.joining(", "))));
ALLOWED_PERMISSIONS.stream().map(Object::toString).collect(Collectors.joining(", "))));
}

if (!allowedPatternTypes.contains(accessControlEntry.getSpec().getResourcePatternType())) {
// Which pattern types can be granted cross namespaces
if (!ALLOWED_PATTERN_TYPES.contains(accessControlEntry.getSpec().getResourcePatternType())) {
validationErrors.add(invalidAclPatternType(
String.valueOf(accessControlEntry.getSpec().getResourcePatternType()),
allowedPatternTypes.stream().map(Object::toString).collect(Collectors.joining(", "))));
ALLOWED_PATTERN_TYPES.stream().map(Object::toString).collect(Collectors.joining(", "))));
}

// GrantedTo namespace exists?
Expand Down Expand Up @@ -509,8 +507,8 @@ public List<AccessControlEntry> findAllForCluster(String cluster) {
*
* @return A list of ACLs
*/
public List<AccessControlEntry> findAll() {
return new ArrayList<>(accessControlEntryRepository.findAll());
public Collection<AccessControlEntry> findAll() {
return accessControlEntryRepository.findAll();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.michelin.ns4kafka.model.AccessControlEntry;
import com.michelin.ns4kafka.model.Metadata;
import com.michelin.ns4kafka.service.AclService;
import java.util.Collection;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -58,7 +59,7 @@ void shouldListAcls() {

when(aclService.findAll()).thenReturn(List.of(accessControlEntry, accessControlEntry2));

List<AccessControlEntry> actual = aclNonNamespacedController.listAll();
Collection<AccessControlEntry> actual = aclNonNamespacedController.listAll();

assertEquals(2, actual.size());
assertEquals(List.of(accessControlEntry, accessControlEntry2), actual);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -988,7 +989,7 @@ void shouldFindAllAcls() {

when(accessControlEntryRepository.findAll()).thenReturn(List.of(ace1, ace2, ace3));

List<AccessControlEntry> actual = aclService.findAll();
Collection<AccessControlEntry> actual = aclService.findAll();
assertEquals(3, actual.size());
}

Expand Down