#!/bin/bash # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one # or more contributor license agreements. Licensed under the Elastic License 2.0; you may not use # this file except in compliance with the Elastic License 2.0. {%- from 'vars/globals.map.jinja' import GLOBALS %} {%- from 'elasticfleet/map.jinja' import ELASTICFLEETMERGED %} {%- from 'elasticfleet/config.map.jinja' import LOGSTASH_CONFIG_YAML %} . /usr/sbin/so-common FORCE_UPDATE=false # Only run on Managers if ! is_manager_node; then printf "Not a Manager Node... Exiting" exit 0 fi function update_logstash_outputs() { if logstash_policy=$(curl -K /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_logstash" --retry 3 --retry-delay 10 --fail 2>/dev/null); then SSL_CONFIG=$(echo "$logstash_policy" | jq -r '.item.ssl') if SECRETS=$(echo "$logstash_policy" | jq -er '.item.secrets' 2>/dev/null); then JSON_STRING=$(jq -n \ --arg UPDATEDLIST "$NEW_LIST_JSON" \ --argjson SECRETS "$SECRETS" \ --argjson SSL_CONFIG "$SSL_CONFIG" \ '{"name":"grid-logstash","type":"logstash","hosts": $UPDATEDLIST,"is_default":true,"is_default_monitoring":true,"config_yaml":"{{ LOGSTASH_CONFIG_YAML }}","ssl": $SSL_CONFIG,"secrets": $SECRETS}') else JSON_STRING=$(jq -n \ --arg UPDATEDLIST "$NEW_LIST_JSON" \ --argjson SSL_CONFIG "$SSL_CONFIG" \ '{"name":"grid-logstash","type":"logstash","hosts": $UPDATEDLIST,"is_default":true,"is_default_monitoring":true,"config_yaml":"","ssl": $SSL_CONFIG}') fi fi # Update Logstash Outputs curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_logstash" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq } function update_kafka_outputs() { # Make sure SSL configuration is included in policy updates for Kafka output. SSL is configured in so-elastic-fleet-setup if kafka_policy=$(curl -K /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" --fail 2>/dev/null); then SSL_CONFIG=$(echo "$kafka_policy" | jq -r '.item.ssl') if SECRETS=$(echo "$kafka_policy" | jq -er '.item.secrets' 2>/dev/null); then # Update policy when fleet has secrets enabled JSON_STRING=$(jq -n \ --arg UPDATEDLIST "$NEW_LIST_JSON" \ --argjson SSL_CONFIG "$SSL_CONFIG" \ --argjson SECRETS "$SECRETS" \ '{"name": "grid-kafka","type": "kafka","hosts": $UPDATEDLIST,"is_default": true,"is_default_monitoring": true,"config_yaml": "","ssl": $SSL_CONFIG,"secrets": $SECRETS}') else # Update policy when fleet has secrets disabled or policy hasn't been force updated JSON_STRING=$(jq -n \ --arg UPDATEDLIST "$NEW_LIST_JSON" \ --argjson SSL_CONFIG "$SSL_CONFIG" \ '{"name": "grid-kafka","type": "kafka","hosts": $UPDATEDLIST,"is_default": true,"is_default_monitoring": true,"config_yaml": "","ssl": $SSL_CONFIG}') fi # Update Kafka outputs curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_kafka" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq else printf "Failed to get current Kafka output policy..." exit 1 fi } {% if GLOBALS.pipeline == "KAFKA" %} # Get current list of Kafka Outputs RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_kafka') # Check to make sure that the server responded with good data - else, bail from script CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON") if [ "$CHECKSUM" != "so-manager_kafka" ]; then printf "Failed to query for current Kafka Outputs..." exit 1 fi # Get the current list of kafka outputs & hash them CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON") CURRENT_HASH=$(sha1sum <<< "$CURRENT_LIST" | awk '{print $1}') declare -a NEW_LIST=() # Query for the current Grid Nodes that are running kafka KAFKANODES=$(salt-call --out=json pillar.get kafka:nodes | jq '.local') # Query for Kafka nodes with Broker role and add hostname to list while IFS= read -r line; do NEW_LIST+=("$line") done < <(jq -r 'to_entries | .[] | select(.value.role | contains("broker")) | .key + ":9092"' <<< $KAFKANODES) {# If global pipeline isn't set to KAFKA then assume default of REDIS / logstash #} {% else %} # Get current list of Logstash Outputs RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_logstash') # Check to make sure that the server responded with good data - else, bail from script CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON") if [ "$CHECKSUM" != "so-manager_logstash" ]; then printf "Failed to query for current Logstash Outputs..." exit 1 fi CURRENT_LOGSTASH_ADV_CONFIG=$(jq -r '.item.config_yaml // ""' <<< "$RAW_JSON") CURRENT_LOGSTASH_ADV_CONFIG_HASH=$(sha256sum <<< "$CURRENT_LOGSTASH_ADV_CONFIG" | awk '{print $1}') NEW_LOGSTASH_ADV_CONFIG=$'{{ LOGSTASH_CONFIG_YAML }}' NEW_LOGSTASH_ADV_CONFIG_HASH=$(sha256sum <<< "$NEW_LOGSTASH_ADV_CONFIG" | awk '{print $1}') if [ "$CURRENT_LOGSTASH_ADV_CONFIG_HASH" != "$NEW_LOGSTASH_ADV_CONFIG_HASH" ]; then FORCE_UPDATE=true fi # Get the current list of Logstash outputs & hash them CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON") CURRENT_HASH=$(sha256sum <<< "$CURRENT_LIST" | awk '{print $1}') declare -a NEW_LIST=() {# If we select to not send to manager via SOC, then omit the code that adds manager to NEW_LIST #} {% if ELASTICFLEETMERGED.enable_manager_output %} # Create array & add initial elements if [ "{{ GLOBALS.hostname }}" = "{{ GLOBALS.url_base }}" ]; then NEW_LIST+=("{{ GLOBALS.url_base }}:5055") else NEW_LIST+=("{{ GLOBALS.url_base }}:5055" "{{ GLOBALS.hostname }}:5055") fi {% endif %} # Query for FQDN entries & add them to the list {% if ELASTICFLEETMERGED.config.server.custom_fqdn | length > 0 %} CUSTOMFQDNLIST=('{{ ELASTICFLEETMERGED.config.server.custom_fqdn | join(' ') }}') readarray -t -d ' ' CUSTOMFQDN < <(printf '%s' "$CUSTOMFQDNLIST") for CUSTOMNAME in "${CUSTOMFQDN[@]}" do NEW_LIST+=("$CUSTOMNAME:5055") done {% endif %} # Query for the current Grid Nodes that are running Logstash LOGSTASHNODES=$(salt-call --out=json pillar.get logstash:nodes | jq '.local') # Query for Receiver Nodes & add them to the list if grep -q "receiver" <<< $LOGSTASHNODES; then readarray -t RECEIVERNODES < <(jq -r ' .receiver | keys_unsorted[]' <<< $LOGSTASHNODES) for NODE in "${RECEIVERNODES[@]}" do NEW_LIST+=("$NODE:5055") done fi # Query for Fleet Nodes & add them to the list if grep -q "fleet" <<< $LOGSTASHNODES; then readarray -t FLEETNODES < <(jq -r ' .fleet | keys_unsorted[]' <<< $LOGSTASHNODES) for NODE in "${FLEETNODES[@]}" do NEW_LIST+=("$NODE:5055") done fi {% endif %} # Sort & hash the new list of Logstash Outputs NEW_LIST_JSON=$(jq --compact-output --null-input '$ARGS.positional' --args -- "${NEW_LIST[@]}") NEW_HASH=$(sha256sum <<< "$NEW_LIST_JSON" | awk '{print $1}') # Compare the current & new list of outputs - if different, update the Logstash outputs if [[ "$NEW_HASH" = "$CURRENT_HASH" ]] && [[ "$FORCE_UPDATE" != "true" ]]; then printf "\nHashes match - no update needed.\n" printf "Current List: $CURRENT_LIST\nNew List: $NEW_LIST_JSON\n" # Since output can be KAFKA or LOGSTASH, we need to check if the policy set as default matches the value set in GLOBALS.pipeline and update if needed printf "Checking if the correct output policy is set as default\n" OUTPUT_DEFAULT=$(jq -r '.item.is_default' <<< $RAW_JSON) OUTPUT_DEFAULT_MONITORING=$(jq -r '.item.is_default_monitoring' <<< $RAW_JSON) if [[ "$OUTPUT_DEFAULT" = "false" || "$OUTPUT_DEFAULT_MONITORING" = "false" ]]; then printf "Default output policy needs to be updated.\n" {%- if GLOBALS.pipeline == "KAFKA" and 'gmd' in salt['pillar.get']('features', []) %} update_kafka_outputs {%- else %} update_logstash_outputs {%- endif %} else printf "Default output policy is set - no update needed.\n" fi exit 0 else printf "\nHashes don't match - update needed.\n" printf "Current List: $CURRENT_LIST\nNew List: $NEW_LIST_JSON\n" {%- if GLOBALS.pipeline == "KAFKA" and 'gmd' in salt['pillar.get']('features', []) %} update_kafka_outputs {%- else %} update_logstash_outputs {%- endif %} fi