From 665b7197a6a1820d4ced3bd2da4be7e2000c057a Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Wed, 17 Apr 2024 17:08:41 -0400 Subject: [PATCH] Update Kafka nodeid Update so-minion to include running kafka.nodes state to ensure nodeid is generated for new brokers Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com> --- pillar/kafka/nodes.sls | 30 +------------ salt/kafka/enabled.sls | 7 ++- salt/kafka/files/managed_node_pillar.jinja | 7 +++ salt/kafka/nodes.map.jinja | 50 ++++++++++++++++++++++ salt/kafka/nodes.sls | 19 ++++++++ salt/manager/tools/sbin/so-minion | 4 ++ 6 files changed, 86 insertions(+), 31 deletions(-) create mode 100644 salt/kafka/files/managed_node_pillar.jinja create mode 100644 salt/kafka/nodes.map.jinja create mode 100644 salt/kafka/nodes.sls diff --git a/pillar/kafka/nodes.sls b/pillar/kafka/nodes.sls index 447e7a35d..ba14c219e 100644 --- a/pillar/kafka/nodes.sls +++ b/pillar/kafka/nodes.sls @@ -1,30 +1,2 @@ -{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', fun='network.ip_addrs', tgt_type='compound') %} -{% set pillar_kafkanodes = salt['pillar.get']('kafka:nodes', default={}, merge=True) %} - -{% set existing_ids = [] %} -{% for node in pillar_kafkanodes.values() %} - {% if node.get('id') %} - {% do existing_ids.append(node['nodeid']) %} - {% endif %} -{% endfor %} -{% set all_possible_ids = range(1, 256)|list %} - -{% set available_ids = [] %} -{% for id in all_possible_ids %} - {% if id not in existing_ids %} - {% do available_ids.append(id) %} - {% endif %} -{% endfor %} - -{% set final_nodes = pillar_kafkanodes.copy() %} - -{% for minionid, ip in current_kafkanodes.items() %} - {% set hostname = minionid.split('_')[0] %} - {% if hostname not in final_nodes %} - {% set new_id = available_ids.pop(0) %} - {% do final_nodes.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} - {% endif %} -{% endfor %} - kafka: - nodes: {{ final_nodes|tojson }} + nodes: \ No newline at end of file diff --git a/salt/kafka/enabled.sls b/salt/kafka/enabled.sls index a42b6f18b..3c4f548f1 100644 --- a/salt/kafka/enabled.sls +++ b/salt/kafka/enabled.sls @@ -1,5 +1,5 @@ # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one -# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. @@ -7,9 +7,12 @@ {% if sls.split('.')[0] in allowed_states %} {% from 'vars/globals.map.jinja' import GLOBALS %} {% from 'docker/docker.map.jinja' import DOCKER %} -{% set KAFKANODES = salt['pillar.get']('kafka:nodes', {}) %} +{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES as KAFKANODES %} include: + {% if grains.role in ['so-manager', 'so-managersearch', 'so-standalone'] %} + - kafka.nodes + {% endif %} - elasticsearch.ca - kafka.sostatus - kafka.config diff --git a/salt/kafka/files/managed_node_pillar.jinja b/salt/kafka/files/managed_node_pillar.jinja new file mode 100644 index 000000000..fb2ef410e --- /dev/null +++ b/salt/kafka/files/managed_node_pillar.jinja @@ -0,0 +1,7 @@ +kafka: + nodes: +{% for node, values in COMBINED_KAFKANODES.items() %} + {{ node }}: + ip: {{ values['ip'] }} + nodeid: {{ values['nodeid'] }} +{% endfor %} \ No newline at end of file diff --git a/salt/kafka/nodes.map.jinja b/salt/kafka/nodes.map.jinja new file mode 100644 index 000000000..a6e36d36a --- /dev/null +++ b/salt/kafka/nodes.map.jinja @@ -0,0 +1,50 @@ +{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', fun='network.ip_addrs', tgt_type='compound') %} +{% set STORED_KAFKANODES = salt['pillar.get']('kafka:nodes', {}) %} + +{% set existing_ids = [] %} + +{# Check STORED_KAFKANODES for existing kafka nodes and pull their IDs so they are not reused across the grid #} +{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} +{% for node, values in STORED_KAFKANODES.nodes.items() %} +{% if values.get('nodeid') %} +{% do existing_ids.append(values['nodeid']) %} +{% endif %} +{% endfor %} +{% endif %} + +{# Create list of possible node ids #} +{% set all_possible_ids = range(1, 65536)|list %} + +{# Don't like the below loop because the higher the range for all_possible_ids the more time spent on loop #} +{# Create list of available node ids by looping through all_possible_ids and ensuring it isn't in existing_ids #} +{% set available_ids = [] %} +{% for id in all_possible_ids %} +{% if id not in existing_ids %} +{% do available_ids.append(id) %} +{% endif %} +{% endfor %} + +{# Collect kafka eligible nodes and check if they're already in STORED_KAFKANODES to avoid potentially reassigning a nodeid #} +{% set NEW_KAFKANODES = {} %} +{% for minionid, ip in current_kafkanodes.items() %} +{% set hostname = minionid.split('_')[0] %} +{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 and hostname not in STORED_KAFKANODES.nodes %} +{% set new_id = available_ids.pop(0) %} +{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} +{% endif %} +{% if hostname not in NEW_KAFKANODES %} +{% set new_id = available_ids.pop(0) %} +{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} +{% endif %} +{% endfor %} + +{# Combine STORED_KAFKANODES and NEW_KAFKANODES for writing to the pillar/kafka/nodes.sls #} +{% set COMBINED_KAFKANODES = {} %} +{% for node, details in NEW_KAFKANODES.items() %} +{% do COMBINED_KAFKANODES.update({node: details}) %} +{% endfor %} +{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %} +{% for node, details in STORED_KAFKANODES.nodes.items() %} +{% do COMBINED_KAFKANODES.update({node: details}) %} +{% endfor %} +{% endif %} diff --git a/salt/kafka/nodes.sls b/salt/kafka/nodes.sls new file mode 100644 index 000000000..5085c6cca --- /dev/null +++ b/salt/kafka/nodes.sls @@ -0,0 +1,19 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. +{% from 'vars/globals.map.jinja' import GLOBALS %} +{% if GLOBALS.pipeline == "KAFKA" %} +{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES %} + +{# Store kafka pillar in a file rather than memory where values could be lost. Kafka does not support nodeid's changing #} +write_kafka_pillar_yaml: + file.managed: + - name: /opt/so/saltstack/local/pillar/kafka/nodes.sls + - mode: 644 + - user: socore + - source: salt://kafka/files/managed_node_pillar.jinja + - template: jinja + - context: + COMBINED_KAFKANODES: {{ COMBINED_KAFKANODES }} +{% endif %} \ No newline at end of file diff --git a/salt/manager/tools/sbin/so-minion b/salt/manager/tools/sbin/so-minion index 34e069ece..72ae55209 100755 --- a/salt/manager/tools/sbin/so-minion +++ b/salt/manager/tools/sbin/so-minion @@ -616,6 +616,10 @@ function updateMineAndApplyStates() { salt $MINIONID state.apply elasticsearch queue=True --async salt $MINIONID state.apply soc queue=True --async fi + if [[ "$NODETYPE" == "RECEIVER" ]]; then + # Setup nodeid for Kafka + salt-call state.apply kafka.nodes queue=True + fi # run this async so the cli doesn't wait for a return salt "$MINION_ID" state.highstate --async queue=True }