diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index a55844548ba0a..e47144f3cdb8f 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1182,12 +1182,6 @@ def all_nodes_configs_command_uses_bootstrap_server_scram(self): return False return True - def all_nodes_acl_command_supports_bootstrap_server(self): - for node in self.nodes: - if not node.version.acl_command_supports_bootstrap_server(): - return False - return True - def all_nodes_reassign_partitions_command_supports_bootstrap_server(self): for node in self.nodes: if not node.version.reassign_partitions_command_supports_bootstrap_server(): @@ -1340,34 +1334,29 @@ def alter_message_format(self, topic, msg_format_version, node=None): cmd = fix_opts_for_new_jvm(node) cmd += "%s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \ - (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version) + (self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version) self.logger.info("Running alter message format command...\n%s" % cmd) node.account.ssh(cmd) - def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None): + def kafka_acls_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None, override_command_config = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller") - force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server - if force_use_zk_connection: - bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s" % (self.zk_connect_setting()) - skip_optional_security_settings = True - else: - if kafka_security_protocol is None: - # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, - # otherwise use the client security protocol - if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: - security_protocol_to_use = SecurityConfig.PLAINTEXT - else: - security_protocol_to_use = self.security_protocol + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT else: - security_protocol_to_use = kafka_security_protocol - bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) - skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) + skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT if skip_optional_security_settings: optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: - # we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT + # we need security configs because we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1387,7 +1376,7 @@ def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_conn kafka_acls_script = self.path.script("kafka-acls.sh", node) return "%s%s %s%s" % \ (optional_jass_krb_system_props_prefix, kafka_acls_script, - bootstrap_server_or_authorizer_zk_props, optional_command_config_suffix) + bootstrap_server, optional_command_config_suffix) def run_cli_tool(self, node, cmd): output = "" diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py index 5bd1a46f59770..8c6f946b10930 100644 --- a/tests/kafkatest/services/security/kafka_acls.py +++ b/tests/kafkatest/services/security/kafka_acls.py @@ -19,13 +19,11 @@ class ACLs: def __init__(self, context): self.context = context - def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_grant = [], security_protocol=None): + def add_cluster_acl(self, kafka, principal, additional_cluster_operations_to_grant = [], security_protocol=None): """ :param kafka: Kafka cluster upon which ClusterAction ACL is created :param principal: principal for which ClusterAction ACL is created :param node: Node to use when determining connection settings - :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available. - This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled :param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required to create SCRAM credentials and topics, respectively :param security_protocol set it to explicitly determine whether we use client or broker credentials, otherwise @@ -37,7 +35,7 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, addit for operation in ['ClusterAction'] + additional_cluster_operations_to_grant: cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s" % { - 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection, security_protocol), + 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol), 'operation': operation, 'principal': principal } @@ -59,7 +57,7 @@ def remove_cluster_acl(self, kafka, principal, additional_cluster_operations_to_ for operation in ['ClusterAction'] + additional_cluster_operations_to_remove: cmd = "%(cmd_prefix)s --remove --force --cluster --operation=%(operation)s --allow-principal=%(principal)s" % { - 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, False, security_protocol), + 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol), 'operation': operation, 'principal': principal } diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index e89fea80eeeea..36d4eca08fdf8 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -78,7 +78,7 @@ def __init__(self, quota_type, override_quota, kafka): def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args): node = kafka.nodes[0] cmd = "%s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \ - (kafka.kafka_configs_cmd_with_optional_security_settings(node, False), producer_byte_rate, consumer_byte_rate) + (kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False), producer_byte_rate, consumer_byte_rate) cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1]) if len(entity_args) > 2: cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3]) diff --git a/tests/kafkatest/tests/core/authorizer_test.py b/tests/kafkatest/tests/core/authorizer_test.py index 60c0612f356b1..260b16b1f6fe7 100644 --- a/tests/kafkatest/tests/core/authorizer_test.py +++ b/tests/kafkatest/tests/core/authorizer_test.py @@ -98,8 +98,9 @@ def test_authorizer(self, metadata_quorum, authorizer_class): # add ACLs self.logger.info("Adding ACLs with broker credentials so that alter client quotas command will succeed") - self.acls.add_cluster_acl(self.kafka, client_principal, force_use_zk_connection=False, - additional_cluster_operations_to_grant=['AlterConfigs'], security_protocol=broker_security_protocol) + self.acls.add_cluster_acl(self.kafka, client_principal, + additional_cluster_operations_to_grant=['AlterConfigs'], + security_protocol=broker_security_protocol) # the alter client quotas command should now succeed again self.logger.info(alter_client_quotas_cmd_log_msg) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 31b890c5b6562..e2f6a0bacdb07 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -62,9 +62,6 @@ def _cmp(self, other): return LooseVersion._cmp(self, other) - def acl_command_supports_bootstrap_server(self): - return self >= V_2_1_0 - def topic_command_supports_bootstrap_server(self): return self >= V_2_3_0