diff --git a/src/main/java/com/michelin/kafkactl/Kafkactl.java b/src/main/java/com/michelin/kafkactl/Kafkactl.java index e492e938..860f5882 100644 --- a/src/main/java/com/michelin/kafkactl/Kafkactl.java +++ b/src/main/java/com/michelin/kafkactl/Kafkactl.java @@ -30,10 +30,10 @@ import com.michelin.kafkactl.command.Import; import com.michelin.kafkactl.command.ResetOffsets; import com.michelin.kafkactl.command.ResetPassword; -import com.michelin.kafkactl.command.Schema; import com.michelin.kafkactl.command.auth.Auth; import com.michelin.kafkactl.command.config.Config; import com.michelin.kafkactl.command.connectcluster.ConnectCluster; +import com.michelin.kafkactl.command.subjectconfig.SubjectConfig; import com.michelin.kafkactl.service.SystemService; import com.michelin.kafkactl.util.VersionProvider; import io.micronaut.configuration.picocli.PicocliRunner; @@ -60,7 +60,7 @@ Get.class, Import.class, ResetOffsets.class, - Schema.class, + SubjectConfig.class, ResetPassword.class }, headerHeading = "@|bold Usage|@:", diff --git a/src/main/java/com/michelin/kafkactl/client/NamespacedResourceClient.java b/src/main/java/com/michelin/kafkactl/client/NamespacedResourceClient.java index 77bddc1e..a82cf145 100644 --- a/src/main/java/com/michelin/kafkactl/client/NamespacedResourceClient.java +++ b/src/main/java/com/michelin/kafkactl/client/NamespacedResourceClient.java @@ -175,20 +175,28 @@ HttpResponse changeConnectorState( @Header("Authorization") String token); /** - * Change the schema compatibility mode. + * Change the subject config. * * @param namespace The namespace * @param subject The subject - * @param compatibility The compatibility to apply + * @param config The config to apply * @param token The auth token * @return The change compatibility response */ @Post("{namespace}/schemas/{subject}/config") - HttpResponse changeSchemaCompatibility( - String namespace, - String subject, - @Body Map compatibility, - @Header("Authorization") String token); + HttpResponse updateSubjectConfig( + String namespace, String subject, @Body Map config, @Header("Authorization") String token); + + /** + * Delete the subject config. + * + * @param namespace The namespace + * @param subject The subject + * @param token The auth token + * @return The change compatibility response + */ + @Delete("{namespace}/schemas/{subject}/config") + HttpResponse deleteSubjectConfig(String namespace, String subject, @Header("Authorization") String token); /** * Reset password of a given user. diff --git a/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectCluster.java b/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectCluster.java index 0c9b00e9..88ff607b 100644 --- a/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectCluster.java +++ b/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectCluster.java @@ -26,9 +26,7 @@ /** Connect clusters subcommand. */ @Command( name = "connect-cluster", - subcommands = { - ConnectClusterVault.class, - }, + subcommands = {ConnectClusterVault.class}, headerHeading = "@|bold Usage|@:", synopsisHeading = " ", synopsisSubcommandLabel = "COMMAND", diff --git a/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectClusterVault.java b/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectClusterVault.java index 134f0205..90fd04f9 100644 --- a/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectClusterVault.java +++ b/src/main/java/com/michelin/kafkactl/command/connectcluster/ConnectClusterVault.java @@ -62,7 +62,7 @@ public Integer onAuthSuccess() { return resourceService.listAvailableVaultsConnectClusters(namespace, commandSpec); } - // if connect cluster define but no secrets to encrypt => show error no secrets to encrypt. + // if connect cluster defined but no secrets to encrypt => show error. if (secrets == null) { throw new ParameterException(commandSpec.commandLine(), "No secrets to encrypt."); } diff --git a/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfig.java b/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfig.java new file mode 100644 index 00000000..b5bde01b --- /dev/null +++ b/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kafkactl.command.subjectconfig; + +import com.michelin.kafkactl.hook.HelpHook; +import picocli.CommandLine.Command; +import picocli.CommandLine.Model.CommandSpec; +import picocli.CommandLine.Spec; + +/** Subject config subcommand. */ +@Command( + name = "subject-config", + subcommands = {SubjectConfigUpdate.class, SubjectConfigDelete.class}, + headerHeading = "@|bold Usage|@:", + synopsisHeading = " ", + synopsisSubcommandLabel = "COMMAND", + descriptionHeading = "%n@|bold Description|@: ", + description = "Interact with subjects config.", + parameterListHeading = "%n@|bold Parameters|@:%n", + optionListHeading = "%n@|bold Options|@:%n", + commandListHeading = "%n@|bold Commands|@:%n", + usageHelpAutoWidth = true) +public class SubjectConfig extends HelpHook { + @Spec + public CommandSpec commandSpec; +} diff --git a/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfigDelete.java b/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfigDelete.java new file mode 100644 index 00000000..a9a0f804 --- /dev/null +++ b/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfigDelete.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kafkactl.command.subjectconfig; + +import com.michelin.kafkactl.hook.AuthenticatedHook; +import com.michelin.kafkactl.service.ResourceService; +import io.micronaut.core.annotation.ReflectiveAccess; +import jakarta.inject.Inject; +import java.io.IOException; +import picocli.CommandLine.Command; +import picocli.CommandLine.Parameters; + +/** Subject config delete subcommand. */ +@Command( + name = "delete", + headerHeading = "@|bold Usage|@:", + synopsisHeading = " ", + descriptionHeading = "%n@|bold Description|@: ", + description = "Delete subject config.", + parameterListHeading = "%n@|bold Parameters|@:%n", + optionListHeading = "%n@|bold Options|@:%n", + commandListHeading = "%n@|bold Commands|@:%n", + usageHelpAutoWidth = true) +public class SubjectConfigDelete extends AuthenticatedHook { + @Inject + @ReflectiveAccess + private ResourceService resourceService; + + @Parameters(index = "0", description = "Subject name.", arity = "1") + public String subject; + + /** + * Run the "subject" command. + * + * @return The command return code + * @throws IOException Any exception during the run + */ + @Override + public Integer onAuthSuccess() throws IOException { + return resourceService + .deleteSubjectConfig(getNamespace(), subject, commandSpec) + .isEmpty() + ? 1 + : 0; + } +} diff --git a/src/main/java/com/michelin/kafkactl/command/Schema.java b/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfigUpdate.java similarity index 54% rename from src/main/java/com/michelin/kafkactl/command/Schema.java rename to src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfigUpdate.java index efab3525..41e4279e 100644 --- a/src/main/java/com/michelin/kafkactl/command/Schema.java +++ b/src/main/java/com/michelin/kafkactl/command/subjectconfig/SubjectConfigUpdate.java @@ -16,70 +16,59 @@ * specific language governing permissions and limitations * under the License. */ -package com.michelin.kafkactl.command; - -import static com.michelin.kafkactl.model.Output.TABLE; -import static com.michelin.kafkactl.util.constant.ResourceKind.SCHEMA_COMPATIBILITY_STATE; +package com.michelin.kafkactl.command.subjectconfig; import com.michelin.kafkactl.hook.AuthenticatedHook; -import com.michelin.kafkactl.model.Resource; -import com.michelin.kafkactl.model.SchemaCompatibility; -import com.michelin.kafkactl.service.FormatService; +import com.michelin.kafkactl.model.SubjectCompatibility; import com.michelin.kafkactl.service.ResourceService; import io.micronaut.core.annotation.ReflectiveAccess; import jakarta.inject.Inject; import java.io.IOException; -import java.util.List; -import java.util.Optional; import picocli.CommandLine.Command; +import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; -/** Schema subcommand. */ +/** Subject config update subcommand. */ @Command( - name = "schema", + name = "update", headerHeading = "@|bold Usage|@:", synopsisHeading = " ", descriptionHeading = "%n@|bold Description|@: ", - description = "Interact with schemas.", + description = "Update subject config.", parameterListHeading = "%n@|bold Parameters|@:%n", optionListHeading = "%n@|bold Options|@:%n", commandListHeading = "%n@|bold Commands|@:%n", usageHelpAutoWidth = true) -public class Schema extends AuthenticatedHook { +public class SubjectConfigUpdate extends AuthenticatedHook { @Inject @ReflectiveAccess private ResourceService resourceService; - @Inject - @ReflectiveAccess - private FormatService formatService; + @Parameters(index = "0", description = "Subject name.", arity = "1") + public String subject; - @Parameters(index = "0", description = "Compatibility to set (${COMPLETION-CANDIDATES}).", arity = "1") - public SchemaCompatibility compatibility; + @Option( + names = {"-c", "--compatibility"}, + description = "Compatibility to set (${COMPLETION-CANDIDATES}).") + public SubjectCompatibility compatibility; - @Parameters(index = "1..*", description = "Subject names separated by space.", arity = "1..*") - public List subjects; + @Option( + names = {"-a", "--alias"}, + description = "Alias to set.") + public String alias; /** - * Run the "reset-offsets" command. + * Run the "subject" command. * * @return The command return code * @throws IOException Any exception during the run */ @Override public Integer onAuthSuccess() throws IOException { - List updatedSchemas = subjects.stream() - .map(subject -> - resourceService.changeSchemaCompatibility(getNamespace(), subject, compatibility, commandSpec)) - .filter(Optional::isPresent) - .map(Optional::get) - .toList(); - - if (!updatedSchemas.isEmpty()) { - formatService.displayList(SCHEMA_COMPATIBILITY_STATE, updatedSchemas, TABLE, commandSpec); - return 0; - } - - return 1; + return resourceService + .updateSubjectConfig(getNamespace(), subject, compatibility, alias, commandSpec) + .isEmpty() + ? 1 + : 0; } } diff --git a/src/main/java/com/michelin/kafkactl/model/SchemaCompatibility.java b/src/main/java/com/michelin/kafkactl/model/SubjectCompatibility.java similarity index 95% rename from src/main/java/com/michelin/kafkactl/model/SchemaCompatibility.java rename to src/main/java/com/michelin/kafkactl/model/SubjectCompatibility.java index bb2504e6..172a9a58 100644 --- a/src/main/java/com/michelin/kafkactl/model/SchemaCompatibility.java +++ b/src/main/java/com/michelin/kafkactl/model/SubjectCompatibility.java @@ -24,8 +24,7 @@ /** Schema compatibility. */ @Getter @AllArgsConstructor -public enum SchemaCompatibility { - GLOBAL("global"), +public enum SubjectCompatibility { BACKWARD("backward"), BACKWARD_TRANSITIVE("backward-transitive"), FORWARD("forward"), diff --git a/src/main/java/com/michelin/kafkactl/service/ResourceService.java b/src/main/java/com/michelin/kafkactl/service/ResourceService.java index a4b7c3a1..54818a64 100644 --- a/src/main/java/com/michelin/kafkactl/service/ResourceService.java +++ b/src/main/java/com/michelin/kafkactl/service/ResourceService.java @@ -24,6 +24,7 @@ import static com.michelin.kafkactl.util.constant.ResourceKind.CONSUMER_GROUP_RESET_OFFSET_RESPONSE; import static com.michelin.kafkactl.util.constant.ResourceKind.DELETE_RECORDS_RESPONSE; import static com.michelin.kafkactl.util.constant.ResourceKind.SUBJECT; +import static com.michelin.kafkactl.util.constant.ResourceKind.SUBJECT_CONFIG_STATE; import static com.michelin.kafkactl.util.constant.ResourceKind.VAULT_RESPONSE; import com.michelin.kafkactl.client.ClusterResourceClient; @@ -31,7 +32,7 @@ import com.michelin.kafkactl.model.ApiResource; import com.michelin.kafkactl.model.Output; import com.michelin.kafkactl.model.Resource; -import com.michelin.kafkactl.model.SchemaCompatibility; +import com.michelin.kafkactl.model.SubjectCompatibility; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.micronaut.core.annotation.Nullable; @@ -384,25 +385,35 @@ public Optional changeConnectorState( } /** - * Change the compatibility of a given schema. + * Update the config of a given subject. * * @param namespace The namespace * @param subject The schema subject * @param compatibility The compatibility to apply + * @param alias The alias to apply * @param commandSpec The command that triggered the action * @return The resource */ - public Optional changeSchemaCompatibility( - String namespace, String subject, SchemaCompatibility compatibility, CommandSpec commandSpec) { + public Optional updateSubjectConfig( + String namespace, + String subject, + @Nullable SubjectCompatibility compatibility, + @Nullable String alias, + CommandSpec commandSpec) { try { - HttpResponse response = namespacedClient.changeSchemaCompatibility( - namespace, subject, Map.of("compatibility", compatibility.name()), loginService.getAuthorization()); + Map config = new HashMap<>(); + config.put( + "compatibility", + compatibility != null ? compatibility.toString().toUpperCase() : null); + config.put("alias", alias); - // Micronaut does not throw exception on 404, so produce a 404 manually - if (response.getStatus().equals(HttpStatus.NOT_FOUND)) { - throw new HttpClientResponseException(response.reason(), response); - } + HttpResponse response = + namespacedClient.updateSubjectConfig(namespace, subject, config, loginService.getAuthorization()); + commandSpec + .commandLine() + .getOut() + .println(formatService.prettifyKind(SUBJECT_CONFIG_STATE) + " \"" + subject + "\"" + " updated."); return response.getBody(); } catch (HttpClientResponseException exception) { formatService.displayError(exception, SUBJECT, subject, commandSpec); @@ -410,6 +421,31 @@ public Optional changeSchemaCompatibility( } } + /** + * Delete the config of a given subject. + * + * @param namespace The namespace + * @param subject The schema subject + * @param commandSpec The command that triggered the action + * @return The resource + */ + public Optional deleteSubjectConfig(String namespace, String subject, CommandSpec commandSpec) { + try { + Optional subjectConfig = namespacedClient + .deleteSubjectConfig(namespace, subject, loginService.getAuthorization()) + .getBody(); + + commandSpec + .commandLine() + .getOut() + .println(formatService.prettifyKind(SUBJECT_CONFIG_STATE) + " \"" + subject + "\"" + " deleted."); + return subjectConfig; + } catch (HttpClientResponseException exception) { + formatService.displayError(exception, SUBJECT, subject, commandSpec); + return Optional.empty(); + } + } + /** * Reset user password. * diff --git a/src/main/java/com/michelin/kafkactl/util/constant/ResourceKind.java b/src/main/java/com/michelin/kafkactl/util/constant/ResourceKind.java index ad2745c5..f952803f 100644 --- a/src/main/java/com/michelin/kafkactl/util/constant/ResourceKind.java +++ b/src/main/java/com/michelin/kafkactl/util/constant/ResourceKind.java @@ -34,6 +34,6 @@ public class ResourceKind { public static final String KAFKA_USER_RESET_PASSWORD = "KafkaUserResetPassword"; public static final String RESOURCE_DEFINITION = "ResourceDefinition"; public static final String SUBJECT = "Subject"; - public static final String SCHEMA_COMPATIBILITY_STATE = "SchemaCompatibilityState"; + public static final String SUBJECT_CONFIG_STATE = "SubjectConfigState"; public static final String VAULT_RESPONSE = "VaultResponse"; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ca276b4c..6bd65861 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -91,7 +91,7 @@ kafkactl: - "VERSION:/spec/version" - "CONFIG:/spec/compatibility" - "TYPE:/spec/schemaType" - SchemaCompatibilityState: + SubjectConfigState: - "CONFIG:/spec/compatibility" - "SUBJECT:/metadata/name" Topic: diff --git a/src/test/java/com/michelin/kafkactl/command/SubjectConfigDeleteTest.java b/src/test/java/com/michelin/kafkactl/command/SubjectConfigDeleteTest.java new file mode 100644 index 00000000..cb13122b --- /dev/null +++ b/src/test/java/com/michelin/kafkactl/command/SubjectConfigDeleteTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kafkactl.command; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; + +import com.michelin.kafkactl.Kafkactl; +import com.michelin.kafkactl.command.subjectconfig.SubjectConfigDelete; +import com.michelin.kafkactl.model.Metadata; +import com.michelin.kafkactl.model.Resource; +import com.michelin.kafkactl.property.KafkactlProperties; +import com.michelin.kafkactl.service.ConfigService; +import com.michelin.kafkactl.service.LoginService; +import com.michelin.kafkactl.service.ResourceService; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import picocli.CommandLine; + +@ExtendWith(MockitoExtension.class) +class SubjectConfigDeleteTest { + @Mock + LoginService loginService; + + @Mock + ResourceService resourceService; + + @Mock + ConfigService configService; + + @Mock + KafkactlProperties kafkactlProperties; + + @Mock + Kafkactl kafkactl; + + @InjectMocks + SubjectConfigDelete subjectConfigDelete; + + @Test + void shouldReturnInvalidCurrentContext() { + CommandLine cmd = new CommandLine(subjectConfigDelete); + StringWriter sw = new StringWriter(); + cmd.setErr(new PrintWriter(sw)); + + when(configService.isCurrentContextValid()).thenReturn(false); + + int code = cmd.execute("mySubject"); + assertEquals(1, code); + assertTrue(sw.toString() + .contains("No valid current context found. " + + "Use \"kafkactl config use-context\" to set a valid context.")); + } + + @Test + void shouldNotDeleteConfigWhenNotAuthenticated() { + when(configService.isCurrentContextValid()).thenReturn(true); + when(loginService.doAuthenticate(any(), anyBoolean())).thenReturn(false); + + CommandLine cmd = new CommandLine(subjectConfigDelete); + + int code = cmd.execute("mySubject"); + assertEquals(1, code); + } + + @Test + void shouldDeleteConfig() { + Resource resource = Resource.builder() + .kind("SubjectConfigState") + .apiVersion("v1") + .metadata(Metadata.builder() + .name("mySubject") + .namespace("namespace") + .build()) + .spec(Collections.emptyMap()) + .build(); + + when(configService.isCurrentContextValid()).thenReturn(true); + when(loginService.doAuthenticate(any(), anyBoolean())).thenReturn(true); + when(resourceService.deleteSubjectConfig(any(), any(), any())).thenReturn(Optional.of(resource)); + + when(kafkactlProperties.getCurrentNamespace()).thenReturn("namespace"); + + CommandLine cmd = new CommandLine(subjectConfigDelete); + + int code = cmd.execute("mySubject"); + assertEquals(0, code); + } +} diff --git a/src/test/java/com/michelin/kafkactl/command/SchemaTest.java b/src/test/java/com/michelin/kafkactl/command/SubjectConfigUpdateTest.java similarity index 64% rename from src/test/java/com/michelin/kafkactl/command/SchemaTest.java rename to src/test/java/com/michelin/kafkactl/command/SubjectConfigUpdateTest.java index 87187cbe..75addd3d 100644 --- a/src/test/java/com/michelin/kafkactl/command/SchemaTest.java +++ b/src/test/java/com/michelin/kafkactl/command/SubjectConfigUpdateTest.java @@ -18,23 +18,18 @@ */ package com.michelin.kafkactl.command; -import static com.michelin.kafkactl.model.Output.TABLE; -import static com.michelin.kafkactl.util.constant.ResourceKind.SCHEMA_COMPATIBILITY_STATE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.michelin.kafkactl.Kafkactl; +import com.michelin.kafkactl.command.subjectconfig.SubjectConfigUpdate; import com.michelin.kafkactl.model.Metadata; import com.michelin.kafkactl.model.Resource; import com.michelin.kafkactl.property.KafkactlProperties; import com.michelin.kafkactl.service.ConfigService; -import com.michelin.kafkactl.service.FormatService; import com.michelin.kafkactl.service.LoginService; import com.michelin.kafkactl.service.ResourceService; import java.io.PrintWriter; @@ -49,7 +44,7 @@ import picocli.CommandLine; @ExtendWith(MockitoExtension.class) -class SchemaTest { +class SubjectConfigUpdateTest { @Mock LoginService loginService; @@ -65,21 +60,18 @@ class SchemaTest { @Mock Kafkactl kafkactl; - @Mock - FormatService formatService; - @InjectMocks - Schema schema; + SubjectConfigUpdate subjectConfigUpdate; @Test void shouldReturnInvalidCurrentContext() { - CommandLine cmd = new CommandLine(schema); + CommandLine cmd = new CommandLine(subjectConfigUpdate); StringWriter sw = new StringWriter(); cmd.setErr(new PrintWriter(sw)); when(configService.isCurrentContextValid()).thenReturn(false); - int code = cmd.execute("backward", "mySubject"); + int code = cmd.execute("mySubject", "--compatibility", "backward"); assertEquals(1, code); assertTrue(sw.toString() .contains("No valid current context found. " @@ -87,38 +79,23 @@ void shouldReturnInvalidCurrentContext() { } @Test - void shouldNotUpdateCompatWhenNotAuthenticated() { + void shouldNotUpdateConfigWhenNotAuthenticated() { when(configService.isCurrentContextValid()).thenReturn(true); when(loginService.doAuthenticate(any(), anyBoolean())).thenReturn(false); - CommandLine cmd = new CommandLine(schema); - - int code = cmd.execute("backward", "mySubject"); - assertEquals(1, code); - } - - @Test - void shouldNotUpdateCompatWhenEmptyResponseList() { - when(configService.isCurrentContextValid()).thenReturn(true); - when(loginService.doAuthenticate(any(), anyBoolean())).thenReturn(true); - when(resourceService.changeSchemaCompatibility(any(), any(), any(), any())) - .thenReturn(Optional.empty()); - - when(kafkactlProperties.getCurrentNamespace()).thenReturn("namespace"); - - CommandLine cmd = new CommandLine(schema); + CommandLine cmd = new CommandLine(subjectConfigUpdate); - int code = cmd.execute("backward", "mySubject"); + int code = cmd.execute("mySubject", "--compatibility", "backward"); assertEquals(1, code); } @Test - void shouldUpdateCompat() { + void shouldUpdateConfig() { Resource resource = Resource.builder() - .kind("SchemaCompatibilityState") + .kind("SubjectConfigState") .apiVersion("v1") .metadata(Metadata.builder() - .name("prefix.schema-value") + .name("mySubject") .namespace("namespace") .build()) .spec(Collections.emptyMap()) @@ -126,20 +103,29 @@ void shouldUpdateCompat() { when(configService.isCurrentContextValid()).thenReturn(true); when(loginService.doAuthenticate(any(), anyBoolean())).thenReturn(true); - when(resourceService.changeSchemaCompatibility(any(), any(), any(), any())) + when(resourceService.updateSubjectConfig(any(), any(), any(), any(), any())) .thenReturn(Optional.of(resource)); when(kafkactlProperties.getCurrentNamespace()).thenReturn("namespace"); - CommandLine cmd = new CommandLine(schema); + CommandLine cmd = new CommandLine(subjectConfigUpdate); + + int code = + cmd.execute("mySubject", "--compatibility", "backward", "--alias", "anotherSubject", "-n", "namespace"); + assertEquals(0, code); + + code = cmd.execute("mySubject", "--compatibility", "backward", "-n", "namespace"); + assertEquals(0, code); - int code = cmd.execute("backward", "mySubject", "-n", "namespace"); + code = cmd.execute("mySubject", "--alias", "anotherSubject", "-n", "namespace"); assertEquals(0, code); - verify(formatService) - .displayList( - eq(SCHEMA_COMPATIBILITY_STATE), - argThat(schemas -> schemas.getFirst().equals(resource)), - eq(TABLE), - eq(cmd.getCommandSpec())); + } + + @Test + void shouldNotUpdateCompatibilityWhenNonExistingCompatibility() { + CommandLine cmd = new CommandLine(subjectConfigUpdate); + + int code = cmd.execute("mySubject", "--compatibility", "nonExisting", "-n", "namespace"); + assertEquals(2, code); } } diff --git a/src/test/java/com/michelin/kafkactl/service/ResourceServiceTest.java b/src/test/java/com/michelin/kafkactl/service/ResourceServiceTest.java index 994bd40d..b956a1bc 100644 --- a/src/test/java/com/michelin/kafkactl/service/ResourceServiceTest.java +++ b/src/test/java/com/michelin/kafkactl/service/ResourceServiceTest.java @@ -25,8 +25,8 @@ import static com.michelin.kafkactl.util.constant.ResourceKind.CONSUMER_GROUP_RESET_OFFSET_RESPONSE; import static com.michelin.kafkactl.util.constant.ResourceKind.DELETE_RECORDS_RESPONSE; import static com.michelin.kafkactl.util.constant.ResourceKind.KAFKA_USER_RESET_PASSWORD; -import static com.michelin.kafkactl.util.constant.ResourceKind.SCHEMA_COMPATIBILITY_STATE; import static com.michelin.kafkactl.util.constant.ResourceKind.SUBJECT; +import static com.michelin.kafkactl.util.constant.ResourceKind.SUBJECT_CONFIG_STATE; import static com.michelin.kafkactl.util.constant.ResourceKind.VAULT_RESPONSE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -48,7 +48,7 @@ import com.michelin.kafkactl.model.ApiResource; import com.michelin.kafkactl.model.Metadata; import com.michelin.kafkactl.model.Resource; -import com.michelin.kafkactl.model.SchemaCompatibility; +import com.michelin.kafkactl.model.SubjectCompatibility; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.client.exceptions.HttpClientResponseException; @@ -1228,9 +1228,9 @@ void shouldChangeConnectorStateFail() { } @Test - void shouldChangeSchemaCompatibility() { - Resource changeSchemaCompatResource = Resource.builder() - .kind(SCHEMA_COMPATIBILITY_STATE) + void shouldUpdateSubjectConfig() { + Resource updateSubjectConfigResource = Resource.builder() + .kind(SUBJECT_CONFIG_STATE) .apiVersion("v1") .metadata(Metadata.builder().name("subject").build()) .spec(Map.of()) @@ -1238,20 +1238,34 @@ void shouldChangeSchemaCompatibility() { CommandLine cmd = new CommandLine(new Kafkactl()); - when(namespacedClient.changeSchemaCompatibility(any(), any(), any(), any())) - .thenReturn(HttpResponse.ok(changeSchemaCompatResource)); + when(namespacedClient.updateSubjectConfig(any(), any(), any(), any())) + .thenReturn(HttpResponse.ok(updateSubjectConfigResource)); - Optional actual = resourceService.changeSchemaCompatibility( - "namespace", "subject", SchemaCompatibility.FORWARD_TRANSITIVE, cmd.getCommandSpec()); + Optional actual = resourceService.updateSubjectConfig( + "namespace", "subject", SubjectCompatibility.FORWARD_TRANSITIVE, null, cmd.getCommandSpec()); assertTrue(actual.isPresent()); - assertEquals(changeSchemaCompatResource, actual.get()); + assertEquals(updateSubjectConfigResource, actual.get()); } @Test - void shouldChangeSchemaCompatibilityNotFound() { - Resource changeConnectorStateResource = Resource.builder() - .kind(CHANGE_CONNECTOR_STATE) + void shouldNotUpdateSubjectConfig() { + CommandLine cmd = new CommandLine(new Kafkactl()); + + HttpClientResponseException exception = new HttpClientResponseException("error", HttpResponse.serverError()); + when(namespacedClient.updateSubjectConfig(any(), any(), any(), any())).thenThrow(exception); + + Optional actual = resourceService.updateSubjectConfig( + "namespace", "subject", SubjectCompatibility.FORWARD_TRANSITIVE, null, cmd.getCommandSpec()); + + assertTrue(actual.isEmpty()); + verify(formatService).displayError(exception, SUBJECT, "subject", cmd.getCommandSpec()); + } + + @Test + void shouldDeleteSubjectConfig() { + Resource deleteSubjectConfigResource = Resource.builder() + .kind(SUBJECT_CONFIG_STATE) .apiVersion("v1") .metadata(Metadata.builder().name("subject").build()) .spec(Map.of()) @@ -1259,32 +1273,23 @@ void shouldChangeSchemaCompatibilityNotFound() { CommandLine cmd = new CommandLine(new Kafkactl()); - when(namespacedClient.changeSchemaCompatibility(any(), any(), any(), any())) - .thenReturn(HttpResponse.notFound(changeConnectorStateResource)); + when(namespacedClient.deleteSubjectConfig(any(), any(), any())) + .thenReturn(HttpResponse.ok(deleteSubjectConfigResource)); - Optional actual = resourceService.changeSchemaCompatibility( - "namespace", "subject", SchemaCompatibility.FORWARD_TRANSITIVE, cmd.getCommandSpec()); + Optional actual = resourceService.deleteSubjectConfig("namespace", "subject", cmd.getCommandSpec()); - assertTrue(actual.isEmpty()); - verify(formatService) - .displayError( - argThat(exception -> exception.getStatus().equals(HttpStatus.NOT_FOUND) - && exception.getMessage().equals("Not Found")), - eq(SUBJECT), - eq("subject"), - eq(cmd.getCommandSpec())); + assertTrue(actual.isPresent()); + assertEquals(deleteSubjectConfigResource, actual.get()); } @Test - void shouldChangeSchemaCompatFail() { + void shouldNotDeleteSubjectConfig() { CommandLine cmd = new CommandLine(new Kafkactl()); HttpClientResponseException exception = new HttpClientResponseException("error", HttpResponse.serverError()); - when(namespacedClient.changeSchemaCompatibility(any(), any(), any(), any())) - .thenThrow(exception); + when(namespacedClient.deleteSubjectConfig(any(), any(), any())).thenThrow(exception); - Optional actual = resourceService.changeSchemaCompatibility( - "namespace", "subject", SchemaCompatibility.FORWARD_TRANSITIVE, cmd.getCommandSpec()); + Optional actual = resourceService.deleteSubjectConfig("namespace", "subject", cmd.getCommandSpec()); assertTrue(actual.isEmpty()); verify(formatService).displayError(exception, SUBJECT, "subject", cmd.getCommandSpec());