Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public HttpResponse<ResourceQuota> apply(
throw new ResourceValidationException(quota, validationErrors);
}

Optional<ResourceQuota> resourceQuotaOptional = resourceQuotaService.findForNamespace(namespace);
Optional<ResourceQuota> resourceQuotaOptional = resourceQuotaService.findByNamespace(namespace);
if (resourceQuotaOptional.isPresent() && resourceQuotaOptional.get().equals(quota)) {
return formatHttpResponse(quota, ApplyStatus.UNCHANGED);
}
Expand Down Expand Up @@ -156,11 +156,10 @@ public HttpResponse<ResourceQuota> apply(
* @return An HTTP response
*/
@Delete
public HttpResponse<List<ResourceQuota>> bulkDelete(
public HttpResponse<List<ResourceQuota>> delete(
String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue(defaultValue = "false") boolean dryrun) {

List<ResourceQuota> resourceQuotas = resourceQuotaService.findByWildcardName(namespace, name);

if (resourceQuotas.isEmpty()) {
Expand All @@ -178,34 +177,4 @@ public HttpResponse<List<ResourceQuota>> bulkDelete(

return HttpResponse.ok(resourceQuotas);
}

/**
* Delete a quota.
*
* @param namespace The namespace
* @param name The resource quota
* @param dryrun Is dry run mode or not?
* @return An HTTP response
* @deprecated use {@link #bulkDelete(String, String, boolean)} instead.
*/
@Delete("/{name}{?dryrun}")
@Deprecated(since = "1.13.0")
public HttpResponse<Void> delete(
String namespace, String name, @QueryValue(defaultValue = "false") boolean dryrun) {
Optional<ResourceQuota> resourceQuota = resourceQuotaService.findByName(namespace, name);
if (resourceQuota.isEmpty()) {
return HttpResponse.notFound();
}

if (dryrun) {
return HttpResponse.noContent();
}

ResourceQuota resourceQuotaToDelete = resourceQuota.get();

sendEventLog(resourceQuotaToDelete, ApplyStatus.DELETED, resourceQuotaToDelete.getSpec(), null, EMPTY_STRING);

resourceQuotaService.delete(resourceQuotaToDelete);
return HttpResponse.noContent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.michelin.ns4kafka.repository;

import com.michelin.ns4kafka.model.quota.ResourceQuota;
import java.util.List;
import java.util.Collection;
import java.util.Optional;

/** Resource quota repository. */
Expand All @@ -29,15 +29,15 @@ public interface ResourceQuotaRepository {
*
* @return The resource quotas
*/
List<ResourceQuota> findAll();
Collection<ResourceQuota> findAll();

/**
* Get resource quota by namespace.
*
* @param namespace The namespace used to research
* @return The resource quotas associated to the namespace
*/
Optional<ResourceQuota> findForNamespace(String namespace);
Optional<ResourceQuota> findByNamespace(String namespace);

/**
* Create a resource quota.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import io.micronaut.scheduling.TaskScheduler;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.Optional;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -64,8 +63,14 @@ public KafkaResourceQuotaRepository(
super(kafkaTopic, kafkaProducer, adminClient, ns4KafkaProperties, taskScheduler);
}

/**
* Get the message key for a resource quota message, which is the namespace.
*
* @param message The resource quota message
* @return The message key
*/
@Override
String getMessageKey(ResourceQuota message) {
public String getMessageKey(ResourceQuota message) {
return message.getMetadata().getNamespace();
}

Expand All @@ -75,8 +80,8 @@ String getMessageKey(ResourceQuota message) {
* @return The resource quotas
*/
@Override
public List<ResourceQuota> findAll() {
return new ArrayList<>(getKafkaStore().values());
public Collection<ResourceQuota> findAll() {
return getKafkaStore().values();
}

/**
Expand All @@ -86,11 +91,8 @@ public List<ResourceQuota> findAll() {
* @return A resource quota
*/
@Override
public Optional<ResourceQuota> findForNamespace(String namespace) {
return getKafkaStore().values().stream()
.filter(resourceQuota ->
resourceQuota.getMetadata().getNamespace().equals(namespace))
.findFirst();
public Optional<ResourceQuota> findByNamespace(String namespace) {
return Optional.ofNullable(getKafkaStore().get(namespace));
}

/**
Expand All @@ -100,7 +102,7 @@ public Optional<ResourceQuota> findForNamespace(String namespace) {
*/
@Override
@Topic(value = "${ns4kafka.store.kafka.topics.prefix}.resource-quotas")
void receive(ConsumerRecord<String, ResourceQuota> message) {
public void receive(ConsumerRecord<String, ResourceQuota> message) {
super.receive(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public List<String> findAllResourcesByNamespace(Namespace namespace) {
.map(ace -> ACCESS_CONTROL_ENTRY + "/"
+ ace.getMetadata().getName()),
resourceQuotaService
.findForNamespace(namespace.getMetadata().getName())
.findByNamespace(namespace.getMetadata().getName())
.stream()
.map(resourceQuota -> RESOURCE_QUOTA + "/"
+ resourceQuota.getMetadata().getName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public class ResourceQuotaService {
* @param namespace The namespace used to research
* @return The researched resource quota
*/
public Optional<ResourceQuota> findForNamespace(String namespace) {
return resourceQuotaRepository.findForNamespace(namespace);
public Optional<ResourceQuota> findByNamespace(String namespace) {
return resourceQuotaRepository.findByNamespace(namespace);
}

/**
Expand All @@ -88,7 +88,7 @@ public Optional<ResourceQuota> findForNamespace(String namespace) {
*/
public List<ResourceQuota> findByWildcardName(String namespace, String name) {
List<String> nameFilterPatterns = RegexUtils.convertWildcardStringsToRegex(List.of(name));
return findForNamespace(namespace).stream()
return findByNamespace(namespace).stream()
.filter(quota ->
RegexUtils.isResourceCoveredByRegex(quota.getMetadata().getName(), nameFilterPatterns))
.toList();
Expand All @@ -102,7 +102,7 @@ public List<ResourceQuota> findByWildcardName(String namespace, String name) {
* @return The researched resource quota
*/
public Optional<ResourceQuota> findByName(String namespace, String quota) {
return findForNamespace(namespace).stream()
return findByNamespace(namespace).stream()
.filter(resourceQuota -> resourceQuota.getMetadata().getName().equals(quota))
.findFirst();
}
Expand Down Expand Up @@ -254,7 +254,7 @@ public long getCurrentCountConnectorsByNamespace(Namespace namespace) {
*/
public List<String> validateTopicQuota(Namespace namespace, Optional<Topic> existingTopic, Topic newTopic) {
Optional<ResourceQuota> resourceQuotaOptional =
findForNamespace(namespace.getMetadata().getName());
findByNamespace(namespace.getMetadata().getName());
if (resourceQuotaOptional.isEmpty()) {
return List.of();
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public List<String> validateTopicQuota(Namespace namespace, Optional<Topic> exis
*/
public List<String> validateConnectorQuota(Namespace namespace) {
Optional<ResourceQuota> resourceQuotaOptional =
findForNamespace(namespace.getMetadata().getName());
findByNamespace(namespace.getMetadata().getName());
if (resourceQuotaOptional.isEmpty()) {
return List.of();
}
Expand All @@ -345,7 +345,7 @@ public List<String> validateConnectorQuota(Namespace namespace) {
public List<ResourceQuotaResponse> getUsedQuotaByNamespaces(List<Namespace> namespaces) {
return namespaces.stream()
.map(namespace -> getUsedResourcesByQuotaByNamespace(
namespace, findForNamespace(namespace.getMetadata().getName())))
namespace, findByNamespace(namespace.getMetadata().getName())))
.toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidResetPasswordProvider;
import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_USER_RESET_PASSWORD;

import com.michelin.ns4kafka.model.quota.ResourceQuota;
import com.michelin.ns4kafka.property.ManagedClusterProperties;
import com.michelin.ns4kafka.repository.NamespaceRepository;
import com.michelin.ns4kafka.repository.ResourceQuotaRepository;
Expand All @@ -34,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -136,16 +134,14 @@ public String resetPassword(String user) throws ExecutionException, InterruptedE
*/
private Map<String, Map<String, Double>> collectNs4KafkaQuotas() {
return namespaceRepository.findAllForCluster(managedClusterProperties.getName()).stream()
.collect(Collectors.toMap(namespace -> namespace.getSpec().getKafkaUser(), namespace -> {
Optional<ResourceQuota> quota = quotaRepository.findForNamespace(
namespace.getMetadata().getName());
return quota.map(resourceQuota -> resourceQuota.getSpec().entrySet().stream()
.filter(q -> q.getKey().startsWith(USER_QUOTA_PREFIX))
.collect(Collectors.toMap(
q -> q.getKey().substring(USER_QUOTA_PREFIX.length()),
q -> Double.parseDouble(q.getValue()))))
.orElse(Map.of());
}));
.collect(Collectors.toMap(namespace -> namespace.getSpec().getKafkaUser(), namespace -> quotaRepository
.findByNamespace(namespace.getMetadata().getName())
.map(resourceQuota -> resourceQuota.getSpec().entrySet().stream()
.filter(q -> q.getKey().startsWith(USER_QUOTA_PREFIX))
.collect(Collectors.toMap(
q -> q.getKey().substring(USER_QUOTA_PREFIX.length()),
q -> Double.parseDouble(q.getValue()))))
.orElse(Map.of())));
}

/** Abstract user synchronizer to define the operations required for the user synchronization and password reset. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.michelin.ns4kafka.service.ResourceQuotaService;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.security.utils.SecurityService;
import java.util.List;
Expand Down Expand Up @@ -202,7 +201,7 @@ void shouldApplyUnchanged() {

when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of());
when(resourceQuotaService.findForNamespace(ns.getMetadata().getName())).thenReturn(Optional.of(resourceQuota));
when(resourceQuotaService.findByNamespace(ns.getMetadata().getName())).thenReturn(Optional.of(resourceQuota));

var response = resourceQuotaController.apply("test", resourceQuota, false);
assertEquals("unchanged", response.header("X-Ns4kafka-Result"));
Expand All @@ -223,7 +222,7 @@ void shouldApplyDryRun() {

when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of());
when(resourceQuotaService.findForNamespace(ns.getMetadata().getName())).thenReturn(Optional.empty());
when(resourceQuotaService.findByNamespace(ns.getMetadata().getName())).thenReturn(Optional.empty());

var response = resourceQuotaController.apply("test", resourceQuota, true);
assertEquals("created", response.header("X-Ns4kafka-Result"));
Expand All @@ -246,7 +245,7 @@ void shouldApplyCreated() {

when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of());
when(resourceQuotaService.findForNamespace(ns.getMetadata().getName())).thenReturn(Optional.empty());
when(resourceQuotaService.findByNamespace(ns.getMetadata().getName())).thenReturn(Optional.empty());
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());
Expand Down Expand Up @@ -282,7 +281,7 @@ void shouldApplyUpdated() {

when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(resourceQuotaService.validateNewResourceQuota(ns, resourceQuota)).thenReturn(List.of());
when(resourceQuotaService.findForNamespace(ns.getMetadata().getName()))
when(resourceQuotaService.findByNamespace(ns.getMetadata().getName()))
.thenReturn(Optional.of(resourceQuotaExisting));
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
Expand All @@ -297,70 +296,28 @@ void shouldApplyUpdated() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteQuotaWhenNotFound() {
when(resourceQuotaService.findByName("test", "quota")).thenReturn(Optional.empty());
HttpResponse<Void> actual = resourceQuotaController.delete("test", "quota", false);
assertEquals(HttpStatus.NOT_FOUND, actual.getStatus());
verify(resourceQuotaService, never()).delete(ArgumentMatchers.any());
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteQuotaWhenDryRun() {
ResourceQuota resourceQuota = ResourceQuota.builder()
.metadata(Metadata.builder().cluster("local").name("quota").build())
.spec(Map.of("count/topics", "3"))
.build();

when(resourceQuotaService.findByName("test", "quota")).thenReturn(Optional.of(resourceQuota));
HttpResponse<Void> actual = resourceQuotaController.delete("test", "quota", true);
assertEquals(HttpStatus.NO_CONTENT, actual.getStatus());
verify(resourceQuotaService, never()).delete(ArgumentMatchers.any());
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteQuota() {
ResourceQuota resourceQuota = ResourceQuota.builder()
.metadata(Metadata.builder().cluster("local").name("quota").build())
.spec(Map.of("count/topics", "3"))
.build();

when(resourceQuotaService.findByName("test", "quota")).thenReturn(Optional.of(resourceQuota));
when(securityService.username()).thenReturn(Optional.of("test-user"));
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
doNothing().when(applicationEventPublisher).publishEvent(any());
doNothing().when(resourceQuotaService).delete(resourceQuota);

HttpResponse<Void> actual = resourceQuotaController.delete("test", "quota", false);
assertEquals(HttpStatus.NO_CONTENT, actual.getStatus());
verify(resourceQuotaService).delete(resourceQuota);
}

@Test
void shouldNotBulkDeleteQuotaWhenNotFound() {
when(resourceQuotaService.findByWildcardName("test", "quota*")).thenReturn(List.of());
var actual = resourceQuotaController.bulkDelete("test", "quota*", false);
var actual = resourceQuotaController.delete("test", "quota*", false);
assertEquals(HttpStatus.NOT_FOUND, actual.getStatus());
verify(resourceQuotaService, never()).delete(ArgumentMatchers.any());
}

@Test
void shouldNotBulkDeleteQuotaInDryRunMode() {
void shouldNotDeleteQuotaInDryRunMode() {
ResourceQuota resourceQuota1 = ResourceQuota.builder()
.metadata(Metadata.builder().cluster("local").name("quota1").build())
.spec(Map.of("count/topics", "3"))
.build();

when(resourceQuotaService.findByWildcardName("test", "quota*")).thenReturn(List.of(resourceQuota1));
var actual = resourceQuotaController.bulkDelete("test", "quota*", true);
var actual = resourceQuotaController.delete("test", "quota*", true);
assertEquals(HttpStatus.OK, actual.getStatus());
verify(resourceQuotaService, never()).delete(ArgumentMatchers.any());
}

@Test
void shouldBulkDeleteQuota() {
void shouldDeleteQuota() {
ResourceQuota resourceQuota = ResourceQuota.builder()
.metadata(Metadata.builder().cluster("local").name("quota").build())
.spec(Map.of("count/topics", "3"))
Expand All @@ -372,7 +329,7 @@ void shouldBulkDeleteQuota() {
doNothing().when(applicationEventPublisher).publishEvent(any());
doNothing().when(resourceQuotaService).delete(resourceQuota);

var actual = resourceQuotaController.bulkDelete("test", "quota*", false);
var actual = resourceQuotaController.delete("test", "quota*", false);
assertEquals(HttpStatus.OK, actual.getStatus());
verify(resourceQuotaService).delete(resourceQuota);
}
Expand Down
Loading