{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at https://securityonion.net/license; you may not use this file except in compliance with the Elastic License 2.0. #} {% from 'kafka/map.jinja' import KAFKAMERGED %} {% from 'vars/globals.map.jinja' import GLOBALS %} {% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %} {% set KAFKA_CONTROLLERS_PILLAR = salt['pillar.get']('kafka:kafka_controllers', default=None) %} {# Create list of KRaft controllers #} {% set controllers = [] %} {% if KAFKA_CONTROLLERS_PILLAR != none %} {% for node in KAFKA_CONTROLLERS_PILLAR %} {# Check that the user input for kafka_controllers pillar exists as a kafka:node value #} {% if node in KAFKA_NODES_PILLAR %} {% do controllers.append(KAFKA_NODES_PILLAR[node]['nodeid'] ~ '@' ~ node ~ ':9093') %} {% endif %} {% endfor %} {% endif %} {# Ensure in the event that the SOC controllers pillar has a single hostname and that hostname doesn't exist in kafka:nodes that a controller is still set for the Kafka cluster. Defaulting to the grid manager #} {% if controllers | length < 1 %} {% do controllers.append(KAFKA_NODES_PILLAR[GLOBALS.manager]['nodeid'] ~ "@" ~ GLOBALS.manager ~ ":9093") %} {% endif %} {% set kafka_controller_quorum_voters = ','.join(controllers) %} {# By default all Kafka eligible nodes are given the role of broker, except for grid MANAGER (broker,controller) until overridden through SOC UI #} {% set node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %} {# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types anything above this line is a configuration needed for ALL Kafka nodes #} {% if node_type == 'broker' %} {% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} {% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {# Nodes with only the 'broker' role need to have the below settings for communicating with controller nodes #} {% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %} {% do KAFKAMERGED.config.broker.update({ 'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map }) %} {% endif %} {% if node_type == 'controller' %} {% do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {% endif %} {# Kafka nodes of this type are not recommended for use outside of development / testing. #} {% if node_type == 'broker,controller' %} {% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} {% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %} {% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %} {% do KAFKAMERGED.config.broker.update({ 'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.controller.listeners }) %} {% do KAFKAMERGED.config.broker.update({ 'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map }) %} {% endif %} {% if 'broker' in node_type %} {% set KAFKACONFIG = KAFKAMERGED.config.broker %} {% else %} {% set KAFKACONFIG = KAFKAMERGED.config.controller %} {% endif %}