diff --git a/salt/kafka/config.map.jinja b/salt/kafka/config.map.jinja index 4e82eac42..e5b77db11 100644 --- a/salt/kafka/config.map.jinja +++ b/salt/kafka/config.map.jinja @@ -12,19 +12,12 @@ {# Create list of KRaft controllers #} {% set controllers = [] %} -{% if KAFKA_CONTROLLERS_PILLAR != none %} -{% for node in KAFKA_CONTROLLERS_PILLAR %} -{# Check that the user input for kafka_controllers pillar exists as a kafka:node value #} -{% if node in KAFKA_NODES_PILLAR %} -{% do controllers.append(KAFKA_NODES_PILLAR[node]['nodeid'] ~ '@' ~ node ~ ':9093') %} -{% endif %} -{% endfor %} -{% endif %} -{# Ensure in the event that the SOC controllers pillar has a single hostname and that hostname doesn't exist in kafka:nodes - that a controller is still set for the Kafka cluster. Defaulting to the grid manager #} -{% if controllers | length < 1 %} -{% do controllers.append(KAFKA_NODES_PILLAR[GLOBALS.manager]['nodeid'] ~ "@" ~ GLOBALS.manager ~ ":9093") %} -{% endif %} +{# Check for Kafka nodes with controller in process_x_roles #} +{% for node in KAFKA_NODES_PILLAR %} +{% if 'controller' in KAFKA_NODES_PILLAR[node].role %} +{% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ node ~ ":9093") %} +{% endif %} +{% endfor %} {% set kafka_controller_quorum_voters = ','.join(controllers) %} diff --git a/salt/kafka/nodes.map.jinja b/salt/kafka/nodes.map.jinja index 9b4979e92..e629ff783 100644 --- a/salt/kafka/nodes.map.jinja +++ b/salt/kafka/nodes.map.jinja @@ -8,6 +8,10 @@ {% from 'vars/globals.map.jinja' import GLOBALS %} {% set process_x_roles = KAFKADEFAULTS.kafka.config.broker.process_x_roles %} +{# Set grid manager to default process_x_roles of broker,controller until overridden via SOC UI #} +{% if grains.role in ["so-standalone", "so-manager", "so-managersearch"] %} +{% set process_x_roles = 'broker,controller'%} +{% endif %} {% set current_kafkanodes = salt.saltutil.runner( 'mine.get', @@ -16,6 +20,7 @@ tgt_type='compound') %} {% set STORED_KAFKANODES = salt['pillar.get']('kafka:nodes', default=None) %} +{% set KAFKA_CONTROLLERS_PILLAR = salt['pillar.get']('kafka:kafka_controllers', default=None) %} {% set existing_ids = [] %} @@ -43,10 +48,6 @@ {% set NEW_KAFKANODES = {} %} {% for minionid, ip in current_kafkanodes.items() %} {% set hostname = minionid.split('_')[0] %} -{# Override the default process_x_roles for manager and set to 'broker,controller'. Changes from SOC UI will overwrite this #} -{% if hostname == GLOBALS.manager %} -{% set process_x_roles = 'broker,controller' %} -{% endif %} {% if STORED_KAFKANODES != none and hostname not in STORED_KAFKANODES.items() %} {% set new_id = available_ids.pop(0) %} {% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0], 'role': process_x_roles }}) %} @@ -67,3 +68,28 @@ {% do COMBINED_KAFKANODES.update({node: details}) %} {% endfor %} {% endif %} + +{# Update the process_x_roles value for any host in the kafka_controllers_pillar configured from SOC UI #} +{% set ns = namespace(has_controller=false) %} +{% if KAFKA_CONTROLLERS_PILLAR != none %} +{% for hostname in KAFKA_CONTROLLERS_PILLAR %} +{% if hostname in COMBINED_KAFKANODES %} +{% do COMBINED_KAFKANODES[hostname].update({'role': 'controller'}) %} +{% set ns.has_controller = true %} +{% endif %} +{% endfor %} +{% for hostname in COMBINED_KAFKANODES %} +{% if hostname not in KAFKA_CONTROLLERS_PILLAR %} +{% do COMBINED_KAFKANODES[hostname].update({'role': 'broker'}) %} +{% endif %} +{% endfor %} +{# If the kafka_controllers_pillar is NOT empty check that atleast one node contains the controller role. + otherwise default to GLOBALS.manager having broker,controller role #} +{% if not ns.has_controller %} +{% do COMBINED_KAFKANODES[GLOBALS.manager].update({'role': 'broker,controller'}) %} +{% endif %} +{# If kafka_controllers_pillar is empty, default to having grid manager as 'broker,controller' + so there is always atleast 1 controller in the cluster #} +{% else %} +{% do COMBINED_KAFKANODES[GLOBALS.manager].update({'role': 'broker,controller'}) %} +{% endif %} \ No newline at end of file