Files
securityonion/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja
reyesj2 eca2a4a9c8 Logstash consumer threads should match topic partition count
- Default is set to 3. If there are too many consumer threads it may lead to idle logstash worker threads and could require decreasing this value to saturate workers

Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
2024-05-08 16:17:09 -04:00

35 lines
1.2 KiB
Django/Jinja

{% set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %}
{% set brokers = [] %}
{% for key, values in kafka_brokers.items() %}
{% if 'broker' in values['role'] %}
{% do brokers.append(key ~ ':9092') %}
{% endif %}
{% endfor %}
{% set bootstrap_servers = ','.join(brokers) %}
input {
kafka {
codec => json
topics => ['default-topic', 'kratos-topic', 'soc-topic', 'strelka-topic', 'suricata-topic', 'zeek-topic', 'rita-topic', 'opencanary-topic', 'syslog-topic']
group_id => 'searchnodes'
consumer_threads => 3
client_id => '{{ GLOBALS.hostname }}'
security_protocol => 'SSL'
bootstrap_servers => '{{ bootstrap_servers }}'
ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12'
ssl_keystore_password => 'changeit'
ssl_keystore_type => 'PKCS12'
ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts'
ssl_truststore_password => 'changeit'
decorate_events => true
tags => [ "elastic-agent", "input-{{ GLOBALS.hostname}}", "kafka" ]
}
}
filter {
if ![metadata] {
mutate {
rename => { "@metadata" => "metadata" }
}
}
}