Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 2 additions & 26 deletions roles/kafka_connect/tasks/deploy_connectors.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
---
- name: Register Kafka Connect Subgroups
set_fact:
subgroups: "{{ ((subgroups | default([])) + hostvars[item].group_names) | difference('kafka_broker, ksql, kafka_connect, kafka_rest, kerberos, ksql, schema_registry, kafka_broker_parallel, ksql_parallel, kafka_connect_parallel, kafka_rest_parallel, kerberos_parallel, ksql_parallel, schema_registry_parallel, kafka_controller, kafka_controller_parallel') }}"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: we have investigated and this seems to be not working as it is expected to work. The difference() function expects an (array, array) as input, but here it consumes (array, string). It doesn't fail, because it implicitly converts str to array of characters.

with_items: "{{groups['kafka_connect']}}"

- name: Extract Connect Principal from cert
import_role:
name: common
Expand All @@ -16,7 +11,6 @@
- rbac_enabled|bool
- mds_ssl_client_authentication != 'none'
- not ansible_check_mode
- kafka_connect_connectors is defined

- name: Add Role Bindings for Connect mTLS User
include_tasks: rbac_connectors.yml
Expand All @@ -26,7 +20,6 @@
- rbac_enabled|bool
- mds_ssl_client_authentication != 'none'
- not ansible_check_mode
- kafka_connect_connectors is defined

- name: Add Role Bindings for Connect OAuth
include_tasks: rbac_connectors.yml
Expand All @@ -36,7 +29,6 @@
- rbac_enabled|bool
- oauth_enabled|bool
- not ansible_check_mode
- kafka_connect_connectors is defined

- name: Add Role Bindings for Connect Ldap
include_tasks: rbac_connectors.yml
Expand All @@ -46,7 +38,6 @@
- rbac_enabled|bool
- "'ldap' in auth_mode"
- not ansible_check_mode
- kafka_connect_connectors is defined

- set_fact:
certs_chain: "{{ssl_file_dir_final}}/{{ kafka_connect_service_name if kafka_connect_service_name != kafka_connect_default_service_name else 'kafka_connect' }}.chain"
Expand Down Expand Up @@ -74,7 +65,7 @@
oauth_client_assertion_template_file: "{{ kafka_connect_oauth_client_assertion_template_file_dest_path }}"
when: rbac_enabled or kafka_connect_oauth_enabled

- name: Register connector configs and remove deleted connectors for single cluster
- name: Register connector configs and remove deleted connectors for a connect cluster
confluent.platform.kafka_connectors:
connect_url: "{{kafka_connect_http_protocol}}://{{ hostvars[inventory_hostname] | confluent.platform.resolve_and_format_hostname }}:{{kafka_connect_rest_port}}/connectors"
active_connectors: "{{ kafka_connect_connectors }}"
Expand All @@ -83,19 +74,4 @@
client_cert: "{% if (ssl_provided_keystore_and_truststore and ssl_mutual_auth_enabled) %}{{kafka_connect_cert_path}}{% elif ssl_mutual_auth_enabled %}{{certs_chain}}{% else %}{{none}}{% endif %}"
client_key: "{% if ssl_mutual_auth_enabled %}{{kafka_connect_key_path}}{% else %}{{none}}{% endif %}"
when:
- kafka_connect_connectors is defined
- subgroups|length == 0
run_once: true

- name: Register connector configs and remove deleted connectors for Multiple Clusters
confluent.platform.kafka_connectors:
connect_url: "http{% if hostvars[groups[item][0]].kafka_connect_ssl_enabled|default(kafka_connect_ssl_enabled) %}s{% endif %}://{{ hostvars[groups[item][0]] | confluent.platform.resolve_and_format_hostname }}:{{ hostvars[groups[item][0]].kafka_connect_rest_port|default(kafka_connect_rest_port) }}/connectors"
active_connectors: "{{ hostvars[groups[item][0]].kafka_connect_connectors }}"
timeout: "{{ kafka_connect_deploy_connector_timeout }}"
token: "{% if rbac_enabled or kafka_connect_oauth_enabled %}{{ authorization_token }}{% else %}{{none}}{% endif %}"
client_cert: "{% if (ssl_provided_keystore_and_truststore and hostvars[groups[item][0]].kafka_connect_ssl_mutual_auth_enabled|default(kafka_connect_ssl_mutual_auth_enabled)) %}{{hostvars[groups[item][0]].kafka_connect_cert_path|default(kafka_connect_cert_path)}}{% elif hostvars[groups[item][0]].kafka_connect_ssl_mutual_auth_enabled|default(kafka_connect_ssl_mutual_auth_enabled) %}{{certs_chain}}{% else %}{{none}}{% endif %}"
client_key: "{% if hostvars[groups[item][0]].kafka_connect_ssl_mutual_auth_enabled|default(kafka_connect_ssl_mutual_auth_enabled) %}{{hostvars[groups[item][0]].kafka_connect_key_path|default(kafka_connect_key_path)}}{% else %}{{none}}{% endif %}"
when: hostvars[groups[item][0]].kafka_connect_connectors is defined
delegate_to: "{{ groups[item][0] }}"
loop: "{{subgroups}}"
run_once: true
- inventory_hostname == ansible_play_hosts_all[0] # solves serial mode issue
37 changes: 9 additions & 28 deletions roles/kafka_connect/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -448,40 +448,21 @@
- not ansible_check_mode
tags: health_check

