From 721e04f793701e635c957b5f0f23f189a2083cd8 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:37:14 -0400 Subject: [PATCH 1/3] initial logstash input from kafka over ssl Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com> --- salt/logstash/enabled.sls | 2 - .../config/so/0800_input_kafka.conf.jinja | 55 +++++++++-------- salt/ssl/init.sls | 59 +++++++++++++++++++ 3 files changed, 89 insertions(+), 27 deletions(-) diff --git a/salt/logstash/enabled.sls b/salt/logstash/enabled.sls index 798b1984a..fcc2ec190 100644 --- a/salt/logstash/enabled.sls +++ b/salt/logstash/enabled.sls @@ -78,8 +78,6 @@ so-logstash: {% if GLOBALS.role in ['so-manager', 'so-managersearch', 'so-standalone', 'so-import', 'so-heavynode', 'so-searchnode', 'so-kafkanode' ] %} - /opt/so/conf/ca/cacerts:/etc/pki/ca-trust/extracted/java/cacerts:ro - /opt/so/conf/ca/tls-ca-bundle.pem:/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem:ro - {% endif %} - {% if GLOBALS.role in ['so-kafkanode'] %} - /etc/pki/kafka-logstash.p12:/usr/share/logstash/kafka-logstash.p12:ro {% endif %} {% if GLOBALS.role == 'so-eval' %} diff --git a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja index c1429319a..957f7da19 100644 --- a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja +++ b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja @@ -1,26 +1,31 @@ -{% set kafka_brokers = salt['pillar.get']('logstash:nodes:kafkanode', {}) %} -{% set broker_ips = [] %} -{% for node, node_data in kafka_brokers.items() %} - {% do broker_ips.append(node_data['ip'] + ":9092") %} -{% endfor %} - -{% set bootstrap_servers = "','".join(broker_ips) %} - - -#Run on searchnodes ingest kafka topic(s) group_id allows load balancing of event ingest to all searchnodes -input { - kafka { - codec => json - #Can ingest multiple topics. Set to a value from SOC UI? - topics => ['logstash-topic',] - group_id => 'searchnodes' - security_protocol => 'SSL' - bootstrap_servers => {{ bootstrap_servers }} - ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12' - ssl_keystore_password => '' - ssl_keystore_type => 'PKCS12' - ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts' - # Set password as a pillar to avoid bad optics? This is default truststore for grid - ssl_truststore_password => 'changeit' - } +{% set kafka_brokers = salt['pillar.get']('logstash:nodes:kafkanode', {}) %} +{% set kafka_on_mngr = salt ['pillar.get']('logstash:nodes:manager', {}) %} +{% set broker_ips = [] %} +{% for node, node_data in kafka_brokers.items() %} + {% do broker_ips.append(node_data['ip'] + ":9092") %} +{% endfor %} + +{# For testing kafka stuff from manager not dedicated kafkanodes #} +{% for node, node_data in kafka_on_mngr.items() %} + {% do broker_ips.append(node_data['ip'] + ":9092") %} +{% endfor %} +{% set bootstrap_servers = "','".join(broker_ips) %} + + +#Run on searchnodes ingest kafka topic(s) group_id allows load balancing of event ingest to all searchnodes +input { + kafka { + codec => json + #Can ingest multiple topics. Set to a value from SOC UI? + topics => ['ea-logs'] + group_id => 'searchnodes' + security_protocol => 'SSL' + bootstrap_servers => '{{ bootstrap_servers }}' + ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12' + ssl_keystore_password => 'changeit' + ssl_keystore_type => 'PKCS12' + ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts' + # Set password as a pillar to avoid bad optics? This is default truststore for grid + ssl_truststore_password => 'changeit' + } } \ No newline at end of file diff --git a/salt/ssl/init.sls b/salt/ssl/init.sls index a99b030ff..90f9cc64f 100644 --- a/salt/ssl/init.sls +++ b/salt/ssl/init.sls @@ -736,6 +736,40 @@ elasticfleet_kafka_crt: - onchanges: - x509: elasticfleet_kafka_key +kafka_logstash_key: + x509.private_key_managed: + - name: /etc/pki/kafka-logstash.key + - keysize: 4096 + - backup: True + - new: True + {% if salt['file.file_exists']('/etc/pki/kafka-logstash.key') -%} + - prereq: + - x509: /etc/pki/kafka-logstash.crt + {%- endif %} + - retry: + attempts: 5 + interval: 30 + +kafka_logstash_crt: + x509.certificate_managed: + - name: /etc/pki/kafka-logstash.crt + - ca_server: {{ ca_server }} + - subjectAltName: DNS:{{ GLOBALS.hostname }}, IP:{{ GLOBALS.node_ip }} + - signing_policy: kafka + - private_key: /etc/pki/kafka-logstash.key + - CN: {{ GLOBALS.hostname }} + - days_remaining: 0 + - days_valid: 820 + - backup: True + - timeout: 30 + - retry: + attempts: 5 + interval: 30 + cmd.run: + - name: "/usr/bin/openssl pkcs12 -inkey /etc/pki/kafka-logstash.key -in /etc/pki/kafka-logstash.crt -export -out /etc/pki/kafka-logstash.p12 -nodes -passout pass:changeit" + - onchanges: + - x509: /etc/pki/kafka-logstash.key + {% if grains['role'] in ['so-manager'] %} kafka_client_key: x509.private_key_managed: @@ -783,6 +817,7 @@ kafka_client_crt_perms: - user: 960 - group: 939 {% endif %} + kafka_key_perms: file.managed: - replace: False @@ -799,6 +834,30 @@ kafka_crt_perms: - user: 960 - group: 939 +kafka_logstash_key_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka-logstash.key + - mode: 640 + - user: 960 + - group: 939 + +kafka_logstash_crt_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka-logstash.crt + - mode: 640 + - user: 960 + - group: 939 + +kafka_logstash_pkcs12_perms: + file.managed: + - replace: False + - name: /etc/pki/kafka-logstash.p12 + - mode: 640 + - user: 960 + - group: 931 + kafka_pkcs8_perms: file.managed: - replace: False From 65274e89d7c8741fe63536e030483c2e2af21665 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Fri, 5 Apr 2024 15:38:00 -0400 Subject: [PATCH 2/3] Add client_id to logstash pipeline. To identify which searchnode is pulling messages Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com> --- salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja | 1 + 1 file changed, 1 insertion(+) diff --git a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja index 957f7da19..0260b774e 100644 --- a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja +++ b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja @@ -19,6 +19,7 @@ input { #Can ingest multiple topics. Set to a value from SOC UI? topics => ['ea-logs'] group_id => 'searchnodes' + client_id => '{{ GLOBALS.hostname }}' security_protocol => 'SSL' bootstrap_servers => '{{ bootstrap_servers }}' ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12' From d67ebabc951cfaf226b176f6eb458ec2b4c35127 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Mon, 8 Apr 2024 16:38:03 -0400 Subject: [PATCH 3/3] Remove logstash output to kafka pipeline. Add additional topics for searchnodes to ingest and add partition/offset info to event Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com> --- .../config/so/0800_input_kafka.conf.jinja | 15 ++++++++----- .../config/so/0899_output_kafka.conf.jinja | 22 ------------------- 2 files changed, 10 insertions(+), 27 deletions(-) delete mode 100644 salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja diff --git a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja index 0260b774e..1391ce983 100644 --- a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja +++ b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja @@ -11,13 +11,10 @@ {% endfor %} {% set bootstrap_servers = "','".join(broker_ips) %} - -#Run on searchnodes ingest kafka topic(s) group_id allows load balancing of event ingest to all searchnodes input { kafka { codec => json - #Can ingest multiple topics. Set to a value from SOC UI? - topics => ['ea-logs'] + topics => ['default-logs', 'kratos-logs', 'soc-logs', 'strelka-logs', 'suricata-logs', 'zeek-logs'] group_id => 'searchnodes' client_id => '{{ GLOBALS.hostname }}' security_protocol => 'SSL' @@ -26,7 +23,15 @@ input { ssl_keystore_password => 'changeit' ssl_keystore_type => 'PKCS12' ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts' - # Set password as a pillar to avoid bad optics? This is default truststore for grid ssl_truststore_password => 'changeit' + decorate_events => true + tags => [ "elastic-agent", "input-{{ GLOBALS.hostname}}", "kafka" ] } +} +filter { + if ![metadata] { + mutate { + rename => { "@metadata" => "metadata" } + } + } } \ No newline at end of file diff --git a/salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja deleted file mode 100644 index ff9a6f6ee..000000000 --- a/salt/logstash/pipelines/config/so/0899_output_kafka.conf.jinja +++ /dev/null @@ -1,22 +0,0 @@ -{% set kafka_brokers = salt['pillar.get']('logstash:nodes:kafkanode', {}) %} -{% set broker_ips = [] %} -{% for node, node_data in kafka_brokers.items() %} - {% do broker_ips.append(node_data['ip'] + ":9092") %} -{% endfor %} - -{% set bootstrap_servers = "','".join(broker_ips) %} - -#Run on kafka broker logstash writes to topic 'logstash-topic' -output { - kafka { - codec => json - topic_id => 'logstash-topic' - bootstrap_servers => '{{ bootstrap_servers }}' - security_protocol => 'SSL' - ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12' - ssl_keystore_password => '' - ssl_keystore_type => 'PKCS12' - ssl_truststore_location => '/etc/pki/ca-trust/extracted/java/cacerts' - ssl_truststore_password => 'changeit' - } -} \ No newline at end of file