Merge pull request #13335 from Security-Onion-Solutions/reyesj2/kgz

FIX: Kafka configuration updates
This commit is contained in:
Jorge Reyes
2024-07-12 15:55:43 -04:00
committed by GitHub
21 changed files with 388 additions and 242 deletions

View File

@@ -233,6 +233,8 @@ base:
- stig.soc_stig
- soc.license
- kafka.nodes
- kafka.soc_kafka
- kafka.adv_kafka
'*_receiver':
- logstash.nodes

View File

@@ -136,7 +136,9 @@
'firewall',
'schedule',
'docker_clean',
'stig'
'stig',
'kafka.ca',
'kafka.ssl'
],
'so-standalone': [
'salt.master',
@@ -195,7 +197,6 @@
'schedule',
'docker_clean',
'kafka',
'elasticsearch.ca',
'stig'
],
'so-desktop': [

View File

@@ -241,6 +241,7 @@ exclude_log "mysqld.log" # MySQL is removed as of 2.4.70, logs may still be on
exclude_log "soctopus.log" # Soctopus is removed as of 2.4.70, logs may still be on disk
exclude_log "agentstatus.log" # ignore this log since it tracks agents in error state
exclude_log "detections_runtime-status_yara.log" # temporarily ignore this log until Detections is more stable
exclude_log "/nsm/kafka/data/" # ignore Kafka data directory from log check.
for log_file in $(cat /tmp/log_check_files); do
status "Checking log file $log_file"

37
salt/kafka/ca.sls Normal file
View File

@@ -0,0 +1,37 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states or sls in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set KAFKATRUST = salt['pillar.get']('kafka:truststore') %}
kafkaconfdir:
file.directory:
- name: /opt/so/conf/kafka
- user: 960
- group: 960
- makedirs: True
{% if GLOBALS.is_manager %}
# Manager runs so-kafka-trust to create truststore for Kafka ssl communication
kafka_truststore:
cmd.script:
- source: salt://kafka/tools/sbin_jinja/so-kafka-trust
- template: jinja
- cwd: /opt/so
- defaults:
GLOBALS: {{ GLOBALS }}
KAFKATRUST: {{ KAFKATRUST }}
{% endif %}
kafkacertz:
file.managed:
- name: /opt/so/conf/kafka/kafka-truststore.jks
- source: salt://kafka/files/kafka-truststore
- user: 960
- group: 931
{% endif %}

View File

