diff --git a/salt/kafka/config.map.jinja b/salt/kafka/config.map.jinja index 8f116c02e..c9f3e79e2 100644 --- a/salt/kafka/config.map.jinja +++ b/salt/kafka/config.map.jinja @@ -7,13 +7,24 @@ {% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %} +{% set KAFKA_CONTROLLERS_PILLAR = salt['pillar.get']('kafka:kafka_controllers', default=None) %} + {# Create list of KRaft controllers #} {% set controllers = [] %} -{% for node, values in KAFKA_NODES_PILLAR.items() %} -{% if 'controller' in values['role'] %} -{% do controllers.append(values.nodeid ~ "@" ~ node ~ ":9093") %} -{% endif %} -{% endfor %} + +{% if KAFKA_CONTROLLERS_PILLAR != none %} +{% for node in KAFKA_CONTROLLERS_PILLAR %} +{# Check that the user input for kafka_controllers pillar exists as a kafka:node value #} +{% if node in KAFKA_NODES_PILLAR %} +{% do controllers.append(KAFKA_NODES_PILLAR[node]['nodeid'] ~ '@' ~ node ~ ':9093') %} +{% endif %} +{% endfor %} +{% endif %} +{# Ensure in the event that the SOC controllers pillar has a single hostname and that hostname doesn't exist in kafka:nodes + that a controller is still set for the Kafka cluster. Defaulting to the grid manager #} +{% if controllers | length < 1 %} +{% do controllers.append(KAFKA_NODES_PILLAR[GLOBALS.manager]['nodeid'] ~ "@" ~ GLOBALS.manager ~ ":9093") %} +{% endif %} {% set kafka_controller_quorum_voters = ','.join(controllers) %} @@ -44,17 +55,12 @@ {% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %} {% do KAFKAMERGED.config.broker.update({ - 'listeners': - KAFKAMERGED.config.broker.listeners - + ',' - + KAFKAMERGED.config.controller.listeners }) + 'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.controller.listeners }) %} {% do KAFKAMERGED.config.broker.update({ - 'listener_x_security_x_protocol_x_map': - KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map - + ',' - + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map }) + 'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map }) %} {% endif %} diff --git a/salt/kafka/enabled.sls b/salt/kafka/enabled.sls index ec2dc8e46..78e0d87d9 100644 --- a/salt/kafka/enabled.sls +++ b/salt/kafka/enabled.sls @@ -10,7 +10,6 @@ {% set KAFKANODES = salt['pillar.get']('kafka:nodes') %} include: - - kafka.controllers - elasticsearch.ca - kafka.sostatus - kafka.config diff --git a/salt/kafka/nodes.map.jinja b/salt/kafka/nodes.map.jinja index 36f789259..9b4979e92 100644 --- a/salt/kafka/nodes.map.jinja +++ b/salt/kafka/nodes.map.jinja @@ -1,12 +1,27 @@ -{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', fun='network.ip_addrs', tgt_type='compound') %} -{% set STORED_KAFKANODES = salt['pillar.get']('kafka', {}) %} +{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one + or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at + https://securityonion.net/license; you may not use this file except in compliance with the + Elastic License 2.0. #} + +{# USED TO GENERATE PILLAR/KAFKA/NODES.SLS. #} +{% import_yaml 'kafka/defaults.yaml' as KAFKADEFAULTS %} +{% from 'vars/globals.map.jinja' import GLOBALS %} + +{% set process_x_roles = KAFKADEFAULTS.kafka.config.broker.process_x_roles %} + +{% set current_kafkanodes = salt.saltutil.runner( + 'mine.get', + tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', + fun='network.ip_addrs', + tgt_type='compound') %} + +{% set STORED_KAFKANODES = salt['pillar.get']('kafka:nodes', default=None) %} {% set existing_ids = [] %} {# Check STORED_KAFKANODES for existing kafka nodes and pull their IDs so they are not reused across the grid #} -{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} #} {% if STORED_KAFKANODES != none %} -{% for node, values in STORED_KAFKANODES.nodes.items() %} +{% for node, values in STORED_KAFKANODES.items() %} {% if values.get('nodeid') %} {% do existing_ids.append(values['nodeid']) %} {% endif %} @@ -16,7 +31,6 @@ {# Create list of possible node ids #} {% set all_possible_ids = range(1, 65536)|list %} -{# Don't like the below loop because the higher the range for all_possible_ids the more time spent on loop #} {# Create list of available node ids by looping through all_possible_ids and ensuring it isn't in existing_ids #} {% set available_ids = [] %} {% for id in all_possible_ids %} @@ -29,14 +43,17 @@ {% set NEW_KAFKANODES = {} %} {% for minionid, ip in current_kafkanodes.items() %} {% set hostname = minionid.split('_')[0] %} -{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 and hostname not in STORED_KAFKANODES.nodes %} #} -{% if STORED_KAFKANODES != none and hostname not in STORED_KAFKANODES.nodes %} -{% set new_id = available_ids.pop(0) %} -{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} +{# Override the default process_x_roles for manager and set to 'broker,controller'. Changes from SOC UI will overwrite this #} +{% if hostname == GLOBALS.manager %} +{% set process_x_roles = 'broker,controller' %} {% endif %} -{% if hostname not in NEW_KAFKANODES %} +{% if STORED_KAFKANODES != none and hostname not in STORED_KAFKANODES.items() %} {% set new_id = available_ids.pop(0) %} -{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} +{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0], 'role': process_x_roles }}) %} +{% endif %} +{% if hostname not in NEW_KAFKANODES.items() %} +{% set new_id = available_ids.pop(0) %} +{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0], 'role': process_x_roles }}) %} {% endif %} {% endfor %} @@ -45,9 +62,8 @@ {% for node, details in NEW_KAFKANODES.items() %} {% do COMBINED_KAFKANODES.update({node: details}) %} {% endfor %} -{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} #} {% if STORED_KAFKANODES != none %} -{% for node, details in STORED_KAFKANODES.nodes.items() %} +{% for node, details in STORED_KAFKANODES.items() %} {% do COMBINED_KAFKANODES.update({node: details}) %} {% endfor %} {% endif %}