{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at https://securityonion.net/license; you may not use this file except in compliance with the Elastic License 2.0. #} {# USED TO GENERATE PILLAR/KAFKA/NODES.SLS. #} {% import_yaml 'kafka/defaults.yaml' as KAFKADEFAULTS %} {% from 'vars/globals.map.jinja' import GLOBALS %} {% set process_x_roles = KAFKADEFAULTS.kafka.config.broker.process_x_roles %} {% set current_kafkanodes = salt.saltutil.runner( 'mine.get', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', fun='network.ip_addrs', tgt_type='compound') %} {% set STORED_KAFKANODES = salt['pillar.get']('kafka:nodes', default=None) %} {% set KAFKA_CONTROLLERS_PILLAR = salt['pillar.get']('kafka:controllers', default=None) %} {% set existing_ids = [] %} {# Check STORED_KAFKANODES for existing kafka nodes and pull their IDs so they are not reused across the grid #} {% if STORED_KAFKANODES != none %} {% for node, values in STORED_KAFKANODES.items() %} {% if values.get('nodeid') %} {% do existing_ids.append(values['nodeid']) %} {% endif %} {% endfor %} {% endif %} {# Create list of possible node ids #} {% set all_possible_ids = range(1, 2000)|list %} {# Create list of available node ids by looping through all_possible_ids and ensuring it isn't in existing_ids #} {% set available_ids = [] %} {% for id in all_possible_ids %} {% if id not in existing_ids %} {% do available_ids.append(id) %} {% endif %} {% endfor %} {# Collect kafka eligible nodes and check if they're already in STORED_KAFKANODES to avoid potentially reassigning a nodeid #} {% set NEW_KAFKANODES = {} %} {% for minionid, ip in current_kafkanodes.items() %} {% set hostname = minionid.split('_')[0] %} {% if not STORED_KAFKANODES or hostname not in STORED_KAFKANODES %} {% set new_id = available_ids.pop(0) %} {% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0], 'role': process_x_roles }}) %} {% endif %} {% endfor %} {# Combine STORED_KAFKANODES and NEW_KAFKANODES for writing to the pillar/kafka/nodes.sls #} {% set COMBINED_KAFKANODES = {} %} {% for node, details in NEW_KAFKANODES.items() %} {% do COMBINED_KAFKANODES.update({node: details}) %} {% endfor %} {% if STORED_KAFKANODES != none %} {% for node, details in STORED_KAFKANODES.items() %} {% do COMBINED_KAFKANODES.update({node: details}) %} {% endfor %} {% endif %} {# Update the process_x_roles value for any host in the kafka_controllers_pillar configured from SOC UI #} {% set ns = namespace(has_controller=false) %} {% if KAFKA_CONTROLLERS_PILLAR != none %} {% set KAFKA_CONTROLLERS_PILLAR_LIST = KAFKA_CONTROLLERS_PILLAR.split(',') %} {% for hostname in KAFKA_CONTROLLERS_PILLAR_LIST %} {% if hostname in COMBINED_KAFKANODES %} {% do COMBINED_KAFKANODES[hostname].update({'role': 'controller'}) %} {% set ns.has_controller = true %} {% endif %} {% endfor %} {% for hostname in COMBINED_KAFKANODES %} {% if hostname not in KAFKA_CONTROLLERS_PILLAR_LIST %} {% do COMBINED_KAFKANODES[hostname].update({'role': 'broker'}) %} {% endif %} {% endfor %} {# If the kafka_controllers_pillar is NOT empty check that atleast one node contains the controller role. otherwise default to GLOBALS.manager having broker,controller role #} {% if not ns.has_controller %} {% do COMBINED_KAFKANODES[GLOBALS.manager].update({'role': 'broker,controller'}) %} {% endif %} {# If kafka_controllers_pillar is empty, default to having grid manager as 'broker,controller' so there is always atleast 1 controller in the cluster #} {% else %} {% do COMBINED_KAFKANODES[GLOBALS.manager].update({'role': 'broker,controller'}) %} {% endif %}