From c8978b8fa1a4ede1d68799cfb82f70e972e964b4 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Fri, 14 Mar 2025 18:27:51 +0800 Subject: [PATCH 1/5] Remove force_use_zk_connection Co-authored-by: @mingdaoy --- tests/kafkatest/services/kafka/kafka.py | 125 +++++++----------- .../kafkatest/services/security/kafka_acls.py | 8 +- tests/kafkatest/tests/core/authorizer_test.py | 7 +- .../core/controller_mutation_quota_test.py | 4 +- 4 files changed, 54 insertions(+), 90 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 88f24b14424a1..a3751d78988cc 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1052,29 +1052,23 @@ def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, use_cont kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", node) return "{} {}".format(kafka_metadata_script, bootstrap) - def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]): + def kafka_topics_cmd_with_optional_security_settings(self, node, kafka_security_protocol=None, offline_nodes=[]): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller") - if force_use_zk_connection: - bootstrap_server_or_zookeeper = "--zookeeper %s" % (self.zk_connect_setting()) - skip_optional_security_settings = True - else: - if kafka_security_protocol is None: - # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, - # otherwise use the client security protocol - if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: - security_protocol_to_use = SecurityConfig.PLAINTEXT - else: - security_protocol_to_use = self.security_protocol + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT else: - security_protocol_to_use = kafka_security_protocol - bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes)) - skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT - if skip_optional_security_settings: + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes)) + if security_protocol_to_use == SecurityConfig.PLAINTEXT: optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: - # we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1091,32 +1085,26 @@ def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_co kafka_topic_script = self.path.script("kafka-topics.sh", node) return "%s%s %s%s" % \ (optional_jass_krb_system_props_prefix, kafka_topic_script, - bootstrap_server_or_zookeeper, optional_command_config_suffix) + bootstrap_server, optional_command_config_suffix) - def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None): + def kafka_configs_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-configs against a broker, not a KRaft controller") - if force_use_zk_connection: - # kafka-configs supports a TLS config file, so include it if there is one - bootstrap_server_or_zookeeper = "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption()) - skip_optional_security_settings = True - else: - if kafka_security_protocol is None: - # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, - # otherwise use the client security protocol - if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: - security_protocol_to_use = SecurityConfig.PLAINTEXT - else: - security_protocol_to_use = self.security_protocol + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT else: - security_protocol_to_use = kafka_security_protocol - bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) - skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) + skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT if skip_optional_security_settings: optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: - # we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1133,16 +1121,15 @@ def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_c kafka_config_script = self.path.script("kafka-configs.sh", node) return "%s%s %s%s" % \ (optional_jass_krb_system_props_prefix, kafka_config_script, - bootstrap_server_or_zookeeper, optional_command_config_suffix) + bootstrap_server, optional_command_config_suffix) def maybe_setup_broker_scram_credentials(self, node): security_config = self.security_config # we only need to create broker credentials when the broker mechanism is SASL/SCRAM if security_config.is_sasl(self.interbroker_security_protocol) and security_config.is_sasl_scram(self.interbroker_sasl_mechanism): - force_use_zk_connection = True # we are bootstrapping these credentials before Kafka is started cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter --add-config %(mechanism)s=[password=%(password)s]" % { - 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), + 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node), 'user': SecurityConfig.SCRAM_BROKER_USER, 'mechanism': self.interbroker_sasl_mechanism, 'password': SecurityConfig.SCRAM_BROKER_PASSWORD @@ -1153,15 +1140,13 @@ def maybe_setup_client_scram_credentials(self, node): security_config = self.security_config # we only need to create client credentials when the client mechanism is SASL/SCRAM if security_config.is_sasl(self.security_protocol) and security_config.is_sasl_scram(self.client_sasl_mechanism): - force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server_scram() - # ignored if forcing the use of Zookeeper, but we need a value to send, so calculate it anyway if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: kafka_security_protocol = self.interbroker_security_protocol else: kafka_security_protocol = self.security_protocol cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter --add-config %(mechanism)s=[password=%(password)s]" % { - 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection, kafka_security_protocol), + 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, kafka_security_protocol), 'user': SecurityConfig.SCRAM_CLIENT_USER, 'mechanism': self.client_sasl_mechanism, 'password': SecurityConfig.SCRAM_CLIENT_PASSWORD @@ -1228,12 +1213,9 @@ def create_topic(self, topic_cfg, node=None): self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or\ - (topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server()) - cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % { - 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), + 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node), 'topic': topic_cfg.get("topic"), } if 'replica-assignment' in topic_cfg: @@ -1267,11 +1249,9 @@ def delete_topic(self, topic, node=None): node = self.nodes[0] self.logger.info("Deleting topic %s" % topic) - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --delete" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node), topic) self.logger.info("Running topic delete command...\n%s" % cmd) node.account.ssh(cmd) @@ -1302,11 +1282,10 @@ def describe_under_replicated_partitions(self): """ node = self.nodes[0] - force_use_zk_connection = not node.version.topic_command_supports_bootstrap_server() cmd = fix_opts_for_new_jvm(node) cmd += "%s --describe --under-replicated-partitions" % \ - self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection) + self.kafka_topics_cmd_with_optional_security_settings(node) self.logger.debug("Running topic command to describe under-replicated partitions\n%s" % cmd) output = "" @@ -1322,11 +1301,9 @@ def describe_topic(self, topic, node=None, offline_nodes=[]): if node is None: node = self.nodes[0] - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --describe" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection, offline_nodes=offline_nodes), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node, offline_nodes=offline_nodes), topic) self.logger.info("Running topic describe command...\n%s" % cmd) output = "" @@ -1338,10 +1315,8 @@ def list_topics(self, node=None): if node is None: node = self.nodes[0] - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) - cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection)) + cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node)) for line in node.account.ssh_capture(cmd): if not line.startswith("SLF4J"): yield line.rstrip() @@ -1351,38 +1326,30 @@ def alter_message_format(self, topic, msg_format_version, node=None): node = self.nodes[0] self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version) - force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) cmd += "%s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \ - (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version) + (self.kafka_configs_cmd_with_optional_security_settings(node), topic, msg_format_version) self.logger.info("Running alter message format command...\n%s" % cmd) node.account.ssh(cmd) - def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None): + def kafka_acls_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None, override_command_config = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller") - force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server - if force_use_zk_connection: - bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s" % (self.zk_connect_setting()) - skip_optional_security_settings = True - else: - if kafka_security_protocol is None: - # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, - # otherwise use the client security protocol - if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: - security_protocol_to_use = SecurityConfig.PLAINTEXT - else: - security_protocol_to_use = self.security_protocol + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT else: - security_protocol_to_use = kafka_security_protocol - bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) - skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) + skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT if skip_optional_security_settings: optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: - # we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1402,7 +1369,7 @@ def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_conn kafka_acls_script = self.path.script("kafka-acls.sh", node) return "%s%s %s%s" % \ (optional_jass_krb_system_props_prefix, kafka_acls_script, - bootstrap_server_or_authorizer_zk_props, optional_command_config_suffix) + bootstrap_server, optional_command_config_suffix) def run_cli_tool(self, node, cmd): output = "" @@ -1694,11 +1661,9 @@ def topic_id(self, topic): if self.all_nodes_support_topic_ids(): node = self.nodes[0] - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --describe" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node), topic) self.logger.debug( "Querying topic ID by using describe topic command ...\n%s" % cmd diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py index 5bd1a46f59770..8c6f946b10930 100644 --- a/tests/kafkatest/services/security/kafka_acls.py +++ b/tests/kafkatest/services/security/kafka_acls.py @@ -19,13 +19,11 @@ class ACLs: def __init__(self, context): self.context = context - def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_grant = [], security_protocol=None): + def add_cluster_acl(self, kafka, principal, additional_cluster_operations_to_grant = [], security_protocol=None): """ :param kafka: Kafka cluster upon which ClusterAction ACL is created :param principal: principal for which ClusterAction ACL is created :param node: Node to use when determining connection settings - :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available. - This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled :param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required to create SCRAM credentials and topics, respectively :param security_protocol set it to explicitly determine whether we use client or broker credentials, otherwise @@ -37,7 +35,7 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, addit for operation in ['ClusterAction'] + additional_cluster_operations_to_grant: cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s" % { - 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection, security_protocol), + 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol), 'operation': operation, 'principal': principal } @@ -59,7 +57,7 @@ def remove_cluster_acl(self, kafka, principal, additional_cluster_operations_to_ for operation in ['ClusterAction'] + additional_cluster_operations_to_remove: cmd = "%(cmd_prefix)s --remove --force --cluster --operation=%(operation)s --allow-principal=%(principal)s" % { - 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, False, security_protocol), + 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol), 'operation': operation, 'principal': principal } diff --git a/tests/kafkatest/tests/core/authorizer_test.py b/tests/kafkatest/tests/core/authorizer_test.py index 60c0612f356b1..7295ae3ef1aaf 100644 --- a/tests/kafkatest/tests/core/authorizer_test.py +++ b/tests/kafkatest/tests/core/authorizer_test.py @@ -66,7 +66,7 @@ def test_authorizer(self, metadata_quorum, authorizer_class): # alter client quotas node = self.kafka.nodes[0] alter_client_quotas_cmd = "%s --entity-name foo --entity-type clients --alter --add-config consumer_byte_rate=10000" % \ - (self.kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False)) + (self.kafka.kafka_configs_cmd_with_optional_security_settings(node)) alter_client_quotas_cmd_log_msg = "Running alter client quotas command with client/non-broker credentials...\n%s" % alter_client_quotas_cmd self.logger.info(alter_client_quotas_cmd_log_msg) @@ -98,8 +98,9 @@ def test_authorizer(self, metadata_quorum, authorizer_class): # add ACLs self.logger.info("Adding ACLs with broker credentials so that alter client quotas command will succeed") - self.acls.add_cluster_acl(self.kafka, client_principal, force_use_zk_connection=False, - additional_cluster_operations_to_grant=['AlterConfigs'], security_protocol=broker_security_protocol) + self.acls.add_cluster_acl(self.kafka, client_principal, + additional_cluster_operations_to_grant=['AlterConfigs'], + security_protocol=broker_security_protocol) # the alter client quotas command should now succeed again self.logger.info(alter_client_quotas_cmd_log_msg) diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py b/tests/kafkatest/tests/core/controller_mutation_quota_test.py index 98f33deab1f17..96b9abc5c4c24 100644 --- a/tests/kafkatest/tests/core/controller_mutation_quota_test.py +++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py @@ -75,7 +75,7 @@ def test_controller_mutation_quota(self, metadata_quorum): # now apply the mutation quota configs node = self.kafka.nodes[0] alter_mutation_quota_cmd = "%s --entity-type users --entity-default --alter --add-config 'controller_mutation_rate=%f'" % \ - (self.kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False), mutation_rate) + (self.kafka.kafka_configs_cmd_with_optional_security_settings(node), mutation_rate) node.account.ssh(alter_mutation_quota_cmd) # now invoke the same steps with the mutation quota applied to ensure it fails @@ -95,7 +95,7 @@ def mutate_partitions(self, topic_prefix, expected_to_succeed): # alter the partition count node = self.kafka.nodes[0] alter_topic_cmd = "%s --alter --topic %s --partitions %i" % \ - (self.kafka.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection=False), + (self.kafka.kafka_topics_cmd_with_optional_security_settings(node), topic_name, 2 * self.partition_count) node.account.ssh(alter_topic_cmd) From b002dac69bab6d0a870822ce84710cdcc69e580f Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Sun, 16 Mar 2025 21:31:56 +0800 Subject: [PATCH 2/5] fix --- tests/kafkatest/tests/client/quota_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index e89fea80eeeea..60f6a3634e29c 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -78,7 +78,7 @@ def __init__(self, quota_type, override_quota, kafka): def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args): node = kafka.nodes[0] cmd = "%s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \ - (kafka.kafka_configs_cmd_with_optional_security_settings(node, False), producer_byte_rate, consumer_byte_rate) + (kafka.kafka_configs_cmd_with_optional_security_settings(node), producer_byte_rate, consumer_byte_rate) cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1]) if len(entity_args) > 2: cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3]) From 4755c0c6b6970da0448fad0c816a1ad7fd90a735 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Sun, 23 Mar 2025 01:53:00 +0800 Subject: [PATCH 3/5] Revert some changes --- tests/kafkatest/services/kafka/kafka.py | 120 +++++++++--------- tests/kafkatest/tests/client/quota_test.py | 2 +- tests/kafkatest/tests/core/authorizer_test.py | 2 +- .../core/controller_mutation_quota_test.py | 4 +- tests/kafkatest/version.py | 3 - 5 files changed, 65 insertions(+), 66 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index a3751d78988cc..f743886158e00 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1052,23 +1052,29 @@ def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, use_cont kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", node) return "{} {}".format(kafka_metadata_script, bootstrap) - def kafka_topics_cmd_with_optional_security_settings(self, node, kafka_security_protocol=None, offline_nodes=[]): + def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection=False, kafka_security_protocol=None, offline_nodes=[]): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller") - if kafka_security_protocol is None: - # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, - # otherwise use the client security protocol - if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: - security_protocol_to_use = SecurityConfig.PLAINTEXT - else: - security_protocol_to_use = self.security_protocol + if force_use_zk_connection: + bootstrap_server_or_zookeeper = "--zookeeper %s" % (self.zk_connect_setting()) + skip_optional_security_settings = True else: - security_protocol_to_use = kafka_security_protocol - bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes)) - if security_protocol_to_use == SecurityConfig.PLAINTEXT: + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT + else: + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes)) + skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT + if skip_optional_security_settings: optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: + # we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1084,27 +1090,33 @@ def kafka_topics_cmd_with_optional_security_settings(self, node, kafka_security_ optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client = use_inter_broker_mechanism_for_client)) kafka_topic_script = self.path.script("kafka-topics.sh", node) return "%s%s %s%s" % \ - (optional_jass_krb_system_props_prefix, kafka_topic_script, - bootstrap_server, optional_command_config_suffix) + (optional_jass_krb_system_props_prefix, kafka_topic_script, + bootstrap_server_or_zookeeper, optional_command_config_suffix) - def kafka_configs_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None): + def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-configs against a broker, not a KRaft controller") - if kafka_security_protocol is None: - # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, - # otherwise use the client security protocol - if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: - security_protocol_to_use = SecurityConfig.PLAINTEXT - else: - security_protocol_to_use = self.security_protocol + if force_use_zk_connection: + # kafka-configs supports a TLS config file, so include it if there is one + bootstrap_server_or_zookeeper = "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption()) + skip_optional_security_settings = True else: - security_protocol_to_use = kafka_security_protocol - bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) - skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT + else: + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) + skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT if skip_optional_security_settings: optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: + # we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1120,33 +1132,22 @@ def kafka_configs_cmd_with_optional_security_settings(self, node, kafka_security optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client = use_inter_broker_mechanism_for_client)) kafka_config_script = self.path.script("kafka-configs.sh", node) return "%s%s %s%s" % \ - (optional_jass_krb_system_props_prefix, kafka_config_script, - bootstrap_server, optional_command_config_suffix) - - def maybe_setup_broker_scram_credentials(self, node): - security_config = self.security_config - # we only need to create broker credentials when the broker mechanism is SASL/SCRAM - if security_config.is_sasl(self.interbroker_security_protocol) and security_config.is_sasl_scram(self.interbroker_sasl_mechanism): - cmd = fix_opts_for_new_jvm(node) - cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter --add-config %(mechanism)s=[password=%(password)s]" % { - 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node), - 'user': SecurityConfig.SCRAM_BROKER_USER, - 'mechanism': self.interbroker_sasl_mechanism, - 'password': SecurityConfig.SCRAM_BROKER_PASSWORD - } - node.account.ssh(cmd) + (optional_jass_krb_system_props_prefix, kafka_config_script, + bootstrap_server_or_zookeeper, optional_command_config_suffix) def maybe_setup_client_scram_credentials(self, node): security_config = self.security_config # we only need to create client credentials when the client mechanism is SASL/SCRAM if security_config.is_sasl(self.security_protocol) and security_config.is_sasl_scram(self.client_sasl_mechanism): + force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server_scram() + # ignored if forcing the use of Zookeeper, but we need a value to send, so calculate it anyway if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: kafka_security_protocol = self.interbroker_security_protocol else: kafka_security_protocol = self.security_protocol cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter --add-config %(mechanism)s=[password=%(password)s]" % { - 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, kafka_security_protocol), + 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection, kafka_security_protocol), 'user': SecurityConfig.SCRAM_CLIENT_USER, 'mechanism': self.client_sasl_mechanism, 'password': SecurityConfig.SCRAM_CLIENT_PASSWORD @@ -1182,12 +1183,6 @@ def all_nodes_configs_command_uses_bootstrap_server_scram(self): return False return True - def all_nodes_acl_command_supports_bootstrap_server(self): - for node in self.nodes: - if not node.version.acl_command_supports_bootstrap_server(): - return False - return True - def all_nodes_reassign_partitions_command_supports_bootstrap_server(self): for node in self.nodes: if not node.version.reassign_partitions_command_supports_bootstrap_server(): @@ -1205,7 +1200,6 @@ def create_topic(self, topic_cfg, node=None): """Run the admin tool create topic command. Specifying node is optional, and may be done if for different kafka nodes have different versions, and we care where command gets run. - If the node is not specified, run the command from self.nodes[0] """ if node is None: @@ -1213,9 +1207,12 @@ def create_topic(self, topic_cfg, node=None): self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or \ + (topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server()) + cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % { - 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node), + 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection=force_use_zk_connection), 'topic': topic_cfg.get("topic"), } if 'replica-assignment' in topic_cfg: @@ -1227,17 +1224,13 @@ def create_topic(self, topic_cfg, node=None): 'partitions': topic_cfg.get('partitions', 1), 'replication-factor': topic_cfg.get('replication-factor', 1) } - if topic_cfg.get('if-not-exists', False): cmd += ' --if-not-exists' - if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None: for config_name, config_value in topic_cfg["configs"].items(): cmd += " --config %s=%s" % (config_name, str(config_value)) - self.logger.info("Running topic creation command...\n%s" % cmd) node.account.ssh(cmd) - def delete_topic(self, topic, node=None): """ Delete a topic with the topics command @@ -1249,9 +1242,11 @@ def delete_topic(self, topic, node=None): node = self.nodes[0] self.logger.info("Deleting topic %s" % topic) + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --delete" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection=force_use_zk_connection), topic) self.logger.info("Running topic delete command...\n%s" % cmd) node.account.ssh(cmd) @@ -1301,34 +1296,38 @@ def describe_topic(self, topic, node=None, offline_nodes=[]): if node is None: node = self.nodes[0] + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --describe" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, offline_nodes=offline_nodes), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection, offline_nodes=offline_nodes), topic) self.logger.info("Running topic describe command...\n%s" % cmd) output = "" for line in node.account.ssh_capture(cmd): output += line return output - def list_topics(self, node=None): if node is None: node = self.nodes[0] + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) - cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node)) + cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection)) for line in node.account.ssh_capture(cmd): if not line.startswith("SLF4J"): yield line.rstrip() - def alter_message_format(self, topic, msg_format_version, node=None): if node is None: node = self.nodes[0] self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version) + force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) cmd += "%s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \ - (self.kafka_configs_cmd_with_optional_security_settings(node), topic, msg_format_version) + (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version) self.logger.info("Running alter message format command...\n%s" % cmd) node.account.ssh(cmd) @@ -1350,6 +1349,7 @@ def kafka_acls_cmd_with_optional_security_settings(self, node, kafka_security_pr optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: + # we need security configs because we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1661,9 +1661,11 @@ def topic_id(self, topic): if self.all_nodes_support_topic_ids(): node = self.nodes[0] + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --describe" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) self.logger.debug( "Querying topic ID by using describe topic command ...\n%s" % cmd diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 60f6a3634e29c..36d4eca08fdf8 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -78,7 +78,7 @@ def __init__(self, quota_type, override_quota, kafka): def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args): node = kafka.nodes[0] cmd = "%s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \ - (kafka.kafka_configs_cmd_with_optional_security_settings(node), producer_byte_rate, consumer_byte_rate) + (kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False), producer_byte_rate, consumer_byte_rate) cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1]) if len(entity_args) > 2: cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3]) diff --git a/tests/kafkatest/tests/core/authorizer_test.py b/tests/kafkatest/tests/core/authorizer_test.py index 7295ae3ef1aaf..260b16b1f6fe7 100644 --- a/tests/kafkatest/tests/core/authorizer_test.py +++ b/tests/kafkatest/tests/core/authorizer_test.py @@ -66,7 +66,7 @@ def test_authorizer(self, metadata_quorum, authorizer_class): # alter client quotas node = self.kafka.nodes[0] alter_client_quotas_cmd = "%s --entity-name foo --entity-type clients --alter --add-config consumer_byte_rate=10000" % \ - (self.kafka.kafka_configs_cmd_with_optional_security_settings(node)) + (self.kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False)) alter_client_quotas_cmd_log_msg = "Running alter client quotas command with client/non-broker credentials...\n%s" % alter_client_quotas_cmd self.logger.info(alter_client_quotas_cmd_log_msg) diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py b/tests/kafkatest/tests/core/controller_mutation_quota_test.py index 96b9abc5c4c24..98f33deab1f17 100644 --- a/tests/kafkatest/tests/core/controller_mutation_quota_test.py +++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py @@ -75,7 +75,7 @@ def test_controller_mutation_quota(self, metadata_quorum): # now apply the mutation quota configs node = self.kafka.nodes[0] alter_mutation_quota_cmd = "%s --entity-type users --entity-default --alter --add-config 'controller_mutation_rate=%f'" % \ - (self.kafka.kafka_configs_cmd_with_optional_security_settings(node), mutation_rate) + (self.kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False), mutation_rate) node.account.ssh(alter_mutation_quota_cmd) # now invoke the same steps with the mutation quota applied to ensure it fails @@ -95,7 +95,7 @@ def mutate_partitions(self, topic_prefix, expected_to_succeed): # alter the partition count node = self.kafka.nodes[0] alter_topic_cmd = "%s --alter --topic %s --partitions %i" % \ - (self.kafka.kafka_topics_cmd_with_optional_security_settings(node), + (self.kafka.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection=False), topic_name, 2 * self.partition_count) node.account.ssh(alter_topic_cmd) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 31b890c5b6562..e2f6a0bacdb07 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -62,9 +62,6 @@ def _cmp(self, other): return LooseVersion._cmp(self, other) - def acl_command_supports_bootstrap_server(self): - return self >= V_2_1_0 - def topic_command_supports_bootstrap_server(self): return self >= V_2_3_0 From c682db77e49921456c1b896a5349279ecbc32ebd Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Sun, 23 Mar 2025 02:06:18 +0800 Subject: [PATCH 4/5] rever some changes --- tests/kafkatest/services/kafka/kafka.py | 38 +++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index f743886158e00..b833407d7c176 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1052,7 +1052,7 @@ def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, use_cont kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", node) return "{} {}".format(kafka_metadata_script, bootstrap) - def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection=False, kafka_security_protocol=None, offline_nodes=[]): + def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller") if force_use_zk_connection: @@ -1090,8 +1090,8 @@ def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_co optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client = use_inter_broker_mechanism_for_client)) kafka_topic_script = self.path.script("kafka-topics.sh", node) return "%s%s %s%s" % \ - (optional_jass_krb_system_props_prefix, kafka_topic_script, - bootstrap_server_or_zookeeper, optional_command_config_suffix) + (optional_jass_krb_system_props_prefix, kafka_topic_script, + bootstrap_server_or_zookeeper, optional_command_config_suffix) def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: @@ -1132,8 +1132,22 @@ def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_c optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client = use_inter_broker_mechanism_for_client)) kafka_config_script = self.path.script("kafka-configs.sh", node) return "%s%s %s%s" % \ - (optional_jass_krb_system_props_prefix, kafka_config_script, - bootstrap_server_or_zookeeper, optional_command_config_suffix) + (optional_jass_krb_system_props_prefix, kafka_config_script, + bootstrap_server_or_zookeeper, optional_command_config_suffix) + + def maybe_setup_broker_scram_credentials(self, node): + security_config = self.security_config + # we only need to create broker credentials when the broker mechanism is SASL/SCRAM + if security_config.is_sasl(self.interbroker_security_protocol) and security_config.is_sasl_scram(self.interbroker_sasl_mechanism): + force_use_zk_connection = True # we are bootstrapping these credentials before Kafka is started + cmd = fix_opts_for_new_jvm(node) + cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter --add-config %(mechanism)s=[password=%(password)s]" % { + 'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), + 'user': SecurityConfig.SCRAM_BROKER_USER, + 'mechanism': self.interbroker_sasl_mechanism, + 'password': SecurityConfig.SCRAM_BROKER_PASSWORD + } + node.account.ssh(cmd) def maybe_setup_client_scram_credentials(self, node): security_config = self.security_config @@ -1200,6 +1214,7 @@ def create_topic(self, topic_cfg, node=None): """Run the admin tool create topic command. Specifying node is optional, and may be done if for different kafka nodes have different versions, and we care where command gets run. + If the node is not specified, run the command from self.nodes[0] """ if node is None: @@ -1212,7 +1227,7 @@ def create_topic(self, topic_cfg, node=None): cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % { - 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection=force_use_zk_connection), + 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), 'topic': topic_cfg.get("topic"), } if 'replica-assignment' in topic_cfg: @@ -1224,13 +1239,17 @@ def create_topic(self, topic_cfg, node=None): 'partitions': topic_cfg.get('partitions', 1), 'replication-factor': topic_cfg.get('replication-factor', 1) } + if topic_cfg.get('if-not-exists', False): cmd += ' --if-not-exists' + if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None: for config_name, config_value in topic_cfg["configs"].items(): cmd += " --config %s=%s" % (config_name, str(config_value)) + self.logger.info("Running topic creation command...\n%s" % cmd) node.account.ssh(cmd) + def delete_topic(self, topic, node=None): """ Delete a topic with the topics command @@ -1246,7 +1265,7 @@ def delete_topic(self, topic, node=None): cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --delete" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection=force_use_zk_connection), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) self.logger.info("Running topic delete command...\n%s" % cmd) node.account.ssh(cmd) @@ -1277,10 +1296,11 @@ def describe_under_replicated_partitions(self): """ node = self.nodes[0] + force_use_zk_connection = not node.version.topic_command_supports_bootstrap_server() cmd = fix_opts_for_new_jvm(node) cmd += "%s --describe --under-replicated-partitions" % \ - self.kafka_topics_cmd_with_optional_security_settings(node) + self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection) self.logger.debug("Running topic command to describe under-replicated partitions\n%s" % cmd) output = "" @@ -1307,6 +1327,7 @@ def describe_topic(self, topic, node=None, offline_nodes=[]): for line in node.account.ssh_capture(cmd): output += line return output + def list_topics(self, node=None): if node is None: node = self.nodes[0] @@ -1318,6 +1339,7 @@ def list_topics(self, node=None): for line in node.account.ssh_capture(cmd): if not line.startswith("SLF4J"): yield line.rstrip() + def alter_message_format(self, topic, msg_format_version, node=None): if node is None: node = self.nodes[0] From c5e26d38c20507ed42fc8973eb1be667063864c9 Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Sun, 23 Mar 2025 02:18:52 +0800 Subject: [PATCH 5/5] revert spaces --- tests/kafkatest/services/kafka/kafka.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 60291da78d04d..e47144f3cdb8f 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1075,8 +1075,8 @@ def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_co optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client = use_inter_broker_mechanism_for_client)) kafka_topic_script = self.path.script("kafka-topics.sh", node) return "%s%s %s%s" % \ - (optional_jass_krb_system_props_prefix, kafka_topic_script, - bootstrap_server_or_zookeeper, optional_command_config_suffix) + (optional_jass_krb_system_props_prefix, kafka_topic_script, + bootstrap_server_or_zookeeper, optional_command_config_suffix) def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: @@ -1117,8 +1117,8 @@ def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_c optional_command_config_suffix = " --command-config <(echo '%s')" % (self.security_config.client_config(use_inter_broker_mechanism_for_client = use_inter_broker_mechanism_for_client)) kafka_config_script = self.path.script("kafka-configs.sh", node) return "%s%s %s%s" % \ - (optional_jass_krb_system_props_prefix, kafka_config_script, - bootstrap_server_or_zookeeper, optional_command_config_suffix) + (optional_jass_krb_system_props_prefix, kafka_config_script, + bootstrap_server_or_zookeeper, optional_command_config_suffix) def maybe_setup_broker_scram_credentials(self, node): security_config = self.security_config @@ -1207,8 +1207,8 @@ def create_topic(self, topic_cfg, node=None): self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or \ - (topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server()) + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or\ + (topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server()) cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % { @@ -1285,7 +1285,7 @@ def describe_under_replicated_partitions(self): cmd = fix_opts_for_new_jvm(node) cmd += "%s --describe --under-replicated-partitions" % \ - self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection) + self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection) self.logger.debug("Running topic command to describe under-replicated partitions\n%s" % cmd) output = "" @@ -1334,7 +1334,7 @@ def alter_message_format(self, topic, msg_format_version, node=None): cmd = fix_opts_for_new_jvm(node) cmd += "%s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \ - (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version) + (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version) self.logger.info("Running alter message format command...\n%s" % cmd) node.account.ssh(cmd) @@ -1672,7 +1672,7 @@ def topic_id(self, topic): cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --describe" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) + (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) self.logger.debug( "Querying topic ID by using describe topic command ...\n%s" % cmd