Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/com/michelin/kafkactl/Kafkactl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import com.michelin.kafkactl.command.Import;
import com.michelin.kafkactl.command.ResetOffsets;
import com.michelin.kafkactl.command.ResetPassword;
import com.michelin.kafkactl.command.Schema;
import com.michelin.kafkactl.command.auth.Auth;
import com.michelin.kafkactl.command.config.Config;
import com.michelin.kafkactl.command.connectcluster.ConnectCluster;
import com.michelin.kafkactl.command.subjectconfig.SubjectConfig;
import com.michelin.kafkactl.service.SystemService;
import com.michelin.kafkactl.util.VersionProvider;
import io.micronaut.configuration.picocli.PicocliRunner;
Expand All @@ -60,7 +60,7 @@
Get.class,
Import.class,
ResetOffsets.class,
Schema.class,
SubjectConfig.class,
ResetPassword.class
},
headerHeading = "@|bold Usage|@:",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,28 @@ HttpResponse<Resource> changeConnectorState(
@Header("Authorization") String token);

/**
* Change the schema compatibility mode.
* Change the subject config.
*
* @param namespace The namespace
* @param subject The subject
* @param compatibility The compatibility to apply
* @param config The config to apply
* @param token The auth token
* @return The change compatibility response
*/
@Post("{namespace}/schemas/{subject}/config")
HttpResponse<Resource> changeSchemaCompatibility(
String namespace,
String subject,
@Body Map<String, String> compatibility,
@Header("Authorization") String token);
HttpResponse<Resource> updateSubjectConfig(
String namespace, String subject, @Body Map<String, String> config, @Header("Authorization") String token);

/**
* Delete the subject config.
*
* @param namespace The namespace
* @param subject The subject
* @param token The auth token
* @return The change compatibility response
*/
@Delete("{namespace}/schemas/{subject}/config")
HttpResponse<Resource> deleteSubjectConfig(String namespace, String subject, @Header("Authorization") String token);

