Merge pull request #14528 from Security-Onion-Solutions/reyesj2-patch-4

external access to kafka topics via user/pass auth
This commit is contained in:
Jorge Reyes
2025-04-11 10:52:40 -05:00
committed by GitHub
11 changed files with 146 additions and 32 deletions

View File

@@ -200,6 +200,7 @@ docker:
final_octet: 88
port_bindings:
- 0.0.0.0:9092:9092
- 0.0.0.0:29092:29092
- 0.0.0.0:9093:9093
- 0.0.0.0:8778:8778
custom_bind_mounts: []

View File

@@ -32,7 +32,7 @@ if ! echo "$output" | grep -q "so-manager_kafka"; then
--arg KAFKACA "$KAFKACA" \
--arg MANAGER_IP "{{ GLOBALS.manager_ip }}:9092" \
--arg KAFKA_OUTPUT_VERSION "$KAFKA_OUTPUT_VERSION" \
'{ "name": "grid-kafka", "id": "so-manager_kafka", "type": "kafka", "hosts": [ $MANAGER_IP ], "is_default": false, "is_default_monitoring": false, "config_yaml": "", "ssl": { "certificate_authorities": [ $KAFKACA ], "certificate": $KAFKACRT, "key": $KAFKAKEY, "verification_mode": "full" }, "proxy_id": null, "client_id": "Elastic", "version": $KAFKA_OUTPUT_VERSION, "compression": "none", "auth_type": "ssl", "partition": "round_robin", "round_robin": { "group_events": 1 }, "topics":[{"topic":"%{[event.module]}-securityonion","when":{"type":"regexp","condition":"event.module:.+"}},{"topic":"default-securityonion"}], "headers": [ { "key": "", "value": "" } ], "timeout": 30, "broker_timeout": 30, "required_acks": 1 }'
'{ "name": "grid-kafka", "id": "so-manager_kafka", "type": "kafka", "hosts": [ $MANAGER_IP ], "is_default": false, "is_default_monitoring": false, "config_yaml": "", "ssl": { "certificate_authorities": [ $KAFKACA ], "certificate": $KAFKACRT, "key": $KAFKAKEY, "verification_mode": "full" }, "proxy_id": null, "client_id": "Elastic", "version": $KAFKA_OUTPUT_VERSION, "compression": "none", "auth_type": "ssl", "partition": "round_robin", "round_robin": { "group_events": 10 }, "topics":[{"topic":"default-securityonion"}], "headers": [ { "key": "", "value": "" } ], "timeout": 30, "broker_timeout": 30, "required_acks": 1 }'
)
curl -sK /opt/so/conf/elasticsearch/curl.config -L -X POST "localhost:5601/api/fleet/outputs" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" -o /dev/null
refresh_output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs" | jq -r .items[].id)

View File

@@ -11,6 +11,7 @@ firewall:
endgame: []
eval: []
external_suricata: []
external_kafka: []
fleet: []
heavynode: []
idh: []
@@ -103,6 +104,10 @@ firewall:
tcp:
- 9092
udp: []
kafka_external_access:
tcp:
- 29092
udp: []
kibana:
tcp:
- 5601
@@ -473,6 +478,8 @@ firewall:
external_suricata:
portgroups:
- external_suricata
external_kafka:
portgroups: []
desktop:
portgroups:
- docker_registry
@@ -668,6 +675,8 @@ firewall:
external_suricata:
portgroups:
- external_suricata
external_kafka:
portgroups: []
desktop:
portgroups:
- docker_registry
@@ -867,6 +876,8 @@ firewall:
external_suricata:
portgroups:
- external_suricata
external_kafka:
portgroups: []
strelka_frontend:
portgroups:
- strelka_frontend
@@ -1337,6 +1348,8 @@ firewall:
endgame:
portgroups:
- endgame
external_kafka:
portgroups: []
receiver:
portgroups: []
customhostgroup0:

View File

