Skip to content

Commit da67016

Browse files
committed
feat: support upgrading kafka broker metadata.version with cp-ansible
1 parent 736838c commit da67016

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed

docs/VARIABLES.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,6 +1292,14 @@ Default: "{{kafka_broker_default_log_dir}}"
12921292

12931293
***
12941294

1295+
### kafka_broker_metadata_version
1296+
1297+
Specify the desired metadata.version. Ensures that metadata.version is finalized on the specified version. Valid values are any valid metadata.version, usually tracks the major.minor of the underlying Apache Kafka version, like: 3.7, 3.8, 3.9, etc
1298+
1299+
Default: undefined (not set)
1300+
1301+
***
1302+
12951303
### kafka_broker_schema_validation_enabled
12961304

12971305
Boolean to configure Schema Validation on Kafka

playbooks/kafka_broker.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,14 @@
8484
pause:
8585
prompt: "Press Enter to Proceed to Next Node. Ctrl + C to Abort"
8686
when: kafka_broker_pause_rolling_deployment|bool
87+
88+
- name: Kafka Broker Cluster Metadata Version
89+
hosts: kafka_broker
90+
gather_facts: false
91+
tags: kafka_broker
92+
run_once: true
93+
environment: "{{ proxy_env }}"
94+
tasks:
95+
- include_tasks: roles/kafka_broker/tasks/update_metadata.yml
96+
run_once: true
97+
when: kraft_mode | bool
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
---
2+
- name: Get current Kafka features
3+
ansible.builtin.command: >-
4+
/usr/bin/kafka-features
5+
--bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}}
6+
--command-config {{ kafka_broker.client_config_file }}
7+
describe
8+
register: current_features
9+
failed_when: false
10+
changed_when: false
11+
check_mode: false
12+
when: kafka_broker_metadata_version is defined
13+
14+
- name: Get Kafka feature metadata.version
15+
ansible.builtin.set_fact:
16+
metadata_version_enabled: "{{ 'metadata.version' in current_features.stdout }}"
17+
metadata_versions: >-
18+
{{
19+
dict(current_features.stdout
20+
| regex_search('Feature: metadata.version.*$')
21+
| regex_findall('(SupportedMinVersion|SupportedMaxVersion|FinalizedVersionLevel):\s+(\S+)')
22+
)
23+
}}
24+
when: current_features is defined
25+
26+
# Create a numeric only representation of the metadata_versions.
27+
# Used for comparison, could probably be done in the previous task, or
28+
# a simple function instead, kept open for extension for now.
29+
- name: Numeric metadata.version
30+
ansible.builtin.set_fact:
31+
metadata_versions_numeric: >-
32+
{{ dict(metadata_versions.keys() | zip(metadata_versions.values() | map('regex_replace', '-IV\d+', '') | map('float') ) ) }}
33+
upgrade_to_version: >-
34+
{{ kafka_broker_metadata_version | regex_replace('-IV\d+', '') | float }}
35+
when: metadata_version_enabled | bool
36+
37+
- name: Status of metadata.version
38+
ansible.builtin.debug:
39+
msg:
40+
- "Required version: {{ kafka_broker_metadata_version }} ({{ upgrade_to_version }})"
41+
- "Finalized version: {{ metadata_versions.FinalizedVersionLevel }} ({{ metadata_versions_numeric.FinalizedVersionLevel }})"
42+
- "Supported min version: {{ metadata_versions.SupportedMinVersion }} ({{ metadata_versions_numeric.SupportedMinVersion }})"
43+
- "Supported max version: {{ metadata_versions.SupportedMaxVersion }} ({{ metadata_versions_numeric.SupportedMaxVersion }})"
44+
45+
- name: Upgrade metadata.version if needed
46+
ansible.builtin.command: >-
47+
/usr/bin/kafka-features
48+
--bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}}
49+
--command-config {{ kafka_broker.client_config_file }}
50+
upgrade --metadata {{ kafka_broker_metadata_version }}
51+
register: upgrade_metadata_version
52+
when:
53+
- upgrade_to_version is defined
54+
- upgrade_to_version | regex_search('^\d+\.\d+$')
55+
- (upgrade_to_version | float) >= metadata_versions_numeric.SupportedMinVersion
56+
- (upgrade_to_version | float) <= metadata_versions_numeric.SupportedMaxVersion
57+
- (upgrade_to_version | float) > metadata_versions_numeric.FinalizedVersionLevel
58+
ignore_errors: true
59+
changed_when:
60+
- upgrade_metadata_version.rc == 0
61+
- "'metadata.version was upgraded' in upgrade_metadata_version.stdout"
62+
failed_when:
63+
- upgrade_metadata_version.rc != 0
64+
- "'Unsupported' in upgrade_metadata_version.stderr"

0 commit comments

Comments
 (0)