@@ -6,7 +6,8 @@
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %}
{% set KAFKA_PASSWORD = salt['pillar.get']('kafka:password') %}
{% set KAFKA_PASSWORD = salt['pillar.get']('kafka:config:password') %}
{% set KAFKA_TRUSTPASS = salt['pillar.get']('kafka:config:trustpass') %}
{# Create list of KRaft controllers #}
{% set controllers = [] %}
@@ -67,19 +68,12 @@
{% endif %}
{# If a password other than PLACEHOLDER isn't set remove it from the server.properties #}
{% if KAFKAMERGED.config.broker.ssl_x_truststore_x_password == 'PLACEHOLDER' %}
{% do KAFKAMERGED.config.broker.pop('ssl_x_truststore_x_password') %}
{% endif %}
{% if KAFKAMERGED.config.controller.ssl_x_truststore_x_password == 'PLACEHOLDER' %}
{% do KAFKAMERGED.config.controller.pop('ssl_x_truststore_x_password') %}
{% endif %}
{# Truststore config #}
{% do KAFKAMERGED.config.broker.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %}
{% do KAFKAMERGED.config.controller.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %}
{% do KAFKAMERGED.config.client.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %}
{# Client properties stuff #}
{% if KAFKAMERGED.config.client.ssl_x_truststore_x_password == 'PLACEHOLDER' %}
{% do KAFKAMERGED.config.client.pop('ssl_x_truststore_x_password') %}
{% endif %}
{% do KAFKAMERGED.config.client.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}
{% if 'broker' in node_type %}

View File

@@ -7,18 +7,22 @@
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
include:
- ssl
kafka_group:
group.present:
- name: kafka
- gid: 960
kafka:
kafka_user:
user.present:
- name: kafka
- uid: 960
- gid: 960
- home: /opt/so/conf/kafka
- createhome: False
kafka_home_dir:
file.absent:
- name: /home/kafka
kafka_sbin_tools:
file.recurse:
@@ -28,6 +32,17 @@ kafka_sbin_tools:
- group: 960
- file_mode: 755
kafka_sbin_jinja_tools:
file.recurse:
- name: /usr/sbin
- source: salt://kafka/tools/sbin_jinja
- user: 960
- group: 960
- file_mode: 755
- template: jinja
- defaults:
GLOBALS: {{ GLOBALS }}
kafka_log_dir:
file.directory:
- name: /opt/so/log/kafka

View File

@@ -1,10 +1,12 @@
kafka:
enabled: False
cluster_id:
password:
controllers:
reset:
logstash: []
config:
password:
trustpass:
broker:
advertised_x_listeners:
auto_x_create_x_topics_x_enable: true
@@ -30,16 +32,16 @@ kafka:
ssl_x_keystore_x_location: /etc/pki/kafka.p12
ssl_x_keystore_x_type: PKCS12
ssl_x_keystore_x_password:
ssl_x_truststore_x_location: /etc/pki/java/sos/cacerts
ssl_x_truststore_x_password: PLACEHOLDER
ssl_x_truststore_x_type: PEM
ssl_x_truststore_x_location: /etc/pki/kafka-truststore.jks
ssl_x_truststore_x_type: JKS
ssl_x_truststore_x_password:
transaction_x_state_x_log_x_min_x_isr: 1
transaction_x_state_x_log_x_replication_x_factor: 1
client:
security_x_protocol: SSL
ssl_x_truststore_x_location: /etc/pki/java/sos/cacerts
ssl_x_truststore_x_password: PLACEHOLDER
ssl_x_truststore_x_type: PEM
ssl_x_truststore_x_location: /etc/pki/kafka-truststore.jks
ssl_x_truststore_x_type: JKS
ssl_x_truststore_x_password:
ssl_x_keystore_x_location: /etc/pki/kafka.p12
ssl_x_keystore_x_type: PKCS12
ssl_x_keystore_x_password:
@@ -57,6 +59,6 @@ kafka:
ssl_x_keystore_x_location: /etc/pki/kafka.p12
ssl_x_keystore_x_type: PKCS12
ssl_x_keystore_x_password:
ssl_x_truststore_x_location: /etc/pki/java/sos/cacerts
ssl_x_truststore_x_password: PLACEHOLDER
ssl_x_truststore_x_type: PEM
ssl_x_truststore_x_location: /etc/pki/kafka-truststore.jks
ssl_x_truststore_x_type: JKS
ssl_x_truststore_x_password:

View File

@@ -22,4 +22,13 @@ ensure_default_pipeline:
- name: |
/usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled False;
/usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/global/soc_global.sls global.pipeline REDIS
{% endif %}
{% endif %}
{# If Kafka has never been manually enabled, the 'Kafka' user does not exist. In this case certs for Kafka should not exist since they'll be owned by uid 960 #}
{% for cert in ['kafka-client.crt','kafka-client.key','kafka.crt','kafka.key','kafka-logstash.crt','kafka-logstash.key','kafka-logstash.p12','kafka.p12','elasticfleet-kafka.p8'] %}
check_kafka_cert_{{cert}}:
file.absent:
- name: /etc/pki/{{cert}}
- onlyif: stat -c %U /etc/pki/{{cert}} | grep -q UNKNOWN
- show_changes: False
{% endfor %}

View File

@@ -17,10 +17,11 @@
{% if 'gmd' in salt['pillar.get']('features', []) %}
include:
- elasticsearch.ca
- kafka.sostatus
- kafka.ca
- kafka.config
- kafka.ssl
- kafka.storage
- kafka.sostatus
so-kafka:
docker_container.running:
@@ -49,7 +50,7 @@ so-kafka:
{% endfor %}
- binds:
- /etc/pki/kafka.p12:/etc/pki/kafka.p12:ro
- /etc/pki/tls/certs/intca.crt:/etc/pki/java/sos/cacerts:ro
- /opt/so/conf/kafka/kafka-truststore.jks:/etc/pki/kafka-truststore.jks:ro
- /nsm/kafka/data/:/nsm/kafka/data/:rw
- /opt/so/log/kafka:/opt/kafka/logs/:rw
- /opt/so/conf/kafka/server.properties:/opt/kafka/config/kraft/server.properties:ro
@@ -58,6 +59,9 @@ so-kafka:
{% for sc in ['server', 'client'] %}
- file: kafka_kraft_{{sc}}_properties
{% endfor %}
- file: kafkacertz
- require:
- file: kafkacertz
delete_so-kafka_so-status.disabled:
file.uncomment:

View File

@@ -8,19 +8,31 @@ kafka:
advanced: True
sensitive: True
helpLink: kafka.html
password:
description: The password to use for the Kafka certificates.
sensitive: True
helpLink: kafka.html
controllers:
description: A comma-separated list of hostnames that will act as Kafka controllers. These hosts will be responsible for managing the Kafka cluster. Note that only manager and receiver nodes are eligible to run Kafka. This configuration needs to be set before enabling Kafka. Failure to do so may result in Kafka topics becoming unavailable requiring manual intervention to restore functionality or reset Kafka, either of which can result in data loss.
forcedType: "string"
forcedType: string
helpLink: kafka.html
reset:
description: Disable and reset the Kafka cluster. This will remove all Kafka data including logs that may have not yet been ingested into Elasticsearch and reverts the grid to using REDIS as the global pipeline. This is useful when testing different Kafka configurations such as rearranging Kafka brokers / controllers allowing you to reset the cluster rather than manually fixing any issues arising from attempting to reassign a Kafka broker into a controller. Enter 'YES_RESET_KAFKA' and submit to disable and reset Kafka. Make any configuration changes required and re-enable Kafka when ready. This action CANNOT be reversed.
advanced: True
helpLink: kafka.html
logstash:
description: By default logstash is disabled when Kafka is enabled. This option allows you to specify any hosts you would like to re-enable logstash on alongside Kafka.
forcedType: "[]string"
multiline: True
advanced: True
helpLink: kafka.html
config:
password:
description: The password used for the Kafka certificates.
readonly: True
sensitive: True
helpLink: kafka.html
trustpass:
description: The password used for the Kafka truststore.
readonly: True
sensitive: True
helpLink: kafka.html
broker:
advertised_x_listeners:
description: Specify the list of listeners (hostname and port) that Kafka brokers provide to clients for communication.
@@ -128,6 +140,10 @@ kafka:
description: The trust store file location within the Docker container.
title: ssl.truststore.location
helpLink: kafka.html
ssl_x_truststore_x_type:
description: The trust store file format.
title: ssl.truststore.type
helpLink: kafka.html
ssl_x_truststore_x_password:
description: The trust store file password. If null, the trust store file is still use, but integrity checking is disabled. Invalid for PEM format.
title: ssl.truststore.password
@@ -167,6 +183,10 @@ kafka:
description: The trust store file location within the Docker container.
title: ssl.truststore.location
helpLink: kafka.html
ssl_x_truststore_x_type:
description: The trust store file format.
title: ssl.truststore.type
helpLink: kafka.html
ssl_x_truststore_x_password:
description: The trust store file password. If null, the trust store file is still use, but integrity checking is disabled. Invalid for PEM format.
title: ssl.truststore.password

201
salt/kafka/ssl.sls Normal file
View File

@@ -0,0 +1,201 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states or sls in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set kafka_password = salt['pillar.get']('kafka:config:password') %}
include:
- ca.dirs
{% set global_ca_server = [] %}
{% set x509dict = salt['mine.get'](GLOBALS.manager | lower~'*', 'x509.get_pem_entries') %}
{% for host in x509dict %}
{% if 'manager' in host.split('_')|last or host.split('_')|last == 'standalone' %}
{% do global_ca_server.append(host) %}
{% endif %}
{% endfor %}
{% set ca_server = global_ca_server[0] %}
{% if GLOBALS.pipeline == "KAFKA" %}
{% if GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone'] %}
kafka_client_key:
x509.private_key_managed:
- name: /etc/pki/kafka-client.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kafka-client.key') -%}
- prereq:
- x509: /etc/pki/kafka-client.crt
{%- endif %}
- retry:
attempts: 5
interval: 30
kafka_client_crt:
x509.certificate_managed:
- name: /etc/pki/kafka-client.crt
- ca_server: {{ ca_server }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- signing_policy: kafka
- private_key: /etc/pki/kafka-client.key
- CN: {{ GLOBALS.hostname }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
kafka_client_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-client.key
- mode: 640
- user: 960
- group: 939
kafka_client_crt_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-client.crt
- mode: 640
- user: 960
- group: 939
{% endif %}
{% if GLOBALS.role in ['so-manager', 'so-managersearch','so-receiver', 'so-standalone'] %}
kafka_key:
x509.private_key_managed:
- name: /etc/pki/kafka.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kafka.key') -%}
- prereq:
- x509: /etc/pki/kafka.crt
{%- endif %}
- retry:
attempts: 5
interval: 30
kafka_crt:
x509.certificate_managed:
- name: /etc/pki/kafka.crt
- ca_server: {{ ca_server }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- signing_policy: kafka
- private_key: /etc/pki/kafka.key
- CN: {{ GLOBALS.hostname }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
cmd.run:
- name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka.key -in /etc/pki/kafka.crt -export -out /etc/pki/kafka.p12 -nodes -passout pass:{{ kafka_password }}"
- onchanges:
- x509: /etc/pki/kafka.key
kafka_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.key
- mode: 640
- user: 960
- group: 939
kafka_crt_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.crt
- mode: 640
- user: 960
- group: 939
kafka_pkcs12_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.p12
- mode: 640
- user: 960
- group: 939
{% endif %}
# Standalone needs kafka-logstash for automated testing. Searchnode/manager search need it for logstash to consume from Kafka.
# Manager will have cert, but be unused until a pipeline is created and logstash enabled.
{% if GLOBALS.role in ['so-standalone', 'so-managersearch', 'so-searchnode', 'so-manager'] %}
kafka_logstash_key:
x509.private_key_managed:
- name: /etc/pki/kafka-logstash.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kafka-logstash.key') -%}
- prereq:
- x509: /etc/pki/kafka-logstash.crt
{%- endif %}
- retry:
attempts: 5
interval: 30
kafka_logstash_crt:
x509.certificate_managed:
- name: /etc/pki/kafka-logstash.crt
- ca_server: {{ ca_server }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- signing_policy: kafka
- private_key: /etc/pki/kafka-logstash.key
- CN: {{ GLOBALS.hostname }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
cmd.run:
- name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka-logstash.key -in /etc/pki/kafka-logstash.crt -export -out /etc/pki/kafka-logstash.p12 -nodes -passout pass:{{ kafka_password }}"
- onchanges:
- x509: /etc/pki/kafka-logstash.key
kafka_logstash_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.key
- mode: 640
- user: 931
- group: 939
kafka_logstash_crt_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.crt
- mode: 640
- user: 931
- group: 939
kafka_logstash_pkcs12_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.p12
- mode: 640
- user: 931
- group: 939
{% endif %}
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}

View File

@@ -0,0 +1,13 @@
#!/bin/bash
#
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% set TRUSTPASS = salt['pillar.get']('kafka:config:trustpass') %}
if [ ! -f /opt/so/saltstack/local/salt/kafka/files/kafka-truststore ]; then
docker run -v /etc/pki/ca.crt:/etc/pki/ca.crt --name so-kafkatrust --user root --entrypoint /opt/java/openjdk/bin/keytool {{ GLOBALS.registry_host }}:5000/{{ GLOBALS.image_repo }}/so-kafka:{{ GLOBALS.so_version }} -import -file /etc/pki/ca.crt -alias SOS -keystore /etc/pki/kafka-truststore -storepass {{ TRUSTPASS }} -storetype jks -noprompt
docker cp so-kafkatrust:/etc/pki/kafka-truststore /opt/so/saltstack/local/salt/kafka/files/kafka-truststore
docker rm so-kafkatrust
fi

View File

@@ -25,7 +25,7 @@ logstash:
- so/0011_input_endgame.conf
- so/0012_input_elastic_agent.conf.jinja
- so/0013_input_lumberjack_fleet.conf
- so/9999_output_redis.conf.jinja
- so/9999_output_redis.conf.jinja
receiver:
- so/0011_input_endgame.conf
- so/0012_input_elastic_agent.conf.jinja
@@ -35,7 +35,6 @@ logstash:
- so/0900_input_redis.conf.jinja
- so/9805_output_elastic_agent.conf.jinja
- so/9900_output_endgame.conf.jinja
- so/0800_input_kafka.conf.jinja
custom0: []
custom1: []
custom2: []

View File

@@ -14,6 +14,11 @@
include:
{% if GLOBALS.role not in ['so-receiver','so-fleet'] %}
- elasticsearch.ca
{% endif %}
{# Kafka ca runs on nodes that can run logstash for Kafka input / output. Only when Kafka is global pipeline #}
{% if GLOBALS.role in ['so-searchnode', 'so-manager', 'so-managersearch', 'so-receiver', 'so-standalone'] and GLOBALS.pipeline == 'KAFKA' %}
- kafka.ca
- kafka.ssl
{% endif %}
- logstash.config
- logstash.sostatus
@@ -79,8 +84,9 @@ so-logstash:
- /opt/so/conf/ca/cacerts:/etc/pki/ca-trust/extracted/java/cacerts:ro
- /opt/so/conf/ca/tls-ca-bundle.pem:/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem:ro
{% endif %}
{% if GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-searchnode'] %}
{% if GLOBALS.pipeline == "KAFKA" and GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-searchnode'] %}
- /etc/pki/kafka-logstash.p12:/usr/share/logstash/kafka-logstash.p12:ro
- /opt/so/conf/kafka/kafka-truststore.jks:/etc/pki/kafka-truststore.jks:ro
{% endif %}
{% if GLOBALS.role == 'so-eval' %}
- /nsm/zeek:/nsm/zeek:ro
@@ -105,6 +111,9 @@ so-logstash:
- file: ls_pipeline_{{assigned_pipeline}}_{{CONFIGFILE.split('.')[0] | replace("/","_") }}
{% endfor %}
{% endfor %}
{% if GLOBALS.pipeline == 'KAFKA' and GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-searchnode'] %}
- file: kafkacertz
{% endif %}
- require:
{% if grains['role'] in ['so-manager', 'so-managersearch', 'so-standalone', 'so-import', 'so-heavynode', 'so-receiver'] %}
- x509: etc_filebeat_crt
@@ -118,6 +127,9 @@ so-logstash:
- file: cacertz
- file: capemz
{% endif %}
{% if GLOBALS.pipeline == 'KAFKA' and GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-searchnode'] %}
- file: kafkacertz
{% endif %}
delete_so-logstash_so-status.disabled:
file.uncomment:

View File

@@ -4,13 +4,9 @@
# Elastic License 2.0.
{% from 'logstash/map.jinja' import LOGSTASH_MERGED %}
{% from 'kafka/map.jinja' import KAFKAMERGED %}
include:
{# Disable logstash when Kafka is enabled except when the role is standalone #}
{% if LOGSTASH_MERGED.enabled and grains.role == 'so-standalone' %}
- logstash.enabled
{% elif LOGSTASH_MERGED.enabled and not KAFKAMERGED.enabled %}
{% if LOGSTASH_MERGED.enabled %}
- logstash.enabled
{% else %}
- logstash.disabled

View File

@@ -6,6 +6,7 @@
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% import_yaml 'logstash/defaults.yaml' as LOGSTASH_DEFAULTS %}
{% set LOGSTASH_MERGED = salt['pillar.get']('logstash', LOGSTASH_DEFAULTS.logstash, merge=True) %}
{% set KAFKA_LOGSTASH = salt['pillar.get']('kafka:logstash', []) %}
{# used to store the redis nodes that logstash needs to know about to pull from the queue #}
{% set LOGSTASH_REDIS_NODES = [] %}
@@ -30,3 +31,15 @@
{% endfor %}
{% endfor %}
{# Append Kafka input pipeline when Kafka is enabled #}
{% if GLOBALS.pipeline == 'KAFKA' %}
{% do LOGSTASH_MERGED.defined_pipelines.search.remove('so/0900_input_redis.conf.jinja') %}
{% do LOGSTASH_MERGED.defined_pipelines.search.append('so/0800_input_kafka.conf.jinja') %}
{% do LOGSTASH_MERGED.defined_pipelines.manager.append('so/0800_input_kafka.conf.jinja') %}
{# Disable logstash on manager & receiver nodes unless it has an override configured #}
{% if not KAFKA_LOGSTASH %}
{% if GLOBALS.role in ['so-manager', 'so-receiver'] and GLOBALS.hostname not in KAFKA_LOGSTASH %}
{% do LOGSTASH_MERGED.update({'enabled': False}) %}
{% endif %}
{% endif %}
{% endif %}

View File

@@ -1,4 +1,5 @@
{%- set kafka_password = salt['pillar.get']('kafka:password') %}
{%- set kafka_password = salt['pillar.get']('kafka:config:password') %}
{%- set kafka_trustpass = salt['pillar.get']('kafka:config:trustpass') %}
{%- set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %}
{%- set brokers = [] %}
@@ -22,8 +23,8 @@ input {
ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12'
ssl_keystore_password => '{{ kafka_password }}'
ssl_keystore_type => 'PKCS12'
ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts'
ssl_truststore_password => 'changeit'
ssl_truststore_location => '/etc/pki/kafka-truststore.jks'
ssl_truststore_password => '{{ kafka_trustpass }}'
decorate_events => true
tags => [ "elastic-agent", "input-{{ GLOBALS.hostname}}", "kafka" ]
}

View File

@@ -673,7 +673,16 @@ up_to_2.4.80() {
}
up_to_2.4.90() {
echo "Nothing to apply"
kafkatrust=$(get_random_value)
# rearranging the kafka pillar to reduce clutter in SOC UI
kafkasavedpass=$(so-yaml.py get /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.password)
kafkatrimpass=$(echo $kafkasavedpass | awk '{print $1}')
so-yaml.py remove /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.password
echo ' config:' >> /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls
echo ' password: '$kafkatrimpass >> /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls
echo ' trustpass: '$kafkatrust >> /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls
INSTALLEDVERSION=2.4.90
}

View File

@@ -43,20 +43,20 @@ engines:
- cmd.run:
cmd: /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled True
- cmd.run:
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver' saltutil.kill_all_jobs
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver or G@role:so-searchnode' saltutil.kill_all_jobs
- cmd.run:
cmd: salt-call state.apply kafka.nodes
- cmd.run:
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver' state.highstate
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver or G@role:so-searchnode' state.highstate
'KAFKA':
to:
'REDIS':
- cmd.run:
cmd: /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled False
- cmd.run:
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver' saltutil.kill_all_jobs
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver or G@role:so-searchnode' saltutil.kill_all_jobs
- cmd.run:
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver' state.highstate
cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver or G@role:so-searchnode' state.highstate
- files:
- /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls
- /opt/so/saltstack/local/pillar/kafka/adv_kafka.sls

View File

@@ -17,8 +17,6 @@
{% set COMMONNAME = GLOBALS.manager %}
{% endif %}
{% set kafka_password = salt['pillar.get']('kafka:password') %}
{% if grains.id.split('_')|last in ['manager', 'managersearch', 'eval', 'standalone', 'import'] %}
include:
- ca
@@ -666,7 +664,6 @@ elastickeyperms:
{%- endif %}
{% if grains['role'] in ['so-manager', 'so-managersearch', 'so-standalone'] %}
elasticfleet_kafka_key:
x509.private_key_managed:
- name: /etc/pki/elasticfleet-kafka.key
@@ -696,17 +693,13 @@ elasticfleet_kafka_crt:
- retry:
attempts: 5
interval: 30
cmd.run:
- name: "/usr/bin/openssl pkcs8 -in /etc/pki/elasticfleet-kafka.key -topk8 -out /etc/pki/elasticfleet-kafka.p8 -nocrypt"
- onchanges:
- x509: elasticfleet_kafka_key
elasticfleet_kafka_cert_perms:
file.managed:
- replace: False
- name: /etc/pki/elasticfleet-kafka.crt
- mode: 640
- user: 960
- user: 947
- group: 939
elasticfleet_kafka_key_perms:
@@ -714,187 +707,8 @@ elasticfleet_kafka_key_perms:
- replace: False
- name: /etc/pki/elasticfleet-kafka.key
- mode: 640
- user: 960
- user: 947
- group: 939
elasticfleet_kafka_pkcs8_perms:
file.managed:
- replace: False
- name: /etc/pki/elasticfleet-kafka.p8
- mode: 640
- user: 960
- group: 939
kafka_client_key:
x509.private_key_managed:
- name: /etc/pki/kafka-client.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kafka-client.key') -%}
- prereq:
- x509: /etc/pki/kafka-client.crt
{%- endif %}
- retry:
attempts: 5
interval: 30
kafka_client_crt:
x509.certificate_managed:
- name: /etc/pki/kafka-client.crt
- ca_server: {{ ca_server }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- signing_policy: kafka
- private_key: /etc/pki/kafka-client.key
- CN: {{ GLOBALS.hostname }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
kafka_client_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-client.key
- mode: 640
- user: 960
- group: 939
kafka_client_crt_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-client.crt
- mode: 640
- user: 960
- group: 939
{% endif %}
{% if grains['role'] in ['so-manager', 'so-managersearch','so-receiver', 'so-standalone'] %}
kafka_key:
x509.private_key_managed:
- name: /etc/pki/kafka.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kafka.key') -%}
- prereq:
- x509: /etc/pki/kafka.crt
{%- endif %}
- retry:
attempts: 5
interval: 30
kafka_crt:
x509.certificate_managed:
- name: /etc/pki/kafka.crt
- ca_server: {{ ca_server }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- signing_policy: kafka
- private_key: /etc/pki/kafka.key
- CN: {{ GLOBALS.hostname }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
cmd.run:
- name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka.key -in /etc/pki/kafka.crt -export -out /etc/pki/kafka.p12 -nodes -passout pass:{{ kafka_password }}"
- onchanges:
- x509: /etc/pki/kafka.key
kafka_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.key
- mode: 640
- user: 960
- group: 939
kafka_crt_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.crt
- mode: 640
- user: 960
- group: 939
kafka_pkcs12_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.p12
- mode: 640
- user: 960
- group: 939
{% endif %}
# Standalone needs kafka-logstash for automated testing. Searchnode/manager search need it for logstash to consume from Kafka.
# Manager will have cert, but be unused until a pipeline is created and logstash enabled.
{% if grains['role'] in ['so-standalone', 'so-managersearch', 'so-searchnode', 'so-manager'] %}
kafka_logstash_key:
x509.private_key_managed:
- name: /etc/pki/kafka-logstash.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kafka-logstash.key') -%}
- prereq:
- x509: /etc/pki/kafka-logstash.crt
{%- endif %}
- retry:
attempts: 5
interval: 30
kafka_logstash_crt:
x509.certificate_managed:
- name: /etc/pki/kafka-logstash.crt
- ca_server: {{ ca_server }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- signing_policy: kafka
- private_key: /etc/pki/kafka-logstash.key
- CN: {{ GLOBALS.hostname }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
cmd.run:
- name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka-logstash.key -in /etc/pki/kafka-logstash.crt -export -out /etc/pki/kafka-logstash.p12 -nodes -passout pass:{{ kafka_password }}"
- onchanges:
- x509: /etc/pki/kafka-logstash.key
kafka_logstash_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.key
- mode: 640
- user: 960
- group: 939
kafka_logstash_crt_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.crt
- mode: 640
- user: 960
- group: 939
kafka_logstash_pkcs12_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.p12
- mode: 640
- user: 960
- group: 931
{% endif %}
{% else %}

View File

@@ -1180,13 +1180,16 @@ kibana_pillar() {
kafka_pillar() {
KAFKACLUSTERID=$(get_random_value 22)
KAFKAPASS=$(get_random_value)
KAFKATRUST=$(get_random_value)
logCmd "mkdir -p $local_salt_dir/pillar/kafka"
logCmd "touch $adv_kafka_pillar_file"
logCmd "touch $kafka_pillar_file"
printf '%s\n'\
"kafka:"\
" cluster_id: $KAFKACLUSTERID"\
" password: $KAFKAPASS" > $kafka_pillar_file
" config:"\
" password: $KAFKAPASS"\
" trustpass: $KAFKATRUST" > $kafka_pillar_file
}
logrotate_pillar() {