Skip to content

Commit 6884638

Browse files
committed
add combined kraft mode
Solves issue confluentinc#1510 Was confluentinc#1511
1 parent baa0027 commit 6884638

File tree

5 files changed

+199
-19
lines changed

5 files changed

+199
-19
lines changed

docs/VARIABLES.md

+20-1
Original file line numberDiff line numberDiff line change
@@ -940,11 +940,30 @@ Default: 10
940940

941941
***
942942

943+
### kraft_combined
944+
945+
Boolean used to declare broker nodes as controller (combined mode). Do not use in production environment
946+
947+
Default: false
948+
949+
***
950+
943951
### kafka_controller_quorum_voters
944952

945953
Default controller quorum voters
946954

947-
Default: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}"
955+
Default: "
956+
{%- if kraft_combined -%}
957+
{%- for broker_hostname in groups.kafka_broker|default([]) %}
958+
{%- if loop.index > 1%},{% endif -%}
959+
{{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }}
960+
{%- endfor -%}
961+
{%- else -%}
962+
{%- for controller_hostname in groups.kafka_controller|default([]) -%}
963+
{%- if loop.index > 1%},{% endif -%}
964+
{{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }}
965+
{%- endfor -%}
966+
{%- endif -%}"
948967

949968
***
950969

Original file line numberDiff line numberDiff line change
@@ -1,12 +1,93 @@
11
---
2+
- name: Prepare SCRAM Users if needed
3+
set_fact:
4+
scram_users_to_create: []
5+
6+
# with kraft combined mode, first install have to define clusterid, instead of getting it from dedicated controllers
7+
- name: Check meta.properties
8+
run_once: true
9+
when: kraft_combined
10+
ansible.builtin.stat:
11+
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
12+
register: meta_properties
13+
14+
- name: Initialize ClusterId
15+
when:
16+
- kraft_combined
17+
- not meta_properties.stat.exists
18+
run_once: true
19+
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid"
20+
environment:
21+
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions"
22+
register: random_uuid
23+
24+
- name: Set ClusterId
25+
when:
26+
- kraft_combined
27+
- not meta_properties.stat.exists
28+
run_once: true
29+
set_fact:
30+
clusterid: "{{ random_uuid.stdout }}"
31+
delegate_to: "{{ item }}"
32+
delegate_facts: true
33+
loop: "{{ groups.kafka_broker }}"
34+
35+
## and initialize temporary controller admin user
36+
- name: Prepare SCRAM 512 admin user
37+
when:
38+
- kraft_combined
39+
- "'SCRAM-SHA-512' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms"
40+
set_fact:
41+
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"
42+
43+
- name: Prepare SCRAM 256 admin user
44+
when:
45+
- kraft_combined
46+
- "'SCRAM-SHA-256' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms"
47+
set_fact:
48+
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"
49+
50+
# after first install in combined mode, get clusterid from one broker node
251
- name: Extract ClusterId from meta.properties on KRaft Controller
52+
when:
53+
- kraft_combined
54+
- meta_properties.stat.exists
55+
run_once: true
56+
slurp:
57+
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
58+
register: uuid_broker
59+
60+
- name: Set ClusterId
61+
when:
62+
- kraft_combined
63+
- meta_properties.stat.exists
64+
run_once: true
65+
set_fact:
66+
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
67+
delegate_to: "{{ item }}"
68+
delegate_facts: true
69+
loop: "{{ groups.kafka_broker }}"
70+
71+
# with dedicated controller nodes, clusterid is already defined onto controller nodes
72+
- name: Extract ClusterId from meta.properties on KRaft Controller
73+
when: not kraft_combined
74+
run_once: true
375
slurp:
476
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
577
delegate_to: "{{ groups.kafka_controller[0] }}"
678
register: uuid_broker
779

80+
- name: Set ClusterId
81+
when: not kraft_combined
82+
run_once: true
83+
set_fact:
84+
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
85+
delegate_to: "{{ item }}"
86+
delegate_facts: true
87+
loop: "{{ groups.kafka_broker }}"
88+
889
- name: Format Storage Directory
9-
shell: "{{ binary_base_path }}/bin/kafka-storage format -t={{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted"
90+
shell: "{{ binary_base_path }}/bin/kafka-storage format -t={{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}"
1091
register: format_meta
1192
vars:
1293
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,68 @@
11
---
2+
23
- name: Check if Data Directories are Formatted
34
shell: "{{ binary_base_path }}/bin/kafka-storage info -c {{ kafka_controller.config_file }}"
45
ignore_errors: true
56
failed_when: false
67
register: formatted
78

