Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/VARIABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,14 @@ Default: "{{kafka_broker_default_log_dir}}"

***

### kafka_broker_metadata_version

Specify the desired metadata.version. Ensures that metadata.version is finalized on the specified version. Valid values are any valid metadata.version, usually tracks the major.minor of the underlying Apache Kafka version, like: 3.7, 3.8, 3.9, etc

Default: undefined (not set)

***

### kafka_broker_schema_validation_enabled

Boolean to configure Schema Validation on Kafka
Expand Down
11 changes: 11 additions & 0 deletions playbooks/kafka_broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,14 @@
pause:
prompt: "Press Enter to Proceed to Next Node. Ctrl + C to Abort"
when: kafka_broker_pause_rolling_deployment|bool

- name: Kafka Broker Cluster Metadata Version
hosts: kafka_broker
gather_facts: false
tags: kafka_broker
run_once: true
environment: "{{ proxy_env }}"
tasks:
- include_tasks: roles/kafka_broker/tasks/update_metadata.yml
run_once: true
when: kraft_mode | bool
64 changes: 64 additions & 0 deletions roles/kafka_broker/tasks/update_metadata.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
- name: Get current Kafka features
ansible.builtin.command: >-
/usr/bin/kafka-features
--bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}}
--command-config {{ kafka_broker.client_config_file }}
describe
register: current_features
failed_when: false
changed_when: false
check_mode: false
when: kafka_broker_metadata_version is defined

- name: Get Kafka feature metadata.version
ansible.builtin.set_fact:
metadata_version_enabled: "{{ 'metadata.version' in current_features.stdout }}"
metadata_versions: >-
{{
dict(current_features.stdout
| regex_search('Feature: metadata.version.*$')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| regex_search('Feature: metadata.version.*$')
| regex_search('Feature: metadata.version.*$', multiline=true)

Needed for CP 8, where the line we are looking for is no longer the last line in the output.

| regex_findall('(SupportedMinVersion|SupportedMaxVersion|FinalizedVersionLevel):\s+(\S+)')
)
}}
when: current_features is defined

# Create a numeric only representation of the metadata_versions.
# Used for comparison, could probably be done in the previous task, or
# a simple function instead, kept open for extension for now.
- name: Numeric metadata.version
ansible.builtin.set_fact:
metadata_versions_numeric: >-
{{ dict(metadata_versions.keys() | zip(metadata_versions.values() | map('regex_replace', '-IV\d+', '') | map('float') ) ) }}
upgrade_to_version: >-
{{ kafka_broker_metadata_version | regex_replace('-IV\d+', '') | float }}
when: metadata_version_enabled | bool

- name: Status of metadata.version
ansible.builtin.debug:
msg:
- "Required version: {{ kafka_broker_metadata_version }} ({{ upgrade_to_version }})"
- "Finalized version: {{ metadata_versions.FinalizedVersionLevel }} ({{ metadata_versions_numeric.FinalizedVersionLevel }})"
- "Supported min version: {{ metadata_versions.SupportedMinVersion }} ({{ metadata_versions_numeric.SupportedMinVersion }})"
- "Supported max version: {{ metadata_versions.SupportedMaxVersion }} ({{ metadata_versions_numeric.SupportedMaxVersion }})"

- name: Upgrade metadata.version if needed
ansible.builtin.command: >-
/usr/bin/kafka-features
--bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}}
--command-config {{ kafka_broker.client_config_file }}
upgrade --metadata {{ kafka_broker_metadata_version }}
register: upgrade_metadata_version
when:
- upgrade_to_version is defined
- upgrade_to_version | regex_search('^\d+\.\d+$')
- (upgrade_to_version | float) >= metadata_versions_numeric.SupportedMinVersion
- (upgrade_to_version | float) <= metadata_versions_numeric.SupportedMaxVersion
- (upgrade_to_version | float) > metadata_versions_numeric.FinalizedVersionLevel
ignore_errors: true
changed_when:
- upgrade_metadata_version.rc == 0
- "'metadata.version was upgraded' in upgrade_metadata_version.stdout"
failed_when:
- upgrade_metadata_version.rc != 0
- "'Unsupported' in upgrade_metadata_version.stderr"