From 83aaa76f982c008a451c19d6160cfc5ee8936569 Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Wed, 10 Jun 2026 16:34:10 -0400 Subject: [PATCH 01/11] allow full highstate on manager when locked --- salt/manager/tools/sbin/soup | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/salt/manager/tools/sbin/soup b/salt/manager/tools/sbin/soup index d50187c9c..4fd474ff2 100755 --- a/salt/manager/tools/sbin/soup +++ b/salt/manager/tools/sbin/soup @@ -343,10 +343,11 @@ highstate() { masterlock() { echo "Locking Salt Master" mv -v $TOPFILE $BACKUPTOPFILE - echo "base:" > $TOPFILE - echo " $MINIONID:" >> $TOPFILE - echo " - ca" >> $TOPFILE - echo " - elasticsearch" >> $TOPFILE + # Render the real top file only for the host running soup; every other + # minion gets an empty top (no states) while the master is upgrading. + echo "{% if grains['id'] == '$MINIONID' %}" > $TOPFILE + cat $BACKUPTOPFILE >> $TOPFILE + echo "{% endif %}" >> $TOPFILE } masterunlock() { From 780d9faf0dd2b346ce4bd5d06a7b665a0584b0e1 Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Thu, 11 Jun 2026 12:08:32 -0400 Subject: [PATCH 02/11] Parallelize so-elasticsearch-ilm-policy-load PUTs Run the ~300 ILM policy PUTs concurrently (bounded to 10 in flight via a throttle gate) instead of one serial curl per policy. Adds a put_policy helper and waits for all background jobs before exiting. Preserves policy parity; only the scheduling changes. Drops the dead empty sid cookie arg (falls back to basic auth from curl.config as before). --- .../so-elasticsearch-ilm-policy-load | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load index 7988c1905..e6a5f4eb8 100755 --- a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load +++ b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load @@ -6,6 +6,23 @@ . /usr/sbin/so-common +MAX_JOBS=10 + +put_policy() { + local desc="$1" policyname="$2" data="$3" + echo "Setting up ${desc} policy..." + curl -K /opt/so/conf/elasticsearch/curl.config -s -k -L \ + -X PUT "https://localhost:9200/_ilm/policy/${policyname}" \ + -H 'Content-Type: application/json' -d"${data}" +} + +# Block until fewer than MAX_JOBS background curls are running. +throttle() { + while (( $(jobs -rp | wc -l) >= MAX_JOBS )); do + wait -n + done +} + {%- from 'elasticsearch/template.map.jinja' import ES_INDEX_SETTINGS %} {%- if GLOBALS.role != "so-heavynode" %} {%- from 'elasticsearch/template.map.jinja' import ALL_ADDON_SETTINGS %} @@ -14,35 +31,26 @@ {%- for index, settings in ES_INDEX_SETTINGS.items() %} {%- if settings.policy is defined %} {%- if index == 'so-logs-detections.alerts' %} - echo - echo "Setting up so-logs-detections.alerts-so policy..." - curl -K /opt/so/conf/elasticsearch/curl.config -b "sid=$SESSIONCOOKIE" -s -k -L -X PUT "https://localhost:9200/_ilm/policy/{{ index }}-so" -H 'Content-Type: application/json' -d'{ "policy": {{ settings.policy | tojson(true) }} }' - echo + throttle + put_policy "so-logs-detections.alerts-so" "{{ index }}-so" '{ "policy": {{ settings.policy | tojson(true) }} }' & {%- elif index == 'so-logs-soc' %} - echo - echo "Setting up so-soc-logs policy..." - curl -K /opt/so/conf/elasticsearch/curl.config -b "sid=$SESSIONCOOKIE" -s -k -L -X PUT "https://localhost:9200/_ilm/policy/so-soc-logs" -H 'Content-Type: application/json' -d'{ "policy": {{ settings.policy | tojson(true) }} }' - echo - echo - echo "Setting up {{ index }}-logs policy..." - curl -K /opt/so/conf/elasticsearch/curl.config -b "sid=$SESSIONCOOKIE" -s -k -L -X PUT "https://localhost:9200/_ilm/policy/{{ index }}-logs" -H 'Content-Type: application/json' -d'{ "policy": {{ settings.policy | tojson(true) }} }' - echo + throttle + put_policy "so-soc-logs" "so-soc-logs" '{ "policy": {{ settings.policy | tojson(true) }} }' & + throttle + put_policy "{{ index }}-logs" "{{ index }}-logs" '{ "policy": {{ settings.policy | tojson(true) }} }' & {%- else %} - echo - echo "Setting up {{ index }}-logs policy..." - curl -K /opt/so/conf/elasticsearch/curl.config -b "sid=$SESSIONCOOKIE" -s -k -L -X PUT "https://localhost:9200/_ilm/policy/{{ index }}-logs" -H 'Content-Type: application/json' -d'{ "policy": {{ settings.policy | tojson(true) }} }' - echo + throttle + put_policy "{{ index }}-logs" "{{ index }}-logs" '{ "policy": {{ settings.policy | tojson(true) }} }' & {%- endif %} {%- endif %} {%- endfor %} -echo {%- if GLOBALS.role != "so-heavynode" %} {%- for index, settings in ALL_ADDON_SETTINGS.items() %} {%- if settings.policy is defined %} - echo - echo "Setting up {{ index }}-logs policy..." - curl -K /opt/so/conf/elasticsearch/curl.config -b "sid=$SESSIONCOOKIE" -s -k -L -X PUT "https://localhost:9200/_ilm/policy/{{ index }}-logs" -H 'Content-Type: application/json' -d'{ "policy": {{ settings.policy | tojson(true) }} }' - echo + throttle + put_policy "{{ index }}-logs" "{{ index }}-logs" '{ "policy": {{ settings.policy | tojson(true) }} }' & {%- endif %} {%- endfor %} {%- endif %} + +wait From 07d3b148b553907d17b9a01d6b8daa6333ae1ceb Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Thu, 11 Jun 2026 13:37:26 -0400 Subject: [PATCH 03/11] fix output --- .../sbin_jinja/so-elasticsearch-ilm-policy-load | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load index e6a5f4eb8..a75023cae 100755 --- a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load +++ b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load @@ -8,12 +8,19 @@ MAX_JOBS=10 +# Policies are loaded concurrently (up to MAX_JOBS at a time) for speed. Each policy's block is +# printed atomically the moment its curl returns, so output appears in COMPLETION ORDER, not the +# order policies are defined in configuration. +echo "Loading ILM policies concurrently; output below appears in completion order, not configuration order." +echo + put_policy() { - local desc="$1" policyname="$2" data="$3" - echo "Setting up ${desc} policy..." - curl -K /opt/so/conf/elasticsearch/curl.config -s -k -L \ + local desc="$1" policyname="$2" data="$3" result + result=$(curl -K /opt/so/conf/elasticsearch/curl.config -s -k -L \ -X PUT "https://localhost:9200/_ilm/policy/${policyname}" \ - -H 'Content-Type: application/json' -d"${data}" + -H 'Content-Type: application/json' -d"${data}") + # Single atomic write so concurrent jobs don't interleave; prints live as each curl finishes. + printf 'Setting up %s policy...\n%s\n\n' "${desc}" "${result}" } # Block until fewer than MAX_JOBS background curls are running. From f23652397c966e0c9627eee9442d097133af25d1 Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Thu, 11 Jun 2026 13:57:56 -0400 Subject: [PATCH 04/11] Speed up so-elastic-fleet-optional-integrations-load decision logic Replace the per-package decision loop (which forked ~10 processes per package and rebuilt a growing JSON file on every add -> O(n^2)) with two jq passes: one prints the status messages, one builds the bulk install list. A vnum/needs() jq definition reproduces the previous version_conversion/compare_versions and excluded/subscription/installed/ upgrade/in-use logic exactly. Also fetch each agent policy once and extract non-default package names locally instead of re-fetching the policy per integration (1+K -> 1 GET per policy). Install behavior is unchanged. --- ...o-elastic-fleet-optional-integrations-load | 155 +++++++----------- 1 file changed, 57 insertions(+), 98 deletions(-) diff --git a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load index ab38b7065..7579123cb 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load +++ b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load @@ -16,7 +16,6 @@ STATE_FILE_SUCCESS=/opt/so/state/estemplates.txt INSTALLED_PACKAGE_LIST=/tmp/esfleet_installed_packages.json BULK_INSTALL_PACKAGE_LIST=/tmp/esfleet_bulk_install.json -BULK_INSTALL_PACKAGE_TMP=/tmp/esfleet_bulk_install_tmp.json BULK_INSTALL_OUTPUT=/opt/so/state/esfleet_bulk_install_results.json INTEGRATION_PACKAGE_COMPONENTS=/opt/so/state/esfleet_package_components.json INPUT_PACKAGE_COMPONENTS=/opt/so/state/esfleet_input_package_components.json @@ -29,29 +28,6 @@ PENDING_UPDATE=false # Requiring some level of manual Elastic Stack configuration before installation EXCLUDED_INTEGRATIONS=('apm') -version_conversion(){ - version=$1 - echo "$version" | awk -F '.' '{ printf("%d%03d%03d\n", $1, $2, $3); }' -} - -compare_versions() { - version1=$1 - version2=$2 - - # Convert versions to numbers - num1=$(version_conversion "$version1") - num2=$(version_conversion "$version2") - - # Compare using bc - if (( $(echo "$num1 < $num2" | bc -l) )); then - echo "less" - elif (( $(echo "$num1 > $num2" | bc -l) )); then - echo "greater" - else - echo "equal" - fi -} - IFS=$'\n' agent_policies=$(elastic_fleet_agent_policy_ids) if [ $? -ne 0 ]; then @@ -63,23 +39,23 @@ default_packages=({% for pkg in SUPPORTED_PACKAGES %}"{{ pkg }}"{% if not loop.l in_use_integrations=() +# Fetch each agent policy once; its package_policies[] already contain both the integration name +# and the .package.name, so extract all non-default package names locally in a single jq instead +# of re-fetching the same policy per integration. +default_packages_json=$(printf '%s\n' "${default_packages[@]}" | jq -R . | jq -s '.') for AGENT_POLICY in $agent_policies; do - if ! integrations=$(elastic_fleet_integration_policy_names "$AGENT_POLICY"); then + if ! policy_json=$(fleet_api "agent_policies/$AGENT_POLICY"); then # skip the agent policy if we can't get required info, let salt retry. Integrations loaded by this script are non-default integrations. echo "Skipping $AGENT_POLICY.. " continue fi - for INTEGRATION in $integrations; do - if ! PACKAGE_NAME=$(elastic_fleet_integration_policy_package_name "$AGENT_POLICY" "$INTEGRATION"); then - echo "Not adding $INTEGRATION, couldn't get package name" - continue - fi - # non-default integrations that are in-use in any policy - if ! [[ " ${default_packages[@]} " =~ " $PACKAGE_NAME " ]]; then - in_use_integrations+=("$PACKAGE_NAME") - fi - done + # non-default integrations that are in-use in any policy + while IFS= read -r PACKAGE_NAME; do + [ -n "$PACKAGE_NAME" ] && in_use_integrations+=("$PACKAGE_NAME") + done < <(jq -r --argjson def "$default_packages_json" \ + '.item.package_policies[].package.name | select(. as $n | ($def | index($n)) | not)' \ + <<<"$policy_json") done if [[ -f $STATE_FILE_SUCCESS ]]; then @@ -90,72 +66,55 @@ if [[ -f $STATE_FILE_SUCCESS ]]; then rm -f $INSTALLED_PACKAGE_LIST echo $latest_package_list | jq '{packages: [.items[] | {name: .name, latest_version: .version, installed_version: .installationInfo.version, subscription: .conditions.elastic.subscription }]}' >> $INSTALLED_PACKAGE_LIST - while read -r package; do - # get package details - package_name=$(echo "$package" | jq -r '.name') - latest_version=$(echo "$package" | jq -r '.latest_version') - installed_version=$(echo "$package" | jq -r '.installed_version') - subscription=$(echo "$package" | jq -r '.subscription') - bulk_package=$(echo "$package" | jq '{name: .name, version: .latest_version}' ) + # Build the bulk install list and the per-package status messages with two jq passes + # instead of a per-package bash loop. The old loop forked ~10 processes per package + # (5 jq + awk/bc for the version compare) and re-parsed/rewrote a growing JSON file on + # every add (O(n^2)). Selection and messages below are identical to that logic. + SUB={% if SUB %}true{% else %}false{% endif %} + AUTOUP={% if AUTO_UPGRADE_INTEGRATIONS %}true{% else %}false{% endif %} + EXCLUDED_JSON=$(printf '%s\n' "${EXCLUDED_INTEGRATIONS[@]}" | jq -R 'select(length>0)' | jq -s '.') + INUSE_JSON=$(printf '%s\n' "${in_use_integrations[@]}" | jq -R 'select(length>0)' | jq -s 'unique') - if [[ ! "${EXCLUDED_INTEGRATIONS[@]}" =~ "$package_name" ]]; then - {% if not SUB %} - if [[ "$subscription" != "basic" && "$subscription" != "null" && -n "$subscription" ]]; then - # pass over integrations that require non-basic elastic license - echo "$package_name integration requires an Elastic license of $subscription or greater... skipping" - continue - else - if [[ "$installed_version" == "null" || -z "$installed_version" ]]; then - echo "$package_name is not installed... Adding to next update." - jq --argjson package "$bulk_package" '.packages += [$package]' $BULK_INSTALL_PACKAGE_LIST > $BULK_INSTALL_PACKAGE_TMP && mv $BULK_INSTALL_PACKAGE_TMP $BULK_INSTALL_PACKAGE_LIST + # vnum replicates the previous version_conversion (%d%03d%03d of the first three dotted + # fields); needs() replicates the excluded/subscription/installed/upgrade/in-use logic. + JQ_DECISION=' +def vnum: + [ (split(".")|.[0:3][] | gsub("[^0-9].*";"") | (if .=="" then "0" else . end) | tonumber) ] + | (.[0]//0)*1000000 + (.[1]//0)*1000 + (.[2]//0); +def needs($sub;$autoup;$excluded;$inuse): + .name as $n + | ($n | IN($excluded[]) | not) + and ( $sub or (.subscription==null or .subscription=="basic" or .subscription=="") ) + and ( (.installed_version==null or .installed_version=="") + or ( ((.latest_version|vnum) > (.installed_version|vnum)) + and ( $autoup or ($n | IN($inuse[]) | not) ) ) );' - PENDING_UPDATE=true - else - results=$(compare_versions "$latest_version" "$installed_version") - if [ $results == "greater" ]; then - {#- When auto_upgrade_integrations is false, skip upgrading in_use_integrations #} - {%- if not AUTO_UPGRADE_INTEGRATIONS %} - if ! [[ " ${in_use_integrations[@]} " =~ " $package_name " ]]; then - {%- endif %} - echo "$package_name is at version $installed_version latest version is $latest_version... Adding to next update." - jq --argjson package "$bulk_package" '.packages += [$package]' $BULK_INSTALL_PACKAGE_LIST > $BULK_INSTALL_PACKAGE_TMP && mv $BULK_INSTALL_PACKAGE_TMP $BULK_INSTALL_PACKAGE_LIST + JQ_ARGS=(--argjson sub "$SUB" --argjson autoup "$AUTOUP" --argjson excluded "$EXCLUDED_JSON" --argjson inuse "$INUSE_JSON") - PENDING_UPDATE=true - {%- if not AUTO_UPGRADE_INTEGRATIONS %} - else - echo "skipping available upgrade for in use integration - $package_name." - fi - {%- endif %} - fi - fi - fi - {% else %} - if [[ "$installed_version" == "null" || -z "$installed_version" ]]; then - echo "$package_name is not installed... Adding to next update." - jq --argjson package "$bulk_package" '.packages += [$package]' $BULK_INSTALL_PACKAGE_LIST > $BULK_INSTALL_PACKAGE_TMP && mv $BULK_INSTALL_PACKAGE_TMP $BULK_INSTALL_PACKAGE_LIST - PENDING_UPDATE=true - else - results=$(compare_versions "$latest_version" "$installed_version") - if [ $results == "greater" ]; then - {#- When auto_upgrade_integrations is false, skip upgrading in_use_integrations #} - {%- if not AUTO_UPGRADE_INTEGRATIONS %} - if ! [[ " ${in_use_integrations[@]} " =~ " $package_name " ]]; then - {%- endif %} - echo "$package_name is at version $installed_version latest version is $latest_version... Adding to next update." - jq --argjson package "$bulk_package" '.packages += [$package]' $BULK_INSTALL_PACKAGE_LIST > $BULK_INSTALL_PACKAGE_TMP && mv $BULK_INSTALL_PACKAGE_TMP $BULK_INSTALL_PACKAGE_LIST - PENDING_UPDATE=true - {%- if not AUTO_UPGRADE_INTEGRATIONS %} - else - echo "skipping available upgrade for in use integration - $package_name." - fi - {%- endif %} - fi - fi - {% endif %} - else - echo "Skipping $package_name..." - fi - done <<< "$(jq -c '.packages[]' "$INSTALLED_PACKAGE_LIST")" + # (a) Per-package status messages (parity with the previous echo output). + jq -r "${JQ_ARGS[@]}" "$JQ_DECISION"' + .packages[] + | .name as $n + | if ($n|IN($excluded[])) then "Skipping \($n)..." + elif (($sub|not) and (.subscription!=null and .subscription!="basic" and .subscription!="")) then + "\($n) integration requires an Elastic license of \(.subscription) or greater... skipping" + elif (.installed_version==null or .installed_version=="") then + "\($n) is not installed... Adding to next update." + elif ((.latest_version|vnum) > (.installed_version|vnum)) then + (if ($autoup or ($n|IN($inuse[])|not)) + then "\($n) is at version \(.installed_version) latest version is \(.latest_version)... Adding to next update." + else "skipping available upgrade for in use integration - \($n)." end) + else empty end + ' "$INSTALLED_PACKAGE_LIST" + + # (b) The bulk install list, built in a single pass. + jq "${JQ_ARGS[@]}" "$JQ_DECISION"' + {packages: [ .packages[] | select(needs($sub;$autoup;$excluded;$inuse)) | {name, version: .latest_version} ]} + ' "$INSTALLED_PACKAGE_LIST" > "$BULK_INSTALL_PACKAGE_LIST" + + if jq -e '.packages | length > 0' "$BULK_INSTALL_PACKAGE_LIST" >/dev/null; then + PENDING_UPDATE=true + fi if [ "$PENDING_UPDATE" = true ]; then # Run chunked install of packages From 6c42c419e2f6c89d3c0b57d5e909b5e532b59a2e Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Thu, 11 Jun 2026 15:42:41 -0400 Subject: [PATCH 05/11] Serialize ILM policy-load output with flock to stop interleaving A single printf per block was not actually one write() call, so concurrent jobs still occasionally interleaved their label and response lines. Hold an flock around just the printf (curl still runs in parallel) so each policy's block prints intact, keeping live completion-order streaming. --- .../sbin_jinja/so-elasticsearch-ilm-policy-load | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load index a75023cae..a884f2e2f 100755 --- a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load +++ b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load @@ -8,9 +8,13 @@ MAX_JOBS=10 +# Lock used to serialize block writes so concurrent jobs never interleave their output. +ILM_OUTPUT_LOCK=$(mktemp) +trap 'rm -f "$ILM_OUTPUT_LOCK"' EXIT + # Policies are loaded concurrently (up to MAX_JOBS at a time) for speed. Each policy's block is -# printed atomically the moment its curl returns, so output appears in COMPLETION ORDER, not the -# order policies are defined in configuration. +# printed the moment its curl returns, so output appears in COMPLETION ORDER, not the order +# policies are defined in configuration. echo "Loading ILM policies concurrently; output below appears in completion order, not configuration order." echo @@ -19,8 +23,11 @@ put_policy() { result=$(curl -K /opt/so/conf/elasticsearch/curl.config -s -k -L \ -X PUT "https://localhost:9200/_ilm/policy/${policyname}" \ -H 'Content-Type: application/json' -d"${data}") - # Single atomic write so concurrent jobs don't interleave; prints live as each curl finishes. - printf 'Setting up %s policy...\n%s\n\n' "${desc}" "${result}" + # curl above ran in parallel; serialize just this block write so concurrent jobs never interleave. + { + flock 200 + printf 'Setting up %s policy...\n%s\n\n' "${desc}" "${result}" + } 200>>"${ILM_OUTPUT_LOCK}" } # Block until fewer than MAX_JOBS background curls are running. From b1273573ed33712613795e2f401bdf9376a90679 Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Thu, 11 Jun 2026 15:50:53 -0400 Subject: [PATCH 06/11] Fix jq $def keyword collision in optional-integrations-load The agent-policy enumeration passed --argjson def, creating a jq variable $def. 'def' is a reserved keyword in jq and the deployed jq version rejects it, so the program failed to compile and in_use_integrations was left empty (silently disabling the in-use upgrade guard). Rename the arg to $defaults. --- .../sbin_jinja/so-elastic-fleet-optional-integrations-load | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load index 7579123cb..75bbc29d8 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load +++ b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-optional-integrations-load @@ -53,8 +53,8 @@ for AGENT_POLICY in $agent_policies; do # non-default integrations that are in-use in any policy while IFS= read -r PACKAGE_NAME; do [ -n "$PACKAGE_NAME" ] && in_use_integrations+=("$PACKAGE_NAME") - done < <(jq -r --argjson def "$default_packages_json" \ - '.item.package_policies[].package.name | select(. as $n | ($def | index($n)) | not)' \ + done < <(jq -r --argjson defaults "$default_packages_json" \ + '.item.package_policies[].package.name | select(. as $n | ($defaults | index($n)) | not)' \ <<<"$policy_json") done From ae6a705ce16d5fe9f228be16ef2335b78ee2fba1 Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Fri, 12 Jun 2026 09:38:41 -0400 Subject: [PATCH 07/11] Speed up so-elastic-fleet-integration-policy-load Fetch each agent policy once per group instead of refetching the full policy (plus a fresh Kibana session cookie) for every integration file, and dispatch the create/update writes as throttled background jobs. Adds elastic_fleet_load_integrations_dir and elastic_fleet_throttle to so-elastic-fleet-common, reusing the bounded-concurrency pattern from so-elasticsearch-ilm-policy-load. Replaces the four serial loops in the loader with one call per agent policy. --- .../tools/sbin/so-elastic-fleet-common | 64 +++++++++++++ .../so-elastic-fleet-integration-policy-load | 95 +++---------------- 2 files changed, 78 insertions(+), 81 deletions(-) diff --git a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common index 91fa787f2..d4f40b3ed 100644 --- a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common +++ b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common @@ -30,6 +30,70 @@ fleet_api() { curl -sK /opt/so/conf/elasticsearch/curl.config -L "localhost:5601/api/fleet/${QUERYPATH}" "$@" --retry 3 --retry-delay 10 --fail 2>/dev/null } +# Max number of concurrent Fleet write jobs (create/update). Override via env if needed. +MAX_FLEET_JOBS=${MAX_FLEET_JOBS:-10} + +# Block until fewer than MAX_FLEET_JOBS background jobs are running. +elastic_fleet_throttle() { + while (( $(jobs -rp | wc -l) >= MAX_FLEET_JOBS )); do + wait -n + done +} + +# Load every integration JSON in a directory into a single agent policy. +# The agent policy is fetched ONCE (not per file), and the create/update writes +# are dispatched as throttled background jobs. +# $1 AGENT_POLICY - the agent policy id/name to load integrations into +# $2 DIR - directory of integration *.json files +# $3 LABEL - human-readable label for log output +# $4 SKIP_CREATE_NAME - (optional) integration name to skip when creating (still updated if present) +# Returns 1 if any integration failed to create/update. +elastic_fleet_load_integrations_dir() { + local AGENT_POLICY=$1 + local DIR=$2 + local LABEL=$3 + local SKIP_CREATE_NAME=$4 + local POLICY_JSON FAIL_FILE INTEGRATION NAME ID + + FAIL_FILE=$(mktemp) + + # Fetch the agent policy a single time; we look up integration ids locally below. + POLICY_JSON=$(fleet_api "agent_policies/$AGENT_POLICY") + + for INTEGRATION in "$DIR"/*.json; do + [ -e "$INTEGRATION" ] || continue + NAME=$(jq -r .name "$INTEGRATION") + ID=$(jq -r --arg n "$NAME" '.item.package_policies[]? | select(.name==$n) | .id' <<<"$POLICY_JSON") + + elastic_fleet_throttle + { + if [ -n "$ID" ]; then + printf "\n\n%s - Updating integration %s\n" "$LABEL" "$NAME" + if ! elastic_fleet_integration_update "$ID" "@$INTEGRATION"; then + flock 9; echo "update ${INTEGRATION##*/}" >&9 + fi + elif [ -n "$SKIP_CREATE_NAME" ] && [ "$NAME" == "$SKIP_CREATE_NAME" ]; then + printf "\n\n%s - Skipping creation of %s\n" "$LABEL" "$NAME" + else + printf "\n\n%s - Creating integration %s\n" "$LABEL" "$NAME" + if ! elastic_fleet_integration_create "@$INTEGRATION"; then + flock 9; echo "create ${INTEGRATION##*/}" >&9 + fi + fi + } 9>>"$FAIL_FILE" & + done + wait + + local rc=0 + if [ -s "$FAIL_FILE" ]; then + printf "\n%s: failed integrations:\n" "$LABEL" + cat "$FAIL_FILE" + rc=1 + fi + rm -f "$FAIL_FILE" + return $rc +} + elastic_fleet_integration_check() { AGENT_POLICY=$1 diff --git a/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load b/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load index e548c7f86..73d569dd9 100644 --- a/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load +++ b/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load @@ -18,93 +18,26 @@ if [ ! -f /opt/so/state/eaintegrations.txt ]; then # Third, configure Elastic Defend Integration seperately /usr/sbin/so-elastic-fleet-integration-policy-elastic-defend + # Each group fetches its agent policy once and dispatches create/update writes concurrently. + # Initial Endpoints - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations/endpoints-initial/*.json; do - printf "\n\nInitial Endpoints Policy - Loading $INTEGRATION\n" - elastic_fleet_integration_check "endpoints-initial" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - done + elastic_fleet_load_integrations_dir "endpoints-initial" \ + /opt/so/conf/elastic-fleet/integrations/endpoints-initial "Initial Endpoints Policy" || RETURN_CODE=1 # Grid Nodes - General - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations/grid-nodes_general/*.json; do - printf "\n\nGrid Nodes Policy_General - Loading $INTEGRATION\n" - elastic_fleet_integration_check "so-grid-nodes_general" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - done + elastic_fleet_load_integrations_dir "so-grid-nodes_general" \ + /opt/so/conf/elastic-fleet/integrations/grid-nodes_general "Grid Nodes Policy_General" || RETURN_CODE=1 # Grid Nodes - Heavy - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations/grid-nodes_heavy/*.json; do - printf "\n\nGrid Nodes Policy_Heavy - Loading $INTEGRATION\n" - elastic_fleet_integration_check "so-grid-nodes_heavy" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - done + elastic_fleet_load_integrations_dir "so-grid-nodes_heavy" \ + /opt/so/conf/elastic-fleet/integrations/grid-nodes_heavy "Grid Nodes Policy_Heavy" || RETURN_CODE=1 - # Fleet Server - Optional integrations - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations-optional/FleetServer*/*.json; do - if ! [ "$INTEGRATION" == "/opt/so/conf/elastic-fleet/integrations-optional/FleetServer*/*.json" ]; then - FLEET_POLICY=`echo "$INTEGRATION"| cut -d'/' -f7` - printf "\n\nFleet Server Policy - Loading $INTEGRATION\n" - elastic_fleet_integration_check "$FLEET_POLICY" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if [ "$NAME" != "elasticsearch-logs" ]; then - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - fi - fi + # Fleet Server - Optional integrations (one agent policy per FleetServer_* directory) + for FLEET_DIR in /opt/so/conf/elastic-fleet/integrations-optional/FleetServer*/; do + [ -d "$FLEET_DIR" ] || continue + FLEET_POLICY=$(basename "$FLEET_DIR") + elastic_fleet_load_integrations_dir "$FLEET_POLICY" \ + "${FLEET_DIR%/}" "Fleet Server Policy" "elasticsearch-logs" || RETURN_CODE=1 done # Only create the state file if all policies were created/updated successfully From 43f72c1f9f0c8d7c874781ae2ac54844f74b8029 Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Fri, 12 Jun 2026 15:11:34 -0400 Subject: [PATCH 08/11] Parallelize so-elasticsearch-templates-load template PUTs Load component and index templates as throttled background jobs (max 10 concurrent) instead of sequential curl PUTs, matching the bounded-concurrency + flock-serialized-output pattern used by the fleet/ILM load scripts. Keeps a wait barrier between the component phase and the index phase so index templates never load before their referenced component templates. Failures are tracked via per-job marker files since counter increments can't escape background subshells. --- .../sbin/so-elasticsearch-templates-load | 143 +++++++++++++----- 1 file changed, 107 insertions(+), 36 deletions(-) diff --git a/salt/elasticsearch/tools/sbin/so-elasticsearch-templates-load b/salt/elasticsearch/tools/sbin/so-elasticsearch-templates-load index a0ebd66e8..f3c830f1c 100755 --- a/salt/elasticsearch/tools/sbin/so-elasticsearch-templates-load +++ b/salt/elasticsearch/tools/sbin/so-elasticsearch-templates-load @@ -11,10 +11,8 @@ ADDON_STATEFILE_SUCCESS=/opt/so/state/addon_estemplates.txt ELASTICSEARCH_TEMPLATES_DIR="/opt/so/conf/elasticsearch/templates" SO_TEMPLATES_DIR="${ELASTICSEARCH_TEMPLATES_DIR}/index" ADDON_TEMPLATES_DIR="${ELASTICSEARCH_TEMPLATES_DIR}/addon-index" -SO_LOAD_FAILURES=0 -ADDON_LOAD_FAILURES=0 -SO_LOAD_FAILURES_NAMES=() -ADDON_LOAD_FAILURES_NAMES=() +FAILED_NAMES=() +FAILED_COUNT=0 IS_HEAVYNODE="false" FORCE="false" VERBOSE="false" @@ -46,20 +44,86 @@ while [[ $# -gt 0 ]]; do shift done +# Max number of concurrent template PUT jobs. Override via env if needed. +MAX_TEMPLATE_JOBS=${MAX_TEMPLATE_JOBS:-10} + +# Block until fewer than MAX_TEMPLATE_JOBS background jobs are running. +template_throttle() { + while (( $(jobs -rp | wc -l) >= MAX_TEMPLATE_JOBS )); do + wait -n + done +} + +# Per-job failure markers and an output lock for serializing parallel job output. +# Each failed load drops one file (named after the template) into FAIL_DIR; the +# output of each job is flushed as a single block under flock so concurrent jobs +# never interleave their (chatty) retry output. +FAIL_DIR=$(mktemp -d) +OUTPUT_LOCK="${FAIL_DIR}/.output.lock" +: > "$OUTPUT_LOCK" +trap 'rm -rf "$FAIL_DIR"' EXIT + +# Record a failure: $1 = the template name/path to report later. Slashes are +# encoded so the path becomes a safe single filename. +record_failure() { + local marker="${1//\//__}" + : > "${FAIL_DIR}/fail.${marker}" +} + +# Populate FAILED_NAMES and FAILED_COUNT from the current phase's markers. +# Must run in the current shell (not a command substitution) so the array sticks. +collect_failures() { + FAILED_NAMES=() + FAILED_COUNT=0 + local f name + shopt -s nullglob + for f in "${FAIL_DIR}"/fail.*; do + name="${f##*/fail.}" + name="${name//__//}" + FAILED_NAMES+=("$name") + FAILED_COUNT=$((FAILED_COUNT + 1)) + done + shopt -u nullglob +} + +# Clear markers and names between phases so SO and addon counts stay independent. +reset_failures() { + shopt -s nullglob + rm -f "${FAIL_DIR}"/fail.* + shopt -u nullglob + FAILED_NAMES=() + FAILED_COUNT=0 +} + +# Print a block of text atomically (under the shared output lock) so the output +# of concurrent background jobs is not interleaved. +locked_echo() { + { flock 9; printf '%s\n' "$1"; } 9>>"$OUTPUT_LOCK" +} + +# Loads one template file via PUT. Intended to be dispatched as a background job. +# $1 uri - e.g. _component_template/foo or _index_template/foo +# $2 file - path to the template JSON +# $3 report_name - name/path to record if this load fails load_template() { local uri="$1" local file="$2" + local report_name="$3" + local out rc=0 block - echo "Loading template file $file" - if ! output=$(retry 3 3 "so-elasticsearch-query $uri -d@$file -XPUT" "{\"acknowledged\":true}"); then - echo "$output" - - return 1 - + # Capture everything (including retry's diagnostic chatter) into one block so + # concurrent jobs never interleave; the whole block is flushed under one flock. + block="Loading template file $file"$'\n' + if ! out=$(retry 3 3 "so-elasticsearch-query $uri -d@$file -XPUT" "{\"acknowledged\":true}" 2>&1); then + block+="$out"$'\n' + rc=1 elif [[ "$VERBOSE" == "true" ]]; then - echo "$output" + block+="$out"$'\n' fi + { flock 9; printf '%s' "$block"; } 9>>"$OUTPUT_LOCK" + + (( rc != 0 )) && record_failure "$report_name" } check_required_component_template_exists() { @@ -110,6 +174,9 @@ load_component_templates() { return fi + # Dispatch loads as throttled background jobs. The barrier (wait) happens in + # the caller after all component groups have been dispatched, since index + # templates must not load until every component template is in place. for component in "$pattern"/*.json; do tmpl_name=$(basename "${component%.json}") @@ -118,10 +185,8 @@ load_component_templates() { tmpl_name="${tmpl_name%-mappings}-mappings" fi - if ! load_template "_component_template/${tmpl_name}" "$component"; then - SO_LOAD_FAILURES=$((SO_LOAD_FAILURES + 1)) - SO_LOAD_FAILURES_NAMES+=("$component") - fi + template_throttle + load_template "_component_template/${tmpl_name}" "$component" "$component" & done } @@ -180,6 +245,9 @@ if [[ "$FORCE" == "true" || ! -f "$SO_STATEFILE_SUCCESS" ]] && index_templates_e load_component_templates "Elastic Agent" "elastic-agent" load_component_templates "Security Onion" "so" + # Barrier: every component template PUT must complete before we snapshot the + # component template list and start loading index templates that depend on them. + wait component_templates=$(so-elasticsearch-component-templates-list) echo -e "Loading Security Onion index templates...\n" for so_idx_tmpl in "${SO_TEMPLATES_DIR}"/*.json; do @@ -189,7 +257,7 @@ if [[ "$FORCE" == "true" || ! -f "$SO_STATEFILE_SUCCESS" ]] && index_templates_e # TODO: Better way to load only heavynode specific templates if ! check_heavynode_compatiable_index_template "$tmpl_name"; then if [[ "$VERBOSE" == "true" ]]; then - echo "Skipping over $so_idx_tmpl, template is not a heavynode specific index template." + locked_echo "Skipping over $so_idx_tmpl, template is not a heavynode specific index template." fi continue @@ -197,32 +265,34 @@ if [[ "$FORCE" == "true" || ! -f "$SO_STATEFILE_SUCCESS" ]] && index_templates_e fi if check_required_component_template_exists "$so_idx_tmpl"; then - if ! load_template "_index_template/$tmpl_name" "$so_idx_tmpl"; then - SO_LOAD_FAILURES=$((SO_LOAD_FAILURES + 1)) - SO_LOAD_FAILURES_NAMES+=("$so_idx_tmpl") - fi + template_throttle + load_template "_index_template/$tmpl_name" "$so_idx_tmpl" "$so_idx_tmpl" & else - echo "Skipping over $so_idx_tmpl due to missing required component template(s)." - SO_LOAD_FAILURES=$((SO_LOAD_FAILURES + 1)) - SO_LOAD_FAILURES_NAMES+=("$so_idx_tmpl") + locked_echo "Skipping over $so_idx_tmpl due to missing required component template(s)." + record_failure "$so_idx_tmpl" continue fi done - if [[ $SO_LOAD_FAILURES -eq 0 ]]; then + # Barrier: all SO index template PUTs must finish before tallying failures. + wait + + collect_failures + if [[ $FAILED_COUNT -eq 0 ]]; then echo "All Security Onion core templates loaded successfully." touch "$SO_STATEFILE_SUCCESS" else - echo "Encountered $SO_LOAD_FAILURES failure(s) loading templates:" - for failed_template in "${SO_LOAD_FAILURES_NAMES[@]}"; do + echo "Encountered $FAILED_COUNT failure(s) loading templates:" + for failed_template in "${FAILED_NAMES[@]}"; do echo " - $failed_template" done if [[ "$SHOULD_EXIT_ON_FAILURE" == "true" ]]; then fail "Failed to load all Security Onion core templates successfully." fi fi + reset_failures elif ! index_templates_exist "$SO_TEMPLATES_DIR"; then echo "No Security Onion core index templates found in ${SO_TEMPLATES_DIR}, skipping." elif [[ -f "$SO_STATEFILE_SUCCESS" ]]; then @@ -241,26 +311,27 @@ if should_load_addon_templates; then tmpl_name=$(basename "${addon_idx_tmpl%-template.json}") if check_required_component_template_exists "$addon_idx_tmpl"; then - if ! load_template "_index_template/${tmpl_name}" "$addon_idx_tmpl"; then - ADDON_LOAD_FAILURES=$((ADDON_LOAD_FAILURES + 1)) - ADDON_LOAD_FAILURES_NAMES+=("$addon_idx_tmpl") - fi + template_throttle + load_template "_index_template/${tmpl_name}" "$addon_idx_tmpl" "$addon_idx_tmpl" & else - echo "Skipping over $addon_idx_tmpl due to missing required component template(s)." - ADDON_LOAD_FAILURES=$((ADDON_LOAD_FAILURES + 1)) - ADDON_LOAD_FAILURES_NAMES+=("$addon_idx_tmpl") + locked_echo "Skipping over $addon_idx_tmpl due to missing required component template(s)." + record_failure "$addon_idx_tmpl" continue fi done - if [[ $ADDON_LOAD_FAILURES -eq 0 ]]; then + # Barrier: all addon index template PUTs must finish before tallying failures. + wait + + collect_failures + if [[ $FAILED_COUNT -eq 0 ]]; then echo "All addon integration templates loaded successfully." touch "$ADDON_STATEFILE_SUCCESS" else - echo "Encountered $ADDON_LOAD_FAILURES failure(s) loading addon integration templates:" - for failed_template in "${ADDON_LOAD_FAILURES_NAMES[@]}"; do + echo "Encountered $FAILED_COUNT failure(s) loading addon integration templates:" + for failed_template in "${FAILED_NAMES[@]}"; do echo " - $failed_template" done if [[ "$SHOULD_EXIT_ON_FAILURE" == "true" ]]; then From 1ee555957a971a778cd8db0c6cbbe9fbe8ab712a Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Fri, 12 Jun 2026 15:23:43 -0400 Subject: [PATCH 09/11] Speed up so-elastic-fleet-integration-upgrade Fetch each agent policy once and extract integration name/package/version/id locally via a single jq pass instead of re-fetching the identical policy JSON 1+3N times. Memoize epm/packages latest-version lookups so each package is queried once instead of per (policy, integration). Dispatch the per-integration dry-run+upgrade as throttled background jobs (MAX_FLEET_JOBS) with flock-serialized output and a FAIL_FILE marker, mirroring elastic_fleet_load_integrations_dir. Behavior preserved: same elastic-defend-endpoints/fleet_server skips, same AUTO_UPGRADE_INTEGRATIONS default-package gating (moved into jq, using $defaults to avoid the jq $def keyword collision), and exit 1 on any failure so salt retries. --- .../so-elastic-fleet-integration-upgrade | 133 ++++++++++-------- 1 file changed, 75 insertions(+), 58 deletions(-) diff --git a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-integration-upgrade b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-integration-upgrade index 1a1448c53..c0008f362 100644 --- a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-integration-upgrade +++ b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-integration-upgrade @@ -23,73 +23,90 @@ if [ $? -ne 0 ]; then fi default_packages=({% for pkg in SUPPORTED_PACKAGES %}"{{ pkg }}"{% if not loop.last %} {% endif %}{% endfor %}) +# JSON array of the default packages, used by the jq filter below. +default_packages_json=$(printf '%s\n' "${default_packages[@]}" | jq -R . | jq -s '.') + +# Output lock (serializes concurrent job output) and failure file (one marker line per +# failed integration). Mirrors the pattern used by elastic_fleet_load_integrations_dir. +OUTPUT_LOCK=$(mktemp) +FAIL_FILE=$(mktemp) +trap 'rm -f "$OUTPUT_LOCK" "$FAIL_FILE"' EXIT + +# Cache of package name -> latest available version, so the same package is only looked up +# once instead of once per (policy, integration). +declare -A LATEST_VERSION_CACHE -ERROR=false for AGENT_POLICY in $agent_policies; do - if ! integrations=$(elastic_fleet_integration_policy_names "$AGENT_POLICY"); then + # Fetch the agent policy a single time; package name/version and integration id are all + # extracted locally below instead of re-fetching the same policy per integration. + if ! POLICY_JSON=$(fleet_api "agent_policies/$AGENT_POLICY"); then # this script upgrades default integration packages, exit 1 and let salt handle retrying exit 1 fi - for INTEGRATION in $integrations; do - if ! [[ "$INTEGRATION" == "elastic-defend-endpoints" ]] && ! [[ "$INTEGRATION" == "fleet_server-"* ]]; then - # Get package name so we know what package to look for when checking the current and latest available version - if ! PACKAGE_NAME=$(elastic_fleet_integration_policy_package_name "$AGENT_POLICY" "$INTEGRATION"); then + + # One jq pass emits name/package.name/package.version/id for every eligible integration. + # The endpoint/fleet_server skips and the default-package gate are applied here in jq. + # $defaults (not $def, a jq reserved keyword) holds the default package list. + while IFS=$'\t' read -r INTEGRATION PACKAGE_NAME PACKAGE_VERSION INTEGRATION_ID; do + [ -n "$INTEGRATION" ] || continue + + # Look up the latest available version once per package, then memoize it. + if [[ -z "${LATEST_VERSION_CACHE[$PACKAGE_NAME]+set}" ]]; then + if ! AVAILABLE_VERSION=$(elastic_fleet_package_latest_version_check "$PACKAGE_NAME"); then + echo "Error: Failed getting latest version for $PACKAGE_NAME" exit 1 fi - {%- if not AUTO_UPGRADE_INTEGRATIONS %} - if [[ " ${default_packages[@]} " =~ " $PACKAGE_NAME " ]]; then - {%- endif %} - # Get currently installed version of package - attempt=0 - max_attempts=3 - while [ $attempt -lt $max_attempts ]; do - if PACKAGE_VERSION=$(elastic_fleet_integration_policy_package_version "$AGENT_POLICY" "$INTEGRATION") && AVAILABLE_VERSION=$(elastic_fleet_package_latest_version_check "$PACKAGE_NAME"); then - break - fi - attempt=$((attempt + 1)) - done - if [ $attempt -eq $max_attempts ]; then - echo "Error: Failed getting $PACKAGE_VERSION or $AVAILABLE_VERSION" - exit 1 - fi - - # Get integration ID - if ! INTEGRATION_ID=$(elastic_fleet_integration_id "$AGENT_POLICY" "$INTEGRATION"); then - exit 1 - fi - - if [[ "$PACKAGE_VERSION" != "$AVAILABLE_VERSION" ]]; then - # Dry run of the upgrade - echo "" - echo "Current $PACKAGE_NAME package version ($PACKAGE_VERSION) is not the same as the latest available package ($AVAILABLE_VERSION)..." - echo "Upgrading $INTEGRATION..." - echo "Starting dry run..." - if ! DRYRUN_OUTPUT=$(elastic_fleet_integration_policy_dryrun_upgrade "$INTEGRATION_ID"); then - exit 1 - fi - DRYRUN_ERRORS=$(echo "$DRYRUN_OUTPUT" | jq .[].hasErrors) - - # If no errors with dry run, proceed with actual upgrade - if [[ "$DRYRUN_ERRORS" == "false" ]]; then - echo "No errors detected. Proceeding with upgrade..." - if ! elastic_fleet_integration_policy_upgrade "$INTEGRATION_ID"; then - echo "Error: Upgrade failed for $PACKAGE_NAME with integration ID '$INTEGRATION_ID'." - ERROR=true - continue - fi - else - echo "Errors detected during dry run for $PACKAGE_NAME policy upgrade..." - ERROR=true - continue - fi - fi - {%- if not AUTO_UPGRADE_INTEGRATIONS %} - fi - {%- endif %} + LATEST_VERSION_CACHE[$PACKAGE_NAME]=$AVAILABLE_VERSION fi - done + AVAILABLE_VERSION=${LATEST_VERSION_CACHE[$PACKAGE_NAME]} + + if [[ "$PACKAGE_VERSION" != "$AVAILABLE_VERSION" ]]; then + # Dry run, then (if clean) the actual upgrade, dispatched as a throttled background + # job. Each job builds its full log into one block, then flushes it under a single + # shared lock (OUTPUT_LOCK) so concurrent jobs never interleave on stdout; a failed + # job also appends a marker line to FAIL_FILE while holding that same lock. + elastic_fleet_throttle + { + block=$'\n'"Current $PACKAGE_NAME package version ($PACKAGE_VERSION) is not the same as the latest available package ($AVAILABLE_VERSION)..."$'\n' + block+="Upgrading $INTEGRATION..."$'\n'"Starting dry run..."$'\n' + fail="" + if ! DRYRUN_OUTPUT=$(elastic_fleet_integration_policy_dryrun_upgrade "$INTEGRATION_ID"); then + block+="Error: Failed to complete dry run for '$INTEGRATION_ID'."$'\n' + fail="dryrun $INTEGRATION" + elif [[ "$(jq .[].hasErrors <<<"$DRYRUN_OUTPUT")" == "false" ]]; then + block+="No errors detected. Proceeding with upgrade..."$'\n' + if ! elastic_fleet_integration_policy_upgrade "$INTEGRATION_ID"; then + block+="Error: Upgrade failed for $PACKAGE_NAME with integration ID '$INTEGRATION_ID'."$'\n' + fail="upgrade $INTEGRATION" + fi + else + block+="Errors detected during dry run for $PACKAGE_NAME policy upgrade..."$'\n' + fail="dryrun-errors $INTEGRATION" + fi + { + flock 9 + printf '%s' "$block" + [ -n "$fail" ] && printf '%s\n' "$fail" >>"$FAIL_FILE" + } 9>>"$OUTPUT_LOCK" + } & + fi + done < <(jq -r --argjson defaults "$default_packages_json" ' + .item.package_policies[] + | select(.name != "elastic-defend-endpoints") + | select(.name | startswith("fleet_server-") | not) + {%- if not AUTO_UPGRADE_INTEGRATIONS %} + | select(.package.name | IN($defaults[])) + {%- endif %} + | [.name, .package.name, .package.version, .id] | @tsv + ' <<<"$POLICY_JSON") done -if [[ "$ERROR" == "true" ]]; then + +# Barrier: wait for every dispatched dry-run/upgrade job to finish. +wait + +if [ -s "$FAIL_FILE" ]; then + printf '\nFailed integration upgrades:\n' + cat "$FAIL_FILE" exit 1 fi echo From d0bea2ebcbef3234636590e78bc7318f1621ef7e Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Thu, 18 Jun 2026 11:19:36 -0400 Subject: [PATCH 10/11] Restore grouped per-integration logging and retry 409s in fleet integration loader elastic_fleet_load_integrations_dir now buffers each concurrent job's output (header + API response) to a per-job file and prints them in submission order after wait, restoring the readable serial-style output while keeping concurrent writes. Add --retry-all-errors to the integration create/update curl calls so transient 409 conflicts from concurrent writes to the same agent policy are retried (curl --retry alone does not retry 409). --- .../tools/sbin/so-elastic-fleet-common | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common index d4f40b3ed..e8ded916f 100644 --- a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common +++ b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common @@ -53,9 +53,13 @@ elastic_fleet_load_integrations_dir() { local DIR=$2 local LABEL=$3 local SKIP_CREATE_NAME=$4 - local POLICY_JSON FAIL_FILE INTEGRATION NAME ID + local POLICY_JSON FAIL_FILE OUT_DIR INTEGRATION NAME ID i FAIL_FILE=$(mktemp) + # Each job buffers its full output (header + API response) into its own file so the + # parent can print them grouped and in submission order after concurrent writes finish. + OUT_DIR=$(mktemp -d) + i=0 # Fetch the agent policy a single time; we look up integration ids locally below. POLICY_JSON=$(fleet_api "agent_policies/$AGENT_POLICY") @@ -67,23 +71,31 @@ elastic_fleet_load_integrations_dir() { elastic_fleet_throttle { + local RESP if [ -n "$ID" ]; then printf "\n\n%s - Updating integration %s\n" "$LABEL" "$NAME" - if ! elastic_fleet_integration_update "$ID" "@$INTEGRATION"; then + if ! RESP=$(elastic_fleet_integration_update "$ID" "@$INTEGRATION"); then flock 9; echo "update ${INTEGRATION##*/}" >&9 fi + printf '%s\n' "$RESP" elif [ -n "$SKIP_CREATE_NAME" ] && [ "$NAME" == "$SKIP_CREATE_NAME" ]; then printf "\n\n%s - Skipping creation of %s\n" "$LABEL" "$NAME" else printf "\n\n%s - Creating integration %s\n" "$LABEL" "$NAME" - if ! elastic_fleet_integration_create "@$INTEGRATION"; then + if ! RESP=$(elastic_fleet_integration_create "@$INTEGRATION"); then flock 9; echo "create ${INTEGRATION##*/}" >&9 fi + printf '%s\n' "$RESP" fi - } 9>>"$FAIL_FILE" & + } >"$OUT_DIR/$(printf '%03d' "$i")" 9>>"$FAIL_FILE" & + i=$((i+1)) done wait + # Emit per-integration output grouped and in submission order (glob sorts numerically). + cat "$OUT_DIR"/* 2>/dev/null + rm -rf "$OUT_DIR" + local rc=0 if [ -s "$FAIL_FILE" ]; then printf "\n%s: failed integrations:\n" "$LABEL" @@ -110,7 +122,9 @@ elastic_fleet_integration_create() { JSON_STRING=$1 - if ! fleet_api "package_policies" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -XPOST -d "$JSON_STRING"; then + # --retry-all-errors so transient 409 conflicts (concurrent writes to the same agent + # policy) are retried; curl --retry alone does not retry 409. + if ! fleet_api "package_policies" --retry-all-errors -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -XPOST -d "$JSON_STRING"; then return 1 fi } @@ -141,7 +155,9 @@ elastic_fleet_integration_update() { JSON_STRING=$2 - if ! fleet_api "package_policies/$UPDATE_ID" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -XPUT -d "$JSON_STRING"; then + # --retry-all-errors so transient 409 conflicts (concurrent writes to the same agent + # policy) are retried; curl --retry alone does not retry 409. + if ! fleet_api "package_policies/$UPDATE_ID" --retry-all-errors -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -XPUT -d "$JSON_STRING"; then return 1 fi } From 96fcc0ec3848060ce84ec9b2dde4ac0234704bee Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Mon, 22 Jun 2026 14:25:46 -0500 Subject: [PATCH 11/11] wip --- .../tools/sbin/so-elastic-fleet-common | 20 ++++++++--- .../so-elasticsearch-ilm-policy-load | 35 +++++++++++++++---- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common index e8ded916f..855a28510 100644 --- a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common +++ b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common @@ -36,7 +36,7 @@ MAX_FLEET_JOBS=${MAX_FLEET_JOBS:-10} # Block until fewer than MAX_FLEET_JOBS background jobs are running. elastic_fleet_throttle() { while (( $(jobs -rp | wc -l) >= MAX_FLEET_JOBS )); do - wait -n + wait -n || true done } @@ -47,7 +47,7 @@ elastic_fleet_throttle() { # $2 DIR - directory of integration *.json files # $3 LABEL - human-readable label for log output # $4 SKIP_CREATE_NAME - (optional) integration name to skip when creating (still updated if present) -# Returns 1 if any integration failed to create/update. +# Returns 1 if the policy cannot be fetched or if any integration failed to create/update. elastic_fleet_load_integrations_dir() { local AGENT_POLICY=$1 local DIR=$2 @@ -62,7 +62,19 @@ elastic_fleet_load_integrations_dir() { i=0 # Fetch the agent policy a single time; we look up integration ids locally below. - POLICY_JSON=$(fleet_api "agent_policies/$AGENT_POLICY") + if ! POLICY_JSON=$(fleet_api "agent_policies/$AGENT_POLICY"); then + echo "Error: Failed to retrieve agent policy '$AGENT_POLICY'." + rm -f "$FAIL_FILE" + rm -rf "$OUT_DIR" + return 1 + fi + + if ! jq -e '.item.package_policies' <<<"$POLICY_JSON" >/dev/null 2>&1; then + echo "Error: Invalid agent policy response for '$AGENT_POLICY'." + rm -f "$FAIL_FILE" + rm -rf "$OUT_DIR" + return 1 + fi for INTEGRATION in "$DIR"/*.json; do [ -e "$INTEGRATION" ] || continue @@ -90,7 +102,7 @@ elastic_fleet_load_integrations_dir() { } >"$OUT_DIR/$(printf '%03d' "$i")" 9>>"$FAIL_FILE" & i=$((i+1)) done - wait + wait || true # Emit per-integration output grouped and in submission order (glob sorts numerically). cat "$OUT_DIR"/* 2>/dev/null diff --git a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load index a884f2e2f..9b748ce59 100755 --- a/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load +++ b/salt/elasticsearch/tools/sbin_jinja/so-elasticsearch-ilm-policy-load @@ -6,11 +6,12 @@ . /usr/sbin/so-common -MAX_JOBS=10 +MAX_JOBS=${MAX_ILM_JOBS:-10} # Lock used to serialize block writes so concurrent jobs never interleave their output. ILM_OUTPUT_LOCK=$(mktemp) -trap 'rm -f "$ILM_OUTPUT_LOCK"' EXIT +ILM_FAIL_FILE=$(mktemp) +trap 'rm -f "$ILM_OUTPUT_LOCK" "$ILM_FAIL_FILE"' EXIT # Policies are loaded concurrently (up to MAX_JOBS at a time) for speed. Each policy's block is # printed the moment its curl returns, so output appears in COMPLETION ORDER, not the order @@ -19,21 +20,31 @@ echo "Loading ILM policies concurrently; output below appears in completion orde echo put_policy() { - local desc="$1" policyname="$2" data="$3" result - result=$(curl -K /opt/so/conf/elasticsearch/curl.config -s -k -L \ + local desc="$1" policyname="$2" data="$3" result rc=0 + if ! result=$(curl -K /opt/so/conf/elasticsearch/curl.config -s -k -L --fail \ -X PUT "https://localhost:9200/_ilm/policy/${policyname}" \ - -H 'Content-Type: application/json' -d"${data}") + -H 'Content-Type: application/json' -d"${data}" 2>&1); then + rc=1 + elif ! jq -e '.acknowledged == true' <<<"$result" >/dev/null 2>&1; then + rc=1 + fi + # curl above ran in parallel; serialize just this block write so concurrent jobs never interleave. { flock 200 printf 'Setting up %s policy...\n%s\n\n' "${desc}" "${result}" + if (( rc != 0 )); then + printf '%s\n' "${policyname}" >>"$ILM_FAIL_FILE" + fi } 200>>"${ILM_OUTPUT_LOCK}" + + return "$rc" } # Block until fewer than MAX_JOBS background curls are running. throttle() { while (( $(jobs -rp | wc -l) >= MAX_JOBS )); do - wait -n + wait -n || true done } @@ -67,4 +78,14 @@ throttle() { {%- endfor %} {%- endif %} -wait +wait || true + +if [[ -s "$ILM_FAIL_FILE" ]]; then + echo "ERROR: Failed to load ILM policy(s):" + while read -r POLICY; do + echo " - $POLICY" + done < "$ILM_FAIL_FILE" + exit 1 +else + echo "Successfully loaded all ILM policies." +fi