diff --git a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common index 91fa787f2..d4f40b3ed 100644 --- a/salt/elasticfleet/tools/sbin/so-elastic-fleet-common +++ b/salt/elasticfleet/tools/sbin/so-elastic-fleet-common @@ -30,6 +30,70 @@ fleet_api() { curl -sK /opt/so/conf/elasticsearch/curl.config -L "localhost:5601/api/fleet/${QUERYPATH}" "$@" --retry 3 --retry-delay 10 --fail 2>/dev/null } +# Max number of concurrent Fleet write jobs (create/update). Override via env if needed. +MAX_FLEET_JOBS=${MAX_FLEET_JOBS:-10} + +# Block until fewer than MAX_FLEET_JOBS background jobs are running. +elastic_fleet_throttle() { + while (( $(jobs -rp | wc -l) >= MAX_FLEET_JOBS )); do + wait -n + done +} + +# Load every integration JSON in a directory into a single agent policy. +# The agent policy is fetched ONCE (not per file), and the create/update writes +# are dispatched as throttled background jobs. +# $1 AGENT_POLICY - the agent policy id/name to load integrations into +# $2 DIR - directory of integration *.json files +# $3 LABEL - human-readable label for log output +# $4 SKIP_CREATE_NAME - (optional) integration name to skip when creating (still updated if present) +# Returns 1 if any integration failed to create/update. +elastic_fleet_load_integrations_dir() { + local AGENT_POLICY=$1 + local DIR=$2 + local LABEL=$3 + local SKIP_CREATE_NAME=$4 + local POLICY_JSON FAIL_FILE INTEGRATION NAME ID + + FAIL_FILE=$(mktemp) + + # Fetch the agent policy a single time; we look up integration ids locally below. + POLICY_JSON=$(fleet_api "agent_policies/$AGENT_POLICY") + + for INTEGRATION in "$DIR"/*.json; do + [ -e "$INTEGRATION" ] || continue + NAME=$(jq -r .name "$INTEGRATION") + ID=$(jq -r --arg n "$NAME" '.item.package_policies[]? | select(.name==$n) | .id' <<<"$POLICY_JSON") + + elastic_fleet_throttle + { + if [ -n "$ID" ]; then + printf "\n\n%s - Updating integration %s\n" "$LABEL" "$NAME" + if ! elastic_fleet_integration_update "$ID" "@$INTEGRATION"; then + flock 9; echo "update ${INTEGRATION##*/}" >&9 + fi + elif [ -n "$SKIP_CREATE_NAME" ] && [ "$NAME" == "$SKIP_CREATE_NAME" ]; then + printf "\n\n%s - Skipping creation of %s\n" "$LABEL" "$NAME" + else + printf "\n\n%s - Creating integration %s\n" "$LABEL" "$NAME" + if ! elastic_fleet_integration_create "@$INTEGRATION"; then + flock 9; echo "create ${INTEGRATION##*/}" >&9 + fi + fi + } 9>>"$FAIL_FILE" & + done + wait + + local rc=0 + if [ -s "$FAIL_FILE" ]; then + printf "\n%s: failed integrations:\n" "$LABEL" + cat "$FAIL_FILE" + rc=1 + fi + rm -f "$FAIL_FILE" + return $rc +} + elastic_fleet_integration_check() { AGENT_POLICY=$1 diff --git a/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load b/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load index e548c7f86..73d569dd9 100644 --- a/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load +++ b/salt/elasticfleet/tools/sbin/so-elastic-fleet-integration-policy-load @@ -18,93 +18,26 @@ if [ ! -f /opt/so/state/eaintegrations.txt ]; then # Third, configure Elastic Defend Integration seperately /usr/sbin/so-elastic-fleet-integration-policy-elastic-defend + # Each group fetches its agent policy once and dispatches create/update writes concurrently. + # Initial Endpoints - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations/endpoints-initial/*.json; do - printf "\n\nInitial Endpoints Policy - Loading $INTEGRATION\n" - elastic_fleet_integration_check "endpoints-initial" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - done + elastic_fleet_load_integrations_dir "endpoints-initial" \ + /opt/so/conf/elastic-fleet/integrations/endpoints-initial "Initial Endpoints Policy" || RETURN_CODE=1 # Grid Nodes - General - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations/grid-nodes_general/*.json; do - printf "\n\nGrid Nodes Policy_General - Loading $INTEGRATION\n" - elastic_fleet_integration_check "so-grid-nodes_general" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - done + elastic_fleet_load_integrations_dir "so-grid-nodes_general" \ + /opt/so/conf/elastic-fleet/integrations/grid-nodes_general "Grid Nodes Policy_General" || RETURN_CODE=1 # Grid Nodes - Heavy - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations/grid-nodes_heavy/*.json; do - printf "\n\nGrid Nodes Policy_Heavy - Loading $INTEGRATION\n" - elastic_fleet_integration_check "so-grid-nodes_heavy" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - done + elastic_fleet_load_integrations_dir "so-grid-nodes_heavy" \ + /opt/so/conf/elastic-fleet/integrations/grid-nodes_heavy "Grid Nodes Policy_Heavy" || RETURN_CODE=1 - # Fleet Server - Optional integrations - for INTEGRATION in /opt/so/conf/elastic-fleet/integrations-optional/FleetServer*/*.json; do - if ! [ "$INTEGRATION" == "/opt/so/conf/elastic-fleet/integrations-optional/FleetServer*/*.json" ]; then - FLEET_POLICY=`echo "$INTEGRATION"| cut -d'/' -f7` - printf "\n\nFleet Server Policy - Loading $INTEGRATION\n" - elastic_fleet_integration_check "$FLEET_POLICY" "$INTEGRATION" - if [ -n "$INTEGRATION_ID" ]; then - printf "\n\nIntegration $NAME exists - Updating integration\n" - if ! elastic_fleet_integration_update "$INTEGRATION_ID" "@$INTEGRATION"; then - echo -e "\nFailed to update integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - else - printf "\n\nIntegration does not exist - Creating integration\n" - if [ "$NAME" != "elasticsearch-logs" ]; then - if ! elastic_fleet_integration_create "@$INTEGRATION"; then - echo -e "\nFailed to create integration for ${INTEGRATION##*/}" - RETURN_CODE=1 - continue - fi - fi - fi - fi + # Fleet Server - Optional integrations (one agent policy per FleetServer_* directory) + for FLEET_DIR in /opt/so/conf/elastic-fleet/integrations-optional/FleetServer*/; do + [ -d "$FLEET_DIR" ] || continue + FLEET_POLICY=$(basename "$FLEET_DIR") + elastic_fleet_load_integrations_dir "$FLEET_POLICY" \ + "${FLEET_DIR%/}" "Fleet Server Policy" "elasticsearch-logs" || RETURN_CODE=1 done # Only create the state file if all policies were created/updated successfully