diff --git a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update index eb5ccc1ed..4d2867fc7 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update +++ b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-outputs-update @@ -21,64 +21,104 @@ function update_logstash_outputs() { # Update Logstash Outputs curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_logstash" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq } +function update_kafka_outputs() { + # Make sure SSL configuration is included in policy updates for Kafka output. SSL is configured in so-elastic-fleet-setup + SSL_CONFIG=$(curl -K /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" | jq -r '.item.ssl') -# Get current list of Logstash Outputs -RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_logstash') + JSON_STRING=$(jq -n \ + --arg UPDATEDLIST "$NEW_LIST_JSON" \ + --argjson SSL_CONFIG "$SSL_CONFIG" \ + '{"name": "grid-kafka","type": "kafka","hosts": $UPDATEDLIST,"is_default": true,"is_default_monitoring": true,"config_yaml": "","ssl": $SSL_CONFIG}') + # Update Kafka outputs + curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_kafka" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq +} -# Check to make sure that the server responded with good data - else, bail from script -CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON") -if [ "$CHECKSUM" != "so-manager_logstash" ]; then - printf "Failed to query for current Logstash Outputs..." - exit 1 -fi +{% if GLOBALS.pipeline == "KAFKA" %} + # Get current list of Kafka Outputs + RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_kafka') -# Get the current list of Logstash outputs & hash them -CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON") -CURRENT_HASH=$(sha1sum <<< "$CURRENT_LIST" | awk '{print $1}') + # Check to make sure that the server responded with good data - else, bail from script + CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON") + if [ "$CHECKSUM" != "so-manager_kafka" ]; then + printf "Failed to query for current Kafka Outputs..." + exit 1 + fi -declare -a NEW_LIST=() + # Get the current list of kafka outputs & hash them + CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON") + CURRENT_HASH=$(sha1sum <<< "$CURRENT_LIST" | awk '{print $1}') + + declare -a NEW_LIST=() + + # Query for the current Grid Nodes that are running kafka + KAFKANODES=$(salt-call --out=json pillar.get kafka:nodes | jq '.local') + + # Query for Kafka nodes with Broker role and add hostname to list + while IFS= read -r line; do + NEW_LIST+=("$line") + done < <(jq -r 'to_entries | .[] | select(.value.role | contains("broker")) | .key + ":9092"' <<< $KAFKANODES) + + {# If global pipeline isn't set to KAFKA then assume default of REDIS / logstash #} +{% else %} + # Get current list of Logstash Outputs + RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_logstash') + + # Check to make sure that the server responded with good data - else, bail from script + CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON") + if [ "$CHECKSUM" != "so-manager_logstash" ]; then + printf "Failed to query for current Logstash Outputs..." + exit 1 + fi + + # Get the current list of Logstash outputs & hash them + CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON") + CURRENT_HASH=$(sha1sum <<< "$CURRENT_LIST" | awk '{print $1}') + + declare -a NEW_LIST=() + + {# If we select to not send to manager via SOC, then omit the code that adds manager to NEW_LIST #} + {% if ELASTICFLEETMERGED.enable_manager_output %} + # Create array & add initial elements + if [ "{{ GLOBALS.hostname }}" = "{{ GLOBALS.url_base }}" ]; then + NEW_LIST+=("{{ GLOBALS.url_base }}:5055") + else + NEW_LIST+=("{{ GLOBALS.url_base }}:5055" "{{ GLOBALS.hostname }}:5055") + fi + {% endif %} + + # Query for FQDN entries & add them to the list + {% if ELASTICFLEETMERGED.config.server.custom_fqdn | length > 0 %} + CUSTOMFQDNLIST=('{{ ELASTICFLEETMERGED.config.server.custom_fqdn | join(' ') }}') + readarray -t -d ' ' CUSTOMFQDN < <(printf '%s' "$CUSTOMFQDNLIST") + for CUSTOMNAME in "${CUSTOMFQDN[@]}" + do + NEW_LIST+=("$CUSTOMNAME:5055") + done + {% endif %} + + # Query for the current Grid Nodes that are running Logstash + LOGSTASHNODES=$(salt-call --out=json pillar.get logstash:nodes | jq '.local') + + # Query for Receiver Nodes & add them to the list + if grep -q "receiver" <<< $LOGSTASHNODES; then + readarray -t RECEIVERNODES < <(jq -r ' .receiver | keys_unsorted[]' <<< $LOGSTASHNODES) + for NODE in "${RECEIVERNODES[@]}" + do + NEW_LIST+=("$NODE:5055") + done + fi + + # Query for Fleet Nodes & add them to the list + if grep -q "fleet" <<< $LOGSTASHNODES; then + readarray -t FLEETNODES < <(jq -r ' .fleet | keys_unsorted[]' <<< $LOGSTASHNODES) + for NODE in "${FLEETNODES[@]}" + do + NEW_LIST+=("$NODE:5055") + done + fi -{# If we select to not send to manager via SOC, then omit the code that adds manager to NEW_LIST #} -{% if ELASTICFLEETMERGED.enable_manager_output %} -# Create array & add initial elements -if [ "{{ GLOBALS.hostname }}" = "{{ GLOBALS.url_base }}" ]; then - NEW_LIST+=("{{ GLOBALS.url_base }}:5055") -else - NEW_LIST+=("{{ GLOBALS.url_base }}:5055" "{{ GLOBALS.hostname }}:5055") -fi {% endif %} -# Query for FQDN entries & add them to the list -{% if ELASTICFLEETMERGED.config.server.custom_fqdn | length > 0 %} -CUSTOMFQDNLIST=('{{ ELASTICFLEETMERGED.config.server.custom_fqdn | join(' ') }}') -readarray -t -d ' ' CUSTOMFQDN < <(printf '%s' "$CUSTOMFQDNLIST") -for CUSTOMNAME in "${CUSTOMFQDN[@]}" -do - NEW_LIST+=("$CUSTOMNAME:5055") -done -{% endif %} - -# Query for the current Grid Nodes that are running Logstash -LOGSTASHNODES=$(salt-call --out=json pillar.get logstash:nodes | jq '.local') - -# Query for Receiver Nodes & add them to the list -if grep -q "receiver" <<< $LOGSTASHNODES; then - readarray -t RECEIVERNODES < <(jq -r ' .receiver | keys_unsorted[]' <<< $LOGSTASHNODES) - for NODE in "${RECEIVERNODES[@]}" - do - NEW_LIST+=("$NODE:5055") - done -fi - -# Query for Fleet Nodes & add them to the list -if grep -q "fleet" <<< $LOGSTASHNODES; then - readarray -t FLEETNODES < <(jq -r ' .fleet | keys_unsorted[]' <<< $LOGSTASHNODES) - for NODE in "${FLEETNODES[@]}" - do - NEW_LIST+=("$NODE:5055") - done -fi - # Sort & hash the new list of Logstash Outputs NEW_LIST_JSON=$(jq --compact-output --null-input '$ARGS.positional' --args -- "${NEW_LIST[@]}") NEW_HASH=$(sha1sum <<< "$NEW_LIST_JSON" | awk '{print $1}') @@ -91,5 +131,9 @@ if [ "$NEW_HASH" = "$CURRENT_HASH" ]; then else printf "\nHashes don't match - update needed.\n" printf "Current List: $CURRENT_LIST\nNew List: $NEW_LIST_JSON\n" +{% if GLOBALS.pipeline == "KAFKA" %} + update_kafka_outputs +{% else %} update_logstash_outputs +{% endif %} fi diff --git a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup index 14ab58b45..acaec360b 100755 --- a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup +++ b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup @@ -81,8 +81,7 @@ printf "\nCreate Kafka Output Config if node is not an Import or Eval install\n" {% if grains.role not in ['so-import', 'so-eval'] %} KAFKACRT=$(openssl x509 -in /etc/pki/elasticfleet-kafka.crt) KAFKAKEY=$(openssl rsa -in /etc/pki/elasticfleet-kafka.key) -{# KAFKACA=$(openssl x509 -in $INTCA) #} -KAFKACA=$(openssl x509 -in /etc/pki/tls/certs/intca.crt) +KAFKACA=$(openssl x509 -in $INTCA) KAFKA_OUTPUT_VERSION="2.6.0" JSON_STRING=$( jq -n \ --arg KAFKACRT "$KAFKACRT" \