From 9f7bcb0f7d76695f3f674144e5a56063544f10a0 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Mon, 8 Sep 2025 21:13:11 -0500 Subject: [PATCH 1/4] add --force flag to so-kafka-fleet-output-policy & default to using fleet secret storage for client key --- .../sbin_jinja/so-kafka-fleet-output-policy | 79 +++++++++++++------ 1 file changed, 55 insertions(+), 24 deletions(-) diff --git a/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy b/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy index a5ea79922..2e88d00b2 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy +++ b/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy @@ -5,46 +5,77 @@ # Elastic License 2.0. {% from 'vars/globals.map.jinja' import GLOBALS %} -{% if GLOBALS.role in ['so-manager', 'so-standalone', 'so-managersearch'] %} +{% if GLOBALS.role in ['so-manager', 'so-standalone', 'so-managersearch', 'so-managerhype'] %} . /usr/sbin/so-common +force=false +while [[ $# -gt 0 ]]; do + case $1 in + -f|--force) + force=true + shift + ;; + *) + echo "Unknown option $1" + echo "Usage: $0 [-f|--force]" + exit 1 + ;; + esac +done + # Check to make sure that Kibana API is up & ready RETURN_CODE=0 wait_for_web_response "http://localhost:5601/api/fleet/settings" "fleet" 300 "curl -K /opt/so/conf/elasticsearch/curl.config" RETURN_CODE=$? if [[ "$RETURN_CODE" != "0" ]]; then - printf "Kibana API not accessible, can't setup Elastic Fleet output policy for Kafka..." - exit 1 + echo -e "\nKibana API not accessible, can't setup Elastic Fleet output policy for Kafka...\n" + exit 1 fi -output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs" | jq -r .items[].id) +KAFKACRT=$(openssl x509 -in /etc/pki/elasticfleet-kafka.crt) +KAFKAKEY=$(openssl rsa -in /etc/pki/elasticfleet-kafka.key) +KAFKACA=$(openssl x509 -in /etc/pki/tls/certs/intca.crt) +KAFKA_OUTPUT_VERSION="2.6.0" -if ! echo "$output" | grep -q "so-manager_kafka"; then - KAFKACRT=$(openssl x509 -in /etc/pki/elasticfleet-kafka.crt) - KAFKAKEY=$(openssl rsa -in /etc/pki/elasticfleet-kafka.key) - KAFKACA=$(openssl x509 -in /etc/pki/tls/certs/intca.crt) - KAFKA_OUTPUT_VERSION="2.6.0" +if ! kafka_output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" --fail 2>/dev/null); then + # Create a new output policy for Kafka. Default is disabled 'is_default: false & is_default_monitoring: false' JSON_STRING=$( jq -n \ - --arg KAFKACRT "$KAFKACRT" \ - --arg KAFKAKEY "$KAFKAKEY" \ - --arg KAFKACA "$KAFKACA" \ - --arg MANAGER_IP "{{ GLOBALS.manager_ip }}:9092" \ - --arg KAFKA_OUTPUT_VERSION "$KAFKA_OUTPUT_VERSION" \ - '{ "name": "grid-kafka", "id": "so-manager_kafka", "type": "kafka", "hosts": [ $MANAGER_IP ], "is_default": false, "is_default_monitoring": false, "config_yaml": "", "ssl": { "certificate_authorities": [ $KAFKACA ], "certificate": $KAFKACRT, "key": $KAFKAKEY, "verification_mode": "full" }, "proxy_id": null, "client_id": "Elastic", "version": $KAFKA_OUTPUT_VERSION, "compression": "none", "auth_type": "ssl", "partition": "round_robin", "round_robin": { "group_events": 10 }, "topics":[{"topic":"default-securityonion"}], "headers": [ { "key": "", "value": "" } ], "timeout": 30, "broker_timeout": 30, "required_acks": 1 }' - ) - curl -sK /opt/so/conf/elasticsearch/curl.config -L -X POST "localhost:5601/api/fleet/outputs" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" -o /dev/null - refresh_output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs" | jq -r .items[].id) - - if ! echo "$refresh_output" | grep -q "so-manager_kafka"; then - echo -e "\nFailed to setup Elastic Fleet output policy for Kafka...\n" + --arg KAFKACRT "$KAFKACRT" \ + --arg KAFKAKEY "$KAFKAKEY" \ + --arg KAFKACA "$KAFKACA" \ + --arg MANAGER_IP "{{ GLOBALS.manager_ip }}:9092" \ + --arg KAFKA_OUTPUT_VERSION "$KAFKA_OUTPUT_VERSION" \ + '{"name":"grid-kafka", "id":"so-manager_kafka","type":"kafka","hosts":[ $MANAGER_IP ],"is_default":false,"is_default_monitoring":false,"config_yaml":"","ssl":{"certificate_authorities":[ $KAFKACA ],"certificate": $KAFKACRT ,"key":"","verification_mode":"full"},"proxy_id":null,"client_id":"Elastic","version": $KAFKA_OUTPUT_VERSION ,"compression":"none","auth_type":"ssl","partition":"round_robin","round_robin":{"group_events":10},"topics":[{"topic":"default-securityonion"}],"headers":[{"key":"","value":""}],"timeout":30,"broker_timeout":30,"required_acks":1,"secrets":{"ssl":{"key": $KAFKAKEY }}}' + ) + if ! response=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L -X POST "localhost:5601/api/fleet/outputs" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" --fail 2>/dev/null); then + echo -e "\nFailed to setup Elastic Fleet output policy for Kafka...\n" + exit 1 + else + echo -e "\nSuccessfully setup Elastic Fleet output policy for Kafka...\n" + exit 0 + fi +elif kafka_output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" --fail 2>/dev/null) && [[ "$force" == "true" ]]; then + # force an update to Kafka policy. Keep the current value of Kafka output policy (enabled/disabled). + ENABLED_DISABLED=$(echo "$kafka_output" | jq -e .item.is_default) + JSON_STRING=$( jq -n \ + --arg KAFKACRT "$KAFKACRT" \ + --arg KAFKAKEY "$KAFKAKEY" \ + --arg KAFKACA "$KAFKACA" \ + --arg MANAGER_IP "{{ GLOBALS.manager_ip }}:9092" \ + --arg ENABLED_DISABLED "$ENABLED_DISABLED"\ + --arg KAFKA_OUTPUT_VERSION "$KAFKA_OUTPUT_VERSION" \ + '{"name":"grid-kafka","type":"kafka","hosts":[ $MANAGER_IP ],"is_default":$ENABLED_DISABLED,"is_default_monitoring":$ENABLED_DISABLED,"config_yaml":"","ssl":{"certificate_authorities":[ $KAFKACA ],"certificate": $KAFKACRT ,"key":"","verification_mode":"full"},"proxy_id":null,"client_id":"Elastic","version": $KAFKA_OUTPUT_VERSION ,"compression":"none","auth_type":"ssl","partition":"round_robin","round_robin":{"group_events":10},"topics":[{"topic":"default-securityonion"}],"headers":[{"key":"","value":""}],"timeout":30,"broker_timeout":30,"required_acks":1,"secrets":{"ssl":{"key": $KAFKAKEY }}}' + ) + if ! response=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_kafka" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" --fail 2>/dev/null); then + echo -e "\nFailed to force update to Elastic Fleet output policy for Kafka...\n" exit 1 - elif echo "$refresh_output" | grep -q "so-manager_kafka"; then - echo -e "\nSuccessfully setup Elastic Fleet output policy for Kafka...\n" + else + echo -e "\nForced update to Elastic Fleet output policy for Kafka...\n" fi -elif echo "$output" | grep -q "so-manager_kafka"; then +else echo -e "\nElastic Fleet output policy for Kafka already exists...\n" fi {% else %} From 665527641068620ed7f648ddc286f81ea551e7b1 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Mon, 8 Sep 2025 21:13:29 -0500 Subject: [PATCH 2/4] force update to kafka-fleet-output-policy --- salt/manager/tools/sbin/soup | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/salt/manager/tools/sbin/soup b/salt/manager/tools/sbin/soup index f7180c46c..1d9f94889 100755 --- a/salt/manager/tools/sbin/soup +++ b/salt/manager/tools/sbin/soup @@ -602,7 +602,9 @@ post_to_2.4.170() { } post_to_2.4.180() { - echo "Nothing to apply" + # Force update to Kafka output policy + /usr/sbin/so-kafka-fleet-output-policy --force + POSTVERSION=2.4.180 } From 8dc0f8d20e21ded972c3f69125b6bc8d996fb7c9 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Wed, 10 Sep 2025 12:49:30 -0500 Subject: [PATCH 3/4] fix elastic agent ssl unpack error --- .../so-elastic-fleet-outputs-update | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update index b5d6e1bfe..5e2990af5 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update +++ b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update @@ -23,14 +23,20 @@ function update_logstash_outputs() { } function update_kafka_outputs() { # Make sure SSL configuration is included in policy updates for Kafka output. SSL is configured in so-elastic-fleet-setup - SSL_CONFIG=$(curl -K /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" | jq -r '.item.ssl') - - JSON_STRING=$(jq -n \ - --arg UPDATEDLIST "$NEW_LIST_JSON" \ - --argjson SSL_CONFIG "$SSL_CONFIG" \ - '{"name": "grid-kafka","type": "kafka","hosts": $UPDATEDLIST,"is_default": true,"is_default_monitoring": true,"config_yaml": "","ssl": $SSL_CONFIG}') - # Update Kafka outputs - curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_kafka" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq + if kafka_policy=$(curl -K /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" --fail 2>/dev/null); then + SSL_CONFIG=$(echo "$kafka_policy" | jq -r '.item.ssl') + SECRETS=$(echo "$kafka_policy" | jq -r '.item.secrets') + JSON_STRING=$(jq -n \ + --arg UPDATEDLIST "$NEW_LIST_JSON" \ + --argjson SSL_CONFIG "$SSL_CONFIG" \ + --argjson SECRETS "$SECRETS" \ + '{"name": "grid-kafka","type": "kafka","hosts": $UPDATEDLIST,"is_default": true,"is_default_monitoring": true,"config_yaml": "","ssl": $SSL_CONFIG,"secrets": $SECRETS}') + # Update Kafka outputs + curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_kafka" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq + else + printf "Failed to get current Kafka output policy..." + exit 1 + fi } {% if GLOBALS.pipeline == "KAFKA" %} From 890f76e45cfd1023f84f00263b9bb66474e79f99 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Wed, 10 Sep 2025 20:21:11 -0500 Subject: [PATCH 4/4] avoid delay in log ingest after a forced kafka output policy update --- .../tools/sbin_jinja/so-kafka-fleet-output-policy | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy b/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy index 2e88d00b2..d44a5cb6c 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy +++ b/salt/elasticfleet/tools/sbin_jinja/so-kafka-fleet-output-policy @@ -59,14 +59,15 @@ if ! kafka_output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://l elif kafka_output=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" --fail 2>/dev/null) && [[ "$force" == "true" ]]; then # force an update to Kafka policy. Keep the current value of Kafka output policy (enabled/disabled). ENABLED_DISABLED=$(echo "$kafka_output" | jq -e .item.is_default) + HOSTS=$(echo "$kafka_output" | jq -r '.item.hosts') JSON_STRING=$( jq -n \ --arg KAFKACRT "$KAFKACRT" \ --arg KAFKAKEY "$KAFKAKEY" \ --arg KAFKACA "$KAFKACA" \ - --arg MANAGER_IP "{{ GLOBALS.manager_ip }}:9092" \ --arg ENABLED_DISABLED "$ENABLED_DISABLED"\ --arg KAFKA_OUTPUT_VERSION "$KAFKA_OUTPUT_VERSION" \ - '{"name":"grid-kafka","type":"kafka","hosts":[ $MANAGER_IP ],"is_default":$ENABLED_DISABLED,"is_default_monitoring":$ENABLED_DISABLED,"config_yaml":"","ssl":{"certificate_authorities":[ $KAFKACA ],"certificate": $KAFKACRT ,"key":"","verification_mode":"full"},"proxy_id":null,"client_id":"Elastic","version": $KAFKA_OUTPUT_VERSION ,"compression":"none","auth_type":"ssl","partition":"round_robin","round_robin":{"group_events":10},"topics":[{"topic":"default-securityonion"}],"headers":[{"key":"","value":""}],"timeout":30,"broker_timeout":30,"required_acks":1,"secrets":{"ssl":{"key": $KAFKAKEY }}}' + --argjson HOSTS "$HOSTS" \ + '{"name":"grid-kafka","type":"kafka","hosts":$HOSTS,"is_default":$ENABLED_DISABLED,"is_default_monitoring":$ENABLED_DISABLED,"config_yaml":"","ssl":{"certificate_authorities":[ $KAFKACA ],"certificate": $KAFKACRT ,"key":"","verification_mode":"full"},"proxy_id":null,"client_id":"Elastic","version": $KAFKA_OUTPUT_VERSION ,"compression":"none","auth_type":"ssl","partition":"round_robin","round_robin":{"group_events":10},"topics":[{"topic":"default-securityonion"}],"headers":[{"key":"","value":""}],"timeout":30,"broker_timeout":30,"required_acks":1,"secrets":{"ssl":{"key": $KAFKAKEY }}}' ) if ! response=$(curl -sK /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_kafka" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" --fail 2>/dev/null); then echo -e "\nFailed to force update to Elastic Fleet output policy for Kafka...\n"