Files
securityonion/salt/kafka/config.map.jinja
reyesj2 386be4e746 WIP: Manage Kafka nodes pillar role value
This way when kafka_controllers is updated the pillar value gets updated and any non-controllers get updated to revert to 'broker' only role.
 Needs more testing when a new controller joins in this manner Kafka errors due to cluster metadata being out of sync. One solution is to remove /nsm/kafka/data/__cluster_metadata-0/quorum-state and restart cluster. Alternative is working with Kafka cli tools to inform cluster of new voter, likely best option but requires a wrapper script of some sort to be created for updating cluster in-place.
Easiest option is to have all receivers join grid and then configure Kafka with specific controllers via SOC UI prior to enabling Kafka. This way Kafka cluster comes up in the desired configuration with no need for immediately modifying cluster

Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
2024-05-29 16:48:39 -04:00

72 lines
3.8 KiB
Django/Jinja

{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
https://securityonion.net/license; you may not use this file except in compliance with the
Elastic License 2.0. #}
{% from 'kafka/map.jinja' import KAFKAMERGED %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %}
{% set KAFKA_CONTROLLERS_PILLAR = salt['pillar.get']('kafka:kafka_controllers', default=None) %}
{# Create list of KRaft controllers #}
{% set controllers = [] %}
{# Check for Kafka nodes with controller in process_x_roles #}
{% for node in KAFKA_NODES_PILLAR %}
{% if 'controller' in KAFKA_NODES_PILLAR[node].role %}
{% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ node ~ ":9093") %}
{% endif %}
{% endfor %}
{% set kafka_controller_quorum_voters = ','.join(controllers) %}
{# By default all Kafka eligible nodes are given the role of broker, except for
grid MANAGER (broker,controller) until overridden through SOC UI #}
{% set node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %}
{# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types
anything above this line is a configuration needed for ALL Kafka nodes #}
{% if node_type == 'broker' %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{# Nodes with only the 'broker' role need to have the below settings for communicating with controller nodes #}
{% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %}
{% do KAFKAMERGED.config.broker.update({
'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map
+ ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map })
%}
{% endif %}
{% if node_type == 'controller' %}
{% do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% endif %}
{# Kafka nodes of this type are not recommended for use outside of development / testing. #}
{% if node_type == 'broker,controller' %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %}
{% do KAFKAMERGED.config.broker.update({
'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.controller.listeners })
%}
{% do KAFKAMERGED.config.broker.update({
'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map
+ ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map })
%}
{% endif %}
{% if 'broker' in node_type %}
{% set KAFKACONFIG = KAFKAMERGED.config.broker %}
{% else %}
{% set KAFKACONFIG = KAFKAMERGED.config.controller %}
{% endif %}