{%- set kafka_password = salt['pillar.get']('kafka:password') %} {%- set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %} {%- set brokers = [] %} {%- if kafka_brokers %} {%- for key, values in kafka_brokers.items() %} {%- if 'broker' in values['role'] %} {%- do brokers.append(key ~ ':9092') %} {%- endif %} {%- endfor %} {%- set bootstrap_servers = ','.join(brokers) %} input { kafka { codec => json topics_pattern => '.*-securityonion$' group_id => 'searchnodes' consumer_threads => 3 client_id => '{{ GLOBALS.hostname }}' security_protocol => 'SSL' bootstrap_servers => '{{ bootstrap_servers }}' ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12' ssl_keystore_password => '{{ kafka_password }}' ssl_keystore_type => 'PKCS12' ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts' ssl_truststore_password => 'changeit' decorate_events => true tags => [ "elastic-agent", "input-{{ GLOBALS.hostname}}", "kafka" ] } } filter { if ![metadata] { mutate { rename => { "@metadata" => "metadata" } } } } {% endif %}