{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at https://securityonion.net/license; you may not use this file except in compliance with the Elastic License 2.0. #} {% from 'kafka/map.jinja' import KAFKAMERGED %} {% from 'vars/globals.map.jinja' import GLOBALS %} {% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %} {% set KAFKA_PASSWORD = salt['pillar.get']('kafka:config:password') %} {% set KAFKA_TRUSTPASS = salt['pillar.get']('kafka:config:trustpass') %} {# Create list of KRaft controllers #} {% set controllers = [] %} {# Check for Kafka nodes with controller in process_x_roles #} {% for node in KAFKA_NODES_PILLAR %} {% if 'controller' in KAFKA_NODES_PILLAR[node].role %} {% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ node ~ ":9093") %} {% endif %} {% endfor %} {% set kafka_controller_quorum_voters = ','.join(controllers) %} {# By default all Kafka eligible nodes are given the role of broker, except for grid MANAGER (broker,controller) until overridden through SOC UI #} {% set node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %} {# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types anything above this line is a configuration needed for ALL Kafka nodes #} {% if node_type == 'broker' %} {% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} {% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {% do KAFKAMERGED.config.broker.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %} {# Nodes with only the 'broker' role need to have the below settings for communicating with controller nodes #} {% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %} {% do KAFKAMERGED.config.broker.update({ 'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map }) %} {% endif %} {% if node_type == 'controller' %} {% do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {% do KAFKAMERGED.config.controller.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %} {% endif %} {# Kafka nodes of this type are not recommended for use outside of development / testing. #} {% if node_type == 'broker,controller' %} {% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} {% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %} {% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %} {% do KAFKAMERGED.config.broker.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %} {% do KAFKAMERGED.config.broker.update({ 'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.controller.listeners }) %} {% do KAFKAMERGED.config.broker.update({ 'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map }) %} {% endif %} {# Truststore config #} {% do KAFKAMERGED.config.broker.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %} {% do KAFKAMERGED.config.controller.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %} {% do KAFKAMERGED.config.client.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %} {# Client properties stuff #} {% do KAFKAMERGED.config.client.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %} {% if 'broker' in node_type %} {% set KAFKACONFIG = KAFKAMERGED.config.broker %} {% else %} {% set KAFKACONFIG = KAFKAMERGED.config.controller %} {% endif %} {% set KAFKACLIENT = KAFKAMERGED.config.client %}