Split kafka defaults between broker / controller

Setup config.map.jinja to update broker / controller / combined node types

Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
This commit is contained in:
reyesj2
2024-04-29 09:08:14 -04:00
parent 29c964cca1
commit 086ebe1a7c
6 changed files with 134 additions and 45 deletions

View File

@@ -0,0 +1,66 @@
{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
https://securityonion.net/license; you may not use this file except in compliance with the
Elastic License 2.0. #}
{% from 'kafka/map.jinja' import KAFKAMERGED %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %}
{# Create list of KRaft controllers #}
{% set controllers = [] %}
{% for node, values in KAFKA_NODES_PILLAR.items() %}
{% if 'controller' in values['role'] %}
{% do controllers.append(values.nodeid ~ "@" ~ node ~ ":9093") %}
{% endif %}
{% endfor %}
{% set kafka_controller_quorum_voters = ','.join(controllers) %}
{# By default all Kafka eligible nodes are given the role of broker, except for
grid MANAGER (broker,controller) until overridden through SOC UI #}
{% set node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %}
{# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types
anything above this line is a configuration needed for ALL Kafka nodes #}
{% if node_type == 'broker' %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% endif %}
{% if node_type == 'controller' %}
{% do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% endif %}
{# Kafka nodes of this type are not recommended for use outside of development / testing. #}
{% if node_type == 'broker,controller' %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %}
{% do KAFKAMERGED.config.broker.update({
'listeners':
KAFKAMERGED.config.broker.listeners
+ ','
+ KAFKAMERGED.config.controller.listeners })
%}
{% do KAFKAMERGED.config.broker.update({
'listener_x_security_x_protocol_x_map':
KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map
+ ','
+ KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map })
%}
{% endif %}
{% if 'broker' in node_type %}
{% set KAFKACONFIG = KAFKAMERGED.config.broker %}
{% else %}
{% set KAFKACONFIG = KAFKAMERGED.config.controller %}
{% endif %}

View File

@@ -1,14 +1,16 @@
kafka:
enabled: False
cluster_id:
kafka_pass:
kafka_controllers: []
config:
server:
broker:
advertised_x_listeners:
auto_x_create_x_topics_x_enable: true
controller_x_listener_x_names: CONTROLLER
controller_x_quorum_x_voters:
inter_x_broker_x_listener_x_name: BROKER
listeners: BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
listener_x_security_x_protocol_x_map: CONTROLLER:SSL,BROKER:SSL
listeners: BROKER://0.0.0.0:9092
listener_x_security_x_protocol_x_map: BROKER:SSL
log_x_dirs: /nsm/kafka/data
log_x_retention_x_check_x_interval_x_ms: 300000
log_x_retention_x_hours: 168
@@ -37,3 +39,14 @@ kafka:
ssl_x_keystore_x_location: /etc/pki/kafka.jks
ssl_x_keystore_x_type: JKS
ssl_x_keystore_x_password: changeit
controller:
controller_x_listener_x_names: CONTROLLER
controller_x_quorum_x_voters:
listeners: CONTROLLER://0.0.0.0:9093
listener_x_security_x_protocol_x_map: CONTROLLER:SSL
log_x_dirs: /nsm/kafka/data
log_x_retention_x_check_x_interval_x_ms: 300000
log_x_retention_x_hours: 168
log_x_segment_x_bytes: 1073741824
node_x_id:
process_x_roles: controller

View File

@@ -7,12 +7,9 @@
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% from 'docker/docker.map.jinja' import DOCKER %}
{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES as KAFKANODES %}
{% set KAFKANODES = salt['pillar.get']('kafka:nodes') %}
include:
{% if grains.role in ['so-manager', 'so-managersearch', 'so-standalone'] %}
- kafka.nodes
{% endif %}
- kafka.controllers
- elasticsearch.ca
- kafka.sostatus

View File

@@ -3,5 +3,5 @@
https://securityonion.net/license; you may not use this file except in compliance with the
Elastic License 2.0. #}
{% from 'kafka/map.jinja' import KAFKAMERGED -%}
{{ KAFKAMERGED.config.server | yaml(False) | replace("_x_", ".") }}
{% from 'kafka/config.map.jinja' import KAFKACONFIG -%}
{{ KAFKACONFIG | yaml(False) | replace("_x_", ".") }}

View File

@@ -3,18 +3,8 @@
https://securityonion.net/license; you may not use this file except in compliance with the
Elastic License 2.0. #}
{# This is only used to determine if Kafka is enabled / disabled. Configuration is found in kafka/config.map.jinja #}
{# kafka/config.map.jinja depends on there being a kafka nodes pillar being populated #}
{% import_yaml 'kafka/defaults.yaml' as KAFKADEFAULTS %}
{% set KAFKAMERGED = salt['pillar.get']('kafka', KAFKADEFAULTS.kafka, merge=True) %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES %}
{% do KAFKAMERGED.config.server.update({ 'node_x_id': salt['pillar.get']('kafka:nodes:' ~ GLOBALS.hostname ~ ':nodeid')}) %}
{% do KAFKAMERGED.config.server.update({'advertised_x_listeners': 'BROKER://' ~ GLOBALS.node_ip ~ ':9092'}) %}
{% set combined = [] %}
{% for hostname, data in COMBINED_KAFKANODES.items() %}
{% do combined.append(data.nodeid ~ "@" ~ hostname ~ ":9093") %}
{% endfor %}
{% set kraft_controller_quorum_voters = ','.join(combined) %}
{% do KAFKAMERGED.config.server.update({'controller_x_quorum_x_voters': kraft_controller_quorum_voters}) %}

View File

@@ -8,12 +8,15 @@ kafka:
advanced: True
sensitive: True
helpLink: kafka.html
config:
kafkapass:
description: The password to use for the Kafka certificates.
sensitive: True
helpLink: kafka.html
server:
kafka_controllers:
description: A list of Security Onion grid members that should act as KRaft controllers for this Kafka cluster. By default, the grid manager will use a 'combined' role where it will act as both a broker and controller. All other nodes will default to broker roles.
helpLink: kafka.html
config:
broker:
advertised_x_listeners:
description: Specify the list of listeners (hostname and port) that Kafka brokers provide to clients for communication.
title: advertised.listeners
@@ -23,14 +26,6 @@ kafka:
title: auto.create.topics.enable
forcedType: bool
helpLink: kafka.html
controller_x_listener_x_names:
description: Set listeners used by the controller in a comma-seperated list.
title: controller.listener.names
helpLink: kafka.html
controller_x_quorum_x_voters:
description: A comma-seperated list of ID and endpoint information mapped for a set of voters.
title: controller.quorum.voters
helpLink: kafka.html
inter_x_broker_x_listener_x_name:
description: The name of the listener used for inter-broker communication.
title: inter.broker.listener.name
@@ -60,12 +55,6 @@ kafka:
title: log.segment.bytes
forcedType: int
helpLink: kafka.html
node_x_id:
description: The node ID corresponds to the roles performed by this process whenever process.roles is populated.
title: node.id
forcedType: int
readonly: True
helpLink: kafka.html
num_x_io_x_threads:
description: The number of threads used by Kafka.
title: num.io.threads
@@ -92,10 +81,9 @@ kafka:
forcedType: int
helpLink: kafka.html
process_x_roles:
description: The roles performed by Kafka node. Default is to act as 'broker' only.
description: The role performed by Kafka brokers.
title: process.roles
regex: ^(broker|controller|broker,controller|controller,broker)$
regexFailureMessage: Valid values include 'broker' 'controller' or 'broker,controller'
readonly: True
helpLink: kafka.html
socket_x_receive_x_buffer_x_bytes:
description: Size, in bytes of the SO_RCVBUF buffer. A value of -1 will use the OS default.
@@ -174,3 +162,38 @@ kafka:
title: ssl.truststore.password
sensitive: True
helpLink: kafka.html
controller:
controller_x_listener_x_names:
description: Set listeners used by the controller in a comma-seperated list.
title: controller.listener.names
helpLink: kafka.html
listeners:
description: Set of URIs that is listened on and the listener names in a comma-seperated list.
helpLink: kafka.html
listener_x_security_x_protocol_x_map:
description: Comma-seperated mapping of listener name and security protocols.
title: listener.security.protocol.map
helpLink: kafka.html
log_x_dirs:
description: Where Kafka logs are stored within the Docker container.
title: log.dirs
helpLink: kafka.html
log_x_retention_x_check_x_interval_x_ms:
description: Frequency at which log files are checked if they are qualified for deletion.
title: log.retention.check.interval.ms
helpLink: kafka.html
log_x_retention_x_hours:
description: How long, in hours, a log file is kept.
title: log.retention.hours
forcedType: int
helpLink: kafka.html
log_x_segment_x_bytes:
description: The maximum allowable size for a log file.
title: log.segment.bytes
forcedType: int
helpLink: kafka.html
process_x_roles:
description: The role performed by KRaft controller node.
title: process.roles
readonly: True
helpLink: kafka.html