diff --git a/salt/kafka/tools/sbin_jinja/so-kafka-fleet-output-policy b/salt/kafka/tools/sbin_jinja/so-kafka-fleet-output-policy index 13f158bdd..7a1f4e1c5 100644 --- a/salt/kafka/tools/sbin_jinja/so-kafka-fleet-output-policy +++ b/salt/kafka/tools/sbin_jinja/so-kafka-fleet-output-policy @@ -17,7 +17,7 @@ if ! echo "$output" | grep -q "so-manager_kafka"; then --arg KAFKACA "$KAFKACA" \ --arg MANAGER_IP "{{ GLOBALS.manager_ip }}:9092" \ --arg KAFKA_OUTPUT_VERSION "$KAFKA_OUTPUT_VERSION" \ - '{ "name": "grid-kafka", "id": "so-manager_kafka", "type": "kafka", "hosts": [ $MANAGER_IP ], "is_default": false, "is_default_monitoring": false, "config_yaml": "", "ssl": { "certificate_authorities": [ $KAFKACA ], "certificate": $KAFKACRT, "key": $KAFKAKEY, "verification_mode": "full" }, "proxy_id": null, "client_id": "Elastic", "version": $KAFKA_OUTPUT_VERSION, "compression": "none", "auth_type": "ssl", "partition": "round_robin", "round_robin": { "group_events": 1 }, "topics":[{"topic":"%{[event.module]}-topic","when":{"type":"regexp","condition":"event.module:.+"}},{"topic":"default-topic"}], "headers": [ { "key": "", "value": "" } ], "timeout": 30, "broker_timeout": 30, "required_acks": 1 }' + '{ "name": "grid-kafka", "id": "so-manager_kafka", "type": "kafka", "hosts": [ $MANAGER_IP ], "is_default": false, "is_default_monitoring": false, "config_yaml": "", "ssl": { "certificate_authorities": [ $KAFKACA ], "certificate": $KAFKACRT, "key": $KAFKAKEY, "verification_mode": "full" }, "proxy_id": null, "client_id": "Elastic", "version": $KAFKA_OUTPUT_VERSION, "compression": "none", "auth_type": "ssl", "partition": "round_robin", "round_robin": { "group_events": 1 }, "topics":[{"topic":"%{[event.module]}-securityonion","when":{"type":"regexp","condition":"event.module:.+"}},{"topic":"default-securityonion"}], "headers": [ { "key": "", "value": "" } ], "timeout": 30, "broker_timeout": 30, "required_acks": 1 }' ) curl -K /opt/so/conf/elasticsearch/curl.config -L -X POST "localhost:5601/api/fleet/outputs" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" 2&1> /dev/null fi \ No newline at end of file diff --git a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja index 3d0d03020..642428d90 100644 --- a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja +++ b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja @@ -1,17 +1,17 @@ -{% set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %} -{% set brokers = [] %} +{%- set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %} +{%- set brokers = [] %} -{% for key, values in kafka_brokers.items() %} -{% if 'broker' in values['role'] %} -{% do brokers.append(key ~ ':9092') %} -{% endif %} -{% endfor %} -{% set bootstrap_servers = ','.join(brokers) %} +{%- for key, values in kafka_brokers.items() %} +{%- if 'broker' in values['role'] %} +{%- do brokers.append(key ~ ':9092') %} +{%- endif %} +{%- endfor %} +{%- set bootstrap_servers = ','.join(brokers) %} input { kafka { codec => json - topics => ['default-topic', 'kratos-topic', 'soc-topic', 'strelka-topic', 'suricata-topic', 'zeek-topic', 'rita-topic', 'opencanary-topic', 'syslog-topic'] + topics_pattern => '.*-securityonion$' group_id => 'searchnodes' consumer_threads => 3 client_id => '{{ GLOBALS.hostname }}'