Skip to content

KAFKA-17715: Remove argument force_use_zk_connection from e2e #19209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
39 changes: 14 additions & 25 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,12 +1182,6 @@ def all_nodes_configs_command_uses_bootstrap_server_scram(self):
return False
return True

def all_nodes_acl_command_supports_bootstrap_server(self):
for node in self.nodes:
if not node.version.acl_command_supports_bootstrap_server():
return False
return True

def all_nodes_reassign_partitions_command_supports_bootstrap_server(self):
for node in self.nodes:
if not node.version.reassign_partitions_command_supports_bootstrap_server():
Expand Down Expand Up @@ -1340,34 +1334,29 @@ def alter_message_format(self, topic, msg_format_version, node=None):

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
(self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version)
(self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version)
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)

def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None):
def kafka_acls_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None, override_command_config = None):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller")
force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server
if force_use_zk_connection:
bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s" % (self.zk_connect_setting())
skip_optional_security_settings = True
else:
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = self.security_protocol
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
security_protocol_to_use = self.security_protocol
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
if skip_optional_security_settings:
optional_jass_krb_system_props_prefix = ""
optional_command_config_suffix = ""
else:
# we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT
# we need security configs because we aren't using PLAINTEXT
if (security_protocol_to_use == self.interbroker_security_protocol):
# configure JAAS to provide the broker's credentials
# since this is an authenticating cluster and we are going to use the inter-broker security protocol
Expand All @@ -1387,7 +1376,7 @@ def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_conn
kafka_acls_script = self.path.script("kafka-acls.sh", node)
return "%s%s %s%s" % \
(optional_jass_krb_system_props_prefix, kafka_acls_script,
bootstrap_server_or_authorizer_zk_props, optional_command_config_suffix)
bootstrap_server, optional_command_config_suffix)

def run_cli_tool(self, node, cmd):
output = ""
Expand Down
8 changes: 3 additions & 5 deletions tests/kafkatest/services/security/kafka_acls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ class ACLs:
def __init__(self, context):
self.context = context

def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_grant = [], security_protocol=None):
def add_cluster_acl(self, kafka, principal, additional_cluster_operations_to_grant = [], security_protocol=None):
"""
:param kafka: Kafka cluster upon which ClusterAction ACL is created
:param principal: principal for which ClusterAction ACL is created
:param node: Node to use when determining connection settings
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
:param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required
to create SCRAM credentials and topics, respectively
:param security_protocol set it to explicitly determine whether we use client or broker credentials, otherwise
Expand All @@ -37,7 +35,7 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, addit

for operation in ['ClusterAction'] + additional_cluster_operations_to_grant:
cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s" % {
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection, security_protocol),
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
'operation': operation,
'principal': principal
}
Expand All @@ -59,7 +57,7 @@ def remove_cluster_acl(self, kafka, principal, additional_cluster_operations_to_

for operation in ['ClusterAction'] + additional_cluster_operations_to_remove:
cmd = "%(cmd_prefix)s --remove --force --cluster --operation=%(operation)s --allow-principal=%(principal)s" % {
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, False, security_protocol),
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
'operation': operation,
'principal': principal
}
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/client/quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, quota_type, override_quota, kafka):
def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args):
node = kafka.nodes[0]
cmd = "%s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
(kafka.kafka_configs_cmd_with_optional_security_settings(node, False), producer_byte_rate, consumer_byte_rate)
(kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False), producer_byte_rate, consumer_byte_rate)
cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1])
if len(entity_args) > 2:
cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3])
Expand Down
5 changes: 3 additions & 2 deletions tests/kafkatest/tests/core/authorizer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ def test_authorizer(self, metadata_quorum, authorizer_class):

# add ACLs
self.logger.info("Adding ACLs with broker credentials so that alter client quotas command will succeed")
self.acls.add_cluster_acl(self.kafka, client_principal, force_use_zk_connection=False,
additional_cluster_operations_to_grant=['AlterConfigs'], security_protocol=broker_security_protocol)
self.acls.add_cluster_acl(self.kafka, client_principal,
additional_cluster_operations_to_grant=['AlterConfigs'],
security_protocol=broker_security_protocol)

# the alter client quotas command should now succeed again
self.logger.info(alter_client_quotas_cmd_log_msg)
Expand Down
3 changes: 0 additions & 3 deletions tests/kafkatest/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ def _cmp(self, other):

return LooseVersion._cmp(self, other)

def acl_command_supports_bootstrap_server(self):
return self >= V_2_1_0

def topic_command_supports_bootstrap_server(self):
return self >= V_2_3_0

Expand Down