From 86dc7cc804d47d33aca7daf8ddf4107d6eaa5f3c Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Wed, 29 Nov 2023 13:34:25 -0500 Subject: [PATCH] Kafka init Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com> --- .../assigned_hostgroups.local.map.yaml | 1 + pillar/kafka/nodes.sls | 30 +++++ pillar/logstash/nodes.sls | 2 +- pillar/top.sls | 10 ++ salt/allowed_states.map.jinja | 11 +- salt/docker/defaults.yaml | 8 ++ salt/docker/soc_docker.yaml | 1 + salt/firewall/containers.map.jinja | 5 + salt/firewall/defaults.yaml | 94 +++++++++++++ salt/firewall/soc_firewall.yaml | 64 +++++++++ salt/kafka/config.sls | 101 ++++++++++++++ salt/kafka/enabled.sls | 46 +++++++ salt/kafka/etc/server.properties.jinja | 123 ++++++++++++++++++ salt/kafka/init.sls | 9 ++ salt/kafka/sostatus.sls | 21 +++ salt/kafka/storage.sls | 31 +++++ .../sbin_jinja/so-kafka-generate-keystore | 16 +++ salt/logstash/defaults.yaml | 4 + salt/logstash/enabled.sls | 5 +- .../config/so/0800_input_kafka.conf.jinja | 26 ++++ .../config/so/0899_output_kafka.conf.jinja | 22 ++++ salt/logstash/soc_logstash.yaml | 2 + salt/manager/tools/sbin/so-firewall-minion | 3 + salt/manager/tools/sbin/so-kafka-clusterid | 22 ++++ salt/manager/tools/sbin/so-minion | 4 + salt/ssl/init.sls | 122 +++++++++++++++++ salt/ssl/remove.sls | 17 +++ salt/top.sls | 10 ++ salt/vars/kafkanode.map.jinja | 1 + setup/so-functions | 8 +- setup/so-setup | 10 ++ setup/so-whiptail | 5 +- 32 files changed, 828 insertions(+), 6 deletions(-) create mode 100644 pillar/kafka/nodes.sls create mode 100644 salt/kafka/config.sls create mode 100644 salt/kafka/enabled.sls create mode 100644 salt/kafka/etc/server.properties.jinja create mode 100644 salt/kafka/init.sls create mode 100644 salt/kafka/sostatus.sls create mode 100644 salt/kafka/storage.sls create mode 100644 salt/kafka/tools/sbin_jinja/so-kafka-generate-keystore create mode 100644 salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja create mode 100644 salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja create mode 100644 salt/manager/tools/sbin/so-kafka-clusterid create mode 100644 salt/vars/kafkanode.map.jinja diff --git a/files/firewall/assigned_hostgroups.local.map.yaml b/files/firewall/assigned_hostgroups.local.map.yaml index 07f389af0..c6eb199c3 100644 --- a/files/firewall/assigned_hostgroups.local.map.yaml +++ b/files/firewall/assigned_hostgroups.local.map.yaml @@ -21,3 +21,4 @@ role: standalone: searchnode: sensor: + kafkanode: \ No newline at end of file diff --git a/pillar/kafka/nodes.sls b/pillar/kafka/nodes.sls new file mode 100644 index 000000000..a7d97ac9c --- /dev/null +++ b/pillar/kafka/nodes.sls @@ -0,0 +1,30 @@ +{% set current_kafkanodes = salt.saltutil.runner('mine.get', tgt='G@role:so-kafkanode', fun='network.ip_addrs', tgt_type='compound') %} +{% set pillar_kafkanodes = salt['pillar.get']('kafka:nodes', default={}, merge=True) %} + +{% set existing_ids = [] %} +{% for node in pillar_kafkanodes.values() %} + {% if node.get('id') %} + {% do existing_ids.append(node['nodeid']) %} + {% endif %} +{% endfor %} +{% set all_possible_ids = range(1, 256)|list %} + +{% set available_ids = [] %} +{% for id in all_possible_ids %} + {% if id not in existing_ids %} + {% do available_ids.append(id) %} + {% endif %} +{% endfor %} + +{% set final_nodes = pillar_kafkanodes.copy() %} + +{% for minionid, ip in current_kafkanodes.items() %} + {% set hostname = minionid.split('_')[0] %} + {% if hostname not in final_nodes %} + {% set new_id = available_ids.pop(0) %} + {% do final_nodes.update({hostname: {'nodeid': new_id, 'ip': ip[0]}}) %} + {% endif %} +{% endfor %} + +kafka: + nodes: {{ final_nodes|tojson }} \ No newline at end of file diff --git a/pillar/logstash/nodes.sls b/pillar/logstash/nodes.sls index 8d3bdab65..3b75a5cae 100644 --- a/pillar/logstash/nodes.sls +++ b/pillar/logstash/nodes.sls @@ -2,7 +2,7 @@ {% set cached_grains = salt.saltutil.runner('cache.grains', tgt='*') %} {% for minionid, ip in salt.saltutil.runner( 'mine.get', - tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-searchnode or G@role:so-heavynode or G@role:so-receiver or G@role:so-fleet ', + tgt='G@role:so-manager or G@role:so-managersearch or G@role:so-standalone or G@role:so-searchnode or G@role:so-heavynode or G@role:so-receiver or G@role:so-fleet or G@role:so-kafkanode ', fun='network.ip_addrs', tgt_type='compound') | dictsort() %} diff --git a/pillar/top.sls b/pillar/top.sls index 4893c44f9..49e493ec8 100644 --- a/pillar/top.sls +++ b/pillar/top.sls @@ -65,6 +65,7 @@ base: - soctopus.adv_soctopus - minions.{{ grains.id }} - minions.adv_{{ grains.id }} + - kafka.nodes '*_sensor': - healthcheck.sensor @@ -241,6 +242,15 @@ base: - minions.{{ grains.id }} - minions.adv_{{ grains.id }} + '*_kafkanode': + - logstash.nodes + - logstash.soc_logstash + - logstash.adv_logstash + - minions.{{ grains.id }} + - minions.adv_{{ grains.id }} + - secrets + - kafka.nodes + '*_import': - secrets - elasticsearch.index_templates diff --git a/salt/allowed_states.map.jinja b/salt/allowed_states.map.jinja index a3c5c75ab..11dfde824 100644 --- a/salt/allowed_states.map.jinja +++ b/salt/allowed_states.map.jinja @@ -187,6 +187,15 @@ 'schedule', 'docker_clean' ], + 'so-kafkanode': [ + 'kafka', + 'logstash', + 'ssl', + 'telegraf', + 'firewall', + 'schedule', + 'docker_clean' + ], 'so-desktop': [ ], }, grain='role') %} @@ -203,7 +212,7 @@ {% do allowed_states.append('strelka') %} {% endif %} - {% if grains.role in ['so-eval', 'so-manager', 'so-standalone', 'so-searchnode', 'so-managersearch', 'so-heavynode', 'so-import'] %} + {% if grains.role in ['so-eval', 'so-manager', 'so-standalone', 'so-searchnode', 'so-managersearch', 'so-heavynode', 'so-import', 'so-kafkanode'] %} {% do allowed_states.append('elasticsearch') %} {% endif %} diff --git a/salt/docker/defaults.yaml b/salt/docker/defaults.yaml index e39feaf06..3155841c9 100644 --- a/salt/docker/defaults.yaml +++ b/salt/docker/defaults.yaml @@ -201,3 +201,11 @@ docker: custom_bind_mounts: [] extra_hosts: [] extra_env: [] + 'so-kafka': + final_octet: 88 + port_bindings: + - 0.0.0.0:9092:9092 + - 0.0.0.0:2181:2181 + custom_bind_mounts: [] + extra_hosts: [] + extra_env: [] \ No newline at end of file diff --git a/salt/docker/soc_docker.yaml b/salt/docker/soc_docker.yaml index d227a3e85..87751010e 100644 --- a/salt/docker/soc_docker.yaml +++ b/salt/docker/soc_docker.yaml @@ -68,3 +68,4 @@ docker: so-steno: *dockerOptions so-suricata: *dockerOptions so-zeek: *dockerOptions + so-kafka: *dockerOptions \ No newline at end of file diff --git a/salt/firewall/containers.map.jinja b/salt/firewall/containers.map.jinja index 617b4a216..b19f66355 100644 --- a/salt/firewall/containers.map.jinja +++ b/salt/firewall/containers.map.jinja @@ -87,6 +87,11 @@ 'so-logstash', 'so-redis', ] %} +{% elif GLOBALS.role == 'so-kafkanode' %} +{% set NODE_CONTAINERS = [ + 'so-logstash', + 'so-kafka', +] %} {% elif GLOBALS.role == 'so-idh' %} {% set NODE_CONTAINERS = [ diff --git a/salt/firewall/defaults.yaml b/salt/firewall/defaults.yaml index ff127c419..112e0eaaa 100644 --- a/salt/firewall/defaults.yaml +++ b/salt/firewall/defaults.yaml @@ -19,6 +19,7 @@ firewall: manager: [] managersearch: [] receiver: [] + kafkanode: [] searchnode: [] self: [] sensor: [] @@ -90,6 +91,11 @@ firewall: tcp: - 8086 udp: [] + kafka: + tcp: + - 9092 + - 9093 + udp: [] kibana: tcp: - 5601 @@ -441,6 +447,15 @@ firewall: - elastic_agent_data - elastic_agent_update - sensoroni + kafkanode: + portgroups: + - yum + - docker_registry + - influxdb + - elastic_agent_control + - elastic_agent_data + - elastic_agent_update + - sensoroni analyst: portgroups: - nginx @@ -513,6 +528,9 @@ firewall: receiver: portgroups: - salt_manager + kafkanode: + portgroups: + - salt_manager desktop: portgroups: - salt_manager @@ -629,6 +647,15 @@ firewall: - elastic_agent_data - elastic_agent_update - sensoroni + kafkanode: + portgroups: + - yum + - docker_registry + - influxdb + - elastic_agent_control + - elastic_agent_data + - elastic_agent_update + - sensoroni analyst: portgroups: - nginx @@ -1339,6 +1366,73 @@ firewall: portgroups: [] customhostgroup9: portgroups: [] + kafkanode: + chain: + DOCKER-USER: + hostgroups: + searchnode: + portgroups: + - kafka + kafkanode: + portgroups: + - kafka + customhostgroup0: + portgroups: [] + customhostgroup1: + portgroups: [] + customhostgroup2: + portgroups: [] + customhostgroup3: + portgroups: [] + customhostgroup4: + portgroups: [] + customhostgroup5: + portgroups: [] + customhostgroup6: + portgroups: [] + customhostgroup7: + portgroups: [] + customhostgroup8: + portgroups: [] + customhostgroup9: + portgroups: [] + INPUT: + hostgroups: + anywhere: + portgroups: + - ssh + dockernet: + portgroups: + - all + localhost: + portgroups: + - all + self: + portgroups: + - syslog + syslog: + portgroups: + - syslog + customhostgroup0: + portgroups: [] + customhostgroup1: + portgroups: [] + customhostgroup2: + portgroups: [] + customhostgroup3: + portgroups: [] + customhostgroup4: + portgroups: [] + customhostgroup5: + portgroups: [] + customhostgroup6: + portgroups: [] + customhostgroup7: + portgroups: [] + customhostgroup8: + portgroups: [] + customhostgroup9: + portgroups: [] idh: chain: DOCKER-USER: diff --git a/salt/firewall/soc_firewall.yaml b/salt/firewall/soc_firewall.yaml index 209484b6e..7d250737a 100644 --- a/salt/firewall/soc_firewall.yaml +++ b/salt/firewall/soc_firewall.yaml @@ -34,6 +34,7 @@ firewall: heavynode: *hostgroupsettings idh: *hostgroupsettings import: *hostgroupsettings + kafkanode: *hostgroupsettings localhost: *ROhostgroupsettingsadv manager: *hostgroupsettings managersearch: *hostgroupsettings @@ -115,6 +116,9 @@ firewall: influxdb: tcp: *tcpsettings udp: *udpsettings + kafka: + tcp: *tcpsettings + udp: *udpsettings kibana: tcp: *tcpsettings udp: *udpsettings @@ -363,6 +367,8 @@ firewall: portgroups: *portgroupsdocker endgame: portgroups: *portgroupsdocker + kafkanode: + portgroups: *portgroupsdocker analyst: portgroups: *portgroupsdocker desktop: @@ -454,6 +460,8 @@ firewall: portgroups: *portgroupsdocker syslog: portgroups: *portgroupsdocker + kafkanode: + portgroups: *portgroupsdocker analyst: portgroups: *portgroupsdocker desktop: @@ -938,6 +946,62 @@ firewall: portgroups: *portgroupshost customhostgroup9: portgroups: *portgroupshost + kafkanode: + chain: + DOCKER-USER: + hostgroups: + searchnode: + portgroups: *portgroupsdocker + kafkanode: + portgroups: *portgroupsdocker + customhostgroup0: + portgroups: *portgroupsdocker + customhostgroup1: + portgroups: *portgroupsdocker + customhostgroup2: + portgroups: *portgroupsdocker + customhostgroup3: + portgroups: *portgroupsdocker + customhostgroup4: + portgroups: *portgroupsdocker + customhostgroup5: + portgroups: *portgroupsdocker + customhostgroup6: + portgroups: *portgroupsdocker + customhostgroup7: + portgroups: *portgroupsdocker + customhostgroup8: + portgroups: *portgroupsdocker + customhostgroup9: + portgroups: *portgroupsdocker + INPUT: + hostgroups: + anywhere: + portgroups: *portgroupshost + dockernet: + portgroups: *portgroupshost + localhost: + portgroups: *portgroupshost + customhostgroup0: + portgroups: *portgroupshost + customhostgroup1: + portgroups: *portgroupshost + customhostgroup2: + portgroups: *portgroupshost + customhostgroup3: + portgroups: *portgroupshost + customhostgroup4: + portgroups: *portgroupshost + customhostgroup5: + portgroups: *portgroupshost + customhostgroup6: + portgroups: *portgroupshost + customhostgroup7: + portgroups: *portgroupshost + customhostgroup8: + portgroups: *portgroupshost + customhostgroup9: + portgroups: *portgroupshost idh: chain: diff --git a/salt/kafka/config.sls b/salt/kafka/config.sls new file mode 100644 index 000000000..8caaa01cd --- /dev/null +++ b/salt/kafka/config.sls @@ -0,0 +1,101 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +{% from 'allowed_states.map.jinja' import allowed_states %} +{% if sls.split('.')[0] in allowed_states %} +{% from 'vars/globals.map.jinja' import GLOBALS %} + +{% set kafka_ips_logstash = [] %} +{% set kafka_ips_kraft = [] %} +{% set kafkanodes = salt['pillar.get']('kafka:nodes', {}) %} +{% set kafka_nodeid = salt['pillar.get']('kafka:nodes:' ~ GLOBALS.hostname ~ ':nodeid') %} +{% set kafka_ip = GLOBALS.node_ip %} + +{% set nodes = salt['pillar.get']('kafka:nodes', {}) %} +{% set combined = [] %} +{% for hostname, data in nodes.items() %} + {% do combined.append(data.nodeid ~ "@" ~ hostname) %} +{% endfor %} +{% set kraft_controller_quorum_voters = ','.join(combined) %} + +{# Create list for kafka <-> logstash/searchnode communcations #} +{% for node, node_data in kafkanodes.items() %} +{% do kafka_ips_logstash.append(node_data['ip'] + ":9092") %} +{% endfor %} +{% set kafka_server_list = "','".join(kafka_ips_logstash) %} + +{# Create a list for kraft controller <-> kraft controller communications. Used for Kafka metadata management #} +{% for node, node_data in kafkanodes.items() %} +{% do kafka_ips_kraft.append(node_data['nodeid'] ~ "@" ~ node_data['ip'] ~ ":9093") %} +{% endfor %} +{% set kraft_server_list = "','".join(kafka_ips_kraft) %} + + +include: + - ssl + +kafka_group: + group.present: + - name: kafka + - gid: 960 + +kafka: + user.present: + - uid: 960 + - gid: 960 + +{# Future tools to query kafka directly / show consumer groups +kafka_sbin_tools: + file.recurse: + - name: /usr/sbin + - source: salt://kafka/tools/sbin + - user: 960 + - group: 960 + - file_mode: 755 #} + +kakfa_log_dir: + file.directory: + - name: /opt/so/log/kafka + - user: 960 + - group: 960 + - makedirs: True + +kafka_data_dir: + file.directory: + - name: /nsm/kafka/data + - user: 960 + - group: 960 + - makedirs: True + +{# When docker container is created an added to registry. Update so-kafka-generate-keystore script #} +kafka_keystore_script: + cmd.script: + - source: salt://kafka/tools/sbin_jinja/so-kafka-generate-keystore + - tempalte: jinja + - cwd: /opt/so + - defaults: + GLOBALS: {{ GLOBALS }} + +kafka_kraft_server_properties: + file.managed: + - source: salt://kafka/etc/server.properties.jinja + - name: /opt/so/conf/kafka/server.properties + - template: jinja + - defaults: + kafka_nodeid: {{ kafka_nodeid }} + kraft_controller_quorum_voters: {{ kraft_controller_quorum_voters }} + kafka_ip: {{ kafka_ip }} + - user: 960 + - group: 960 + - makedirs: True + - show_changes: False + +{% else %} + +{{sls}}_state_not_allowed: + test.fail_without_changes: + - name: {{sls}}_state_not_allowed + +{% endif %} \ No newline at end of file diff --git a/salt/kafka/enabled.sls b/salt/kafka/enabled.sls new file mode 100644 index 000000000..1bf7dcf8b --- /dev/null +++ b/salt/kafka/enabled.sls @@ -0,0 +1,46 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +{% from 'allowed_states.map.jinja' import allowed_states %} +{% if sls.split('.')[0] in allowed_states %} +{% from 'vars/globals.map.jinja' import GLOBALS %} +{% from 'docker/docker.map.jinja' import DOCKER %} + +include: + - kafka.sostatus + - kafka.config + - kafka.storage + +so-kafka: + docker_container.running: + - image: so-kafka + - hostname: so-kafka + - name: so-kafka + - networks: + - sobridge: + - ipv4_address: {{ DOCKER.containers['so-kafka'].ip }} + - user: kafka + - port_bindings: + {% for BINDING in DOCKER.containers['so-kafka'].port_bindings %} + - {{ BINDING }} + {% endfor %} + - binds: + - /etc/pki/kafka.jks:/etc/pki/kafka.jks + - /opt/so/conf/ca/cacerts:/etc/pki/java/sos/cacerts + - /nsm/kafka/data/:/nsm/kafka/data/:rw + - /opt/so/conf/kafka/server.properties:/kafka/config/kraft/server.properties + +delete_so-kafka_so-status.disabled: + file.uncomment: + - name: /opt/so/conf/so-status/so-status.conf + - regex: ^so-kafka$ + +{% else %} + +{{sls}}_state_not_allowed: + test.fail_without_changes: + - name: {{sls}}_state_not_allowed + +{% endif %} \ No newline at end of file diff --git a/salt/kafka/etc/server.properties.jinja b/salt/kafka/etc/server.properties.jinja new file mode 100644 index 000000000..ad5ac67a9 --- /dev/null +++ b/salt/kafka/etc/server.properties.jinja @@ -0,0 +1,123 @@ +# This configuration file is intended for use in KRaft mode, where +# Apache ZooKeeper is not present. See config/kraft/README.md for details. +# + +############################# Server Basics ############################# + +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker,controller + +# The node id associated with this instance's roles +node.id={{ kafka_nodeid }} + +# The connect string for the controller quorum +controller.quorum.voters={{ kraft_controller_quorum_voters }} + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. +# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum. +# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), +# with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=BROKER://{{ kafka_ip }}:9092,CONTROLLER://{{ kafka_ip }}:9093 + +# Name of listener used for communication between brokers. +inter.broker.listener.name=BROKER + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +advertised.listeners=BROKER://{{ kafka_ip }}:9092 + +# A comma-separated list of the names of the listeners used by the controller. +# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +listener.security.protocol.map=CONTROLLER:SSL,BROKER:SSL + +#SSL configuration +ssl.keystore.location=/etc/pki/kafka.jks +ssl.keystore.pasword=changeit +ssl.keystore.type=JKS +ssl.truststore.location=/etc/pki/java/sos/cacerts +ssl.truststore.password=changeit + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs=/nsm/kafka/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 \ No newline at end of file diff --git a/salt/kafka/init.sls b/salt/kafka/init.sls new file mode 100644 index 000000000..653cd4b88 --- /dev/null +++ b/salt/kafka/init.sls @@ -0,0 +1,9 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +{# Create map.jinja to enable / disable kafka from UI #} +{# Temporarily just enable kafka #} +include: + - kafka.enabled diff --git a/salt/kafka/sostatus.sls b/salt/kafka/sostatus.sls new file mode 100644 index 000000000..4c7519964 --- /dev/null +++ b/salt/kafka/sostatus.sls @@ -0,0 +1,21 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +{% from 'allowed_states.map.jinja' import allowed_states %} +{% if sls.split('.')[0] in allowed_states %} + +append_so-kafka_so-status.conf: + file.append: + - name: /opt/so/conf/so-status/so-status.conf + - text: so-kafka + - unless: grep -q so-kafka /opt/so/conf/so-status/so-status.conf + +{% else %} + +{{sls}}_state_not_allowed: + test.fail_without_changes: + - name: {{sls}}_state_not_allowed + +{% endif %} \ No newline at end of file diff --git a/salt/kafka/storage.sls b/salt/kafka/storage.sls new file mode 100644 index 000000000..dc114ef4f --- /dev/null +++ b/salt/kafka/storage.sls @@ -0,0 +1,31 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +{% from 'allowed_states.map.jinja' import allowed_states %} +{% if sls.split('.')[0] in allowed_states %} +{% from 'vars/globals.map.jinja' import GLOBALS %} +{% set kafka_cluster_id = salt['pillar.get']('secrets:kafka_cluster_id')%} + +{# Initialize kafka storage if it doesn't already exist. Just looking for meta.properties in /nsm/kafka/data #} +{% if salt['file.file_exists']('/nsm/kafka/data/meta.properties') %} +{% else %} +kafka_storage_init: + cmd.run: + - name: | + docker run -v /nsm/kafka/data:/nsm/kafka/data -v /opt/so/conf/kafka/server.properties:/kafka/config/kraft/newserver.properties --name so-kafkainit --user root --entrypoint /kafka/bin/kafka-storage.sh so-kafka format -t {{ kafka_cluster_id }} -c /kafka/config/kraft/server.properties +kafka_rm_kafkainit: + cmd.run: + - name: | + docker rm so-kafkainit +{% endif %} + + +{% else %} + +{{sls}}_state_not_allowed: + test.fail_without_changes: + - name: {{sls}}_state_not_allowed + +{% endif %} \ No newline at end of file diff --git a/salt/kafka/tools/sbin_jinja/so-kafka-generate-keystore b/salt/kafka/tools/sbin_jinja/so-kafka-generate-keystore new file mode 100644 index 000000000..69bb6ad87 --- /dev/null +++ b/salt/kafka/tools/sbin_jinja/so-kafka-generate-keystore @@ -0,0 +1,16 @@ +#!/bin/bash +# +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +. /usr/sbin/so-common + +if [ ! -f /etc/pki/kafka.jks ]; then + docker run -v /etc/pki/kafka.p12:/etc/pki/kafka.p12 --name so-kafka-keystore --user root --entrypoint keytool so-kafka -importkeystore -srckeystore /etc/pki/kafka.p12 -srcstoretype PKCS12 -srsstorepass changeit -destkeystore /etc/pki/kafka.jks -deststoretype JKS -deststorepass changeit -alias kafkastore -noprompt + docker cp so-kafka-keystore:/etc/pki/kafka.jks /etc/pki/kafka.jks + docker rm so-kafka-keystore +else + exit 0 +fi \ No newline at end of file diff --git a/salt/logstash/defaults.yaml b/salt/logstash/defaults.yaml index e4c18cc64..b7382090e 100644 --- a/salt/logstash/defaults.yaml +++ b/salt/logstash/defaults.yaml @@ -19,6 +19,8 @@ logstash: - search fleet: - fleet + kafkanode: + - kafkanode defined_pipelines: fleet: - so/0012_input_elastic_agent.conf.jinja @@ -37,6 +39,8 @@ logstash: - so/0900_input_redis.conf.jinja - so/9805_output_elastic_agent.conf.jinja - so/9900_output_endgame.conf.jinja + kafkanode: + - so/0899_output_kafka.conf.jinja custom0: [] custom1: [] custom2: [] diff --git a/salt/logstash/enabled.sls b/salt/logstash/enabled.sls index c76f81d21..96e29b25a 100644 --- a/salt/logstash/enabled.sls +++ b/salt/logstash/enabled.sls @@ -75,10 +75,13 @@ so-logstash: {% else %} - /etc/pki/tls/certs/intca.crt:/usr/share/filebeat/ca.crt:ro {% endif %} - {% if GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-import', 'so-heavynode', 'so-searchnode'] %} + {% if GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-import', 'so-heavynode', 'so-searchnode', 'so-kafkanode' ] %} - /opt/so/conf/ca/cacerts:/etc/pki/ca-trust/extracted/java/cacerts:ro - /opt/so/conf/ca/tls-ca-bundle.pem:/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem:ro {% endif %} + {% if GLOBALS.role in ['so-kafkanode'] %} + - /etc/pki/kafka-logstash.p12:/usr/share/logstash/kafka-logstash.p12:ro + {% endif %} {% if GLOBALS.role == 'so-eval' %} - /nsm/zeek:/nsm/zeek:ro - /nsm/suricata:/suricata:ro diff --git a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja new file mode 100644 index 000000000..c1429319a --- /dev/null +++ b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja @@ -0,0 +1,26 @@ +{% set kafka_brokers = salt['pillar.get']('logstash:nodes:kafkanode', {}) %} +{% set broker_ips = [] %} +{% for node, node_data in kafka_brokers.items() %} + {% do broker_ips.append(node_data['ip'] + ":9092") %} +{% endfor %} + +{% set bootstrap_servers = "','".join(broker_ips) %} + + +#Run on searchnodes ingest kafka topic(s) group_id allows load balancing of event ingest to all searchnodes +input { + kafka { + codec => json + #Can ingest multiple topics. Set to a value from SOC UI? + topics => ['logstash-topic',] + group_id => 'searchnodes' + security_protocol => 'SSL' + bootstrap_servers => {{ bootstrap_servers }} + ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12' + ssl_keystore_password => '' + ssl_keystore_type => 'PKCS12' + ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts' + # Set password as a pillar to avoid bad optics? This is default truststore for grid + ssl_truststore_password => 'changeit' + } +} \ No newline at end of file diff --git a/salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja new file mode 100644 index 000000000..ff9a6f6ee --- /dev/null +++ b/salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja @@ -0,0 +1,22 @@ +{% set kafka_brokers = salt['pillar.get']('logstash:nodes:kafkanode', {}) %} +{% set broker_ips = [] %} +{% for node, node_data in kafka_brokers.items() %} + {% do broker_ips.append(node_data['ip'] + ":9092") %} +{% endfor %} + +{% set bootstrap_servers = "','".join(broker_ips) %} + +#Run on kafka broker logstash writes to topic 'logstash-topic' +output { + kafka { + codec => json + topic_id => 'logstash-topic' + bootstrap_servers => '{{ bootstrap_servers }}' + security_protocol => 'SSL' + ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12' + ssl_keystore_password => '' + ssl_keystore_type => 'PKCS12' + ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts' + ssl_truststore_password => 'changeit' + } +} \ No newline at end of file diff --git a/salt/logstash/soc_logstash.yaml b/salt/logstash/soc_logstash.yaml index bcb99bad5..144094eb1 100644 --- a/salt/logstash/soc_logstash.yaml +++ b/salt/logstash/soc_logstash.yaml @@ -16,6 +16,7 @@ logstash: manager: *assigned_pipelines managersearch: *assigned_pipelines fleet: *assigned_pipelines + kafkanode: *assigned_pipelines defined_pipelines: receiver: &defined_pipelines description: List of pipeline configurations assign to this group. @@ -26,6 +27,7 @@ logstash: fleet: *defined_pipelines manager: *defined_pipelines search: *defined_pipelines + kafkanode: *defined_pipelines custom0: *defined_pipelines custom1: *defined_pipelines custom2: *defined_pipelines diff --git a/salt/manager/tools/sbin/so-firewall-minion b/salt/manager/tools/sbin/so-firewall-minion index 66a0afcea..3357e5185 100755 --- a/salt/manager/tools/sbin/so-firewall-minion +++ b/salt/manager/tools/sbin/so-firewall-minion @@ -79,6 +79,9 @@ fi 'RECEIVER') so-firewall includehost receiver "$IP" --apply ;; + 'KAFKANODE') + so-firewall includehost kafkanode "$IP" --apply + ;; 'DESKTOP') so-firewall includehost desktop "$IP" --apply ;; diff --git a/salt/manager/tools/sbin/so-kafka-clusterid b/salt/manager/tools/sbin/so-kafka-clusterid new file mode 100644 index 000000000..64833a0d2 --- /dev/null +++ b/salt/manager/tools/sbin/so-kafka-clusterid @@ -0,0 +1,22 @@ +#!/bin/bash + +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +local_salt_dir=/opt/so/saltstack/local + +if [[ -f /usr/sbin/so-common ]]; then + source /usr/sbin/so-common +else + source $(dirname $0)/../../../common/tools/sbin/so-common +fi + +if ! grep -q "^ kafka_cluster_id:" $local_salt_dir/pillar/secrets.sls; then + kafka_cluster_id=$(get_random_value 22) + echo ' kafka_cluster_id: '$kafka_cluster_id >> $local_salt_dir/pillar/secrets.sls +else + echo 'kafka_cluster_id exists' + salt-call pillar.get secrets +fi \ No newline at end of file diff --git a/salt/manager/tools/sbin/so-minion b/salt/manager/tools/sbin/so-minion index edc0b1404..c61098589 100755 --- a/salt/manager/tools/sbin/so-minion +++ b/salt/manager/tools/sbin/so-minion @@ -556,6 +556,10 @@ function createRECEIVER() { add_telegraf_to_minion } +function createKAFKANODE() { + add_logstash_to_minion + # add_telegraf_to_minion +} function testConnection() { retry 15 3 "salt '$MINION_ID' test.ping" True diff --git a/salt/ssl/init.sls b/salt/ssl/init.sls index ef93a9072..2a71cd853 100644 --- a/salt/ssl/init.sls +++ b/salt/ssl/init.sls @@ -664,6 +664,128 @@ elastickeyperms: {%- endif %} +# Roles will need to be modified. Below is just for testing encrypted kafka pipelines +# Remove so-manager. Just inplace for testing +{% if grains['role'] in ['so-manager', 'so-kafkanode', 'so-searchnode'] %} +# Create a cert for Redis encryption +kafka_key: + x509.private_key_managed: + - name: /etc/pki/kafka.key + - keysize: 4096 + - backup: True + - new: True + {% if salt['file.file_exists']('/etc/pki/kafka.key') -%} + - prereq: + - x509: /etc/pki/kafka.crt + {%- endif %} + - retry: + attempts: 5 + interval: 30 + +kafka_crt: + x509.certificate_managed: + - name: /etc/pki/kafka.crt + - ca_server: {{ ca_server }} + - subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }} + - signing_policy: elasticfleet + - private_key: /etc/pki/kafka.key + - CN: {{ GLOBALS.hostname }} + - days_remaining: 0 + - days_valid: 820 + - backup: True + - timeout: 30 + - retry: + attempts: 5 + interval: 30 + cmd.run: + - name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka.key -in /etc/pki/kafka.crt -export -out /etc/pki/kafka.p12 -nodes -passout pass:changeit" + - onchanges: + - x509: /etc/pki/kafka.key + +# Kafka needs a keystore so just creating a new key / cert for that purpose +etc_kafka_logstash_key: + x509.private_key_managed: + - name: /etc/pki/kafka-logstash.key + - keysize: 4096 + - backup: True + - new: True + {% if salt['file.file_exists']('/etc/pki/kakfa-logstash.key') -%} + - prereq: + - x509: etc_kafka_logstash_crt + {%- endif %} + - retry: + attempts: 5 + interval: 30 + +etc_kafka_logstash_crt: + x509.certificate_managed: + - name: /etc/pki/kafka-logstash.crt + - ca_server: {{ ca_server }} + - signing_policy: elasticfleet + - private_key: /etc/pki/kafka-logstash.key + - CN: {{ GLOBALS.hostname }} + - subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }} + - days_remaining: 0 + - days_valid: 820 + - backup: True + - timeout: 30 + - retry: + attempts: 5 + interval: 30 + cmd.run: + - name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka-logstash.key -in /etc/pki/kafka-logstash.crt -export -out /etc/pki/kafka-logstash.p12 -nodes -passout pass:" + - onchanges: + - x509: etc_kafka_logstash_key + +kafka_key_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka.key + - mode: 640 + - user: 960 + - group: 939 + +kafka_crt_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka.crt + - mode: 640 + - user: 960 + - group: 939 + +kafka_logstash_cert_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka-logstash.crt + - mode: 640 + - user: 960 + - group: 939 + +kafka_logstash_key_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka-logstash.key + - mode: 640 + - user: 960 + - group: 939 + +kafka_logstash_keystore_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka-logstash.p12 + - mode: 640 + - user: 960 + - group: 939 + +kafka_keystore_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka.p12 + - mode: 640 + - user: 960 + - group: 939 + +{% endif %} {% else %} {{sls}}_state_not_allowed: diff --git a/salt/ssl/remove.sls b/salt/ssl/remove.sls index 43a245288..bb4562300 100644 --- a/salt/ssl/remove.sls +++ b/salt/ssl/remove.sls @@ -67,3 +67,20 @@ fleet_crt: fbcertdir: file.absent: - name: /opt/so/conf/filebeat/etc/pki + +kafka_crt: + file.absent: + - name: /etc/pki/kafka.crt +kafka_key: + file.absent: + - name: /etc/pki/kafka.key + +kafka_logstash_crt: + file.absent: + - name: /etc/pki/kafka-logstash.crt +kafka_logstash_key: + file.absent: + - name: /etc/pki/kafka-logstash.key +kafka_logstash_keystore: + file.absent: + - name: /etc/pki/kafka-logstash.p12 diff --git a/salt/top.sls b/salt/top.sls index 2323731a1..cd1b92e5c 100644 --- a/salt/top.sls +++ b/salt/top.sls @@ -255,6 +255,16 @@ base: - elasticfleet.install_agent_grid - docker_clean + '*_kafkanode and G@saltversion:{{saltversion}}': + - match: compound + - kafka + - logstash + - ssl + - telegraf + - firewall + - docker_clean + - elasticfleet.install_agent_grid + '*_idh and G@saltversion:{{saltversion}}': - match: compound - ssl diff --git a/salt/vars/kafkanode.map.jinja b/salt/vars/kafkanode.map.jinja new file mode 100644 index 000000000..396cefcc9 --- /dev/null +++ b/salt/vars/kafkanode.map.jinja @@ -0,0 +1 @@ +{% set ROLE_GLOBALS = {} %} \ No newline at end of file diff --git a/setup/so-functions b/setup/so-functions index fc0876248..76887c81c 100755 --- a/setup/so-functions +++ b/setup/so-functions @@ -1242,6 +1242,7 @@ generate_passwords(){ REDISPASS=$(get_random_value) SOCSRVKEY=$(get_random_value 64) IMPORTPASS=$(get_random_value) + KAFKACLUSTERID=$(get_random_value 22) } generate_interface_vars() { @@ -1269,7 +1270,7 @@ get_redirect() { get_minion_type() { local minion_type case "$install_type" in - 'EVAL' | 'MANAGERSEARCH' | 'MANAGER' | 'SENSOR' | 'HEAVYNODE' | 'SEARCHNODE' | 'FLEET' | 'IDH' | 'STANDALONE' | 'IMPORT' | 'RECEIVER') + 'EVAL' | 'MANAGERSEARCH' | 'MANAGER' | 'SENSOR' | 'HEAVYNODE' | 'SEARCHNODE' | 'FLEET' | 'IDH' | 'STANDALONE' | 'IMPORT' | 'RECEIVER' | 'KAFKANODE') minion_type=$(echo "$install_type" | tr '[:upper:]' '[:lower:]') ;; esac @@ -1663,6 +1664,8 @@ process_installtype() { is_import=true elif [ "$install_type" = 'RECEIVER' ]; then is_receiver=true + elif [ "$install_type" = 'KAFKANODE' ]; then + is_kafka=true elif [ "$install_type" = 'DESKTOP' ]; then if [ "$setup_type" != 'desktop' ]; then exec bash so-setup desktop @@ -2105,7 +2108,8 @@ secrets_pillar(){ " playbook_automation: $PLAYBOOKAUTOMATIONPASS"\ " playbook_automation_api_key: "\ " import_pass: $IMPORTPASS"\ - " influx_pass: $INFLUXPASS" > $local_salt_dir/pillar/secrets.sls + " influx_pass: $INFLUXPASS"\ + " kafka_cluster_id: $KAFKACLUSTERID" > $local_salt_dir/pillar/secrets.sls fi } diff --git a/setup/so-setup b/setup/so-setup index 14d6b2304..bc64cd9d1 100755 --- a/setup/so-setup +++ b/setup/so-setup @@ -574,6 +574,16 @@ if ! [[ -f $install_opt_file ]]; then check_manager_connection set_minion_info whiptail_end_settings + + elif [[ $is_kafka ]]; then + info "Setting up as node type Kafka broker" + #check_requirements "kafka" + networking_needful + collect_mngr_hostname + add_mngr_ip_to_hosts + check_manager_connection + set_minion_info + whiptail_end_settings fi if [[ $waitforstate ]]; then diff --git a/setup/so-whiptail b/setup/so-whiptail index c55e2db8f..4553ebd33 100755 --- a/setup/so-whiptail +++ b/setup/so-whiptail @@ -640,13 +640,14 @@ whiptail_install_type_dist_existing() { Note: Heavy nodes (HEAVYNODE) are NOT recommended for most users. EOM - install_type=$(whiptail --title "$whiptail_title" --menu "$node_msg" 19 75 6 \ + install_type=$(whiptail --title "$whiptail_title" --menu "$node_msg" 19 75 7 \ "SENSOR" "Create a forward only sensor " \ "SEARCHNODE" "Add a search node with parsing " \ "FLEET" "Dedicated Elastic Fleet Node " \ "HEAVYNODE" "Sensor + Search Node " \ "IDH" "Intrusion Detection Honeypot Node " \ "RECEIVER" "Receiver Node " \ + "KAFKANODE" "Kafka Broker + Kraft controller" \ 3>&1 1>&2 2>&3 # "HOTNODE" "Add Hot Node (Uses Elastic Clustering)" \ # TODO # "WARMNODE" "Add Warm Node to existing Hot or Search node" \ # TODO @@ -677,6 +678,8 @@ whiptail_install_type_dist_existing() { is_import=true elif [ "$install_type" = 'RECEIVER' ]; then is_receiver=true + elif [ "$install_type" = 'KAFKANODE' ]; then + is_kafka=true elif [ "$install_type" = 'DESKTOP' ]; then if [ "$setup_type" != 'desktop' ]; then exec bash so-setup desktop