diff --git a/pillar/top.sls b/pillar/top.sls index fbb1604da..61b812cc8 100644 --- a/pillar/top.sls +++ b/pillar/top.sls @@ -226,6 +226,7 @@ base: - minions.adv_{{ grains.id }} - stig.soc_stig - soc.license + - kafka.nodes '*_receiver': - logstash.nodes diff --git a/salt/logstash/defaults.yaml b/salt/logstash/defaults.yaml index 348acb622..d82cba1ff 100644 --- a/salt/logstash/defaults.yaml +++ b/salt/logstash/defaults.yaml @@ -37,6 +37,7 @@ logstash: - so/0900_input_redis.conf.jinja - so/9805_output_elastic_agent.conf.jinja - so/9900_output_endgame.conf.jinja + - so/0800_input_kafka.conf.jinja custom0: [] custom1: [] custom2: [] diff --git a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja index 85e6729e2..087ed7755 100644 --- a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja +++ b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja @@ -1,18 +1,17 @@ -{% set kafka_brokers = salt['pillar.get']('logstash:nodes:receiver', {}) %} -{% set kafka_on_mngr = salt ['pillar.get']('logstash:nodes:manager', {}) %} -{% set broker_ips = [] %} -{% for node, node_data in kafka_brokers.items() %} - {% do broker_ips.append(node_data['ip'] + ":9092") %} +{% set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %} +{% set brokers = [] %} + +{% for key, values in kafka_brokers.items() %} +{% if 'broker' in values['role'] %} +{% do brokers.append(key ~ ':9092') %} +{% endif %} {% endfor %} -{% for node, node_data in kafka_on_mngr.items() %} - {% do broker_ips.append(node_data['ip'] + ":9092") %} -{% endfor %} -{% set bootstrap_servers = "','".join(broker_ips) %} +{% set bootstrap_servers = ','.join(brokers) %} input { kafka { codec => json - topics => ['default-logs', 'kratos-logs', 'soc-logs', 'strelka-logs', 'suricata-logs', 'zeek-logs'] + topics => ['default-topic', 'kratos-topic', 'soc-topic', 'strelka-topic', 'suricata-topic', 'zeek-topic', 'rita-topic', 'opencanary-topic', 'syslog-topic'] group_id => 'searchnodes' client_id => '{{ GLOBALS.hostname }}' security_protocol => 'SSL'