From 492ae80da7cf52c19751cb607824a6dc0ff90c86 Mon Sep 17 00:00:00 2001 From: reyesj2 <94730068+reyesj2@users.noreply.github.com> Date: Mon, 11 May 2026 16:51:38 -0500 Subject: [PATCH] add ingest latency metrics --- salt/elasticsearch/files/ingest/global@custom | 77 ++++++++++++++++++- salt/logstash/defaults.yaml | 5 +- .../so/0012_input_elastic_agent.conf.jinja | 14 +++- .../so/0013_input_lumberjack_fleet.conf | 23 ------ .../so/0013_input_lumberjack_fleet.conf.jinja | 26 +++++++ .../config/so/0800_input_kafka.conf.jinja | 6 ++ .../config/so/0900_input_redis.conf.jinja | 9 ++- .../so/9805_output_elastic_agent.conf.jinja | 8 ++ .../9806_output_lumberjack_fleet.conf.jinja | 19 +++-- .../config/so/9999_output_redis.conf.jinja | 9 ++- salt/logstash/soc_logstash.yaml | 5 ++ 11 files changed, 162 insertions(+), 39 deletions(-) delete mode 100644 salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf create mode 100644 salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf.jinja diff --git a/salt/elasticsearch/files/ingest/global@custom b/salt/elasticsearch/files/ingest/global@custom index bafb783a4..979c5c1b8 100644 --- a/salt/elasticsearch/files/ingest/global@custom +++ b/salt/elasticsearch/files/ingest/global@custom @@ -177,12 +177,84 @@ "description": "Extract IPs from Elastic Agent events (host.ip) and adds them to related.ip" } }, + { + "script": { + "description": "Snapshot event.ingested into _tmp.event_ingested_pre_fleet before .fleet_final_pipeline-1 overwrites it with ES ingest time", + "lang": "painless", + "if": "ctx.event?.ingested != null && ctx.event?.created == null", + "ignore_failure": true, + "source": "ctx.putIfAbsent('_tmp', [:]); ctx._tmp.event_ingested_pre_fleet = ctx.event.ingested;" + } + }, { "pipeline": { "name": ".fleet_final_pipeline-1", "ignore_missing_pipeline": true } }, + { + "script": { + "description": "Calculate time from Elastic Agent to Logstash.", + "lang": "painless", + "if": "ctx._tmp?.logstash_from_agent != null", + "ignore_failure": true, + "source": "ZonedDateTime start = ctx._tmp.event_ingested_pre_fleet != null ? ZonedDateTime.parse(ctx._tmp.event_ingested_pre_fleet) : ZonedDateTime.parse(ctx['@timestamp']); ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_elasticagent_to_logstash = ChronoUnit.SECONDS.between(start, ZonedDateTime.parse(ctx._tmp.logstash_from_agent));" + } + }, + { + "script": { + "description": "Calculate time from Logstash to Redis", + "lang": "painless", + "if": "ctx._tmp?.logstash_from_agent != null && ctx._tmp?.logstash_to_redis != null", + "ignore_failure": true, + "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_logstash_to_redis = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_agent), ZonedDateTime.parse(ctx._tmp.logstash_to_redis));" + } + }, + { + "script": { + "description": "Calculate time message spends in redis queue (logstash delay in pulling event).", + "lang": "painless", + "if": "ctx._tmp?.logstash_to_redis != null && ctx._tmp?.logstash_from_redis != null", + "ignore_failure": true, + "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_redis_to_logstash = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_to_redis), ZonedDateTime.parse(ctx._tmp.logstash_from_redis));" + } + }, + { + "script": { + "description": "Calculate time from Logstash to Elasticsearch (after read from Redis).", + "lang": "painless", + "if": "ctx._tmp?.logstash_from_redis != null", + "ignore_failure": true, + "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_logstash_to_elasticsearch = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_redis), metadata().now);" + } + }, + { + "script": { + "description": "Calculate time from Elastic Agent to Kafka.", + "lang": "painless", + "if": "ctx._tmp?.logstash_from_kafka != null && ctx._tmp?.logstash_from_agent == null", + "ignore_failure": true, + "source": "ZonedDateTime start = ctx._tmp.event_ingested_pre_fleet != null ? ZonedDateTime.parse(ctx._tmp.event_ingested_pre_fleet) : ZonedDateTime.parse(ctx['@timestamp']); ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_elasticagent_to_kafka = ChronoUnit.SECONDS.between(start, ZonedDateTime.parse(ctx._tmp.logstash_from_kafka));" + } + }, + { + "script": { + "description": "Calculate time message spends in Kafka queue (logstash delay in pulling event).", + "lang": "painless", + "if": "ctx._tmp?.logstash_from_kafka != null && ctx.metadata?.kafka?.timestamp != null && ctx._tmp?.logstash_from_agent == null", + "ignore_failure": true, + "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_kafka_queue = ChronoUnit.SECONDS.between(ZonedDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(ctx.metadata.kafka.timestamp.toString())), ZoneId.of('UTC')), ZonedDateTime.parse(ctx._tmp.logstash_from_kafka));" + } + }, + { + "script": { + "description": "Calculate time from Logstash to Elasticsearch (after read from Kafka).", + "lang": "painless", + "if": "ctx._tmp?.logstash_from_kafka != null && ctx._tmp?.logstash_from_agent == null", + "ignore_failure": true, + "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_kafka_to_elasticsearch = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_kafka), metadata().now);" + } + }, { "remove": { "field": "event.agent_id_status", @@ -202,11 +274,12 @@ "event.dataset_temp", "dataset_tag_temp", "module_temp", - "datastream_dataset_temp" + "datastream_dataset_temp", + "_tmp" ], "ignore_missing": true, "ignore_failure": true } } ] -} \ No newline at end of file +} diff --git a/salt/logstash/defaults.yaml b/salt/logstash/defaults.yaml index 520182555..db5e4ee58 100644 --- a/salt/logstash/defaults.yaml +++ b/salt/logstash/defaults.yaml @@ -26,12 +26,12 @@ logstash: manager: - so/0011_input_endgame.conf - so/0012_input_elastic_agent.conf.jinja - - so/0013_input_lumberjack_fleet.conf + - so/0013_input_lumberjack_fleet.conf.jinja - so/9999_output_redis.conf.jinja receiver: - so/0011_input_endgame.conf - so/0012_input_elastic_agent.conf.jinja - - so/0013_input_lumberjack_fleet.conf + - so/0013_input_lumberjack_fleet.conf.jinja - so/9999_output_redis.conf.jinja search: - so/0900_input_redis.conf.jinja @@ -69,4 +69,5 @@ logstash: pipeline_x_batch_x_size: 125 pipeline_x_ecs_compatibility: disabled dmz_nodes: [] + latency_metrics: False diff --git a/salt/logstash/pipelines/config/so/0012_input_elastic_agent.conf.jinja b/salt/logstash/pipelines/config/so/0012_input_elastic_agent.conf.jinja index a4d699aff..32dcac224 100644 --- a/salt/logstash/pipelines/config/so/0012_input_elastic_agent.conf.jinja +++ b/salt/logstash/pipelines/config/so/0012_input_elastic_agent.conf.jinja @@ -1,3 +1,4 @@ +{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %} input { elastic_agent { port => 5055 @@ -11,10 +12,15 @@ input { } } filter { -if ![metadata] { - mutate { - rename => {"@metadata" => "metadata"} + {% if LOGSTASH_MERGED.get('latency_metrics', False) %} + ruby { + code => "event.set('[_tmp][logstash_from_agent]', Time.now().utc.iso8601(3));" + } + {% endif %} + if ![metadata] { + mutate { + rename => {"@metadata" => "metadata"} + } } } -} diff --git a/salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf b/salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf deleted file mode 100644 index b31ffee8d..000000000 --- a/salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf +++ /dev/null @@ -1,23 +0,0 @@ -input { - elastic_agent { - port => 5056 - tags => [ "elastic-agent", "fleet-lumberjack-input" ] - ssl_enabled => true - ssl_certificate => "/usr/share/logstash/elasticfleet-lumberjack.crt" - ssl_key => "/usr/share/logstash/elasticfleet-lumberjack.key" - ecs_compatibility => v8 - id => "fleet-lumberjack-in" - codec => "json" - } -} - - -filter { -if ![metadata] { - mutate { - rename => {"@metadata" => "metadata"} - } -} -} - - diff --git a/salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf.jinja b/salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf.jinja new file mode 100644 index 000000000..a04df5fd1 --- /dev/null +++ b/salt/logstash/pipelines/config/so/0013_input_lumberjack_fleet.conf.jinja @@ -0,0 +1,26 @@ +{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %} +input { + elastic_agent { + port => 5056 + tags => [ "elastic-agent", "fleet-lumberjack-input" ] + ssl_enabled => true + ssl_certificate => "/usr/share/logstash/elasticfleet-lumberjack.crt" + ssl_key => "/usr/share/logstash/elasticfleet-lumberjack.key" + ecs_compatibility => v8 + id => "fleet-lumberjack-in" + codec => "json" + } +} + +filter { + {% if LOGSTASH_MERGED.get('latency_metrics', False) %} + ruby { + code => "event.set('[_tmp][logstash_from_fleet]', Time.now().utc.iso8601(3));" + } + {% endif %} + if ![metadata] { + mutate { + rename => {"@metadata" => "metadata"} + } + } +} diff --git a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja index 7478375b0..769f71ea9 100644 --- a/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja +++ b/salt/logstash/pipelines/config/so/0800_input_kafka.conf.jinja @@ -1,3 +1,4 @@ +{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %} {%- set kafka_password = salt['pillar.get']('kafka:config:password') %} {%- set kafka_trustpass = salt['pillar.get']('kafka:config:trustpass') %} {%- set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %} @@ -30,6 +31,11 @@ input { } } filter { + {% if LOGSTASH_MERGED.get('latency_metrics', False) %} + ruby { + code => "event.set('[_tmp][logstash_from_kafka]', Time.now().utc.iso8601(3));" + } + {% endif %} if ![metadata] { mutate { rename => { "@metadata" => "metadata" } diff --git a/salt/logstash/pipelines/config/so/0900_input_redis.conf.jinja b/salt/logstash/pipelines/config/so/0900_input_redis.conf.jinja index ad9fae5f2..4bf388f4f 100644 --- a/salt/logstash/pipelines/config/so/0900_input_redis.conf.jinja +++ b/salt/logstash/pipelines/config/so/0900_input_redis.conf.jinja @@ -1,4 +1,4 @@ -{%- from 'logstash/map.jinja' import LOGSTASH_REDIS_NODES with context %} +{%- from 'logstash/map.jinja' import LOGSTASH_REDIS_NODES, LOGSTASH_MERGED %} {%- set REDIS_PASS = salt['pillar.get']('redis:config:requirepass') %} {%- for index in range(LOGSTASH_REDIS_NODES|length) %} @@ -18,3 +18,10 @@ input { } {% endfor %} {% endfor -%} +filter { + {% if LOGSTASH_MERGED.get('latency_metrics', False) %} + ruby { + code => "event.set('[_tmp][logstash_from_redis]', Time.now().utc.iso8601(3));" + } + {% endif %} +} diff --git a/salt/logstash/pipelines/config/so/9805_output_elastic_agent.conf.jinja b/salt/logstash/pipelines/config/so/9805_output_elastic_agent.conf.jinja index 4fe138dd8..f973070a5 100644 --- a/salt/logstash/pipelines/config/so/9805_output_elastic_agent.conf.jinja +++ b/salt/logstash/pipelines/config/so/9805_output_elastic_agent.conf.jinja @@ -1,3 +1,11 @@ +{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %} +{% if LOGSTASH_MERGED.get('latency_metrics', False) %} +filter { + ruby { + code => "event.set('[_tmp][logstash_to_elasticsearch]', Time.now().utc.iso8601(3));" + } +} +{% endif %} output { if "elastic-agent" in [tags] and "so-ip-mappings" in [tags] { elasticsearch { diff --git a/salt/logstash/pipelines/config/so/9806_output_lumberjack_fleet.conf.jinja b/salt/logstash/pipelines/config/so/9806_output_lumberjack_fleet.conf.jinja index 50328e833..602c5fece 100644 --- a/salt/logstash/pipelines/config/so/9806_output_lumberjack_fleet.conf.jinja +++ b/salt/logstash/pipelines/config/so/9806_output_lumberjack_fleet.conf.jinja @@ -13,13 +13,20 @@ filter { add_tag => "fleet-lumberjack-{{ GLOBALS.hostname }}" } } - -output { - lumberjack { - codec => json +{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %} +{% if LOGSTASH_MERGED.get('latency_metrics', False) %} +filter { + ruby { + code => "event.set('[_tmp][fleet_to_logstash]', Time.now().utc.iso8601(3));" + } +} +{% endif %} +output { + lumberjack { + codec => json hosts => {{ FAILOVER_LOGSTASH_NODES }} ssl_certificate => "/usr/share/filebeat/ca.crt" - port => 5056 + port => 5056 id => "fleet-lumberjack-{{ GLOBALS.hostname }}" - } + } } \ No newline at end of file diff --git a/salt/logstash/pipelines/config/so/9999_output_redis.conf.jinja b/salt/logstash/pipelines/config/so/9999_output_redis.conf.jinja index 0d3b3324b..af13915f7 100644 --- a/salt/logstash/pipelines/config/so/9999_output_redis.conf.jinja +++ b/salt/logstash/pipelines/config/so/9999_output_redis.conf.jinja @@ -1,10 +1,17 @@ +{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %} {%- if grains.role in ['so-heavynode', 'so-receiver'] %} {%- set HOST = GLOBALS.hostname %} {%- else %} {%- set HOST = GLOBALS.manager %} {%- endif %} {%- set REDIS_PASS = salt['pillar.get']('redis:config:requirepass') %} - +{% if LOGSTASH_MERGED.get('latency_metrics', False) %} +filter { + ruby { + code => "event.set('[_tmp][logstash_to_redis]', Time.now().utc.iso8601(3));" + } +} +{% endif %} output { redis { host => '{{ HOST }}' diff --git a/salt/logstash/soc_logstash.yaml b/salt/logstash/soc_logstash.yaml index 5a5816a9e..40794afe4 100644 --- a/salt/logstash/soc_logstash.yaml +++ b/salt/logstash/soc_logstash.yaml @@ -86,3 +86,8 @@ logstash: multiline: True advanced: True forcedType: "[]string" + latency_metrics: + description: Enable latency metrics within events processed by logstash. Useful for pinpointing log ingest delay. + forcedType: bool + global: False + advanced: True