8-
- name: Get ClusterId
9+
- name: Prepare SCRAM Users if needed
10+
set_fact:
11+
scram_users_to_create: []
12+
13+
- name: Check meta.properties
14+
run_once: true
15+
ansible.builtin.stat:
16+
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
17+
register: meta_properties
18+
19+
# if meta.properties does not exists , create uuid
20+
- name: Initialize ClusterId
21+
when: not meta_properties.stat.exists
22+
run_once: true
923
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid"
1024
environment:
1125
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions"
12-
register: uuid_key
26+
register: random_uuid
27+
28+
- name: Set ClusterId
29+
when: not meta_properties.stat.exists
30+
run_once: true
31+
set_fact:
32+
clusterid: "{{ random_uuid.stdout }}"
33+
delegate_to: "{{ item }}"
34+
delegate_facts: true
35+
loop: "{{ groups['kafka_controller'] }}"
36+
37+
## and initialize temporary controller admin user
38+
- name: Prepare SCRAM 512 admin user
39+
when:
40+
- "'SCRAM-SHA-512' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms"
41+
set_fact:
42+
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"
43+
44+
- name: Prepare SCRAM 256 admin user
45+
when:
46+
- "'SCRAM-SHA-256' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms"
47+
set_fact:
48+
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"
49+
50+
# else, extract it from meta.properties
51+
- name: Extract ClusterId from meta.properties
52+
when: meta_properties.stat.exists
53+
run_once: true
54+
slurp:
55+
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
56+
register: uuid_broker
57+
58+
- name: Set ClusterId
59+
when: meta_properties.stat.exists and not kraft_migration|bool
1360
run_once: true
14-
when: not kraft_migration|bool
61+
set_fact:
62+
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
63+
delegate_to: "{{ item }}"
64+
delegate_facts: true
65+
loop: "{{ groups['kafka_controller'] }}"
1566

1667
- name: Extract ClusterId from meta.properties on ZK Broker
1768
slurp:
@@ -21,8 +72,8 @@
2172
when: kraft_migration|bool
2273

2374
- name: Format Data Directory
24-
shell: "{{ binary_base_path }}/bin/kafka-storage format -t={{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted"
75+
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}"
2576
register: format_meta
2677
vars:
27-
clusterid: "{{ (zoo_cluster['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] if kraft_migration|bool else uuid_key.stdout }}"
28-
when: formatted.rc == 1 # To trigger the command only when the directories are not formatted
78+
clusterid: "{{ (zoo_cluster['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] if kraft_migration|bool else clusterid }}"
79+
when: kraft_migration|bool or formatted.rc == 1 # To trigger the command only during migration or when the directories are not still formatted

roles/variables/defaults/main.yml

+18-3
Original file line numberDiff line numberDiff line change
@@ -422,8 +422,22 @@ kraft_migration: false
422422
### Parameter to increase the number of retries for Metadata Migration API request
423423
metadata_migration_retries: 10
424424

425-
### Default controller quorum voters
426-
kafka_controller_quorum_voters: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}"
425+
### set to true to install controller and broker on same nodes
426+
kraft_combined: false
427+
428+
### Default controller quorum voters. Dynamically assigned later if not user provided
429+
kafka_controller_quorum_voters: >-
430+
{%- if kraft_combined -%}
431+
{%- for broker_hostname in groups.kafka_broker|default([]) %}
432+
{%- if loop.index > 1%},{% endif -%}
433+
{{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }}
434+
{%- endfor -%}
435+
{%- else -%}
436+
{%- for controller_hostname in groups.kafka_controller|default([]) -%}
437+
{%- if loop.index > 1%},{% endif -%}
438+
{{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }}
439+
{%- endfor -%}
440+
{%- endif -%}
427441
428442
### Default Kafka config prefix. Only valid to customize when installation_method: archive
429443
kafka_controller_config_prefix: "{{ config_prefix }}/controller"
@@ -578,7 +592,8 @@ kafka_broker_default_listeners: "{
578592
'ssl_enabled': {{ssl_enabled|string|lower}},
579593
'ssl_mutual_auth_enabled': {{ssl_mutual_auth_enabled|string|lower}},
580594
'sasl_protocol': '{{sasl_protocol}}'
581-
}{% endif %}{% endif %}
595+
}{% endif %}{% if kraft_enabled|bool and kraft_combined|bool %},
596+
'controller': {{ kafka_controller_listeners['controller'] }}{% endif %}{% endif %}
582597
}"
583598

