diff --git a/salt/kafka/map.jinja b/salt/kafka/map.jinja index 771e6102b..56f85144a 100644 --- a/salt/kafka/map.jinja +++ b/salt/kafka/map.jinja @@ -6,13 +6,13 @@ {% import_yaml 'kafka/defaults.yaml' as KAFKADEFAULTS %} {% set KAFKAMERGED = salt['pillar.get']('kafka', KAFKADEFAULTS.kafka, merge=True) %} {% from 'vars/globals.map.jinja' import GLOBALS %} +{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES %} {% do KAFKAMERGED.config.server.update({ 'node_x_id': salt['pillar.get']('kafka:nodes:' ~ GLOBALS.hostname ~ ':nodeid')}) %} {% do KAFKAMERGED.config.server.update({'advertised_x_listeners': 'BROKER://' ~ GLOBALS.node_ip ~ ':9092'}) %} -{% set nodes = salt['pillar.get']('kafka:nodes', {}) %} {% set combined = [] %} -{% for hostname, data in nodes.items() %} +{% for hostname, data in COMBINED_KAFKANODES.items() %} {% do combined.append(data.nodeid ~ "@" ~ hostname ~ ":9093") %} {% endfor %} {% set kraft_controller_quorum_voters = ','.join(combined) %} diff --git a/salt/kafka/nodes.map.jinja b/salt/kafka/nodes.map.jinja index a6e36d36a..5d74e9e1c 100644 --- a/salt/kafka/nodes.map.jinja +++ b/salt/kafka/nodes.map.jinja @@ -4,7 +4,8 @@ {% set existing_ids = [] %} {# Check STORED_KAFKANODES for existing kafka nodes and pull their IDs so they are not reused across the grid #} -{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} +{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} #} +{% if STORED_KAFKANODES != none %} {% for node, values in STORED_KAFKANODES.nodes.items() %} {% if values.get('nodeid') %} {% do existing_ids.append(values['nodeid']) %} @@ -28,7 +29,8 @@ {% set NEW_KAFKANODES = {} %} {% for minionid, ip in current_kafkanodes.items() %} {% set hostname = minionid.split('_')[0] %} -{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 and hostname not in STORED_KAFKANODES.nodes %} +{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 and hostname not in STORED_KAFKANODES.nodes %} #} +{% if STORED_KAFKANODES != none and hostname not in STORED_KAFKANODES.nodes %} {% set new_id = available_ids.pop(0) %} {% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} {% endif %} @@ -43,7 +45,8 @@ {% for node, details in NEW_KAFKANODES.items() %} {% do COMBINED_KAFKANODES.update({node: details}) %} {% endfor %} -{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} +{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} #} +{% if STORED_KAFKANODES != none %} {% for node, details in STORED_KAFKANODES.nodes.items() %} {% do COMBINED_KAFKANODES.update({node: details}) %} {% endfor %}