Kafka init

Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
This commit is contained in:
reyesj2
2023-11-29 13:34:25 -05:00
parent c957c6ce14
commit 86dc7cc804
32 changed files with 828 additions and 6 deletions

View File

@@ -21,3 +21,4 @@ role:
standalone: standalone:
searchnode: searchnode:
sensor: sensor:
kafkanode:

30
pillar/kafka/nodes.sls Normal file
View File

@@ -0,0 +1,30 @@
{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-kafkanode', fun='network.ip_addrs', tgt_type='compound') %}
{% set pillar_kafkanodes = salt['pillar.get']('kafka:nodes', default={}, merge=True) %}
{% set existing_ids = [] %}
{% for node in pillar_kafkanodes.values() %}
{% if node.get('id') %}
{% do existing_ids.append(node['nodeid']) %}
{% endif %}
{% endfor %}
{% set all_possible_ids = range(1, 256)|list %}
{% set available_ids = [] %}
{% for id in all_possible_ids %}
{% if id not in existing_ids %}
{% do available_ids.append(id) %}
{% endif %}
{% endfor %}
{% set final_nodes = pillar_kafkanodes.copy() %}
{% for minionid, ip in current_kafkanodes.items() %}
{% set hostname = minionid.split('_')[0] %}
{% if hostname not in final_nodes %}
{% set new_id = available_ids.pop(0) %}
{% do final_nodes.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %}
{% endif %}
{% endfor %}
kafka:
nodes: {{ final_nodes|tojson }}

View File

@@ -2,7 +2,7 @@
{% set cached_grains = salt.saltutil.runner('cache.grains', tgt='*') %} {% set cached_grains = salt.saltutil.runner('cache.grains', tgt='*') %}
{% for minionid, ip in salt.saltutil.runner( {% for minionid, ip in salt.saltutil.runner(
'mine.get', 'mine.get',
tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-searchnode or G@role:so-heavynode or G@role:so-receiver or G@role:so-fleet ', tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-searchnode or G@role:so-heavynode or G@role:so-receiver or G@role:so-fleet or G@role:so-kafkanode ',
fun='network.ip_addrs', fun='network.ip_addrs',
tgt_type='compound') | dictsort() tgt_type='compound') | dictsort()
%} %}

View File

@@ -65,6 +65,7 @@ base:
- soctopus.adv_soctopus - soctopus.adv_soctopus
- minions.{{ grains.id }} - minions.{{ grains.id }}
- minions.adv_{{ grains.id }} - minions.adv_{{ grains.id }}
- kafka.nodes
'*_sensor': '*_sensor':
- healthcheck.sensor - healthcheck.sensor
@@ -241,6 +242,15 @@ base:
- minions.{{ grains.id }} - minions.{{ grains.id }}
- minions.adv_{{ grains.id }} - minions.adv_{{ grains.id }}
'*_kafkanode':
- logstash.nodes
- logstash.soc_logstash
- logstash.adv_logstash
- minions.{{ grains.id }}
- minions.adv_{{ grains.id }}
- secrets
- kafka.nodes
'*_import': '*_import':
- secrets - secrets
- elasticsearch.index_templates - elasticsearch.index_templates

View File

@@ -187,6 +187,15 @@
'schedule', 'schedule',
'docker_clean' 'docker_clean'
], ],
'so-kafkanode': [
'kafka',
'logstash',
'ssl',
'telegraf',
'firewall',
'schedule',
'docker_clean'
],
'so-desktop': [ 'so-desktop': [
], ],
}, grain='role') %} }, grain='role') %}
@@ -203,7 +212,7 @@
{% do allowed_states.append('strelka') %} {% do allowed_states.append('strelka') %}
{% endif %} {% endif %}
{% if grains.role in ['so-eval', 'so-manager', 'so-standalone', 'so-searchnode', 'so-managersearch', 'so-heavynode', 'so-import'] %} {% if grains.role in ['so-eval', 'so-manager', 'so-standalone', 'so-searchnode', 'so-managersearch', 'so-heavynode', 'so-import', 'so-kafkanode'] %}
{% do allowed_states.append('elasticsearch') %} {% do allowed_states.append('elasticsearch') %}
{% endif %} {% endif %}

View File

