From 96c56297ce68dee6667f7f81b0cb5901ddb14c25 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Wed, 9 Apr 2025 22:08:13 -0500 Subject: [PATCH 1/5] external access via user/pass --- salt/docker/defaults.yaml | 1 + salt/firewall/defaults.yaml | 13 +++++++++ salt/firewall/map.jinja | 47 +++++++++++++++++++++------------ salt/firewall/soc_firewall.yaml | 4 +++ salt/kafka/config.map.jinja | 24 ++++++++++++++--- salt/kafka/config.sls | 19 ++++++++++++- salt/kafka/defaults.yaml | 12 +++++++-- salt/kafka/enabled.sls | 13 ++++++--- salt/kafka/etc/jaas.conf.jinja | 7 +++++ salt/kafka/soc_kafka.yaml | 28 ++++++++++++++++---- 10 files changed, 137 insertions(+), 31 deletions(-) create mode 100644 salt/kafka/etc/jaas.conf.jinja diff --git a/salt/docker/defaults.yaml b/salt/docker/defaults.yaml index 7c776937d..2d7ad4e1c 100644 --- a/salt/docker/defaults.yaml +++ b/salt/docker/defaults.yaml @@ -200,6 +200,7 @@ docker: final_octet: 88 port_bindings: - 0.0.0.0:9092:9092 + - 0.0.0.0:29092:29092 - 0.0.0.0:9093:9093 - 0.0.0.0:8778:8778 custom_bind_mounts: [] diff --git a/salt/firewall/defaults.yaml b/salt/firewall/defaults.yaml index b9bfdbf63..e92a75bcb 100644 --- a/salt/firewall/defaults.yaml +++ b/salt/firewall/defaults.yaml @@ -11,6 +11,7 @@ firewall: endgame: [] eval: [] external_suricata: [] + external_kafka: [] fleet: [] heavynode: [] idh: [] @@ -103,6 +104,10 @@ firewall: tcp: - 9092 udp: [] + kafka_external_access: + tcp: + - 29092 + udp: [] kibana: tcp: - 5601 @@ -473,6 +478,8 @@ firewall: external_suricata: portgroups: - external_suricata + external_kafka: + portgroups: [] desktop: portgroups: - docker_registry @@ -668,6 +675,8 @@ firewall: external_suricata: portgroups: - external_suricata + external_kafka: + portgroups: [] desktop: portgroups: - docker_registry @@ -867,6 +876,8 @@ firewall: external_suricata: portgroups: - external_suricata + external_kafka: + portgroups: [] strelka_frontend: portgroups: - strelka_frontend @@ -1337,6 +1348,8 @@ firewall: endgame: portgroups: - endgame + external_kafka: + portgroups: [] receiver: portgroups: [] customhostgroup0: diff --git a/salt/firewall/map.jinja b/salt/firewall/map.jinja index fe04d7ad3..4347d2b31 100644 --- a/salt/firewall/map.jinja +++ b/salt/firewall/map.jinja @@ -21,25 +21,38 @@ {# Only add Kafka firewall items when Kafka enabled #} {% set role = GLOBALS.role.split('-')[1] %} -{% if GLOBALS.pipeline == 'KAFKA' and role in ['manager', 'managersearch', 'standalone'] %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[role].portgroups.append('kafka_controller') %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %} -{% endif %} +{% if GLOBALS.pipeline == 'KAFKA' %} +{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %} +{% set kafka_node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %} -{% if GLOBALS.pipeline == 'KAFKA' and role == 'receiver' %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.self.portgroups.append('kafka_controller') %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.standalone.portgroups.append('kafka_controller') %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.manager.portgroups.append('kafka_controller') %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.managersearch.portgroups.append('kafka_controller') %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %} -{% endif %} +{% if role in ['manager', 'managersearch', 'standalone'] %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[role].portgroups.append('kafka_controller') %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %} +{% endif %} -{% if GLOBALS.pipeline == 'KAFKA' and role in ['manager', 'managersearch', 'standalone', 'receiver'] %} -{% for r in ['manager', 'managersearch', 'standalone', 'receiver', 'fleet', 'idh', 'sensor', 'searchnode','heavynode', 'elastic_agent_endpoint', 'desktop'] %} -{% if FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r] is defined %} -{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups.append('kafka_data') %} +{% if role == 'receiver' %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.self.portgroups.append('kafka_controller') %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.standalone.portgroups.append('kafka_controller') %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.manager.portgroups.append('kafka_controller') %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.managersearch.portgroups.append('kafka_controller') %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %} +{% endif %} + +{% if role in ['manager', 'managersearch', 'standalone', 'receiver'] %} +{% for r in ['manager', 'managersearch', 'standalone', 'receiver', 'fleet', 'idh', 'sensor', 'searchnode','heavynode', 'elastic_agent_endpoint', 'desktop'] %} +{% if FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r] is defined %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups.append('kafka_data') %} +{% endif %} +{% endfor %} +{% endif %} + +{% if KAFKA_EXTERNAL_ACCESS %} +{# Kafka external access only applies for Kafka nodes with the broker role. #} +{% if role in ['manager', 'managersearch', 'standalone', 'receiver'] and 'broker' in kafka_node_type %} +{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.external_kafka.portgroups.append('kafka_external_access') %} {% endif %} -{% endfor %} +{% endif %} + {% endif %} -{% set FIREWALL_MERGED = salt['pillar.get']('firewall', FIREWALL_DEFAULT.firewall, merge=True) %} +{% set FIREWALL_MERGED = salt['pillar.get']('firewall', FIREWALL_DEFAULT.firewall, merge=True) %} \ No newline at end of file diff --git a/salt/firewall/soc_firewall.yaml b/salt/firewall/soc_firewall.yaml index 222bcc8a2..6f0208b39 100644 --- a/salt/firewall/soc_firewall.yaml +++ b/salt/firewall/soc_firewall.yaml @@ -33,6 +33,7 @@ firewall: endgame: *hostgroupsettingsadv eval: *hostgroupsettings external_suricata: *hostgroupsettings + external_kafka: *hostgroupsettings fleet: *hostgroupsettings heavynode: *hostgroupsettings idh: *hostgroupsettings @@ -130,6 +131,9 @@ firewall: kafka_data: tcp: *tcpsettings udp: *udpsettings + kafka_external_access: + tcp: *tcpsettings + udp: *udpsettings kibana: tcp: *tcpsettings udp: *udpsettings diff --git a/salt/kafka/config.map.jinja b/salt/kafka/config.map.jinja index 1e43a3ec7..e1e9a30f9 100644 --- a/salt/kafka/config.map.jinja +++ b/salt/kafka/config.map.jinja @@ -8,6 +8,7 @@ {% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %} {% set KAFKA_PASSWORD = salt['pillar.get']('kafka:config:password') %} {% set KAFKA_TRUSTPASS = salt['pillar.get']('kafka:config:trustpass') %} +{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %} {# Create list of KRaft controllers #} {% set controllers = [] %} @@ -15,7 +16,7 @@ {# Check for Kafka nodes with controller in process_x_roles #} {% for node in KAFKA_NODES_PILLAR %} {% if 'controller' in KAFKA_NODES_PILLAR[node].role %} -{% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ node ~ ":9093") %} +{% do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ KAFKA_NODES_PILLAR[node].ip ~ ":9093") %} {% endif %} {% endfor %} @@ -28,7 +29,15 @@ {# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types anything above this line is a configuration needed for ALL Kafka nodes #} {% if node_type == 'broker' %} -{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} +{% if KAFKA_EXTERNAL_ACCESS %} +{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' + ',' + 'EXTERNAL_ACCESS://' + GLOBALS.node_ip + ':29092' }) %} +{% do KAFKAMERGED.config.broker.update({'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.external_access.listeners }) %} +{% do KAFKAMERGED.config.broker.update({'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.external_access.listener_x_security_x_protocol_x_map }) %} +{% do KAFKAMERGED.config.broker.update({'sasl_x_enabled_x_mechanisms': KAFKAMERGED.config.external_access.sasl_x_enabled_x_mechanisms }) %} +{% do KAFKAMERGED.config.broker.update({'sasl_x_mechanism_x_broker_x_protocol': KAFKAMERGED.config.external_access.sasl_x_mechanism_x_broker_x_protocol }) %} +{% else %} +{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} +{% endif %} {% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {% do KAFKAMERGED.config.broker.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %} @@ -42,6 +51,7 @@ {% endif %} {% if node_type == 'controller' %} +{% do KAFKAMERGED.config.controller.update({'advertised_x_listeners': 'CONTROLLER://' + GLOBALS.node_ip + ':9093'}) %} {% do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} {% do KAFKAMERGED.config.controller.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %} @@ -50,7 +60,15 @@ {# Kafka nodes of this type are not recommended for use outside of development / testing. #} {% if node_type == 'broker,controller' %} -{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %} +{% if KAFKA_EXTERNAL_ACCESS %} +{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' + ',' + 'CONTROLLER://'+ GLOBALS.node_ip +':9093' + ',' + 'EXTERNAL_ACCESS://' + GLOBALS.node_ip + ':29092' }) %} +{% do KAFKAMERGED.config.broker.update({'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.external_access.listeners }) %} +{% do KAFKAMERGED.config.broker.update({'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map + ',' + KAFKAMERGED.config.external_access.listener_x_security_x_protocol_x_map }) %} +{% do KAFKAMERGED.config.broker.update({'sasl_x_enabled_x_mechanisms': KAFKAMERGED.config.external_access.sasl_x_enabled_x_mechanisms }) %} +{% do KAFKAMERGED.config.broker.update({'sasl_x_mechanism_x_broker_x_protocol': KAFKAMERGED.config.external_access.sasl_x_mechanism_x_broker_x_protocol }) %} +{% else %} +{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' + ',' + 'CONTROLLER://'+ GLOBALS.node_ip +':9093' }) %} +{% endif %} {% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %} {% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %} {% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %} diff --git a/salt/kafka/config.sls b/salt/kafka/config.sls index e9222388b..4900cd882 100644 --- a/salt/kafka/config.sls +++ b/salt/kafka/config.sls @@ -6,6 +6,8 @@ {% from 'allowed_states.map.jinja' import allowed_states %} {% if sls.split('.')[0] in allowed_states %} {% from 'vars/globals.map.jinja' import GLOBALS %} +{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %} +{% set KAFKA_EXTERNAL_USERS = salt['pillar.get']('kafka:config:external_access:remote_users', default=None) %} kafka_group: group.present: @@ -69,6 +71,21 @@ kafka_kraft_{{sc}}_properties: - show_changes: False {% endfor %} +{% if KAFKA_EXTERNAL_ACCESS and KAFKA_EXTERNAL_USERS != None %} +kafka_server_jaas_properties: + file.managed: + - source: salt://kafka/etc/jaas.conf.jinja + - name: /opt/so/conf/kafka/kafka_server_jaas.conf + - template: jinja + - user: 960 + - group: 960 + - show_changes: False +{% else %} +remove_kafka_server_jaas_properties: + file.absent: + - name: /opt/so/conf/kafka/kafka_server_jaas.conf +{% endif %} + reset_quorum_on_changes: cmd.run: - name: rm -f /nsm/kafka/data/__cluster_metadata-0/quorum-state @@ -81,4 +98,4 @@ reset_quorum_on_changes: test.fail_without_changes: - name: {{sls}}_state_not_allowed -{% endif %} +{% endif %} \ No newline at end of file diff --git a/salt/kafka/defaults.yaml b/salt/kafka/defaults.yaml index 21d6956ba..74bceae55 100644 --- a/salt/kafka/defaults.yaml +++ b/salt/kafka/defaults.yaml @@ -3,6 +3,8 @@ kafka: cluster_id: controllers: reset: + external_access: + enabled: False logstash: [] config: password: @@ -21,7 +23,7 @@ kafka: log_x_segment_x_bytes: 1073741824 node_x_id: num_x_io_x_threads: 8 - num_x_network_x_threads: 3 + num_x_network_x_threads: 5 num_x_partitions: 3 num_x_recovery_x_threads_x_per_x_data_x_dir: 1 offsets_x_topic_x_replication_x_factor: 1 @@ -46,6 +48,7 @@ kafka: ssl_x_keystore_x_type: PKCS12 ssl_x_keystore_x_password: controller: + advertsied_x_listeners: controller_x_listener_x_names: CONTROLLER controller_x_quorum_x_voters: listeners: CONTROLLER://0.0.0.0:9093 @@ -61,4 +64,9 @@ kafka: ssl_x_keystore_x_password: ssl_x_truststore_x_location: /etc/pki/kafka-truststore.jks ssl_x_truststore_x_type: JKS - ssl_x_truststore_x_password: \ No newline at end of file + ssl_x_truststore_x_password: + external_access: + listeners: EXTERNAL_ACCESS://0.0.0.0:29092 + listener_x_security_x_protocol_x_map: EXTERNAL_ACCESS:SASL_SSL + sasl_x_enabled_x_mechanisms: PLAIN + sasl_x_mechanism_x_broker_x_protocol: SSL \ No newline at end of file diff --git a/salt/kafka/enabled.sls b/salt/kafka/enabled.sls index 362f7fde3..3bdd67208 100644 --- a/salt/kafka/enabled.sls +++ b/salt/kafka/enabled.sls @@ -14,6 +14,7 @@ {% from 'vars/globals.map.jinja' import GLOBALS %} {% from 'docker/docker.map.jinja' import DOCKER %} {% set KAFKANODES = salt['pillar.get']('kafka:nodes') %} +{% set KAFKA_EXTERNAL_ACCESS = salt['pillar.get']('kafka:config:external_access:enabled', default=False) %} {% if 'gmd' in salt['pillar.get']('features', []) %} include: @@ -34,7 +35,7 @@ so-kafka: - user: kafka - environment: KAFKA_HEAP_OPTS: -Xmx2G -Xms1G - KAFKA_OPTS: -javaagent:/opt/jolokia/agents/jolokia-agent-jvm-javaagent.jar=port=8778,host={{ DOCKER.containers['so-kafka'].ip }},policyLocation=file:/opt/jolokia/jolokia.xml + KAFKA_OPTS: "-javaagent:/opt/jolokia/agents/jolokia-agent-jvm-javaagent.jar=port=8778,host={{ DOCKER.containers['so-kafka'].ip }},policyLocation=file:/opt/jolokia/jolokia.xml {%- if KAFKA_EXTERNAL_ACCESS %} -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf {% endif -%}" - extra_hosts: {% for node in KAFKANODES %} - {{ node }}:{{ KAFKANODES[node].ip }} @@ -54,11 +55,17 @@ so-kafka: - /nsm/kafka/data/:/nsm/kafka/data/:rw - /opt/so/log/kafka:/opt/kafka/logs/:rw - /opt/so/conf/kafka/server.properties:/opt/kafka/config/kraft/server.properties:ro - - /opt/so/conf/kafka/client.properties:/opt/kafka/config/kraft/client.properties + - /opt/so/conf/kafka/client.properties:/opt/kafka/config/kraft/client.properties:ro + {% if KAFKA_EXTERNAL_ACCESS %} + - /opt/so/conf/kafka/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf:ro + {% endif %} - watch: {% for sc in ['server', 'client'] %} - file: kafka_kraft_{{sc}}_properties {% endfor %} + {% if KAFKA_EXTERNAL_ACCESS %} + - file: kafka_server_jaas_properties + {% endif %} - file: kafkacertz - require: - file: kafkacertz @@ -87,4 +94,4 @@ include: test.fail_without_changes: - name: {{sls}}_state_not_allowed -{% endif %} +{% endif %} \ No newline at end of file diff --git a/salt/kafka/etc/jaas.conf.jinja b/salt/kafka/etc/jaas.conf.jinja new file mode 100644 index 000000000..2647e5ff9 --- /dev/null +++ b/salt/kafka/etc/jaas.conf.jinja @@ -0,0 +1,7 @@ +{% set KAFKA_EXTERNAL_USERS = salt['pillar.get']('kafka:config:external_access:remote_users') %} +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + {% for item, user in KAFKA_EXTERNAL_USERS.items() -%} + user_{{ user.username }}="{{ user.password }}"{{ ";" if loop.last else " \\" }} + {% endfor %} +}; \ No newline at end of file diff --git a/salt/kafka/soc_kafka.yaml b/salt/kafka/soc_kafka.yaml index 8087f9bdf..19ab3d2ef 100644 --- a/salt/kafka/soc_kafka.yaml +++ b/salt/kafka/soc_kafka.yaml @@ -34,10 +34,6 @@ kafka: sensitive: True helpLink: kafka.html broker: - advertised_x_listeners: - description: Specify the list of listeners (hostname and port) that Kafka brokers provide to clients for communication. - title: advertised.listeners - helpLink: kafka.html auto_x_create_x_topics_x_enable: description: Enable the auto creation of topics. title: auto.create.topics.enable @@ -226,4 +222,26 @@ kafka: description: The role performed by controller node. title: process.roles readonly: True - helpLink: kafka.html \ No newline at end of file + helpLink: kafka.html + external_access: + enabled: + description: Enables or disables access to Kafka topics using user/password authentication. Used for producing / consuming messages via an external client. + forcedType: bool + helpLink: kafka.html + remote_users: + user01: &remote_user + username: + description: Username to be used for custom account + forcedType: string + password: + description: Password to be used for custom account + forcedType: string + sensitive: True + user02: *remote_user + user03: *remote_user + user04: *remote_user + user05: *remote_user + user06: *remote_user + user07: *remote_user + user08: *remote_user + user09: *remote_user \ No newline at end of file From 5498673fc37f4f33d6a9653daa521a552dad9206 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Thu, 10 Apr 2025 09:46:37 -0500 Subject: [PATCH 2/5] group events in 10s and remove deprecated output configuration option --- salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy b/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy index 7727f7db1..a5ea79922 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy +++ b/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy @@ -32,7 +32,7 @@ if ! echo "$output" | grep -q "so-manager_kafka"; then --arg KAFKACA "$KAFKACA" \ --arg MANAGER_IP "{{ GLOBALS.manager_ip }}:9092" \ --arg KAFKA_OUTPUT_VERSION "$KAFKA_OUTPUT_VERSION" \ - '{ "name": "grid-kafka", "id": "so-manager_kafka", "type": "kafka", "hosts": [ $MANAGER_IP ], "is_default": false, "is_default_monitoring": false, "config_yaml": "", "ssl": { "certificate_authorities": [ $KAFKACA ], "certificate": $KAFKACRT, "key": $KAFKAKEY, "verification_mode": "full" }, "proxy_id": null, "client_id": "Elastic", "version": $KAFKA_OUTPUT_VERSION, "compression": "none", "auth_type": "ssl", "partition": "round_robin", "round_robin": { "group_events": 1 }, "topics":[{"topic":"%{[event.module]}-securityonion","when":{"type":"regexp","condition":"event.module:.+"}},{"topic":"default-securityonion"}], "headers": [ { "key": "", "value": "" } ], "timeout": 30, "broker_timeout": 30, "required_acks": 1 }' + '{ "name": "grid-kafka", "id": "so-manager_kafka", "type": "kafka", "hosts": [ $MANAGER_IP ], "is_default": false, "is_default_monitoring": false, "config_yaml": "", "ssl": { "certificate_authorities": [ $KAFKACA ], "certificate": $KAFKACRT, "key": $KAFKAKEY, "verification_mode": "full" }, "proxy_id": null, "client_id": "Elastic", "version": $KAFKA_OUTPUT_VERSION, "compression": "none", "auth_type": "ssl", "partition": "round_robin", "round_robin": { "group_events": 10 }, "topics":[{"topic":"default-securityonion"}], "headers": [ { "key": "", "value": "" } ], "timeout": 30, "broker_timeout": 30, "required_acks": 1 }' ) curl -sK /opt/so/conf/elasticsearch/curl.config -L -X POST "localhost:5601/api/fleet/outputs" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" -o /dev/null refresh_output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs" | jq -r .items[].id) From 14292266673b0c715e6bbc125c761239adcaec0e Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Thu, 10 Apr 2025 15:55:17 -0500 Subject: [PATCH 3/5] nest default value for external_access under kafka:config --- salt/kafka/defaults.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/salt/kafka/defaults.yaml b/salt/kafka/defaults.yaml index 74bceae55..08bcc8d18 100644 --- a/salt/kafka/defaults.yaml +++ b/salt/kafka/defaults.yaml @@ -3,8 +3,6 @@ kafka: cluster_id: controllers: reset: - external_access: - enabled: False logstash: [] config: password: @@ -66,6 +64,7 @@ kafka: ssl_x_truststore_x_type: JKS ssl_x_truststore_x_password: external_access: + enabled: False listeners: EXTERNAL_ACCESS://0.0.0.0:29092 listener_x_security_x_protocol_x_map: EXTERNAL_ACCESS:SASL_SSL sasl_x_enabled_x_mechanisms: PLAIN From ecd7da540aaba2a8720e15558391cab550144559 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:21:46 -0500 Subject: [PATCH 4/5] skip user entries that don't have password configured --- salt/kafka/etc/jaas.conf.jinja | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/salt/kafka/etc/jaas.conf.jinja b/salt/kafka/etc/jaas.conf.jinja index 2647e5ff9..effb88319 100644 --- a/salt/kafka/etc/jaas.conf.jinja +++ b/salt/kafka/etc/jaas.conf.jinja @@ -1,7 +1,16 @@ -{% set KAFKA_EXTERNAL_USERS = salt['pillar.get']('kafka:config:external_access:remote_users') %} +{% set KAFKA_EXTERNAL_USERS = salt['pillar.get']('kafka:config:external_access:remote_users') -%} + +{%- set valid_users = [] -%} + +{%- for item, user in KAFKA_EXTERNAL_USERS.items() -%} +{% if 'password' in user and user.password is not none and user.password != "" -%} +{% do valid_users.append('user_' ~ user.username ~ '="' ~ user.password ~ '"') -%} +{% endif -%} +{%- endfor -%} + KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required - {% for item, user in KAFKA_EXTERNAL_USERS.items() -%} - user_{{ user.username }}="{{ user.password }}"{{ ";" if loop.last else " \\" }} + {% for user_entry in valid_users -%} + {{ user_entry }}{{ ";" if loop.last }} {% endfor %} }; \ No newline at end of file From 6fe240de452d6266aeeadc25cc992c37db5a2eeb Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:42:45 -0500 Subject: [PATCH 5/5] remove whitespaces then check for empty string as password --- salt/kafka/etc/jaas.conf.jinja | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/kafka/etc/jaas.conf.jinja b/salt/kafka/etc/jaas.conf.jinja index effb88319..e12367cd9 100644 --- a/salt/kafka/etc/jaas.conf.jinja +++ b/salt/kafka/etc/jaas.conf.jinja @@ -3,7 +3,7 @@ {%- set valid_users = [] -%} {%- for item, user in KAFKA_EXTERNAL_USERS.items() -%} -{% if 'password' in user and user.password is not none and user.password != "" -%} +{% if 'password' in user and user.password is not none and user.password.strip() != "" -%} {% do valid_users.append('user_' ~ user.username ~ '="' ~ user.password ~ '"') -%} {% endif -%} {%- endfor -%}