Setup elastic fleet rollover from logstash -> kafka output policy

Signed-off-by: reyesj2 <94730068+reyesj2@users.noreply.github.com>
This commit is contained in:
reyesj2
2024-04-30 16:47:40 -04:00
parent fcc4050f86
commit bb49944b96
2 changed files with 96 additions and 53 deletions

View File

@@ -21,63 +21,103 @@ function update_logstash_outputs() {
# Update Logstash Outputs
curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_logstash" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq
}
function update_kafka_outputs() {
# Make sure SSL configuration is included in policy updates for Kafka output. SSL is configured in so-elastic-fleet-setup
SSL_CONFIG=$(curl -K /opt/so/conf/elasticsearch/curl.config -L "http://localhost:5601/api/fleet/outputs/so-manager_kafka" | jq -r '.item.ssl')
# Get current list of Logstash Outputs
RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_logstash')
JSON_STRING=$(jq -n \
--arg UPDATEDLIST "$NEW_LIST_JSON" \
--argjson SSL_CONFIG "$SSL_CONFIG" \
'{"name": "grid-kafka","type": "kafka","hosts": $UPDATEDLIST,"is_default": true,"is_default_monitoring": true,"config_yaml": "","ssl": $SSL_CONFIG}')
# Update Kafka outputs
curl -K /opt/so/conf/elasticsearch/curl.config -L -X PUT "localhost:5601/api/fleet/outputs/so-manager_kafka" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d "$JSON_STRING" | jq
}
# Check to make sure that the server responded with good data - else, bail from script
CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON")
if [ "$CHECKSUM" != "so-manager_logstash" ]; then
{% if GLOBALS.pipeline == "KAFKA" %}
# Get current list of Kafka Outputs
RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_kafka')
# Check to make sure that the server responded with good data - else, bail from script
CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON")
if [ "$CHECKSUM" != "so-manager_kafka" ]; then
printf "Failed to query for current Kafka Outputs..."
exit 1
fi
# Get the current list of kafka outputs & hash them
CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON")
CURRENT_HASH=$(sha1sum <<< "$CURRENT_LIST" | awk '{print $1}')
declare -a NEW_LIST=()
# Query for the current Grid Nodes that are running kafka
KAFKANODES=$(salt-call --out=json pillar.get kafka:nodes | jq '.local')
# Query for Kafka nodes with Broker role and add hostname to list
while IFS= read -r line; do
NEW_LIST+=("$line")
done < <(jq -r 'to_entries | .[] | select(.value.role | contains("broker")) | .key + ":9092"' <<< $KAFKANODES)
{# If global pipeline isn't set to KAFKA then assume default of REDIS / logstash #}
{% else %}
# Get current list of Logstash Outputs
RAW_JSON=$(curl -K /opt/so/conf/elasticsearch/curl.config 'http://localhost:5601/api/fleet/outputs/so-manager_logstash')
# Check to make sure that the server responded with good data - else, bail from script
CHECKSUM=$(jq -r '.item.id' <<< "$RAW_JSON")
if [ "$CHECKSUM" != "so-manager_logstash" ]; then
printf "Failed to query for current Logstash Outputs..."
exit 1
fi
fi
# Get the current list of Logstash outputs & hash them
CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON")
CURRENT_HASH=$(sha1sum <<< "$CURRENT_LIST" | awk '{print $1}')
# Get the current list of Logstash outputs & hash them
CURRENT_LIST=$(jq -c -r '.item.hosts' <<< "$RAW_JSON")
CURRENT_HASH=$(sha1sum <<< "$CURRENT_LIST" | awk '{print $1}')
declare -a NEW_LIST=()
declare -a NEW_LIST=()
{# If we select to not send to manager via SOC, then omit the code that adds manager to NEW_LIST #}
{% if ELASTICFLEETMERGED.enable_manager_output %}
# Create array & add initial elements
if [ "{{ GLOBALS.hostname }}" = "{{ GLOBALS.url_base }}" ]; then
{# If we select to not send to manager via SOC, then omit the code that adds manager to NEW_LIST #}
{% if ELASTICFLEETMERGED.enable_manager_output %}
# Create array & add initial elements
if [ "{{ GLOBALS.hostname }}" = "{{ GLOBALS.url_base }}" ]; then
NEW_LIST+=("{{ GLOBALS.url_base }}:5055")
else
else
NEW_LIST+=("{{ GLOBALS.url_base }}:5055" "{{ GLOBALS.hostname }}:5055")
fi
{% endif %}
fi
{% endif %}
# Query for FQDN entries & add them to the list
{% if ELASTICFLEETMERGED.config.server.custom_fqdn | length > 0 %}
CUSTOMFQDNLIST=('{{ ELASTICFLEETMERGED.config.server.custom_fqdn | join(' ') }}')
readarray -t -d ' ' CUSTOMFQDN < <(printf '%s' "$CUSTOMFQDNLIST")
for CUSTOMNAME in "${CUSTOMFQDN[@]}"
do
# Query for FQDN entries & add them to the list
{% if ELASTICFLEETMERGED.config.server.custom_fqdn | length > 0 %}
CUSTOMFQDNLIST=('{{ ELASTICFLEETMERGED.config.server.custom_fqdn | join(' ') }}')
readarray -t -d ' ' CUSTOMFQDN < <(printf '%s' "$CUSTOMFQDNLIST")
for CUSTOMNAME in "${CUSTOMFQDN[@]}"
do
NEW_LIST+=("$CUSTOMNAME:5055")
done
{% endif %}
done
{% endif %}
# Query for the current Grid Nodes that are running Logstash
LOGSTASHNODES=$(salt-call --out=json pillar.get logstash:nodes | jq '.local')
# Query for the current Grid Nodes that are running Logstash
LOGSTASHNODES=$(salt-call --out=json pillar.get logstash:nodes | jq '.local')
# Query for Receiver Nodes & add them to the list
if grep -q "receiver" <<< $LOGSTASHNODES; then
# Query for Receiver Nodes & add them to the list
if grep -q "receiver" <<< $LOGSTASHNODES; then
readarray -t RECEIVERNODES < <(jq -r ' .receiver | keys_unsorted[]' <<< $LOGSTASHNODES)
for NODE in "${RECEIVERNODES[@]}"
do
NEW_LIST+=("$NODE:5055")
done
fi
fi
# Query for Fleet Nodes & add them to the list
if grep -q "fleet" <<< $LOGSTASHNODES; then
# Query for Fleet Nodes & add them to the list
if grep -q "fleet" <<< $LOGSTASHNODES; then
readarray -t FLEETNODES < <(jq -r ' .fleet | keys_unsorted[]' <<< $LOGSTASHNODES)
for NODE in "${FLEETNODES[@]}"
do
NEW_LIST+=("$NODE:5055")
done
fi
fi
{% endif %}
# Sort & hash the new list of Logstash Outputs
NEW_LIST_JSON=$(jq --compact-output --null-input '$ARGS.positional' --args -- "${NEW_LIST[@]}")
@@ -91,5 +131,9 @@ if [ "$NEW_HASH" = "$CURRENT_HASH" ]; then
else
printf "\nHashes don't match - update needed.\n"
printf "Current List: $CURRENT_LIST\nNew List: $NEW_LIST_JSON\n"
{% if GLOBALS.pipeline == "KAFKA" %}
update_kafka_outputs
{% else %}
update_logstash_outputs
{% endif %}
fi

View File

@@ -81,8 +81,7 @@ printf "\nCreate Kafka Output Config if node is not an Import or Eval install\n"
{% if grains.role not in ['so-import', 'so-eval'] %}
KAFKACRT=$(openssl x509 -in /etc/pki/elasticfleet-kafka.crt)
KAFKAKEY=$(openssl rsa -in /etc/pki/elasticfleet-kafka.key)
{# KAFKACA=$(openssl x509 -in $INTCA) #}
KAFKACA=$(openssl x509 -in /etc/pki/tls/certs/intca.crt)
KAFKACA=$(openssl x509 -in $INTCA)
KAFKA_OUTPUT_VERSION="2.6.0"
JSON_STRING=$( jq -n \
--arg KAFKACRT "$KAFKACRT" \