diff --git a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java index 2b6c72755..1996cae2e 100644 --- a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java +++ b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java @@ -153,9 +153,10 @@ private List buildProjectAclBindings(Topology topology) { connector .getConnectors() .ifPresent( - (list) -> aclBindingsResults.add( - new ConnectorAuthorizationAclBindingsBuilder(bindingsBuilder, connector) - .getAclBindings())); + (list) -> + aclBindingsResults.add( + new ConnectorAuthorizationAclBindingsBuilder(bindingsBuilder, connector) + .getAclBindings())); } for (Schemas schemaAuthorization : project.getSchemas()) { @@ -307,7 +308,6 @@ private List buildUpdateBindingsActions( return updateActions; } - // Sync platform relevant Access Control List. private List buildPlatformLevelActions(final Topology topology) { List aclBindingsResults = new ArrayList<>(); diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java index 7de02301a..279932d82 100644 --- a/src/main/java/com/purbon/kafka/topology/Configuration.java +++ b/src/main/java/com/purbon/kafka/topology/Configuration.java @@ -28,7 +28,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.util.Strings; public class Configuration { @@ -153,15 +152,16 @@ private void validateGeneralConfiguration(Topology topology) throws Configuratio } Arrays.asList(TOPIC_MANAGED_PREFIXES, GROUP_MANAGED_PREFIXES, SERVICE_ACCOUNT_MANAGED_PREFIXES) - .forEach(this::validateManagedPrefixes); + .forEach(this::validateManagedPrefixes); } private void validateManagedPrefixes(String key) { List managedTopicPrefixes = config.getStringList(key); if (managedTopicPrefixes.contains("")) { throw new ConfigurationException( - String.format("The config key %s, contains empty strings, this is not possible, please review", key) - ); + String.format( + "The config key %s, contains empty strings, this is not possible, please review", + key)); } } diff --git a/src/main/java/com/purbon/kafka/topology/model/users/Consumer.java b/src/main/java/com/purbon/kafka/topology/model/users/Consumer.java index ed03274f7..099a7b3b9 100644 --- a/src/main/java/com/purbon/kafka/topology/model/users/Consumer.java +++ b/src/main/java/com/purbon/kafka/topology/model/users/Consumer.java @@ -1,6 +1,8 @@ package com.purbon.kafka.topology.model.users; +import com.fasterxml.jackson.annotation.JsonProperty; import com.purbon.kafka.topology.model.User; +import com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles; import java.util.Objects; import java.util.Optional; @@ -8,6 +10,9 @@ public class Consumer extends User { private Optional group; + @JsonProperty(value = "group-role") + private String groupRole = RBACPredefinedRoles.RESOURCE_OWNER; + public Consumer() { super(); group = Optional.empty(); @@ -34,6 +39,14 @@ public void setGroup(Optional group) { this.group = group; } + public String getGroupRole() { + return groupRole; + } + + public void setGroupRole(String groupRole) { + this.groupRole = groupRole; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -44,11 +57,12 @@ public boolean equals(Object o) { } Consumer consumer = (Consumer) o; return getPrincipal().equals(consumer.getPrincipal()) + && groupRole.equals(consumer.getGroupRole()) && groupString().equals(consumer.groupString()); } @Override public int hashCode() { - return Objects.hash(groupString(), getPrincipal()); + return Objects.hash(groupString(), getPrincipal(), groupRole); } } diff --git a/src/main/java/com/purbon/kafka/topology/roles/ResourceFilter.java b/src/main/java/com/purbon/kafka/topology/roles/ResourceFilter.java index 1319a4530..d8a0a971d 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/ResourceFilter.java +++ b/src/main/java/com/purbon/kafka/topology/roles/ResourceFilter.java @@ -1,82 +1,80 @@ package com.purbon.kafka.topology.roles; -import com.purbon.kafka.topology.AccessControlManager; import com.purbon.kafka.topology.Configuration; +import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; - public class ResourceFilter { - private static final Logger LOGGER = LogManager.getLogger(ResourceFilter.class); - - private final List managedServiceAccountPrefixes; - private final List managedTopicPrefixes; - private final List managedGroupPrefixes; - - public ResourceFilter(Configuration config) { - this.managedServiceAccountPrefixes = config.getServiceAccountManagedPrefixes(); - this.managedTopicPrefixes = config.getTopicManagedPrefixes(); - this.managedGroupPrefixes = config.getGroupManagedPrefixes(); + private static final Logger LOGGER = LogManager.getLogger(ResourceFilter.class); + + private final List managedServiceAccountPrefixes; + private final List managedTopicPrefixes; + private final List managedGroupPrefixes; + + public ResourceFilter(Configuration config) { + this.managedServiceAccountPrefixes = config.getServiceAccountManagedPrefixes(); + this.managedTopicPrefixes = config.getTopicManagedPrefixes(); + this.managedGroupPrefixes = config.getGroupManagedPrefixes(); + } + + public boolean matchesManagedPrefixList(TopologyAclBinding topologyAclBinding) { + String resourceName = topologyAclBinding.getResourceName(); + String principle = topologyAclBinding.getPrincipal(); + // For global wild cards ACL's we manage only if we manage the service account/principle, + // regardless. Filtering by service account will always take precedence if defined + if (haveServiceAccountPrefixFilters() || resourceName.equals("*")) { + if (resourceName.equals("*")) { + return matchesServiceAccountPrefixList(principle); + } else { + return matchesServiceAccountPrefixList(principle) + && matchesTopicOrGroupPrefix(topologyAclBinding, resourceName); + } + } else if (haveTopicNamePrefixFilter() || haveGroupNamePrefixFilter()) { + return matchesTopicOrGroupPrefix(topologyAclBinding, resourceName); } - public boolean matchesManagedPrefixList(TopologyAclBinding topologyAclBinding) { - String resourceName = topologyAclBinding.getResourceName(); - String principle = topologyAclBinding.getPrincipal(); - // For global wild cards ACL's we manage only if we manage the service account/principle, - // regardless. Filtering by service account will always take precedence if defined - if (haveServiceAccountPrefixFilters() || resourceName.equals("*")) { - if (resourceName.equals("*")) { - return matchesServiceAccountPrefixList(principle); - } else { - return matchesServiceAccountPrefixList(principle) - && matchesTopicOrGroupPrefix(topologyAclBinding, resourceName); - } - } else if (haveTopicNamePrefixFilter() || haveGroupNamePrefixFilter()) { - return matchesTopicOrGroupPrefix(topologyAclBinding, resourceName); - } - - return true; // should include everything if not properly excluded earlier. - } - - private boolean matchesTopicOrGroupPrefix( - TopologyAclBinding topologyAclBinding, String resourceName) { - if ("TOPIC".equalsIgnoreCase(topologyAclBinding.getResourceType())) { - return matchesTopicPrefixList(resourceName); - } else if ("GROUP".equalsIgnoreCase(topologyAclBinding.getResourceType())) { - return matchesGroupPrefixList(resourceName); - } - return false; - } - - private boolean matchesTopicPrefixList(String topic) { - return matchesPrefix(managedTopicPrefixes, topic, "Topic"); - } - - private boolean matchesGroupPrefixList(String group) { - return matchesPrefix(managedGroupPrefixes, group, "Group"); - } - - private boolean matchesServiceAccountPrefixList(String principal) { - return matchesPrefix(managedServiceAccountPrefixes, principal, "Principal"); - } - - private boolean haveServiceAccountPrefixFilters() { - return managedServiceAccountPrefixes.size() != 0; - } - - private boolean haveTopicNamePrefixFilter() { - return managedTopicPrefixes.size() != 0; - } - - private boolean haveGroupNamePrefixFilter() { - return managedGroupPrefixes.size() != 0; - } + return true; // should include everything if not properly excluded earlier. + } - private boolean matchesPrefix(List prefixes, String item, String type) { - boolean matches = prefixes.size() == 0 || prefixes.stream().anyMatch(item::startsWith); - LOGGER.debug(String.format("%s %s matches %s with $s", type, item, matches, prefixes)); - return matches; + private boolean matchesTopicOrGroupPrefix( + TopologyAclBinding topologyAclBinding, String resourceName) { + if ("TOPIC".equalsIgnoreCase(topologyAclBinding.getResourceType())) { + return matchesTopicPrefixList(resourceName); + } else if ("GROUP".equalsIgnoreCase(topologyAclBinding.getResourceType())) { + return matchesGroupPrefixList(resourceName); } + return false; + } + + private boolean matchesTopicPrefixList(String topic) { + return matchesPrefix(managedTopicPrefixes, topic, "Topic"); + } + + private boolean matchesGroupPrefixList(String group) { + return matchesPrefix(managedGroupPrefixes, group, "Group"); + } + + private boolean matchesServiceAccountPrefixList(String principal) { + return matchesPrefix(managedServiceAccountPrefixes, principal, "Principal"); + } + + private boolean haveServiceAccountPrefixFilters() { + return managedServiceAccountPrefixes.size() != 0; + } + + private boolean haveTopicNamePrefixFilter() { + return managedTopicPrefixes.size() != 0; + } + + private boolean haveGroupNamePrefixFilter() { + return managedGroupPrefixes.size() != 0; + } + + private boolean matchesPrefix(List prefixes, String item, String type) { + boolean matches = prefixes.size() == 0 || prefixes.stream().anyMatch(item::startsWith); + LOGGER.debug(String.format("%s %s matches %s with $s", type, item, matches, prefixes)); + return matches; + } } diff --git a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java index eaf346986..a362b0031 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/roles/rbac/RBACBindingsBuilder.java @@ -148,7 +148,7 @@ public List buildBindingsForConsumers( binding = apiClient.bind( consumer.getPrincipal(), - RESOURCE_OWNER, + consumer.getGroupRole(), evaluateResourcePattern(consumer.groupString()), "Group", evaluateResourcePatternType(consumer.groupString())); diff --git a/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java b/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java index 3353ae713..f6fb9e3b9 100644 --- a/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java +++ b/src/test/java/com/purbon/kafka/topology/ConfigurationTest.java @@ -305,7 +305,8 @@ public void shouldFetchAConfigSubsetSuccessfully() { } @Test - public void nonEmptyTopicManagedPrefixConfigsShouldValidateSuccessfully() throws ConfigurationException { + public void nonEmptyTopicManagedPrefixConfigsShouldValidateSuccessfully() + throws ConfigurationException { var topology = TestTopologyBuilder.createProject().buildTopology(); props.put(TOPIC_MANAGED_PREFIXES + ".0", "foo"); Configuration config = new Configuration(cliOps, props); diff --git a/src/test/java/com/purbon/kafka/topology/ResourceFilterTest.java b/src/test/java/com/purbon/kafka/topology/ResourceFilterTest.java index 3cefb8185..6e7596868 100644 --- a/src/test/java/com/purbon/kafka/topology/ResourceFilterTest.java +++ b/src/test/java/com/purbon/kafka/topology/ResourceFilterTest.java @@ -1,122 +1,90 @@ package com.purbon.kafka.topology; -import com.purbon.kafka.topology.roles.ResourceFilter; -import com.purbon.kafka.topology.roles.TopologyAclBinding; -import org.apache.kafka.common.resource.ResourceType; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Properties; - import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION; import static com.purbon.kafka.topology.Constants.SERVICE_ACCOUNT_MANAGED_PREFIXES; import static com.purbon.kafka.topology.Constants.TOPIC_MANAGED_PREFIXES; import static org.assertj.core.api.Assertions.assertThat; -public class ResourceFilterTest { - - @Test - public void resourcesWithPrincipalAndNameMatchesShouldBeFilter() { - Configuration config = makeConfig(new String[]{"User:foo"}, new String[]{"d.foo.bar"}); - ResourceFilter filter = new ResourceFilter(config); - - TopologyAclBinding binding = TopologyAclBinding.build( - ResourceType.TOPIC.name(), - "d.foo.bar", - "*", - "read", - "User:foo", - "PREFIXED" - ); - assertThat(filter.matchesManagedPrefixList(binding)).isTrue(); - - binding = TopologyAclBinding.build( - ResourceType.TOPIC.name(), - "d.foo.bar", - "*", - "read", - "User:bar", - "PREFIXED" - ); - assertThat(filter.matchesManagedPrefixList(binding)).isFalse(); - - binding = TopologyAclBinding.build( - ResourceType.TOPIC.name(), - "c.f.b", - "*", - "read", - "User:foo", - "PREFIXED" - ); - assertThat(filter.matchesManagedPrefixList(binding)).isFalse(); - } - +import com.purbon.kafka.topology.roles.ResourceFilter; +import com.purbon.kafka.topology.roles.TopologyAclBinding; +import java.util.HashMap; +import java.util.Properties; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.Test; - @Test - public void resourcesWithPrincipalMatchesShouldBeFiltered() { - Configuration config = makeConfig(new String[]{"User:foo"}); - ResourceFilter filter = new ResourceFilter(config); - - TopologyAclBinding binding = TopologyAclBinding.build( - ResourceType.TOPIC.name(), - "d.foo.bar", - "*", - "read", - "User:foo", - "PREFIXED" - ); - assertThat(filter.matchesManagedPrefixList(binding)).isTrue(); - } +public class ResourceFilterTest { - @Test - public void resourcesWithNamesMatchesShouldBeFiltered() { - Configuration config = makeConfigOnlyNames(new String[]{"d.foo.bar"}); - ResourceFilter filter = new ResourceFilter(config); - - TopologyAclBinding binding = TopologyAclBinding.build( - ResourceType.TOPIC.name(), - "d.foo.bar", - "*", - "read", - "User:foo", - "PREFIXED" - ); - assertThat(filter.matchesManagedPrefixList(binding)).isTrue(); - - binding = TopologyAclBinding.build( - ResourceType.TOPIC.name(), - "c.foo.zet", - "*", - "read", - "User:foo", - "PREFIXED" - ); - assertThat(filter.matchesManagedPrefixList(binding)).isFalse(); + @Test + public void resourcesWithPrincipalAndNameMatchesShouldBeFilter() { + Configuration config = makeConfig(new String[] {"User:foo"}, new String[] {"d.foo.bar"}); + ResourceFilter filter = new ResourceFilter(config); + + TopologyAclBinding binding = + TopologyAclBinding.build( + ResourceType.TOPIC.name(), "d.foo.bar", "*", "read", "User:foo", "PREFIXED"); + assertThat(filter.matchesManagedPrefixList(binding)).isTrue(); + + binding = + TopologyAclBinding.build( + ResourceType.TOPIC.name(), "d.foo.bar", "*", "read", "User:bar", "PREFIXED"); + assertThat(filter.matchesManagedPrefixList(binding)).isFalse(); + + binding = + TopologyAclBinding.build( + ResourceType.TOPIC.name(), "c.f.b", "*", "read", "User:foo", "PREFIXED"); + assertThat(filter.matchesManagedPrefixList(binding)).isFalse(); + } + + @Test + public void resourcesWithPrincipalMatchesShouldBeFiltered() { + Configuration config = makeConfig(new String[] {"User:foo"}); + ResourceFilter filter = new ResourceFilter(config); + + TopologyAclBinding binding = + TopologyAclBinding.build( + ResourceType.TOPIC.name(), "d.foo.bar", "*", "read", "User:foo", "PREFIXED"); + assertThat(filter.matchesManagedPrefixList(binding)).isTrue(); + } + + @Test + public void resourcesWithNamesMatchesShouldBeFiltered() { + Configuration config = makeConfigOnlyNames(new String[] {"d.foo.bar"}); + ResourceFilter filter = new ResourceFilter(config); + + TopologyAclBinding binding = + TopologyAclBinding.build( + ResourceType.TOPIC.name(), "d.foo.bar", "*", "read", "User:foo", "PREFIXED"); + assertThat(filter.matchesManagedPrefixList(binding)).isTrue(); + + binding = + TopologyAclBinding.build( + ResourceType.TOPIC.name(), "c.foo.zet", "*", "read", "User:foo", "PREFIXED"); + assertThat(filter.matchesManagedPrefixList(binding)).isFalse(); + } + + private Configuration makeConfigOnlyNames(String[] names) { + return makeConfig(new String[0], names); + } + + private Configuration makeConfig(String[] principals) { + return makeConfig(principals, new String[0]); + } + + private Configuration makeConfig(String[] principals, String[] names) { + HashMap cliOps = new HashMap<>(); + cliOps.put(BROKERS_OPTION, ""); + Properties props = new Properties(); + + for (int i = 0; i < principals.length; i++) { + String key = String.format("%s.%d", SERVICE_ACCOUNT_MANAGED_PREFIXES, i); + props.put(key, principals[i]); } - private Configuration makeConfigOnlyNames(String[] names) { - return makeConfig(new String[0], names); + for (int i = 0; i < names.length; i++) { + String key = String.format("%s.%d", TOPIC_MANAGED_PREFIXES, i); + props.put(key, names[i]); } - private Configuration makeConfig(String[] principals) { - return makeConfig(principals, new String[0]); - } - - - private Configuration makeConfig(String[] principals, String[] names) { - HashMap cliOps = new HashMap<>(); - cliOps.put(BROKERS_OPTION, ""); - Properties props = new Properties(); - for(int i=0; i < principals.length; i++) { - String key = String.format("%s.%d", SERVICE_ACCOUNT_MANAGED_PREFIXES, i); - props.put(key, principals[i]); - } - - for(int i=0; i < names.length; i++) { - String key = String.format("%s.%d", TOPIC_MANAGED_PREFIXES, i); - props.put(key, names[i]); - } - - return new Configuration(cliOps, props); - } + return new Configuration(cliOps, props); + } } diff --git a/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java b/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java index 730e524ac..048eb8b6f 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java +++ b/src/test/java/com/purbon/kafka/topology/integration/RBACPRoviderRbacIT.java @@ -109,6 +109,9 @@ public void consumerAclsCreation() throws IOException { List consumers = new ArrayList<>(); consumers.add(new Consumer("User:app1")); + Consumer consumer2 = new Consumer("User:app4"); + consumer2.setGroupRole(DEVELOPER_READ); + consumers.add(consumer2); Project project = new ProjectImpl("project"); project.setConsumers(consumers); @@ -339,7 +342,6 @@ public void schemasRbacCreation() throws IOException { resources = apiClient.lookupResourcesForSchemaRegistry(schema.getPrincipal(), RESOURCE_OWNER); assertThat(resources).isEmpty(); - } @Test @@ -730,10 +732,38 @@ private void verifyProducerAcls(List producers, String topic, int reso private void verifyConsumerAcls(List consumers, String topic) { consumers.forEach( consumer -> { - List roles = apiClient.lookupRoles(consumer.getPrincipal()); - assertEquals(2, roles.size()); - assertTrue(roles.contains(DEVELOPER_READ)); - assertTrue(roles.contains(RESOURCE_OWNER)); + List rbacResourceTypes = + apiClient.lookupResourcesForKafka(consumer.getPrincipal(), consumer.getGroupRole()); + + Optional group = + rbacResourceTypes.stream() + .filter(t -> t.getResourceType().equals("Group")) + .findFirst(); + assertTrue(group.isPresent()); + + Optional topicResourceType; + if (RESOURCE_OWNER.equals(consumer.getGroupRole())) { + List rbacTopicResourceTypes = + apiClient.lookupResourcesForKafka(consumer.getPrincipal(), DEVELOPER_READ); + + topicResourceType = + rbacTopicResourceTypes.stream() + .filter(t -> t.getResourceType().equals("Topic")) + .findFirst(); + } else { + topicResourceType = + rbacResourceTypes.stream() + .filter(t -> t.getResourceType().equals("Topic")) + .findFirst(); + } + + assertTrue(topicResourceType.isPresent()); + assertThat(topicResourceType.get().getName()).isEqualTo(topic); + + List badRbacResourceTypes = + apiClient.lookupResourcesForKafka(consumer.getPrincipal(), DEVELOPER_WRITE); + + assertTrue(badRbacResourceTypes.isEmpty()); }); } }