Skip to content

KAFKA-17715: Remove argument force_use_zk_connection from e2e #19209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
125 changes: 45 additions & 80 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,29 +1052,23 @@ def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, use_cont
kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", node)
return "{} {}".format(kafka_metadata_script, bootstrap)

def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
def kafka_topics_cmd_with_optional_security_settings(self, node, kafka_security_protocol=None, offline_nodes=[]):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller")
if force_use_zk_connection:
bootstrap_server_or_zookeeper = "--zookeeper %s" % (self.zk_connect_setting())
skip_optional_security_settings = True
else:
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = self.security_protocol
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
if skip_optional_security_settings:
security_protocol_to_use = self.security_protocol
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use, offline_nodes=offline_nodes))
if security_protocol_to_use == SecurityConfig.PLAINTEXT:
optional_jass_krb_system_props_prefix = ""
optional_command_config_suffix = ""
else:
# we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT
if (security_protocol_to_use == self.interbroker_security_protocol):
# configure JAAS to provide the broker's credentials
# since this is an authenticating cluster and we are going to use the inter-broker security protocol
Expand All @@ -1091,32 +1085,26 @@ def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_co
kafka_topic_script = self.path.script("kafka-topics.sh", node)
return "%s%s %s%s" % \
(optional_jass_krb_system_props_prefix, kafka_topic_script,
bootstrap_server_or_zookeeper, optional_command_config_suffix)
bootstrap_server, optional_command_config_suffix)

def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None):
def kafka_configs_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-configs against a broker, not a KRaft controller")
if force_use_zk_connection:
# kafka-configs supports a TLS config file, so include it if there is one
bootstrap_server_or_zookeeper = "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
skip_optional_security_settings = True
else:
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = self.security_protocol
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server_or_zookeeper = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
security_protocol_to_use = self.security_protocol
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
if skip_optional_security_settings:
optional_jass_krb_system_props_prefix = ""
optional_command_config_suffix = ""
else:
# we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT
if (security_protocol_to_use == self.interbroker_security_protocol):
# configure JAAS to provide the broker's credentials
# since this is an authenticating cluster and we are going to use the inter-broker security protocol
Expand All @@ -1133,16 +1121,15 @@ def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_c
kafka_config_script = self.path.script("kafka-configs.sh", node)
return "%s%s %s%s" % \
(optional_jass_krb_system_props_prefix, kafka_config_script,
bootstrap_server_or_zookeeper, optional_command_config_suffix)
bootstrap_server, optional_command_config_suffix)

def maybe_setup_broker_scram_credentials(self, node):
security_config = self.security_config
# we only need to create broker credentials when the broker mechanism is SASL/SCRAM
if security_config.is_sasl(self.interbroker_security_protocol) and security_config.is_sasl_scram(self.interbroker_sasl_mechanism):
force_use_zk_connection = True # we are bootstrapping these credentials before Kafka is started
cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter --add-config %(mechanism)s=[password=%(password)s]" % {
'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection),
'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node),
'user': SecurityConfig.SCRAM_BROKER_USER,
'mechanism': self.interbroker_sasl_mechanism,
'password': SecurityConfig.SCRAM_BROKER_PASSWORD
Expand All @@ -1153,15 +1140,13 @@ def maybe_setup_client_scram_credentials(self, node):
security_config = self.security_config
# we only need to create client credentials when the client mechanism is SASL/SCRAM
if security_config.is_sasl(self.security_protocol) and security_config.is_sasl_scram(self.client_sasl_mechanism):
force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server_scram()
# ignored if forcing the use of Zookeeper, but we need a value to send, so calculate it anyway
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
kafka_security_protocol = self.interbroker_security_protocol
else:
kafka_security_protocol = self.security_protocol
cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type users --alter --add-config %(mechanism)s=[password=%(password)s]" % {
'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection, kafka_security_protocol),
'kafka_configs_cmd': self.kafka_configs_cmd_with_optional_security_settings(node, kafka_security_protocol),
'user': SecurityConfig.SCRAM_CLIENT_USER,
'mechanism': self.client_sasl_mechanism,
'password': SecurityConfig.SCRAM_CLIENT_PASSWORD
Expand Down Expand Up @@ -1228,12 +1213,9 @@ def create_topic(self, topic_cfg, node=None):
self.logger.info("Creating topic %s with settings %s",
topic_cfg["topic"], topic_cfg)

force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or\
(topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server())

cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % {
'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection),
'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node),
'topic': topic_cfg.get("topic"),
}
if 'replica-assignment' in topic_cfg:
Expand Down Expand Up @@ -1267,11 +1249,9 @@ def delete_topic(self, topic, node=None):
node = self.nodes[0]
self.logger.info("Deleting topic %s" % topic)

