Use SOC UI to configure list of KRaft (Kafka) controllers for cluster

Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
This commit is contained in:
reyesj2
2024-04-29 11:37:24 -04:00
parent 529c8d7cf2
commit fd9a91420d
3 changed files with 48 additions and 27 deletions

View File

@@ -7,13 +7,24 @@
{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %} {% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %}
{% set KAFKA_CONTROLLERS_PILLAR = salt['pillar.get']('kafka:kafka_controllers', default=None) %}
{# Create list of KRaft controllers #} {# Create list of KRaft controllers #}
{% set controllers = [] %} {% set controllers = [] %}
{% for node, values in KAFKA_NODES_PILLAR.items() %}
{% if 'controller' in values['role'] %} {% if KAFKA_CONTROLLERS_PILLAR != none %}
{% do controllers.append(values.nodeid ~ "@" ~ node ~ ":9093") %} {% for node in KAFKA_CONTROLLERS_PILLAR %}
{% endif %} {# Check that the user input for kafka_controllers pillar exists as a kafka:node value #}
{% endfor %} {% if node in KAFKA_NODES_PILLAR %}
{% do controllers.append(KAFKA_NODES_PILLAR[node]['nodeid'] ~ '@' ~ node ~ ':9093') %}
{% endif %}
{% endfor %}
{% endif %}
{# Ensure in the event that the SOC controllers pillar has a single hostname and that hostname doesn't exist in kafka:nodes
that a controller is still set for the Kafka cluster. Defaulting to the grid manager #}
{% if controllers | length < 1 %}
{% do controllers.append(KAFKA_NODES_PILLAR[GLOBALS.manager]['nodeid'] ~ "@" ~ GLOBALS.manager ~ ":9093") %}
{% endif %}
{% set kafka_controller_quorum_voters = ','.join(controllers) %} {% set kafka_controller_quorum_voters = ','.join(controllers) %}
@@ -44,17 +55,12 @@
{% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %} {% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %}
{% do KAFKAMERGED.config.broker.update({ {% do KAFKAMERGED.config.broker.update({
'listeners': 'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.controller.listeners })
KAFKAMERGED.config.broker.listeners
+ ','
+ KAFKAMERGED.config.controller.listeners })
%} %}
{% do KAFKAMERGED.config.broker.update({ {% do KAFKAMERGED.config.broker.update({
'listener_x_security_x_protocol_x_map': 'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map
KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map })
+ ','
+ KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map })
%} %}
{% endif %} {% endif %}

View File

@@ -10,7 +10,6 @@
{% set KAFKANODES = salt['pillar.get']('kafka:nodes') %} {% set KAFKANODES = salt['pillar.get']('kafka:nodes') %}
include: include:
- kafka.controllers
- elasticsearch.ca - elasticsearch.ca
- kafka.sostatus - kafka.sostatus
- kafka.config - kafka.config

View File

@@ -1,12 +1,27 @@
{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', fun='network.ip_addrs', tgt_type='compound') %} {# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
{% set STORED_KAFKANODES = salt['pillar.get']('kafka', {}) %} or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
https://securityonion.net/license; you may not use this file except in compliance with the
Elastic License 2.0. #}
{# USED TO GENERATE PILLAR/KAFKA/NODES.SLS. #}
{% import_yaml 'kafka/defaults.yaml' as KAFKADEFAULTS %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set process_x_roles = KAFKADEFAULTS.kafka.config.broker.process_x_roles %}
{% set current_kafkanodes = salt.saltutil.runner(
'mine.get',
tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver',
fun='network.ip_addrs',
tgt_type='compound') %}
{% set STORED_KAFKANODES = salt['pillar.get']('kafka:nodes', default=None) %}
{% set existing_ids = [] %} {% set existing_ids = [] %}
{# Check STORED_KAFKANODES for existing kafka nodes and pull their IDs so they are not reused across the grid #} {# Check STORED_KAFKANODES for existing kafka nodes and pull their IDs so they are not reused across the grid #}
{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} #}
{% if STORED_KAFKANODES != none %} {% if STORED_KAFKANODES != none %}
{% for node, values in STORED_KAFKANODES.nodes.items() %} {% for node, values in STORED_KAFKANODES.items() %}
{% if values.get('nodeid') %} {% if values.get('nodeid') %}
{% do existing_ids.append(values['nodeid']) %} {% do existing_ids.append(values['nodeid']) %}
{% endif %} {% endif %}
@@ -16,7 +31,6 @@
{# Create list of possible node ids #} {# Create list of possible node ids #}
{% set all_possible_ids = range(1, 65536)|list %} {% set all_possible_ids = range(1, 65536)|list %}
{# Don't like the below loop because the higher the range for all_possible_ids the more time spent on loop #}
{# Create list of available node ids by looping through all_possible_ids and ensuring it isn't in existing_ids #} {# Create list of available node ids by looping through all_possible_ids and ensuring it isn't in existing_ids #}
{% set available_ids = [] %} {% set available_ids = [] %}
{% for id in all_possible_ids %} {% for id in all_possible_ids %}
@@ -29,14 +43,17 @@
{% set NEW_KAFKANODES = {} %} {% set NEW_KAFKANODES = {} %}
{% for minionid, ip in current_kafkanodes.items() %} {% for minionid, ip in current_kafkanodes.items() %}
{% set hostname = minionid.split('_')[0] %} {% set hostname = minionid.split('_')[0] %}
{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 and hostname not in STORED_KAFKANODES.nodes %} #} {# Override the default process_x_roles for manager and set to 'broker,controller'. Changes from SOC UI will overwrite this #}
{% if STORED_KAFKANODES != none and hostname not in STORED_KAFKANODES.nodes %} {% if hostname == GLOBALS.manager %}
{% set new_id = available_ids.pop(0) %} {% set process_x_roles = 'broker,controller' %}
{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %}
{% endif %} {% endif %}
{% if hostname not in NEW_KAFKANODES %} {% if STORED_KAFKANODES != none and hostname not in STORED_KAFKANODES.items() %}
{% set new_id = available_ids.pop(0) %} {% set new_id = available_ids.pop(0) %}
{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} {% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0], 'role': process_x_roles }}) %}
{% endif %}
{% if hostname not in NEW_KAFKANODES.items() %}
{% set new_id = available_ids.pop(0) %}
{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0], 'role': process_x_roles }}) %}
{% endif %} {% endif %}
{% endfor %} {% endfor %}
@@ -45,9 +62,8 @@
{% for node, details in NEW_KAFKANODES.items() %} {% for node, details in NEW_KAFKANODES.items() %}
{% do COMBINED_KAFKANODES.update({node: details}) %} {% do COMBINED_KAFKANODES.update({node: details}) %}
{% endfor %} {% endfor %}
{# {% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} #}
{% if STORED_KAFKANODES != none %} {% if STORED_KAFKANODES != none %}
{% for node, details in STORED_KAFKANODES.nodes.items() %} {% for node, details in STORED_KAFKANODES.items() %}
{% do COMBINED_KAFKANODES.update({node: details}) %} {% do COMBINED_KAFKANODES.update({node: details}) %}
{% endfor %} {% endfor %}
{% endif %} {% endif %}