/**
* Reset password of a given user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
/** Connect clusters subcommand. */
@Command(
name = "connect-cluster",
subcommands = {
ConnectClusterVault.class,
},
subcommands = {ConnectClusterVault.class},
headerHeading = "@|bold Usage|@:",
synopsisHeading = " ",
synopsisSubcommandLabel = "COMMAND",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Integer onAuthSuccess() {
return resourceService.listAvailableVaultsConnectClusters(namespace, commandSpec);
}

// if connect cluster define but no secrets to encrypt => show error no secrets to encrypt.
// if connect cluster defined but no secrets to encrypt => show error.
if (secrets == null) {
throw new ParameterException(commandSpec.commandLine(), "No secrets to encrypt.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.michelin.kafkactl.command.subjectconfig;

import com.michelin.kafkactl.hook.HelpHook;
import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Spec;

/** Subject config subcommand. */
@Command(
name = "subject-config",
subcommands = {SubjectConfigUpdate.class, SubjectConfigDelete.class},
headerHeading = "@|bold Usage|@:",
synopsisHeading = " ",
synopsisSubcommandLabel = "COMMAND",
descriptionHeading = "%n@|bold Description|@: ",
description = "Interact with subjects config.",
parameterListHeading = "%n@|bold Parameters|@:%n",
optionListHeading = "%n@|bold Options|@:%n",
commandListHeading = "%n@|bold Commands|@:%n",
usageHelpAutoWidth = true)
public class SubjectConfig extends HelpHook {
@Spec
public CommandSpec commandSpec;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.michelin.kafkactl.command.subjectconfig;

import com.michelin.kafkactl.hook.AuthenticatedHook;
import com.michelin.kafkactl.service.ResourceService;
import io.micronaut.core.annotation.ReflectiveAccess;
import jakarta.inject.Inject;
import java.io.IOException;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;

/** Subject config delete subcommand. */
@Command(
name = "delete",
headerHeading = "@|bold Usage|@:",
synopsisHeading = " ",
descriptionHeading = "%n@|bold Description|@: ",
description = "Delete subject config.",
parameterListHeading = "%n@|bold Parameters|@:%n",
optionListHeading = "%n@|bold Options|@:%n",
commandListHeading = "%n@|bold Commands|@:%n",
usageHelpAutoWidth = true)
public class SubjectConfigDelete extends AuthenticatedHook {
@Inject
@ReflectiveAccess
private ResourceService resourceService;

@Parameters(index = "0", description = "Subject name.", arity = "1")
public String subject;

/**
* Run the "subject" command.
*
* @return The command return code
* @throws IOException Any exception during the run
*/
@Override
public Integer onAuthSuccess() throws IOException {
return resourceService
.deleteSubjectConfig(getNamespace(), subject, commandSpec)
.isEmpty()
? 1
: 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,70 +16,59 @@
* specific language governing permissions and limitations
* under the License.
*/
package com.michelin.kafkactl.command;

import static com.michelin.kafkactl.model.Output.TABLE;
import static com.michelin.kafkactl.util.constant.ResourceKind.SCHEMA_COMPATIBILITY_STATE;
package com.michelin.kafkactl.command.subjectconfig;

import com.michelin.kafkactl.hook.AuthenticatedHook;
import com.michelin.kafkactl.model.Resource;
import com.michelin.kafkactl.model.SchemaCompatibility;
import com.michelin.kafkactl.service.FormatService;
import com.michelin.kafkactl.model.SubjectCompatibility;
import com.michelin.kafkactl.service.ResourceService;
import io.micronaut.core.annotation.ReflectiveAccess;
import jakarta.inject.Inject;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

/** Schema subcommand. */
/** Subject config update subcommand. */
@Command(
name = "schema",
name = "update",
headerHeading = "@|bold Usage|@:",
synopsisHeading = " ",
descriptionHeading = "%n@|bold Description|@: ",
description = "Interact with schemas.",
description = "Update subject config.",
parameterListHeading = "%n@|bold Parameters|@:%n",
optionListHeading = "%n@|bold Options|@:%n",
commandListHeading = "%n@|bold Commands|@:%n",
usageHelpAutoWidth = true)
public class Schema extends AuthenticatedHook {
public class SubjectConfigUpdate extends AuthenticatedHook {
@Inject
@ReflectiveAccess
private ResourceService resourceService;

@Inject
@ReflectiveAccess
private FormatService formatService;
@Parameters(index = "0", description = "Subject name.", arity = "1")
public String subject;

@Parameters(index = "0", description = "Compatibility to set (${COMPLETION-CANDIDATES}).", arity = "1")
public SchemaCompatibility compatibility;
@Option(
names = {"-c", "--compatibility"},
description = "Compatibility to set (${COMPLETION-CANDIDATES}).")
public SubjectCompatibility compatibility;

@Parameters(index = "1..*", description = "Subject names separated by space.", arity = "1..*")
public List<String> subjects;
@Option(
names = {"-a", "--alias"},
description = "Alias to set.")
public String alias;

/**
* Run the "reset-offsets" command.
* Run the "subject" command.
*
* @return The command return code
* @throws IOException Any exception during the run
*/
@Override
public Integer onAuthSuccess() throws IOException {
List<Resource> updatedSchemas = subjects.stream()
.map(subject ->
resourceService.changeSchemaCompatibility(getNamespace(), subject, compatibility, commandSpec))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();

if (!updatedSchemas.isEmpty()) {
formatService.displayList(SCHEMA_COMPATIBILITY_STATE, updatedSchemas, TABLE, commandSpec);
return 0;
}

return 1;
return resourceService
.updateSubjectConfig(getNamespace(), subject, compatibility, alias, commandSpec)
.isEmpty()
? 1
: 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
/** Schema compatibility. */
@Getter
@AllArgsConstructor
public enum SchemaCompatibility {
GLOBAL("global"),
public enum SubjectCompatibility {
BACKWARD("backward"),
BACKWARD_TRANSITIVE("backward-transitive"),
FORWARD("forward"),
Expand Down
56 changes: 46 additions & 10 deletions src/main/java/com/michelin/kafkactl/service/ResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import static com.michelin.kafkactl.util.constant.ResourceKind.CONSUMER_GROUP_RESET_OFFSET_RESPONSE;
import static com.michelin.kafkactl.util.constant.ResourceKind.DELETE_RECORDS_RESPONSE;
import static com.michelin.kafkactl.util.constant.ResourceKind.SUBJECT;
import static com.michelin.kafkactl.util.constant.ResourceKind.SUBJECT_CONFIG_STATE;
import static com.michelin.kafkactl.util.constant.ResourceKind.VAULT_RESPONSE;

import com.michelin.kafkactl.client.ClusterResourceClient;
import com.michelin.kafkactl.client.NamespacedResourceClient;
import com.michelin.kafkactl.model.ApiResource;
import com.michelin.kafkactl.model.Output;
import com.michelin.kafkactl.model.Resource;
import com.michelin.kafkactl.model.SchemaCompatibility;
import com.michelin.kafkactl.model.SubjectCompatibility;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.micronaut.core.annotation.Nullable;
Expand Down Expand Up @@ -384,32 +385,67 @@ public Optional<Resource> changeConnectorState(
}

/**
* Change the compatibility of a given schema.
* Update the config of a given subject.
*
* @param namespace The namespace
* @param subject The schema subject
* @param compatibility The compatibility to apply
* @param alias The alias to apply
* @param commandSpec The command that triggered the action
* @return The resource
*/
public Optional<Resource> changeSchemaCompatibility(
String namespace, String subject, SchemaCompatibility compatibility, CommandSpec commandSpec) {
public Optional<Resource> updateSubjectConfig(
String namespace,
String subject,
@Nullable SubjectCompatibility compatibility,
@Nullable String alias,
CommandSpec commandSpec) {
try {
HttpResponse<Resource> response = namespacedClient.changeSchemaCompatibility(
namespace, subject, Map.of("compatibility", compatibility.name()), loginService.getAuthorization());
Map<String, String> config = new HashMap<>();
config.put(
"compatibility",
compatibility != null ? compatibility.toString().toUpperCase() : null);
config.put("alias", alias);

// Micronaut does not throw exception on 404, so produce a 404 manually
if (response.getStatus().equals(HttpStatus.NOT_FOUND)) {
throw new HttpClientResponseException(response.reason(), response);
}
HttpResponse<Resource> response =
namespacedClient.updateSubjectConfig(namespace, subject, config, loginService.getAuthorization());

commandSpec
.commandLine()
.getOut()
.println(formatService.prettifyKind(SUBJECT_CONFIG_STATE) + " \"" + subject + "\"" + " updated.");
return response.getBody();
} catch (HttpClientResponseException exception) {
formatService.displayError(exception, SUBJECT, subject, commandSpec);
return Optional.empty();
}
}

/**
* Delete the config of a given subject.
*
* @param namespace The namespace
* @param subject The schema subject
* @param commandSpec The command that triggered the action
* @return The resource
*/
public Optional<Resource> deleteSubjectConfig(String namespace, String subject, CommandSpec commandSpec) {
try {
Optional<Resource> subjectConfig = namespacedClient
.deleteSubjectConfig(namespace, subject, loginService.getAuthorization())
.getBody();

commandSpec
.commandLine()
.getOut()
.println(formatService.prettifyKind(SUBJECT_CONFIG_STATE) + " \"" + subject + "\"" + " deleted.");
return subjectConfig;
} catch (HttpClientResponseException exception) {
formatService.displayError(exception, SUBJECT, subject, commandSpec);
return Optional.empty();
}
}

/**
* Reset user password.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public class ResourceKind {
public static final String KAFKA_USER_RESET_PASSWORD = "KafkaUserResetPassword";
public static final String RESOURCE_DEFINITION = "ResourceDefinition";
public static final String SUBJECT = "Subject";
public static final String SCHEMA_COMPATIBILITY_STATE = "SchemaCompatibilityState";
public static final String SUBJECT_CONFIG_STATE = "SubjectConfigState";
public static final String VAULT_RESPONSE = "VaultResponse";
}
2 changes: 1 addition & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ kafkactl:
- "VERSION:/spec/version"
- "CONFIG:/spec/compatibility"
- "TYPE:/spec/schemaType"
SchemaCompatibilityState:
SubjectConfigState:
- "CONFIG:/spec/compatibility"
- "SUBJECT:/metadata/name"
Topic:
Expand Down
Loading
Loading