WIP: Manage Kafka nodes pillar role value

This way when kafka_controllers is updated the pillar value gets updated and any non-controllers get updated to revert to 'broker' only role.
 Needs more testing when a new controller joins in this manner Kafka errors due to cluster metadata being out of sync. One solution is to remove /nsm/kafka/data/__cluster_metadata-0/quorum-state and restart cluster. Alternative is working with Kafka cli tools to inform cluster of new voter, likely best option but requires a wrapper script of some sort to be created for updating cluster in-place.
Easiest option is to have all receivers join grid and then configure Kafka with specific controllers via SOC UI prior to enabling Kafka. This way Kafka cluster comes up in the desired configuration with no need for immediately modifying cluster

Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
This commit is contained in:
reyesj2
2024-05-29 16:48:39 -04:00
parent d9ec556061
commit 386be4e746
2 changed files with 36 additions and 17 deletions

View File

@@ -12,19 +12,12 @@
{# Create list of KRaft controllers #}
{% set controllers = [] %}
{% if KAFKA_CONTROLLERS_PILLAR != none %}
{% for node in KAFKA_CONTROLLERS_PILLAR %}
{# Check that the user input for kafka_controllers pillar exists as a kafka:node value #}
{% if node in KAFKA_NODES_PILLAR %}
{% do controllers.append(KAFKA_NODES_PILLAR[node]['nodeid'] ~ '@' ~ node ~ ':9093') %}
{# Check for Kafka nodes with controller in process_x_roles #}
{% for node in KAFKA_NODES_PILLAR %}
{% if 'controller' in KAFKA_NODES_PILLAR[node].role %}
{% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ node ~ ":9093") %}
{% endif %}
{% endfor %}
{% endif %}
{# Ensure in the event that the SOC controllers pillar has a single hostname and that hostname doesn't exist in kafka:nodes
that a controller is still set for the Kafka cluster. Defaulting to the grid manager #}
{% if controllers | length < 1 %}
{% do controllers.append(KAFKA_NODES_PILLAR[GLOBALS.manager]['nodeid'] ~ "@" ~ GLOBALS.manager ~ ":9093") %}
{% endif %}
{% set kafka_controller_quorum_voters = ','.join(controllers) %}

View File

@@ -8,6 +8,10 @@
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set process_x_roles = KAFKADEFAULTS.kafka.config.broker.process_x_roles %}
{# Set grid manager to default process_x_roles of broker,controller until overridden via SOC UI #}
{% if grains.role in ["so-standalone", "so-manager", "so-managersearch"] %}
{% set process_x_roles = 'broker,controller'%}
{% endif %}
{% set current_kafkanodes = salt.saltutil.runner(
'mine.get',
@@ -16,6 +20,7 @@
tgt_type='compound') %}
{% set STORED_KAFKANODES = salt['pillar.get']('kafka:nodes', default=None) %}
{% set KAFKA_CONTROLLERS_PILLAR = salt['pillar.get']('kafka:kafka_controllers', default=None) %}
{% set existing_ids = [] %}
@@ -43,10 +48,6 @@
{% set NEW_KAFKANODES = {} %}
{% for minionid, ip in current_kafkanodes.items() %}
{% set hostname = minionid.split('_')[0] %}
{# Override the default process_x_roles for manager and set to 'broker,controller'. Changes from SOC UI will overwrite this #}
{% if hostname == GLOBALS.manager %}
{% set process_x_roles = 'broker,controller' %}
{% endif %}
{% if STORED_KAFKANODES != none and hostname not in STORED_KAFKANODES.items() %}
{% set new_id = available_ids.pop(0) %}
{% do NEW_KAFKANODES.update({hostname: {'nodeid': new_id, 'ip': ip[0], 'role': process_x_roles }}) %}
@@ -67,3 +68,28 @@
{% do COMBINED_KAFKANODES.update({node: details}) %}
{% endfor %}
{% endif %}
{# Update the process_x_roles value for any host in the kafka_controllers_pillar configured from SOC UI #}
{% set ns = namespace(has_controller=false) %}
{% if KAFKA_CONTROLLERS_PILLAR != none %}
{% for hostname in KAFKA_CONTROLLERS_PILLAR %}
{% if hostname in COMBINED_KAFKANODES %}
{% do COMBINED_KAFKANODES[hostname].update({'role': 'controller'}) %}
{% set ns.has_controller = true %}
{% endif %}
{% endfor %}
{% for hostname in COMBINED_KAFKANODES %}
{% if hostname not in KAFKA_CONTROLLERS_PILLAR %}
{% do COMBINED_KAFKANODES[hostname].update({'role': 'broker'}) %}
{% endif %}
{% endfor %}
{# If the kafka_controllers_pillar is NOT empty check that atleast one node contains the controller role.
otherwise default to GLOBALS.manager having broker,controller role #}
{% if not ns.has_controller %}
{% do COMBINED_KAFKANODES[GLOBALS.manager].update({'role': 'broker,controller'}) %}
{% endif %}
{# If kafka_controllers_pillar is empty, default to having grid manager as 'broker,controller'
so there is always atleast 1 controller in the cluster #}
{% else %}
{% do COMBINED_KAFKANODES[GLOBALS.manager].update({'role': 'broker,controller'}) %}
{% endif %}