Skip to content

Commit 6d6f059

Browse files
committed
handle combined nodes
1 parent d36700b commit 6d6f059

File tree

2 files changed

+43
-12
lines changed

2 files changed

+43
-12
lines changed

roles/variables/defaults/main.yml

+18-3
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,22 @@ zookeeper_skip_restarts: "{{ skip_restarts }}"
416416

417417
#### kafka Controller variables ####
418418

419-
### Default controller quorum voters
420-
kafka_controller_quorum_voters: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}"
419+
### set to true to install controller and broker on same nodes
420+
kraft_combined: false
421+
422+
### Default controller quorum voters. Dynamically assigned later if not user provided
423+
kafka_controller_quorum_voters: >-
424+
{%- if kraft_combined -%}
425+
{%- for broker_hostname in groups.kafka_broker|default([]) %}
426+
{%- if loop.index > 1%},{% endif -%}
427+
{{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }}
428+
{%- endfor -%}
429+
{%- else -%}
430+
{%- for controller_hostname in groups.kafka_controller|default([]) -%}
431+
{%- if loop.index > 1%},{% endif -%}
432+
{{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }}
433+
{%- endfor -%}
434+
{%- endif -%}
421435
422436
### Default Kafka config prefix. Only valid to customize when installation_method: archive
423437
kafka_controller_config_prefix: "{{ config_prefix }}/controller"
@@ -569,7 +583,8 @@ kafka_broker_default_listeners: "{
569583
'ssl_enabled': {{ssl_enabled|string|lower}},
570584
'ssl_mutual_auth_enabled': {{ssl_mutual_auth_enabled|string|lower}},
571585
'sasl_protocol': '{{sasl_protocol}}'
572-
}{% endif %}{% endif %}
586+
}{% endif %}{% if kraft_enabled|bool and kraft_combined|bool %},
587+
'controller': {{ kafka_controller_listeners['controller'] }}{% endif %}{% endif %}
573588
}"
574589

575590
### Dictionary to put additional listeners to be configured within Kafka. Each listener must include a 'name' and 'port' key. Optionally they can include the keys 'ssl_enabled', 'ssl_mutual_auth_enabled', and 'sasl_protocol'

roles/variables/vars/main.yml

+25-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_
1414
binary_base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_join) if installation_method == 'archive' else '/usr' }}"
1515

1616
### Runs kafka in Kraft mode if controller is present
17-
kraft_enabled: "{{ true if 'kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0 else false }}"
17+
kraft_enabled: "{{ true if kraft_combined or ('kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0) else false }}"
18+
19+
### One controller node to delegate actions to
20+
kafka_controller_default_host: "{{ ( groups.kafka_broker[0] if kraft_combined else groups.kafka_controller[0] ) if kraft_enabled else 'no controller' }}"
1821

1922
#### Config prefix paths ####
2023
zookeeper_config_prefix_path: "{{ zookeeper_config_prefix.strip('/') }}"
@@ -149,7 +152,7 @@ kafka_controller_properties:
149152
confluent.security.event.logger.exporter.kafka.topic.replicas: "{{audit_logs_destination_bootstrap_servers.split(',')|length if audit_logs_destination_enabled and rbac_enabled else kafka_controller_default_internal_replication_factor}}"
150153
confluent.support.metrics.enable: "true"
151154
confluent.support.customer.id: anonymous
152-
log.dirs: "/var/lib/controller/data"
155+
log.dirs: "{{ '/var/lib/kafka/data' if kraft_combined else '/var/lib/controller/data' }}"
153156
kafka.rest.enable: "{{kafka_controller_rest_proxy_enabled|string|lower}}"
154157
process.roles: controller
155158
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
@@ -198,9 +201,9 @@ kafka_controller_properties:
198201
properties:
199202
sasl.kerberos.service.name: "{{kerberos_kafka_controller_primary}}"
200203
inter_broker_sasl:
201-
enabled: "{{ kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
204+
enabled: "{{ kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
202205
properties:
203-
sasl.mechanism.inter.broker.protocol: "{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
206+
sasl.mechanism.inter.broker.protocol: "{{kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
204207
sr:
205208
enabled: "{{ kafka_broker_schema_validation_enabled and 'schema_registry' in groups }}"
206209
properties:
@@ -340,7 +343,7 @@ kafka_broker_properties:
340343
socket.send.buffer.bytes: 102400
341344
transaction.state.log.min.isr: "{{ [ 2, kafka_broker_default_internal_replication_factor|int ] | min }}"
342345
transaction.state.log.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
343-
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
346+
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items|rejectattr('key', 'equalto', 'controller') %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
344347
confluent.ansible.managed: 'true'
345348
confluent.license.topic: _confluent-command
346349
confluent.license.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
@@ -355,11 +358,24 @@ kafka_broker_properties:
355358
broker_on_controller:
356359
enabled: "{{kraft_enabled|bool}}"
357360
properties:
358-
process.roles: broker
361+
process.roles: "broker{% if kraft_combined %},controller{% endif %}"
359362
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
360-
controller.listener.names: "{{kafka_controller_listeners['controller']['name']}}"
361-
listener.security.protocol.map: "{% for listener in kafka_controller_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}{% endfor %},{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}{% endfor %}"
362-
listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}{% endfor %}"
363+
controller.listener.names: "{{ kafka_controller_listeners['controller']['name'] }}"
364+
listener.security.protocol.map: >-
365+
{%- for listener in kafka_controller_listeners|dict2items -%}
366+
{%- if loop.index > 1%},{% endif -%}
367+
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}
368+
{%- endfor -%}
369+
,
370+
{%- for listener in kafka_broker_listeners|dict2items -%}
371+
{%- if loop.index > 1%},{% endif -%}
372+
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}
373+
{%- endfor -%}
374+
listeners: >-
375+
{%- for listener in kafka_broker_listeners|dict2items -%}
376+
{%- if loop.index > 1 %},{% endif -%}
377+
{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}
378+
{%- endfor -%}
363379
confluent.cluster.link.metadata.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
364380
broker_on_zookeeper:
365381
enabled: "{{not kraft_enabled|bool}}"

0 commit comments

Comments
 (0)