diff --git a/salt/kafka/config.map.jinja b/salt/kafka/config.map.jinja new file mode 100644 index 000000000..8f116c02e --- /dev/null +++ b/salt/kafka/config.map.jinja @@ -0,0 +1,66 @@ +{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one + or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at + https://securityonion.net/license; you may not use this file except in compliance with the + Elastic License 2.0. #} +{% from 'kafka/map.jinja' import KAFKAMERGED %} +{% from 'vars/globals.map.jinja' import GLOBALS %} + +{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %} + +{# Create list of KRaft controllers #} +{% set controllers = [] %} +{% for node, values in KAFKA_NODES_PILLAR.items() %} +{% if 'controller' in values['role'] %} +{% do controllers.append(values.nodeid ~ "@" ~ node ~ ":9093") %} +{% endif %} +{% endfor %} + +{% set kafka_controller_quorum_voters = ','.join(controllers) %} + +{# By default all Kafka eligible nodes are given the role of broker, except for + grid MANAGER (broker,controller) until overridden through SOC UI #} +{% set node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %} + +{# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types + anything above this line is a configuration needed for ALL Kafka nodes #} +{% if node_type == 'broker' %} +{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} +{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} +{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} + +{% endif %} + +{% if node_type == 'controller' %} +{% do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} +{% do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} + +{% endif %} + +{# Kafka nodes of this type are not recommended for use outside of development / testing. #} +{% if node_type == 'broker,controller' %} +{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} +{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} +{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} +{% do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %} + +{% do KAFKAMERGED.config.broker.update({ + 'listeners': + KAFKAMERGED.config.broker.listeners + + ',' + + KAFKAMERGED.config.controller.listeners }) + %} + +{% do KAFKAMERGED.config.broker.update({ + 'listener_x_security_x_protocol_x_map': + KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + + ',' + + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map }) + %} + +{% endif %} + +{% if 'broker' in node_type %} +{% set KAFKACONFIG = KAFKAMERGED.config.broker %} +{% else %} +{% set KAFKACONFIG = KAFKAMERGED.config.controller %} +{% endif %} \ No newline at end of file diff --git a/salt/kafka/defaults.yaml b/salt/kafka/defaults.yaml index 91f55a07d..8dcd70b98 100644 --- a/salt/kafka/defaults.yaml +++ b/salt/kafka/defaults.yaml @@ -1,14 +1,16 @@ kafka: enabled: False + cluster_id: + kafka_pass: + kafka_controllers: [] config: - server: + broker: advertised_x_listeners: auto_x_create_x_topics_x_enable: true - controller_x_listener_x_names: CONTROLLER controller_x_quorum_x_voters: inter_x_broker_x_listener_x_name: BROKER - listeners: BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 - listener_x_security_x_protocol_x_map: CONTROLLER:SSL,BROKER:SSL + listeners: BROKER://0.0.0.0:9092 + listener_x_security_x_protocol_x_map: BROKER:SSL log_x_dirs: /nsm/kafka/data log_x_retention_x_check_x_interval_x_ms: 300000 log_x_retention_x_hours: 168 @@ -37,3 +39,14 @@ kafka: ssl_x_keystore_x_location: /etc/pki/kafka.jks ssl_x_keystore_x_type: JKS ssl_x_keystore_x_password: changeit + controller: + controller_x_listener_x_names: CONTROLLER + controller_x_quorum_x_voters: + listeners: CONTROLLER://0.0.0.0:9093 + listener_x_security_x_protocol_x_map: CONTROLLER:SSL + log_x_dirs: /nsm/kafka/data + log_x_retention_x_check_x_interval_x_ms: 300000 + log_x_retention_x_hours: 168 + log_x_segment_x_bytes: 1073741824 + node_x_id: + process_x_roles: controller \ No newline at end of file diff --git a/salt/kafka/enabled.sls b/salt/kafka/enabled.sls index d05a49a0e..ec2dc8e46 100644 --- a/salt/kafka/enabled.sls +++ b/salt/kafka/enabled.sls @@ -7,12 +7,9 @@ {% if sls.split('.')[0] in allowed_states %} {% from 'vars/globals.map.jinja' import GLOBALS %} {% from 'docker/docker.map.jinja' import DOCKER %} -{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES as KAFKANODES %} +{% set KAFKANODES = salt['pillar.get']('kafka:nodes') %} include: - {% if grains.role in ['so-manager', 'so-managersearch', 'so-standalone'] %} - - kafka.nodes - {% endif %} - kafka.controllers - elasticsearch.ca - kafka.sostatus diff --git a/salt/kafka/etc/server.properties.jinja b/salt/kafka/etc/server.properties.jinja index df5632ba9..fb0c785cf 100644 --- a/salt/kafka/etc/server.properties.jinja +++ b/salt/kafka/etc/server.properties.jinja @@ -3,5 +3,5 @@ https://securityonion.net/license; you may not use this file except in compliance with the Elastic License 2.0. #} -{% from 'kafka/map.jinja' import KAFKAMERGED -%} -{{ KAFKAMERGED.config.server | yaml(False) | replace("_x_", ".") }} +{% from 'kafka/config.map.jinja' import KAFKACONFIG -%} +{{ KAFKACONFIG | yaml(False) | replace("_x_", ".") }} diff --git a/salt/kafka/map.jinja b/salt/kafka/map.jinja index 56f85144a..996e5dedf 100644 --- a/salt/kafka/map.jinja +++ b/salt/kafka/map.jinja @@ -3,18 +3,8 @@ https://securityonion.net/license; you may not use this file except in compliance with the Elastic License 2.0. #} +{# This is only used to determine if Kafka is enabled / disabled. Configuration is found in kafka/config.map.jinja #} +{# kafka/config.map.jinja depends on there being a kafka nodes pillar being populated #} + {% import_yaml 'kafka/defaults.yaml' as KAFKADEFAULTS %} {% set KAFKAMERGED = salt['pillar.get']('kafka', KAFKADEFAULTS.kafka, merge=True) %} -{% from 'vars/globals.map.jinja' import GLOBALS %} -{% from 'kafka/nodes.map.jinja' import COMBINED_KAFKANODES %} - -{% do KAFKAMERGED.config.server.update({ 'node_x_id': salt['pillar.get']('kafka:nodes:' ~ GLOBALS.hostname ~ ':nodeid')}) %} -{% do KAFKAMERGED.config.server.update({'advertised_x_listeners': 'BROKER://' ~ GLOBALS.node_ip ~ ':9092'}) %} - -{% set combined = [] %} -{% for hostname, data in COMBINED_KAFKANODES.items() %} - {% do combined.append(data.nodeid ~ "@" ~ hostname ~ ":9093") %} -{% endfor %} -{% set kraft_controller_quorum_voters = ','.join(combined) %} - -{% do KAFKAMERGED.config.server.update({'controller_x_quorum_x_voters': kraft_controller_quorum_voters}) %} diff --git a/salt/kafka/soc_kafka.yaml b/salt/kafka/soc_kafka.yaml index 47ff05719..2216aaaa7 100644 --- a/salt/kafka/soc_kafka.yaml +++ b/salt/kafka/soc_kafka.yaml @@ -8,12 +8,15 @@ kafka: advanced: True sensitive: True helpLink: kafka.html + kafkapass: + description: The password to use for the Kafka certificates. + sensitive: True + helpLink: kafka.html + kafka_controllers: + description: A list of Security Onion grid members that should act as KRaft controllers for this Kafka cluster. By default, the grid manager will use a 'combined' role where it will act as both a broker and controller. All other nodes will default to broker roles. + helpLink: kafka.html config: - kafkapass: - description: The password to use for the Kafka certificates. - sensitive: True - helpLink: kafka.html - server: + broker: advertised_x_listeners: description: Specify the list of listeners (hostname and port) that Kafka brokers provide to clients for communication. title: advertised.listeners @@ -23,14 +26,6 @@ kafka: title: auto.create.topics.enable forcedType: bool helpLink: kafka.html - controller_x_listener_x_names: - description: Set listeners used by the controller in a comma-seperated list. - title: controller.listener.names - helpLink: kafka.html - controller_x_quorum_x_voters: - description: A comma-seperated list of ID and endpoint information mapped for a set of voters. - title: controller.quorum.voters - helpLink: kafka.html inter_x_broker_x_listener_x_name: description: The name of the listener used for inter-broker communication. title: inter.broker.listener.name @@ -60,12 +55,6 @@ kafka: title: log.segment.bytes forcedType: int helpLink: kafka.html - node_x_id: - description: The node ID corresponds to the roles performed by this process whenever process.roles is populated. - title: node.id - forcedType: int - readonly: True - helpLink: kafka.html num_x_io_x_threads: description: The number of threads used by Kafka. title: num.io.threads @@ -92,10 +81,9 @@ kafka: forcedType: int helpLink: kafka.html process_x_roles: - description: The roles performed by Kafka node. Default is to act as 'broker' only. + description: The role performed by Kafka brokers. title: process.roles - regex: ^(broker|controller|broker,controller|controller,broker)$ - regexFailureMessage: Valid values include 'broker' 'controller' or 'broker,controller' + readonly: True helpLink: kafka.html socket_x_receive_x_buffer_x_bytes: description: Size, in bytes of the SO_RCVBUF buffer. A value of -1 will use the OS default. @@ -174,3 +162,38 @@ kafka: title: ssl.truststore.password sensitive: True helpLink: kafka.html + controller: + controller_x_listener_x_names: + description: Set listeners used by the controller in a comma-seperated list. + title: controller.listener.names + helpLink: kafka.html + listeners: + description: Set of URIs that is listened on and the listener names in a comma-seperated list. + helpLink: kafka.html + listener_x_security_x_protocol_x_map: + description: Comma-seperated mapping of listener name and security protocols. + title: listener.security.protocol.map + helpLink: kafka.html + log_x_dirs: + description: Where Kafka logs are stored within the Docker container. + title: log.dirs + helpLink: kafka.html + log_x_retention_x_check_x_interval_x_ms: + description: Frequency at which log files are checked if they are qualified for deletion. + title: log.retention.check.interval.ms + helpLink: kafka.html + log_x_retention_x_hours: + description: How long, in hours, a log file is kept. + title: log.retention.hours + forcedType: int + helpLink: kafka.html + log_x_segment_x_bytes: + description: The maximum allowable size for a log file. + title: log.segment.bytes + forcedType: int + helpLink: kafka.html + process_x_roles: + description: The role performed by KRaft controller node. + title: process.roles + readonly: True + helpLink: kafka.html \ No newline at end of file