@@ -201,3 +201,11 @@ docker:
custom_bind_mounts: [] custom_bind_mounts: []
extra_hosts: [] extra_hosts: []
extra_env: [] extra_env: []
'so-kafka':
final_octet: 88
port_bindings:
- 0.0.0.0:9092:9092
- 0.0.0.0:2181:2181
custom_bind_mounts: []
extra_hosts: []
extra_env: []

View File

@@ -68,3 +68,4 @@ docker:
so-steno: *dockerOptions so-steno: *dockerOptions
so-suricata: *dockerOptions so-suricata: *dockerOptions
so-zeek: *dockerOptions so-zeek: *dockerOptions
so-kafka: *dockerOptions

View File

@@ -87,6 +87,11 @@
'so-logstash', 'so-logstash',
'so-redis', 'so-redis',
] %} ] %}
{% elif GLOBALS.role == 'so-kafkanode' %}
{% set NODE_CONTAINERS = [
'so-logstash',
'so-kafka',
] %}
{% elif GLOBALS.role == 'so-idh' %} {% elif GLOBALS.role == 'so-idh' %}
{% set NODE_CONTAINERS = [ {% set NODE_CONTAINERS = [

View File

@@ -19,6 +19,7 @@ firewall:
manager: [] manager: []
managersearch: [] managersearch: []
receiver: [] receiver: []
kafkanode: []
searchnode: [] searchnode: []
self: [] self: []
sensor: [] sensor: []
@@ -90,6 +91,11 @@ firewall:
tcp: tcp:
- 8086 - 8086
udp: [] udp: []
kafka:
tcp:
- 9092
- 9093
udp: []
kibana: kibana:
tcp: tcp:
- 5601 - 5601
@@ -441,6 +447,15 @@ firewall:
- elastic_agent_data - elastic_agent_data
- elastic_agent_update - elastic_agent_update
- sensoroni - sensoroni
kafkanode:
portgroups:
- yum
- docker_registry
- influxdb
- elastic_agent_control
- elastic_agent_data
- elastic_agent_update
- sensoroni
analyst: analyst:
portgroups: portgroups:
- nginx - nginx
@@ -513,6 +528,9 @@ firewall:
receiver: receiver:
portgroups: portgroups:
- salt_manager - salt_manager
kafkanode:
portgroups:
- salt_manager
desktop: desktop:
portgroups: portgroups:
- salt_manager - salt_manager
@@ -629,6 +647,15 @@ firewall:
- elastic_agent_data - elastic_agent_data
- elastic_agent_update - elastic_agent_update
- sensoroni - sensoroni
kafkanode:
portgroups:
- yum
- docker_registry
- influxdb
- elastic_agent_control
- elastic_agent_data
- elastic_agent_update
- sensoroni
analyst: analyst:
portgroups: portgroups:
- nginx - nginx
@@ -1339,6 +1366,73 @@ firewall:
portgroups: [] portgroups: []
customhostgroup9: customhostgroup9:
portgroups: [] portgroups: []
kafkanode:
chain:
DOCKER-USER:
hostgroups:
searchnode:
portgroups:
- kafka
kafkanode:
portgroups:
- kafka
customhostgroup0:
portgroups: []
customhostgroup1:
portgroups: []
customhostgroup2:
portgroups: []
customhostgroup3:
portgroups: []
customhostgroup4:
portgroups: []
customhostgroup5:
portgroups: []
customhostgroup6:
portgroups: []
customhostgroup7:
portgroups: []
customhostgroup8:
portgroups: []
customhostgroup9:
portgroups: []
INPUT:
hostgroups:
anywhere:
portgroups:
- ssh
dockernet:
portgroups:
- all
localhost:
portgroups:
- all
self:
portgroups:
- syslog
syslog:
portgroups:
- syslog
customhostgroup0:
portgroups: []
customhostgroup1:
portgroups: []
customhostgroup2:
portgroups: []
customhostgroup3:
portgroups: []
customhostgroup4:
portgroups: []
customhostgroup5:
portgroups: []
customhostgroup6:
portgroups: []
customhostgroup7:
portgroups: []
customhostgroup8:
portgroups: []
customhostgroup9:
portgroups: []
idh: idh:
chain: chain:
DOCKER-USER: DOCKER-USER:

View File

@@ -34,6 +34,7 @@ firewall:
heavynode: *hostgroupsettings heavynode: *hostgroupsettings
idh: *hostgroupsettings idh: *hostgroupsettings
import: *hostgroupsettings import: *hostgroupsettings
kafkanode: *hostgroupsettings
localhost: *ROhostgroupsettingsadv localhost: *ROhostgroupsettingsadv
manager: *hostgroupsettings manager: *hostgroupsettings
managersearch: *hostgroupsettings managersearch: *hostgroupsettings
@@ -115,6 +116,9 @@ firewall:
influxdb: influxdb:
tcp: *tcpsettings tcp: *tcpsettings
udp: *udpsettings udp: *udpsettings
kafka:
tcp: *tcpsettings
udp: *udpsettings
kibana: kibana:
tcp: *tcpsettings tcp: *tcpsettings
udp: *udpsettings udp: *udpsettings
@@ -363,6 +367,8 @@ firewall:
portgroups: *portgroupsdocker portgroups: *portgroupsdocker
endgame: endgame:
portgroups: *portgroupsdocker portgroups: *portgroupsdocker
kafkanode:
portgroups: *portgroupsdocker
analyst: analyst:
portgroups: *portgroupsdocker portgroups: *portgroupsdocker
desktop: desktop:
@@ -454,6 +460,8 @@ firewall:
portgroups: *portgroupsdocker portgroups: *portgroupsdocker
syslog: syslog:
portgroups: *portgroupsdocker portgroups: *portgroupsdocker
kafkanode:
portgroups: *portgroupsdocker
analyst: analyst:
portgroups: *portgroupsdocker portgroups: *portgroupsdocker
desktop: desktop:
@@ -938,6 +946,62 @@ firewall:
portgroups: *portgroupshost portgroups: *portgroupshost
customhostgroup9: customhostgroup9:
portgroups: *portgroupshost portgroups: *portgroupshost
kafkanode:
chain:
DOCKER-USER:
hostgroups:
searchnode:
portgroups: *portgroupsdocker
kafkanode:
portgroups: *portgroupsdocker
customhostgroup0:
portgroups: *portgroupsdocker
customhostgroup1:
portgroups: *portgroupsdocker
customhostgroup2:
portgroups: *portgroupsdocker
customhostgroup3:
portgroups: *portgroupsdocker
customhostgroup4:
portgroups: *portgroupsdocker
customhostgroup5:
portgroups: *portgroupsdocker
customhostgroup6:
portgroups: *portgroupsdocker
customhostgroup7:
portgroups: *portgroupsdocker
customhostgroup8:
portgroups: *portgroupsdocker
customhostgroup9:
portgroups: *portgroupsdocker
INPUT:
hostgroups:
anywhere:
portgroups: *portgroupshost
dockernet:
portgroups: *portgroupshost
localhost:
portgroups: *portgroupshost
customhostgroup0:
portgroups: *portgroupshost
customhostgroup1:
portgroups: *portgroupshost
customhostgroup2:
portgroups: *portgroupshost
customhostgroup3:
portgroups: *portgroupshost
customhostgroup4:
portgroups: *portgroupshost
customhostgroup5:
portgroups: *portgroupshost
customhostgroup6:
portgroups: *portgroupshost
customhostgroup7:
portgroups: *portgroupshost
customhostgroup8:
portgroups: *portgroupshost
customhostgroup9:
portgroups: *portgroupshost
idh: idh:
chain: chain:

101
salt/kafka/config.sls Normal file
View File

@@ -0,0 +1,101 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set kafka_ips_logstash = [] %}
{% set kafka_ips_kraft = [] %}
{% set kafkanodes = salt['pillar.get']('kafka:nodes', {}) %}
{% set kafka_nodeid = salt['pillar.get']('kafka:nodes:' ~ GLOBALS.hostname ~ ':nodeid') %}
{% set kafka_ip = GLOBALS.node_ip %}
{% set nodes = salt['pillar.get']('kafka:nodes', {}) %}
{% set combined = [] %}
{% for hostname, data in nodes.items() %}
{% do combined.append(data.nodeid ~ "@" ~ hostname) %}
{% endfor %}
{% set kraft_controller_quorum_voters = ','.join(combined) %}
{# Create list for kafka <-> logstash/searchnode communcations #}
{% for node, node_data in kafkanodes.items() %}
{% do kafka_ips_logstash.append(node_data['ip'] + ":9092") %}
{% endfor %}
{% set kafka_server_list = "','".join(kafka_ips_logstash) %}
{# Create a list for kraft controller <-> kraft controller communications. Used for Kafka metadata management #}
{% for node, node_data in kafkanodes.items() %}
{% do kafka_ips_kraft.append(node_data['nodeid'] ~ "@" ~ node_data['ip'] ~ ":9093") %}
{% endfor %}
{% set kraft_server_list = "','".join(kafka_ips_kraft) %}
include:
- ssl
kafka_group:
group.present:
- name: kafka
- gid: 960
kafka:
user.present:
- uid: 960
- gid: 960
{# Future tools to query kafka directly / show consumer groups
kafka_sbin_tools:
file.recurse:
- name: /usr/sbin
- source: salt://kafka/tools/sbin
- user: 960
- group: 960
- file_mode: 755 #}
kakfa_log_dir:
file.directory:
- name: /opt/so/log/kafka
- user: 960
- group: 960
- makedirs: True
kafka_data_dir:
file.directory:
- name: /nsm/kafka/data
- user: 960
- group: 960
- makedirs: True
{# When docker container is created an added to registry. Update so-kafka-generate-keystore script #}
kafka_keystore_script:
cmd.script:
- source: salt://kafka/tools/sbin_jinja/so-kafka-generate-keystore
- tempalte: jinja
- cwd: /opt/so
- defaults:
GLOBALS: {{ GLOBALS }}
kafka_kraft_server_properties:
file.managed:
- source: salt://kafka/etc/server.properties.jinja
- name: /opt/so/conf/kafka/server.properties
- template: jinja
- defaults:
kafka_nodeid: {{ kafka_nodeid }}
kraft_controller_quorum_voters: {{ kraft_controller_quorum_voters }}
kafka_ip: {{ kafka_ip }}
- user: 960
- group: 960
- makedirs: True
- show_changes: False
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}

46
salt/kafka/enabled.sls Normal file
View File

@@ -0,0 +1,46 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% from 'docker/docker.map.jinja' import DOCKER %}
include:
- kafka.sostatus
- kafka.config
- kafka.storage
so-kafka:
docker_container.running:
- image: so-kafka
- hostname: so-kafka
- name: so-kafka
- networks:
- sobridge:
- ipv4_address: {{ DOCKER.containers['so-kafka'].ip }}
- user: kafka
- port_bindings:
{% for BINDING in DOCKER.containers['so-kafka'].port_bindings %}
- {{ BINDING }}
{% endfor %}
- binds:
- /etc/pki/kafka.jks:/etc/pki/kafka.jks
- /opt/so/conf/ca/cacerts:/etc/pki/java/sos/cacerts
- /nsm/kafka/data/:/nsm/kafka/data/:rw
- /opt/so/conf/kafka/server.properties:/kafka/config/kraft/server.properties
delete_so-kafka_so-status.disabled:
file.uncomment:
- name: /opt/so/conf/so-status/so-status.conf
- regex: ^so-kafka$
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}

View File

@@ -0,0 +1,123 @@
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present. See config/kraft/README.md for details.
#
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id={{ kafka_nodeid }}
# The connect string for the controller quorum
controller.quorum.voters={{ kraft_controller_quorum_voters }}
############################# Socket Server Settings #############################
# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=BROKER://{{ kafka_ip }}:9092,CONTROLLER://{{ kafka_ip }}:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=BROKER
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=BROKER://{{ kafka_ip }}:9092
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:SSL,BROKER:SSL
#SSL configuration
ssl.keystore.location=/etc/pki/kafka.jks
ssl.keystore.pasword=changeit
ssl.keystore.type=JKS
ssl.truststore.location=/etc/pki/java/sos/cacerts
ssl.truststore.password=changeit
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/nsm/kafka/data
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

9
salt/kafka/init.sls Normal file
View File

@@ -0,0 +1,9 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{# Create map.jinja to enable / disable kafka from UI #}
{# Temporarily just enable kafka #}
include:
- kafka.enabled

21
salt/kafka/sostatus.sls Normal file
View File

@@ -0,0 +1,21 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
append_so-kafka_so-status.conf:
file.append:
- name: /opt/so/conf/so-status/so-status.conf
- text: so-kafka
- unless: grep -q so-kafka /opt/so/conf/so-status/so-status.conf
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}

31
salt/kafka/storage.sls Normal file
View File

@@ -0,0 +1,31 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set kafka_cluster_id = salt['pillar.get']('secrets:kafka_cluster_id')%}
{# Initialize kafka storage if it doesn't already exist. Just looking for meta.properties in /nsm/kafka/data #}
{% if salt['file.file_exists']('/nsm/kafka/data/meta.properties') %}
{% else %}
kafka_storage_init:
cmd.run:
- name: |
docker run -v /nsm/kafka/data:/nsm/kafka/data -v /opt/so/conf/kafka/server.properties:/kafka/config/kraft/newserver.properties --name so-kafkainit --user root --entrypoint /kafka/bin/kafka-storage.sh so-kafka format -t {{ kafka_cluster_id }} -c /kafka/config/kraft/server.properties
kafka_rm_kafkainit:
cmd.run:
- name: |
docker rm so-kafkainit
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}

View File

@@ -0,0 +1,16 @@
#!/bin/bash
#
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
. /usr/sbin/so-common
if [ ! -f /etc/pki/kafka.jks ]; then
docker run -v /etc/pki/kafka.p12:/etc/pki/kafka.p12 --name so-kafka-keystore --user root --entrypoint keytool so-kafka -importkeystore -srckeystore /etc/pki/kafka.p12 -srcstoretype PKCS12 -srsstorepass changeit -destkeystore /etc/pki/kafka.jks -deststoretype JKS -deststorepass changeit -alias kafkastore -noprompt
docker cp so-kafka-keystore:/etc/pki/kafka.jks /etc/pki/kafka.jks
docker rm so-kafka-keystore
else
exit 0
fi

View File

@@ -19,6 +19,8 @@ logstash:
- search - search
fleet: fleet:
- fleet - fleet
kafkanode:
- kafkanode
defined_pipelines: defined_pipelines:
fleet: fleet:
- so/0012_input_elastic_agent.conf.jinja - so/0012_input_elastic_agent.conf.jinja
@@ -37,6 +39,8 @@ logstash:
- so/0900_input_redis.conf.jinja - so/0900_input_redis.conf.jinja
- so/9805_output_elastic_agent.conf.jinja - so/9805_output_elastic_agent.conf.jinja
- so/9900_output_endgame.conf.jinja - so/9900_output_endgame.conf.jinja
kafkanode:
- so/0899_output_kafka.conf.jinja
custom0: [] custom0: []
custom1: [] custom1: []
custom2: [] custom2: []

View File

@@ -75,10 +75,13 @@ so-logstash:
{% else %} {% else %}
- /etc/pki/tls/certs/intca.crt:/usr/share/filebeat/ca.crt:ro - /etc/pki/tls/certs/intca.crt:/usr/share/filebeat/ca.crt:ro
{% endif %} {% endif %}
{% if GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-import', 'so-heavynode', 'so-searchnode'] %} {% if GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-import', 'so-heavynode', 'so-searchnode', 'so-kafkanode' ] %}
- /opt/so/conf/ca/cacerts:/etc/pki/ca-trust/extracted/java/cacerts:ro - /opt/so/conf/ca/cacerts:/etc/pki/ca-trust/extracted/java/cacerts:ro
- /opt/so/conf/ca/tls-ca-bundle.pem:/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem:ro - /opt/so/conf/ca/tls-ca-bundle.pem:/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem:ro
{% endif %} {% endif %}
{% if GLOBALS.role in ['so-kafkanode'] %}
- /etc/pki/kafka-logstash.p12:/usr/share/logstash/kafka-logstash.p12:ro
{% endif %}
{% if GLOBALS.role == 'so-eval' %} {% if GLOBALS.role == 'so-eval' %}
- /nsm/zeek:/nsm/zeek:ro - /nsm/zeek:/nsm/zeek:ro
- /nsm/suricata:/suricata:ro - /nsm/suricata:/suricata:ro

View File

@@ -0,0 +1,26 @@
{% set kafka_brokers = salt['pillar.get']('logstash:nodes:kafkanode', {}) %}
{% set broker_ips = [] %}
{% for node, node_data in kafka_brokers.items() %}
{% do broker_ips.append(node_data['ip'] + ":9092") %}
{% endfor %}
{% set bootstrap_servers = "','".join(broker_ips) %}
#Run on searchnodes ingest kafka topic(s) group_id allows load balancing of event ingest to all searchnodes
input {
kafka {
codec => json
#Can ingest multiple topics. Set to a value from SOC UI?
topics => ['logstash-topic',]
group_id => 'searchnodes'
security_protocol => 'SSL'
bootstrap_servers => {{ bootstrap_servers }}
ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12'
ssl_keystore_password => ''
ssl_keystore_type => 'PKCS12'
ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts'
# Set password as a pillar to avoid bad optics? This is default truststore for grid
ssl_truststore_password => 'changeit'
}
}

View File

@@ -0,0 +1,22 @@
{% set kafka_brokers = salt['pillar.get']('logstash:nodes:kafkanode', {}) %}
{% set broker_ips = [] %}
{% for node, node_data in kafka_brokers.items() %}
{% do broker_ips.append(node_data['ip'] + ":9092") %}
{% endfor %}
{% set bootstrap_servers = "','".join(broker_ips) %}
#Run on kafka broker logstash writes to topic 'logstash-topic'
output {
kafka {
codec => json
topic_id => 'logstash-topic'
bootstrap_servers => '{{ bootstrap_servers }}'
security_protocol => 'SSL'
ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12'
ssl_keystore_password => ''
ssl_keystore_type => 'PKCS12'
ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts'
ssl_truststore_password => 'changeit'
}
}

View File

@@ -16,6 +16,7 @@ logstash:
manager: *assigned_pipelines manager: *assigned_pipelines
managersearch: *assigned_pipelines managersearch: *assigned_pipelines
fleet: *assigned_pipelines fleet: *assigned_pipelines
kafkanode: *assigned_pipelines
defined_pipelines: defined_pipelines:
receiver: &defined_pipelines receiver: &defined_pipelines
description: List of pipeline configurations assign to this group. description: List of pipeline configurations assign to this group.
@@ -26,6 +27,7 @@ logstash:
fleet: *defined_pipelines fleet: *defined_pipelines
manager: *defined_pipelines manager: *defined_pipelines
search: *defined_pipelines search: *defined_pipelines
kafkanode: *defined_pipelines
custom0: *defined_pipelines custom0: *defined_pipelines
custom1: *defined_pipelines custom1: *defined_pipelines
custom2: *defined_pipelines custom2: *defined_pipelines

View File

@@ -79,6 +79,9 @@ fi
'RECEIVER') 'RECEIVER')
so-firewall includehost receiver "$IP" --apply so-firewall includehost receiver "$IP" --apply
;; ;;
'KAFKANODE')
so-firewall includehost kafkanode "$IP" --apply
;;
'DESKTOP') 'DESKTOP')
so-firewall includehost desktop "$IP" --apply so-firewall includehost desktop "$IP" --apply
;; ;;

