mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-16 05:53:09 +01:00
Update Kafka nodeid
Update so-minion to include running kafka.nodes state to ensure nodeid is generated for new brokers Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
This commit is contained in:
@@ -1,30 +1,2 @@
|
||||
{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', fun='network.ip_addrs', tgt_type='compound') %}
|
||||
{% set pillar_kafkanodes = salt['pillar.get']('kafka:nodes', default={}, merge=True) %}
|
||||
|
||||
{% set existing_ids = [] %}
|
||||
{% for node in pillar_kafkanodes.values() %}
|
||||
{% if node.get('id') %}
|
||||
{% do existing_ids.append(node['nodeid']) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% set all_possible_ids = range(1, 256)|list %}
|
||||
|
||||
{% set available_ids = [] %}
|
||||
{% for id in all_possible_ids %}
|
||||
{% if id not in existing_ids %}
|
||||
{% do available_ids.append(id) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
{% set final_nodes = pillar_kafkanodes.copy() %}
|
||||
|
||||
{% for minionid, ip in current_kafkanodes.items() %}
|
||||
{% set hostname = minionid.split('_')[0] %}
|
||||
{% if hostname not in final_nodes %}
|
||||
{% set new_id = available_ids.pop(0) %}
|
||||
{% do final_nodes.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
kafka:
|
||||
nodes: {{ final_nodes|tojson }}
|
||||
nodes:
|
||||
@@ -7,9 +7,12 @@
|
||||
{% if sls.split('.')[0] in allowed_states %}
|
||||
{% from 'vars/globals.map.jinja' import GLOBALS %}
|
||||
{% from 'docker/docker.map.jinja' import DOCKER %}
|
||||
{% set KAFKANODES = salt['pillar.get']('kafka:nodes', {}) %}
|
||||
{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES as KAFKANODES %}
|
||||
|
||||
include:
|
||||
{% if grains.role in ['so-manager', 'so-managersearch', 'so-standalone'] %}
|
||||
- kafka.nodes
|
||||
{% endif %}
|
||||
- elasticsearch.ca
|
||||
- kafka.sostatus
|
||||
- kafka.config
|
||||
|
||||
7
salt/kafka/files/managed_node_pillar.jinja
Normal file
7
salt/kafka/files/managed_node_pillar.jinja
Normal file
@@ -0,0 +1,7 @@
|
||||
kafka:
|
||||
nodes:
|
||||
{% for node, values in COMBINED_KAFKANODES.items() %}
|
||||
{{ node }}:
|
||||
ip: {{ values['ip'] }}
|
||||
nodeid: {{ values['nodeid'] }}
|
||||
{% endfor %}
|
||||
50
salt/kafka/nodes.map.jinja
Normal file
50
salt/kafka/nodes.map.jinja
Normal file
@@ -0,0 +1,50 @@
|
||||
{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-receiver', fun='network.ip_addrs', tgt_type='compound') %}
|
||||
{% set STORED_KAFKANODES = salt['pillar.get']('kafka:nodes', {}) %}
|
||||
|
||||
{% set existing_ids = [] %}
|
||||
|
||||
{# Check STORED_KAFKANODES for existing kafka nodes and pull their IDs so they are not reused across the grid #}
|
||||
{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %}
|
||||
{% for node, values in STORED_KAFKANODES.nodes.items() %}
|
||||
{% if values.get('nodeid') %}
|
||||
{% do existing_ids.append(values['nodeid']) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
{# Create list of possible node ids #}
|
||||
{% set all_possible_ids = range(1, 65536)|list %}
|
||||
|
||||
{# Don't like the below loop because the higher the range for all_possible_ids the more time spent on loop #}
|
||||
{# Create list of available node ids by looping through all_possible_ids and ensuring it isn't in existing_ids #}
|
||||
{% set available_ids = [] %}
|
||||
{% for id in all_possible_ids %}
|
||||
{% if id not in existing_ids %}
|
||||
{% do available_ids.append(id) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
{# Collect kafka eligible nodes and check if they're already in STORED_KAFKANODES to avoid potentially reassigning a nodeid #}
|
||||
{% set NEW_KAFKANODES = {} %}
|
||||
{% for minionid, ip in current_kafkanodes.items() %}
|
||||
{% set hostname = minionid.split('_')[0] %}
|
||||
{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 and hostname not in STORED_KAFKANODES.nodes %}
|
||||
{% set new_id = available_ids.pop(0) %}
|
||||
{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %}
|
||||
{% endif %}
|
||||
{% if hostname not in NEW_KAFKANODES %}
|
||||
{% set new_id = available_ids.pop(0) %}
|
||||
{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
{# Combine STORED_KAFKANODES and NEW_KAFKANODES for writing to the pillar/kafka/nodes.sls #}
|
||||
{% set COMBINED_KAFKANODES = {} %}
|
||||
{% for node, details in NEW_KAFKANODES.items() %}
|
||||
{% do COMBINED_KAFKANODES.update({node: details}) %}
|
||||
{% endfor %}
|
||||
{% if STORED_KAFKANODES.get('nodes', {}).items() | length > 0 %}
|
||||
{% for node, details in STORED_KAFKANODES.nodes.items() %}
|
||||
{% do COMBINED_KAFKANODES.update({node: details}) %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
19
salt/kafka/nodes.sls
Normal file
19
salt/kafka/nodes.sls
Normal file
@@ -0,0 +1,19 @@
|
||||
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
||||
# https://securityonion.net/license; you may not use this file except in compliance with the
|
||||
# Elastic License 2.0.
|
||||
{% from 'vars/globals.map.jinja' import GLOBALS %}
|
||||
{% if GLOBALS.pipeline == "KAFKA" %}
|
||||
{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES %}
|
||||
|
||||
{# Store kafka pillar in a file rather than memory where values could be lost. Kafka does not support nodeid's changing #}
|
||||
write_kafka_pillar_yaml:
|
||||
file.managed:
|
||||
- name: /opt/so/saltstack/local/pillar/kafka/nodes.sls
|
||||
- mode: 644
|
||||
- user: socore
|
||||
- source: salt://kafka/files/managed_node_pillar.jinja
|
||||
- template: jinja
|
||||
- context:
|
||||
COMBINED_KAFKANODES: {{ COMBINED_KAFKANODES }}
|
||||
{% endif %}
|
||||
@@ -616,6 +616,10 @@ function updateMineAndApplyStates() {
|
||||
salt $MINIONID state.apply elasticsearch queue=True --async
|
||||
salt $MINIONID state.apply soc queue=True --async
|
||||
fi
|
||||
if [[ "$NODETYPE" == "RECEIVER" ]]; then
|
||||
# Setup nodeid for Kafka
|
||||
salt-call state.apply kafka.nodes queue=True
|
||||
fi
|
||||
# run this async so the cli doesn't wait for a return
|
||||
salt "$MINION_ID" state.highstate --async queue=True
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user