@@ -21,12 +21,16 @@
{# Only add Kafka firewall items when Kafka enabled #}
{% set role = GLOBALS.role.split('-')[1] %}
{% if GLOBALS.pipeline == 'KAFKA' and role in ['manager', 'managersearch', 'standalone'] %}
{% if GLOBALS.pipeline == 'KAFKA' %}
{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %}
{% set kafka_node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %}
{% if role in ['manager', 'managersearch', 'standalone'] %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[role].portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %}
{% endif %}
{% if GLOBALS.pipeline == 'KAFKA' and role == 'receiver' %}
{% if role == 'receiver' %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.self.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.standalone.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.manager.portgroups.append('kafka_controller') %}
@@ -34,7 +38,7 @@
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %}
{% endif %}
{% if GLOBALS.pipeline == 'KAFKA' and role in ['manager', 'managersearch', 'standalone', 'receiver'] %}
{% if role in ['manager', 'managersearch', 'standalone', 'receiver'] %}
{% for r in ['manager', 'managersearch', 'standalone', 'receiver', 'fleet', 'idh', 'sensor', 'searchnode','heavynode', 'elastic_agent_endpoint', 'desktop'] %}
{% if FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r] is defined %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups.append('kafka_data') %}
@@ -42,4 +46,13 @@
{% endfor %}
{% endif %}
{% if KAFKA_EXTERNAL_ACCESS %}
{# Kafka external access only applies for Kafka nodes with the broker role. #}
{% if role in ['manager', 'managersearch', 'standalone', 'receiver'] and 'broker' in kafka_node_type %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.external_kafka.portgroups.append('kafka_external_access') %}
{% endif %}
{% endif %}
{% endif %}
{% set FIREWALL_MERGED = salt['pillar.get']('firewall', FIREWALL_DEFAULT.firewall, merge=True) %}

View File

@@ -33,6 +33,7 @@ firewall:
endgame: *hostgroupsettingsadv
eval: *hostgroupsettings
external_suricata: *hostgroupsettings
external_kafka: *hostgroupsettings
fleet: *hostgroupsettings
heavynode: *hostgroupsettings
idh: *hostgroupsettings
@@ -130,6 +131,9 @@ firewall:
kafka_data:
tcp: *tcpsettings
udp: *udpsettings
kafka_external_access:
tcp: *tcpsettings
udp: *udpsettings
kibana:
tcp: *tcpsettings
udp: *udpsettings

View File

@@ -8,6 +8,7 @@
{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %}
{% set KAFKA_PASSWORD = salt['pillar.get']('kafka:config:password') %}
{% set KAFKA_TRUSTPASS = salt['pillar.get']('kafka:config:trustpass') %}
{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %}
{# Create list of KRaft controllers #}
{% set controllers = [] %}
@@ -15,7 +16,7 @@
{# Check for Kafka nodes with controller in process_x_roles #}
{% for node in KAFKA_NODES_PILLAR %}
{% if 'controller' in KAFKA_NODES_PILLAR[node].role %}
{% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ node ~ ":9093") %}
{% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ KAFKA_NODES_PILLAR[node].ip ~ ":9093") %}
{% endif %}
{% endfor %}
@@ -28,7 +29,15 @@
{# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types
anything above this line is a configuration needed for ALL Kafka nodes #}
{% if node_type == 'broker' %}
{% if KAFKA_EXTERNAL_ACCESS %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' + ',' + 'EXTERNAL_ACCESS://' + GLOBALS.node_ip + ':29092' }) %}
{% do KAFKAMERGED.config.broker.update({'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.external_access.listeners }) %}
{% do KAFKAMERGED.config.broker.update({'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.external_access.listener_x_security_x_protocol_x_map }) %}
{% do KAFKAMERGED.config.broker.update({'sasl_x_enabled_x_mechanisms': KAFKAMERGED.config.external_access.sasl_x_enabled_x_mechanisms }) %}
{% do KAFKAMERGED.config.broker.update({'sasl_x_mechanism_x_broker_x_protocol': KAFKAMERGED.config.external_access.sasl_x_mechanism_x_broker_x_protocol }) %}
{% else %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% endif %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% do KAFKAMERGED.config.broker.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}
@@ -42,6 +51,7 @@
{% endif %}
{% if node_type == 'controller' %}
{% do KAFKAMERGED.config.controller.update({'advertised_x_listeners': 'CONTROLLER://' + GLOBALS.node_ip + ':9093'}) %}
{% do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% do KAFKAMERGED.config.controller.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}
@@ -50,7 +60,15 @@
{# Kafka nodes of this type are not recommended for use outside of development / testing. #}
{% if node_type == 'broker,controller' %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% if KAFKA_EXTERNAL_ACCESS %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' + ',' + 'CONTROLLER://'+ GLOBALS.node_ip +':9093' + ',' + 'EXTERNAL_ACCESS://' + GLOBALS.node_ip + ':29092' }) %}
{% do KAFKAMERGED.config.broker.update({'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.external_access.listeners }) %}
{% do KAFKAMERGED.config.broker.update({'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.external_access.listener_x_security_x_protocol_x_map }) %}
{% do KAFKAMERGED.config.broker.update({'sasl_x_enabled_x_mechanisms': KAFKAMERGED.config.external_access.sasl_x_enabled_x_mechanisms }) %}
{% do KAFKAMERGED.config.broker.update({'sasl_x_mechanism_x_broker_x_protocol': KAFKAMERGED.config.external_access.sasl_x_mechanism_x_broker_x_protocol }) %}
{% else %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' + ',' + 'CONTROLLER://'+ GLOBALS.node_ip +':9093' }) %}
{% endif %}
{% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}

View File

@@ -6,6 +6,8 @@
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %}
{% set KAFKA_EXTERNAL_USERS = salt['pillar.get']('kafka:config:external_access:remote_users', default=None) %}
kafka_group:
group.present:
@@ -69,6 +71,21 @@ kafka_kraft_{{sc}}_properties:
- show_changes: False
{% endfor %}
{% if KAFKA_EXTERNAL_ACCESS and KAFKA_EXTERNAL_USERS != None %}
kafka_server_jaas_properties:
file.managed:
- source: salt://kafka/etc/jaas.conf.jinja
- name: /opt/so/conf/kafka/kafka_server_jaas.conf
- template: jinja
- user: 960
- group: 960
- show_changes: False
{% else %}
remove_kafka_server_jaas_properties:
file.absent:
- name: /opt/so/conf/kafka/kafka_server_jaas.conf
{% endif %}
reset_quorum_on_changes:
cmd.run:
- name: rm -f /nsm/kafka/data/__cluster_metadata-0/quorum-state

View File

@@ -21,7 +21,7 @@ kafka:
log_x_segment_x_bytes: 1073741824
node_x_id:
num_x_io_x_threads: 8
num_x_network_x_threads: 3
num_x_network_x_threads: 5
num_x_partitions: 3
num_x_recovery_x_threads_x_per_x_data_x_dir: 1
offsets_x_topic_x_replication_x_factor: 1
@@ -46,6 +46,7 @@ kafka:
ssl_x_keystore_x_type: PKCS12
ssl_x_keystore_x_password:
controller:
advertsied_x_listeners:
controller_x_listener_x_names: CONTROLLER
controller_x_quorum_x_voters:
listeners: CONTROLLER://0.0.0.0:9093
@@ -62,3 +63,9 @@ kafka:
ssl_x_truststore_x_location: /etc/pki/kafka-truststore.jks
ssl_x_truststore_x_type: JKS
ssl_x_truststore_x_password:
external_access:
enabled: False
listeners: EXTERNAL_ACCESS://0.0.0.0:29092
listener_x_security_x_protocol_x_map: EXTERNAL_ACCESS:SASL_SSL
sasl_x_enabled_x_mechanisms: PLAIN
sasl_x_mechanism_x_broker_x_protocol: SSL

View File

@@ -14,6 +14,7 @@
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% from 'docker/docker.map.jinja' import DOCKER %}
{% set KAFKANODES = salt['pillar.get']('kafka:nodes') %}
{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %}
{% if 'gmd' in salt['pillar.get']('features', []) %}
include:
@@ -34,7 +35,7 @@ so-kafka:
- user: kafka
- environment:
KAFKA_HEAP_OPTS: -Xmx2G -Xms1G
KAFKA_OPTS: -javaagent:/opt/jolokia/agents/jolokia-agent-jvm-javaagent.jar=port=8778,host={{ DOCKER.containers['so-kafka'].ip }},policyLocation=file:/opt/jolokia/jolokia.xml
KAFKA_OPTS: "-javaagent:/opt/jolokia/agents/jolokia-agent-jvm-javaagent.jar=port=8778,host={{ DOCKER.containers['so-kafka'].ip }},policyLocation=file:/opt/jolokia/jolokia.xml {%- if KAFKA_EXTERNAL_ACCESS %} -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf {% endif -%}"
- extra_hosts:
{% for node in KAFKANODES %}
- {{ node }}:{{ KAFKANODES[node].ip }}
@@ -54,11 +55,17 @@ so-kafka:
- /nsm/kafka/data/:/nsm/kafka/data/:rw
- /opt/so/log/kafka:/opt/kafka/logs/:rw
- /opt/so/conf/kafka/server.properties:/opt/kafka/config/kraft/server.properties:ro
- /opt/so/conf/kafka/client.properties:/opt/kafka/config/kraft/client.properties
- /opt/so/conf/kafka/client.properties:/opt/kafka/config/kraft/client.properties:ro
{% if KAFKA_EXTERNAL_ACCESS %}
- /opt/so/conf/kafka/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf:ro
{% endif %}
- watch:
{% for sc in ['server', 'client'] %}
- file: kafka_kraft_{{sc}}_properties
{% endfor %}
{% if KAFKA_EXTERNAL_ACCESS %}
- file: kafka_server_jaas_properties
{% endif %}
- file: kafkacertz
- require:
- file: kafkacertz

View File

@@ -0,0 +1,16 @@
{% set KAFKA_EXTERNAL_USERS = salt['pillar.get']('kafka:config:external_access:remote_users') -%}
{%- set valid_users = [] -%}
{%- for item, user in KAFKA_EXTERNAL_USERS.items() -%}
{% if 'password' in user and user.password is not none and user.password.strip() != "" -%}
{% do valid_users.append('user_' ~ user.username ~ '="' ~ user.password ~ '"') -%}
{% endif -%}
{%- endfor -%}
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
{% for user_entry in valid_users -%}
{{ user_entry }}{{ ";" if loop.last }}
{% endfor %}
};

View File

@@ -34,10 +34,6 @@ kafka:
sensitive: True
helpLink: kafka.html
broker:
advertised_x_listeners:
description: Specify the list of listeners (hostname and port) that Kafka brokers provide to clients for communication.
title: advertised.listeners
helpLink: kafka.html
auto_x_create_x_topics_x_enable:
description: Enable the auto creation of topics.
title: auto.create.topics.enable
@@ -227,3 +223,25 @@ kafka:
title: process.roles
readonly: True
helpLink: kafka.html
external_access:
enabled:
description: Enables or disables access to Kafka topics using user/password authentication. Used for producing / consuming messages via an external client.
forcedType: bool
helpLink: kafka.html
remote_users:
user01: &remote_user
username:
description: Username to be used for custom account
forcedType: string
password:
description: Password to be used for custom account
forcedType: string
sensitive: True
user02: *remote_user
user03: *remote_user
user04: *remote_user
user05: *remote_user
user06: *remote_user
user07: *remote_user
user08: *remote_user
user09: *remote_user