diff --git a/docs/VARIABLES.md b/docs/VARIABLES.md index 22a9f855a3..d79b63e9a7 100644 --- a/docs/VARIABLES.md +++ b/docs/VARIABLES.md @@ -1292,6 +1292,14 @@ Default: "{{kafka_broker_default_log_dir}}" *** +### kafka_broker_metadata_version + +Specify the desired metadata.version. Ensures that metadata.version is finalized on the specified version. Valid values are any valid metadata.version, usually tracks the major.minor of the underlying Apache Kafka version, like: 3.7, 3.8, 3.9, etc + +Default: undefined (not set) + +*** + ### kafka_broker_schema_validation_enabled Boolean to configure Schema Validation on Kafka diff --git a/playbooks/kafka_broker.yml b/playbooks/kafka_broker.yml index 337045fe54..296a2ab48d 100644 --- a/playbooks/kafka_broker.yml +++ b/playbooks/kafka_broker.yml @@ -84,3 +84,14 @@ pause: prompt: "Press Enter to Proceed to Next Node. Ctrl + C to Abort" when: kafka_broker_pause_rolling_deployment|bool + +- name: Kafka Broker Cluster Metadata Version + hosts: kafka_broker + gather_facts: false + tags: kafka_broker + run_once: true + environment: "{{ proxy_env }}" + tasks: + - include_tasks: roles/kafka_broker/tasks/update_metadata.yml + run_once: true + when: kraft_mode | bool diff --git a/roles/kafka_broker/tasks/update_metadata.yml b/roles/kafka_broker/tasks/update_metadata.yml new file mode 100644 index 0000000000..e57b8766bd --- /dev/null +++ b/roles/kafka_broker/tasks/update_metadata.yml @@ -0,0 +1,64 @@ +--- +- name: Get current Kafka features + ansible.builtin.command: >- + /usr/bin/kafka-features + --bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}} + --command-config {{ kafka_broker.client_config_file }} + describe + register: current_features + failed_when: false + changed_when: false + check_mode: false + when: kafka_broker_metadata_version is defined + +- name: Get Kafka feature metadata.version + ansible.builtin.set_fact: + metadata_version_enabled: "{{ 'metadata.version' in current_features.stdout }}" + metadata_versions: >- + {{ + dict(current_features.stdout + | regex_search('Feature: metadata.version.*$') + | regex_findall('(SupportedMinVersion|SupportedMaxVersion|FinalizedVersionLevel):\s+(\S+)') + ) + }} + when: current_features is defined + +# Create a numeric only representation of the metadata_versions. +# Used for comparison, could probably be done in the previous task, or +# a simple function instead, kept open for extension for now. +- name: Numeric metadata.version + ansible.builtin.set_fact: + metadata_versions_numeric: >- + {{ dict(metadata_versions.keys() | zip(metadata_versions.values() | map('regex_replace', '-IV\d+', '') | map('float') ) ) }} + upgrade_to_version: >- + {{ kafka_broker_metadata_version | regex_replace('-IV\d+', '') | float }} + when: metadata_version_enabled | bool + +- name: Status of metadata.version + ansible.builtin.debug: + msg: + - "Required version: {{ kafka_broker_metadata_version }} ({{ upgrade_to_version }})" + - "Finalized version: {{ metadata_versions.FinalizedVersionLevel }} ({{ metadata_versions_numeric.FinalizedVersionLevel }})" + - "Supported min version: {{ metadata_versions.SupportedMinVersion }} ({{ metadata_versions_numeric.SupportedMinVersion }})" + - "Supported max version: {{ metadata_versions.SupportedMaxVersion }} ({{ metadata_versions_numeric.SupportedMaxVersion }})" + +- name: Upgrade metadata.version if needed + ansible.builtin.command: >- + /usr/bin/kafka-features + --bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}} + --command-config {{ kafka_broker.client_config_file }} + upgrade --metadata {{ kafka_broker_metadata_version }} + register: upgrade_metadata_version + when: + - upgrade_to_version is defined + - upgrade_to_version | regex_search('^\d+\.\d+$') + - (upgrade_to_version | float) >= metadata_versions_numeric.SupportedMinVersion + - (upgrade_to_version | float) <= metadata_versions_numeric.SupportedMaxVersion + - (upgrade_to_version | float) > metadata_versions_numeric.FinalizedVersionLevel + ignore_errors: true + changed_when: + - upgrade_metadata_version.rc == 0 + - "'metadata.version was upgraded' in upgrade_metadata_version.stdout" + failed_when: + - upgrade_metadata_version.rc != 0 + - "'Unsupported' in upgrade_metadata_version.stderr"