# Set a host fact with the direct cluster parent group
# Host of the same cluster have the same group.id
- name: Set parent Cluster
vars:
keywords:
- kafka_connect
- kafka_connect_parallel
- kafka_connect_serial
- kafka_broker
- kafka_broker_parallel
- kafka_broker_serial
- kafka_rest
- kafka_rest_parallel
- kafka_rest_serial
- control_center_next_gen
- control_center_next_gen_parallel
- control_center_next_gen_serial
- ksql
- ksql_parallel
- ksql_serial
- schema_registry
- kafka_controller
- kafka_controller_parallel
- kafka_controller_serial
set_fact:
parent_kafka_connect_cluster_group: "{{ (group_names | difference(keywords))[0] | default('kafka_connect') }}"
parent_kafka_connect_cluster_id: "{{ kafka_connect_final_properties['group.id'] }}"

# TODO needs to be ran over kafka-connect subgroups (i.e. multiple kafka-connect clusters in same inventory)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to discuss with someone about how to implement this subgroup in this or higher level.

- name: Register Cluster
include_tasks: register_cluster.yml
when:
- rbac_enabled
- inventory_hostname == ansible_play_hosts_all[0] # for serial mode playbook
run_once: true

# TODO needs to be ran over kafka-connect subgroups (i.e. multiple kafka-connect clusters in same inventory)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to discuss with someone about how to implement this subgroup in this or higher level.

- name: Deploy Connectors
include_tasks: deploy_connectors.yml
when:
- kafka_connect_connectors is defined
- inventory_hostname == ansible_play_hosts_all[0] # for serial mode playbook
run_once: true

- name: Delete temporary keys/certs when keystore and trustore is provided
file:
Expand Down
40 changes: 13 additions & 27 deletions roles/kafka_connect/tasks/register_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import_role:
name: common
tasks_from: rbac_setup.yml
when: rbac_enabled
vars:
copy_certs: false
delegated_to_localhost: "{{ external_mds_enabled|bool }}"
Expand All @@ -25,17 +24,22 @@
# For single cluster delegating this task to broker as in case of cert based token retrival only broker can get a super user token using its certs
# For Centralized MDS other clusters can't delegate to MDS cluster as that cluster may not have permissions to ssh into MDS, thus delegate to localhost

- name: Fetch Kafka Connect Cluster Groups
- name: Lookup cluster hosts
vars:
- connect_cluster_workers: []
set_fact:
active_kafka_connect_groups: "{{ (((active_kafka_connect_groups | default([])) + hostvars[item].group_names) | difference('kafka_connect'+'kafka_connect_parallel'+'kafka_connect_serial'+ 'kafka_broker'+ 'kafka_broker_parallel'+'kafka_broker_serial'+'ksql'+ 'ksql_parallel'+'ksql_serial'+'control_center_next_gen'+'control_center_next_gen_parallel'+'control_center_next_gen_serial'+'schema_registry'+'kafka_rest'+'kafka_rest_parallel'+'kafka_rest_serial'+'kafka_controller'+'kafka_controller_parallel'+'kafka_controller_serial')) | default(['kafka_connect'], true) }}"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: original code has same issue as explained in https://github.com/confluentinc/cp-ansible/pull/2161/files#r2185136443 .

with_items: "{{ ansible_play_hosts }}"
connect_cluster_workers: "{{ connect_cluster_workers + [ { 'host': hostvars[item]['ansible_host'], 'port': kafka_connect_rest_port } ] }}"
loop: "{{ groups['kafka_connect'] }}"

- name: Register Kafka Connect Cluster
vars:
cluster_host_delegates: "{{ active_kafka_connect_groups | map('extract', groups, 0)| list }}"
cluster_group: "{{ hostvars[item].parent_kafka_connect_cluster_group }}"
cluster_name: "{{ hostvars[item].kafka_connect_cluster_name }}"
cluster_id: "{{ hostvars[item].parent_kafka_connect_cluster_id }}"
cluster_register_request:
- clusterName: "{{ kafka_connect_cluster_name }}"
protocol: "{{ kafka_connect_http_protocol | upper }}"
scope:
kafka-cluster: "{{ kafka_cluster_id }}"
connect-cluster: "{{ kafka_connect_final_properties['group.id'] }}"
hosts: "{{ connect_cluster_workers }}"
uri:
url: "{{mds_bootstrap_server_urls.split(',')[0]}}/security/1.0/registry/clusters"
method: POST
Expand All @@ -46,27 +50,9 @@
client_cert: "{{ kafka_connect_cert_path if send_client_cert|bool else omit }}"
client_key: "{{ kafka_connect_key_path if send_client_cert|bool else omit }}"
body_format: json
body: >
[
{
"clusterName": "{{cluster_name}}",
"scope": {
"clusters": {
"kafka-cluster": "{{kafka_cluster_id}}",
"connect-cluster": "{{cluster_id}}"
}
},
"hosts": [ {% for inv_host in groups[cluster_group] %}{% if loop.index > 1%},{% endif %}{ "host": "{{hostvars[inv_host]|confluent.platform.resolve_hostname}}", "port": {% if hostvars[inv_host].kafka_connect_rest_port is defined %} {{hostvars[inv_host].kafka_connect_rest_port}} {% else %} {{kafka_connect_rest_port}} {% endif %} }{% endfor %} ],
"protocol": "{{kafka_connect_http_protocol | upper}}"
}
]
body: "{{ cluster_register_request | to_json }}"
status_code: 204
register: output
until: output.status == 204
retries: "{{ mds_retries }}"
delay: 10
when:
- hostvars[item].get("rbac_enabled", false)|bool
- hostvars[item].kafka_connect_cluster_name is defined
- hostvars[item].parent_kafka_connect_cluster_id is defined
with_items: "{{ cluster_host_delegates }}"