force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --topic %s --delete" % \
(self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic)
(self.kafka_topics_cmd_with_optional_security_settings(node), topic)
self.logger.info("Running topic delete command...\n%s" % cmd)
node.account.ssh(cmd)

Expand Down Expand Up @@ -1302,11 +1282,10 @@ def describe_under_replicated_partitions(self):
"""

node = self.nodes[0]
force_use_zk_connection = not node.version.topic_command_supports_bootstrap_server()

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --describe --under-replicated-partitions" % \
self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection)
self.kafka_topics_cmd_with_optional_security_settings(node)

self.logger.debug("Running topic command to describe under-replicated partitions\n%s" % cmd)
output = ""
Expand All @@ -1322,11 +1301,9 @@ def describe_topic(self, topic, node=None, offline_nodes=[]):
if node is None:
node = self.nodes[0]

force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --topic %s --describe" % \
(self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection, offline_nodes=offline_nodes), topic)
(self.kafka_topics_cmd_with_optional_security_settings(node, offline_nodes=offline_nodes), topic)

self.logger.info("Running topic describe command...\n%s" % cmd)
output = ""
Expand All @@ -1338,10 +1315,8 @@ def list_topics(self, node=None):
if node is None:
node = self.nodes[0]

force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection))
cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node))
for line in node.account.ssh_capture(cmd):
if not line.startswith("SLF4J"):
yield line.rstrip()
Expand All @@ -1351,38 +1326,30 @@ def alter_message_format(self, topic, msg_format_version, node=None):
node = self.nodes[0]
self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)

force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server()

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
(self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, msg_format_version)
(self.kafka_configs_cmd_with_optional_security_settings(node), topic, msg_format_version)
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)

def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None):
def kafka_acls_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None, override_command_config = None):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller")
force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server
if force_use_zk_connection:
bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s" % (self.zk_connect_setting())
skip_optional_security_settings = True
else:
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = self.security_protocol
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
security_protocol_to_use = self.security_protocol
else:
security_protocol_to_use = kafka_security_protocol
bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT
if skip_optional_security_settings:
optional_jass_krb_system_props_prefix = ""
optional_command_config_suffix = ""
else:
# we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT
if (security_protocol_to_use == self.interbroker_security_protocol):
# configure JAAS to provide the broker's credentials
# since this is an authenticating cluster and we are going to use the inter-broker security protocol
Expand All @@ -1402,7 +1369,7 @@ def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_conn
kafka_acls_script = self.path.script("kafka-acls.sh", node)
return "%s%s %s%s" % \
(optional_jass_krb_system_props_prefix, kafka_acls_script,
bootstrap_server_or_authorizer_zk_props, optional_command_config_suffix)
bootstrap_server, optional_command_config_suffix)

def run_cli_tool(self, node, cmd):
output = ""
Expand Down Expand Up @@ -1694,11 +1661,9 @@ def topic_id(self, topic):
if self.all_nodes_support_topic_ids():
node = self.nodes[0]

force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()

cmd = fix_opts_for_new_jvm(node)
cmd += "%s --topic %s --describe" % \
(self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic)
(self.kafka_topics_cmd_with_optional_security_settings(node), topic)

self.logger.debug(
"Querying topic ID by using describe topic command ...\n%s" % cmd
Expand Down
8 changes: 3 additions & 5 deletions tests/kafkatest/services/security/kafka_acls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ class ACLs:
def __init__(self, context):
self.context = context

def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_grant = [], security_protocol=None):
def add_cluster_acl(self, kafka, principal, additional_cluster_operations_to_grant = [], security_protocol=None):
"""
:param kafka: Kafka cluster upon which ClusterAction ACL is created
:param principal: principal for which ClusterAction ACL is created
:param node: Node to use when determining connection settings
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
:param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required
to create SCRAM credentials and topics, respectively
:param security_protocol set it to explicitly determine whether we use client or broker credentials, otherwise
Expand All @@ -37,7 +35,7 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, addit

for operation in ['ClusterAction'] + additional_cluster_operations_to_grant:
cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s" % {
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection, security_protocol),
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
'operation': operation,
'principal': principal
}
Expand All @@ -59,7 +57,7 @@ def remove_cluster_acl(self, kafka, principal, additional_cluster_operations_to_

for operation in ['ClusterAction'] + additional_cluster_operations_to_remove:
cmd = "%(cmd_prefix)s --remove --force --cluster --operation=%(operation)s --allow-principal=%(principal)s" % {
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, False, security_protocol),
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
'operation': operation,
'principal': principal
}
Expand Down
Loading