584599
### Dictionary to put additional listeners to be configured within Kafka. Each listener must include a 'name' and 'port' key. Optionally they can include the keys 'ssl_enabled', 'ssl_mutual_auth_enabled', and 'sasl_protocol'

roles/variables/vars/main.yml

+22-8
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_
1414
binary_base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_join) if installation_method == 'archive' else '/usr' }}"
1515

1616
### Runs kafka in Kraft mode if controller is present
17-
kraft_enabled: "{{ true if 'kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0 else false }}"
17+
kraft_enabled: "{{ true if kraft_combined or ('kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0) else false }}"
1818

1919
#### Config prefix paths ####
2020
zookeeper_config_prefix_path: "{{ zookeeper_config_prefix.strip('/') }}"
@@ -157,7 +157,7 @@ kafka_controller_properties:
157157
confluent.security.event.logger.exporter.kafka.topic.replicas: "{{audit_logs_destination_bootstrap_servers.split(',')|length if audit_logs_destination_enabled and rbac_enabled else kafka_controller_default_internal_replication_factor}}"
158158
confluent.support.metrics.enable: "true"
159159
confluent.support.customer.id: anonymous
160-
log.dirs: "/var/lib/controller/data"
160+
log.dirs: "{{ '/var/lib/kafka/data' if kraft_combined else '/var/lib/controller/data' }}"
161161
kafka.rest.enable: "{{kafka_controller_rest_proxy_enabled|string|lower}}"
162162
process.roles: controller
163163
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
@@ -234,9 +234,9 @@ kafka_controller_properties:
234234
properties:
235235
sasl.kerberos.service.name: "{{kerberos_kafka_controller_primary}}"
236236
inter_broker_sasl:
237-
enabled: "{{ kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
237+
enabled: "{{ kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
238238
properties:
239-
sasl.mechanism.inter.broker.protocol: "{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
239+
sasl.mechanism.inter.broker.protocol: "{{kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
240240
sr:
241241
enabled: "{{ kafka_broker_schema_validation_enabled and 'schema_registry' in groups }}"
242242
properties:
@@ -385,7 +385,7 @@ kafka_broker_properties:
385385
socket.send.buffer.bytes: 102400
386386
transaction.state.log.min.isr: "{{ [ 2, kafka_broker_default_internal_replication_factor|int ] | min }}"
387387
transaction.state.log.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
388-
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
388+
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items|rejectattr('key', 'equalto', 'controller') %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
389389
confluent.ansible.managed: 'true'
390390
confluent.license.topic: _confluent-command
391391
confluent.license.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
@@ -413,10 +413,24 @@ kafka_broker_properties:
413413
broker_on_controller:
414414
enabled: "{{kraft_enabled|bool}}"
415415
properties:
416+
process.roles: "broker{% if kraft_combined %},controller{% endif %}"
416417
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
417-
controller.listener.names: "{{kafka_controller_listeners['controller']['name']}}"
418-
listener.security.protocol.map: "{% for listener in kafka_controller_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}{% endfor %},{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}{% endfor %}"
419-
listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}{% endfor %}"
418+
controller.listener.names: "{{ kafka_controller_listeners['controller']['name'] }}"
419+
listener.security.protocol.map: >-
420+
{%- for listener in kafka_controller_listeners|dict2items -%}
421+
{%- if loop.index > 1%},{% endif -%}
422+
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}
423+
{%- endfor -%}
424+
,
425+
{%- for listener in kafka_broker_listeners|dict2items -%}
426+
{%- if loop.index > 1%},{% endif -%}
427+
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}
428+
{%- endfor -%}
429+
listeners: >-
430+
{%- for listener in kafka_broker_listeners|dict2items -%}
431+
{%- if loop.index > 1 %},{% endif -%}
432+
{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}
433+
{%- endfor -%}
420434
broker_on_zookeeper:
421435
enabled: "{{not kraft_enabled|bool}}"
422436
properties:

0 commit comments

Comments
 (0)