View File

@@ -0,0 +1,22 @@
#!/bin/bash
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
local_salt_dir=/opt/so/saltstack/local
if [[ -f /usr/sbin/so-common ]]; then
source /usr/sbin/so-common
else
source $(dirname $0)/../../../common/tools/sbin/so-common
fi
if ! grep -q "^ kafka_cluster_id:" $local_salt_dir/pillar/secrets.sls; then
kafka_cluster_id=$(get_random_value 22)
echo ' kafka_cluster_id: '$kafka_cluster_id >> $local_salt_dir/pillar/secrets.sls
else
echo 'kafka_cluster_id exists'
salt-call pillar.get secrets
fi

View File

@@ -556,6 +556,10 @@ function createRECEIVER() {
add_telegraf_to_minion add_telegraf_to_minion
} }
function createKAFKANODE() {
add_logstash_to_minion
# add_telegraf_to_minion
}
function testConnection() { function testConnection() {
retry 15 3 "salt '$MINION_ID' test.ping" True retry 15 3 "salt '$MINION_ID' test.ping" True

View File

@@ -664,6 +664,128 @@ elastickeyperms:
{%- endif %} {%- endif %}
# Roles will need to be modified. Below is just for testing encrypted kafka pipelines
# Remove so-manager. Just inplace for testing
{% if grains['role'] in ['so-manager', 'so-kafkanode', 'so-searchnode'] %}
# Create a cert for Redis encryption
kafka_key:
x509.private_key_managed:
- name: /etc/pki/kafka.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kafka.key') -%}
- prereq:
- x509: /etc/pki/kafka.crt
{%- endif %}
- retry:
attempts: 5
interval: 30
kafka_crt:
x509.certificate_managed:
- name: /etc/pki/kafka.crt
- ca_server: {{ ca_server }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- signing_policy: elasticfleet
- private_key: /etc/pki/kafka.key
- CN: {{ GLOBALS.hostname }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
cmd.run:
- name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka.key -in /etc/pki/kafka.crt -export -out /etc/pki/kafka.p12 -nodes -passout pass:changeit"
- onchanges:
- x509: /etc/pki/kafka.key
# Kafka needs a keystore so just creating a new key / cert for that purpose
etc_kafka_logstash_key:
x509.private_key_managed:
- name: /etc/pki/kafka-logstash.key
- keysize: 4096
- backup: True
- new: True
{% if salt['file.file_exists']('/etc/pki/kakfa-logstash.key') -%}
- prereq:
- x509: etc_kafka_logstash_crt
{%- endif %}
- retry:
attempts: 5
interval: 30
etc_kafka_logstash_crt:
x509.certificate_managed:
- name: /etc/pki/kafka-logstash.crt
- ca_server: {{ ca_server }}
- signing_policy: elasticfleet
- private_key: /etc/pki/kafka-logstash.key
- CN: {{ GLOBALS.hostname }}
- subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }}
- days_remaining: 0
- days_valid: 820
- backup: True
- timeout: 30
- retry:
attempts: 5
interval: 30
cmd.run:
- name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka-logstash.key -in /etc/pki/kafka-logstash.crt -export -out /etc/pki/kafka-logstash.p12 -nodes -passout pass:"
- onchanges:
- x509: etc_kafka_logstash_key
kafka_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.key
- mode: 640
- user: 960
- group: 939
kafka_crt_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.crt
- mode: 640
- user: 960
- group: 939
kafka_logstash_cert_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.crt
- mode: 640
- user: 960
- group: 939
kafka_logstash_key_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.key
- mode: 640
- user: 960
- group: 939
kafka_logstash_keystore_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka-logstash.p12
- mode: 640
- user: 960
- group: 939
kafka_keystore_perms:
file.managed:
- replace: False
- name: /etc/pki/kafka.p12
- mode: 640
- user: 960
- group: 939
{% endif %}
{% else %} {% else %}
{{sls}}_state_not_allowed: {{sls}}_state_not_allowed:

View File

@@ -67,3 +67,20 @@ fleet_crt:
fbcertdir: fbcertdir:
file.absent: file.absent:
- name: /opt/so/conf/filebeat/etc/pki - name: /opt/so/conf/filebeat/etc/pki
kafka_crt:
file.absent:
- name: /etc/pki/kafka.crt
kafka_key:
file.absent:
- name: /etc/pki/kafka.key
kafka_logstash_crt:
file.absent:
- name: /etc/pki/kafka-logstash.crt
kafka_logstash_key:
file.absent:
- name: /etc/pki/kafka-logstash.key
kafka_logstash_keystore:
file.absent:
- name: /etc/pki/kafka-logstash.p12

View File

@@ -255,6 +255,16 @@ base:
- elasticfleet.install_agent_grid - elasticfleet.install_agent_grid
- docker_clean - docker_clean
'*_kafkanode and G@saltversion:{{saltversion}}':
- match: compound
- kafka
- logstash
- ssl
- telegraf
- firewall
- docker_clean
- elasticfleet.install_agent_grid
'*_idh and G@saltversion:{{saltversion}}': '*_idh and G@saltversion:{{saltversion}}':
- match: compound - match: compound
- ssl - ssl

View File

@@ -0,0 +1 @@
{% set ROLE_GLOBALS = {} %}

View File

@@ -1242,6 +1242,7 @@ generate_passwords(){
REDISPASS=$(get_random_value) REDISPASS=$(get_random_value)
SOCSRVKEY=$(get_random_value 64) SOCSRVKEY=$(get_random_value 64)
IMPORTPASS=$(get_random_value) IMPORTPASS=$(get_random_value)
KAFKACLUSTERID=$(get_random_value 22)
} }
generate_interface_vars() { generate_interface_vars() {
@@ -1269,7 +1270,7 @@ get_redirect() {
get_minion_type() { get_minion_type() {
local minion_type local minion_type
case "$install_type" in case "$install_type" in
'EVAL' | 'MANAGERSEARCH' | 'MANAGER' | 'SENSOR' | 'HEAVYNODE' | 'SEARCHNODE' | 'FLEET' | 'IDH' | 'STANDALONE' | 'IMPORT' | 'RECEIVER') 'EVAL' | 'MANAGERSEARCH' | 'MANAGER' | 'SENSOR' | 'HEAVYNODE' | 'SEARCHNODE' | 'FLEET' | 'IDH' | 'STANDALONE' | 'IMPORT' | 'RECEIVER' | 'KAFKANODE')
minion_type=$(echo "$install_type" | tr '[:upper:]' '[:lower:]') minion_type=$(echo "$install_type" | tr '[:upper:]' '[:lower:]')
;; ;;
esac esac
@@ -1663,6 +1664,8 @@ process_installtype() {
is_import=true is_import=true
elif [ "$install_type" = 'RECEIVER' ]; then elif [ "$install_type" = 'RECEIVER' ]; then
is_receiver=true is_receiver=true
elif [ "$install_type" = 'KAFKANODE' ]; then
is_kafka=true
elif [ "$install_type" = 'DESKTOP' ]; then elif [ "$install_type" = 'DESKTOP' ]; then
if [ "$setup_type" != 'desktop' ]; then if [ "$setup_type" != 'desktop' ]; then
exec bash so-setup desktop exec bash so-setup desktop
@@ -2105,7 +2108,8 @@ secrets_pillar(){
" playbook_automation: $PLAYBOOKAUTOMATIONPASS"\ " playbook_automation: $PLAYBOOKAUTOMATIONPASS"\
" playbook_automation_api_key: "\ " playbook_automation_api_key: "\
" import_pass: $IMPORTPASS"\ " import_pass: $IMPORTPASS"\
" influx_pass: $INFLUXPASS" > $local_salt_dir/pillar/secrets.sls " influx_pass: $INFLUXPASS"\
" kafka_cluster_id: $KAFKACLUSTERID" > $local_salt_dir/pillar/secrets.sls
fi fi
} }

View File

@@ -574,6 +574,16 @@ if ! [[ -f $install_opt_file ]]; then
check_manager_connection check_manager_connection
set_minion_info set_minion_info
whiptail_end_settings whiptail_end_settings
elif [[ $is_kafka ]]; then
info "Setting up as node type Kafka broker"
#check_requirements "kafka"
networking_needful
collect_mngr_hostname
add_mngr_ip_to_hosts
check_manager_connection
set_minion_info
whiptail_end_settings
fi fi
if [[ $waitforstate ]]; then if [[ $waitforstate ]]; then

View File

@@ -640,13 +640,14 @@ whiptail_install_type_dist_existing() {
Note: Heavy nodes (HEAVYNODE) are NOT recommended for most users. Note: Heavy nodes (HEAVYNODE) are NOT recommended for most users.
EOM EOM
install_type=$(whiptail --title "$whiptail_title" --menu "$node_msg" 19 75 6 \ install_type=$(whiptail --title "$whiptail_title" --menu "$node_msg" 19 75 7 \
"SENSOR" "Create a forward only sensor " \ "SENSOR" "Create a forward only sensor " \
"SEARCHNODE" "Add a search node with parsing " \ "SEARCHNODE" "Add a search node with parsing " \
"FLEET" "Dedicated Elastic Fleet Node " \ "FLEET" "Dedicated Elastic Fleet Node " \
"HEAVYNODE" "Sensor + Search Node " \ "HEAVYNODE" "Sensor + Search Node " \
"IDH" "Intrusion Detection Honeypot Node " \ "IDH" "Intrusion Detection Honeypot Node " \
"RECEIVER" "Receiver Node " \ "RECEIVER" "Receiver Node " \
"KAFKANODE" "Kafka Broker + Kraft controller" \
3>&1 1>&2 2>&3 3>&1 1>&2 2>&3
# "HOTNODE" "Add Hot Node (Uses Elastic Clustering)" \ # TODO # "HOTNODE" "Add Hot Node (Uses Elastic Clustering)" \ # TODO
# "WARMNODE" "Add Warm Node to existing Hot or Search node" \ # TODO # "WARMNODE" "Add Warm Node to existing Hot or Search node" \ # TODO
@@ -677,6 +678,8 @@ whiptail_install_type_dist_existing() {
is_import=true is_import=true
elif [ "$install_type" = 'RECEIVER' ]; then elif [ "$install_type" = 'RECEIVER' ]; then
is_receiver=true is_receiver=true
elif [ "$install_type" = 'KAFKANODE' ]; then
is_kafka=true
elif [ "$install_type" = 'DESKTOP' ]; then elif [ "$install_type" = 'DESKTOP' ]; then
if [ "$setup_type" != 'desktop' ]; then if [ "$setup_type" != 'desktop' ]; then
exec bash so-setup desktop exec bash so-setup desktop