{ "version": 3, "_meta": { "managed_by": "securityonion", "managed": true }, "description": "Custom pipeline for processing all incoming Fleet Agent documents. \n", "processors": [ { "set": { "ignore_failure": true, "field": "event.module", "value": "elastic_agent" } }, { "split": { "if": "ctx.event?.dataset != null && ctx.event.dataset.contains('.')", "field": "event.dataset", "separator": "\\.", "target_field": "module_temp" } }, { "split": { "if": "ctx.data_stream?.dataset != null && ctx.data_stream?.dataset.contains('.')", "field": "data_stream.dataset", "separator": "\\.", "target_field": "datastream_dataset_temp", "ignore_missing": true } }, { "set": { "if": "ctx.module_temp != null", "override": true, "field": "event.module", "value": "{{module_temp.0}}" } }, { "set": { "if": "ctx.datastream_dataset_temp != null && ctx.datastream_dataset_temp[0] == 'network_traffic'", "field": "event.module", "value": "{{ datastream_dataset_temp.0 }}", "ignore_failure": true, "ignore_empty_value": true, "description": "Fix EA network packet capture" } }, { "gsub": { "if": "ctx.event?.dataset != null && ctx.event.dataset.contains('.')", "field": "event.dataset", "pattern": "^[^.]*.", "replacement": "", "target_field": "dataset_tag_temp" } }, { "append": { "if": "ctx.dataset_tag_temp != null", "field": "tags", "value": "{{dataset_tag_temp}}", "allow_duplicates": false } }, { "set": { "if": "ctx.network?.direction == 'egress'", "override": true, "field": "network.initiated", "value": "true" } }, { "set": { "if": "ctx.network?.direction == 'ingress'", "override": true, "field": "network.initiated", "value": "false" } }, { "set": { "if": "ctx.network?.type == 'ipv4'", "override": true, "field": "destination.ipv6", "value": "false" } }, { "set": { "if": "ctx.network?.type == 'ipv6'", "override": true, "field": "destination.ipv6", "value": "true" } }, { "set": { "if": "ctx.tags != null && ctx.tags.contains('import')", "override": true, "field": "data_stream.dataset", "value": "import" } }, { "set": { "if": "ctx.tags != null && ctx.tags.contains('import')", "override": true, "field": "data_stream.namespace", "value": "so" } }, { "community_id": { "if": "ctx.event?.dataset == 'endpoint.events.network'", "ignore_failure": true } }, { "set": { "if": "ctx.event?.module == 'fim'", "override": true, "field": "event.module", "value": "file_integrity" } }, { "rename": { "if": "ctx.winlog?.provider_name == 'Microsoft-Windows-Windows Defender'", "ignore_missing": true, "field": "winlog.event_data.Threat Name", "target_field": "winlog.event_data.threat_name" } }, { "set": { "if": "ctx?.metadata?.kafka != null", "field": "kafka.id", "value": "{{metadata.kafka.partition}}{{metadata.kafka.offset}}{{metadata.kafka.timestamp}}", "ignore_failure": true } }, { "set": { "if": "ctx.event?.dataset != null && ctx.event?.dataset == 'elasticsearch.server'", "field": "event.module", "value": "elasticsearch" } }, { "append": { "field": "related.ip", "value": [ "{{source.ip}}", "{{destination.ip}}" ], "allow_duplicates": false, "if": "ctx?.event?.dataset == 'endpoint.events.network' && ctx?.source?.ip != null", "ignore_failure": true } }, { "foreach": { "field": "host.ip", "processor": { "append": { "field": "related.ip", "value": "{{_ingest._value}}", "allow_duplicates": false } }, "if": "ctx?.event?.module == 'endpoint' && ctx?.host?.ip != null", "ignore_missing": true, "description": "Extract IPs from Elastic Agent events (host.ip) and adds them to related.ip" } }, { "script": { "description": "Snapshot event.ingested into _tmp.event_ingested_pre_fleet before .fleet_final_pipeline-1 overwrites it with ES ingest time", "lang": "painless", "if": "ctx.event?.ingested != null && ctx.event?.created == null", "ignore_failure": true, "source": "ctx.putIfAbsent('_tmp', [:]); ctx._tmp.event_ingested_pre_fleet = ctx.event.ingested;" } }, { "pipeline": { "name": ".fleet_final_pipeline-1", "ignore_missing_pipeline": true } }, { "script": { "description": "Calculate time from Elastic Agent to Logstash.", "lang": "painless", "if": "ctx._tmp?.logstash_from_agent != null", "ignore_failure": true, "source": "ZonedDateTime start = ctx._tmp.event_ingested_pre_fleet != null ? ZonedDateTime.parse(ctx._tmp.event_ingested_pre_fleet) : ZonedDateTime.parse(ctx['@timestamp']); ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_elasticagent_to_logstash = ChronoUnit.SECONDS.between(start, ZonedDateTime.parse(ctx._tmp.logstash_from_agent));" } }, { "script": { "description": "Calculate time from Logstash to Redis", "lang": "painless", "if": "ctx._tmp?.logstash_from_agent != null && ctx._tmp?.logstash_to_redis != null", "ignore_failure": true, "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_logstash_to_redis = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_agent), ZonedDateTime.parse(ctx._tmp.logstash_to_redis));" } }, { "script": { "description": "Calculate time message spends in redis queue (logstash delay in pulling event).", "lang": "painless", "if": "ctx._tmp?.logstash_to_redis != null && ctx._tmp?.logstash_from_redis != null", "ignore_failure": true, "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_redis_to_logstash = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_to_redis), ZonedDateTime.parse(ctx._tmp.logstash_from_redis));" } }, { "script": { "description": "Calculate time from Logstash to Elasticsearch (after read from Redis).", "lang": "painless", "if": "ctx._tmp?.logstash_from_redis != null", "ignore_failure": true, "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_logstash_to_elasticsearch = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_redis), metadata().now);" } }, { "script": { "description": "Calculate time from Elastic Agent to Kafka.", "lang": "painless", "if": "ctx._tmp?.logstash_from_kafka != null && ctx._tmp?.logstash_from_agent == null", "ignore_failure": true, "source": "ZonedDateTime start = ctx._tmp.event_ingested_pre_fleet != null ? ZonedDateTime.parse(ctx._tmp.event_ingested_pre_fleet) : ZonedDateTime.parse(ctx['@timestamp']); ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_elasticagent_to_kafka = ChronoUnit.SECONDS.between(start, ZonedDateTime.parse(ctx._tmp.logstash_from_kafka));" } }, { "script": { "description": "Calculate time message spends in Kafka queue (logstash delay in pulling event).", "lang": "painless", "if": "ctx._tmp?.logstash_from_kafka != null && ctx.metadata?.kafka?.timestamp != null && ctx._tmp?.logstash_from_agent == null", "ignore_failure": true, "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_kafka_queue = ChronoUnit.SECONDS.between(ZonedDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(ctx.metadata.kafka.timestamp.toString())), ZoneId.of('UTC')), ZonedDateTime.parse(ctx._tmp.logstash_from_kafka));" } }, { "script": { "description": "Calculate time from Logstash to Elasticsearch (after read from Kafka).", "lang": "painless", "if": "ctx._tmp?.logstash_from_kafka != null && ctx._tmp?.logstash_from_agent == null", "ignore_failure": true, "source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_kafka_to_elasticsearch = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_kafka), metadata().now);" } }, { "remove": { "field": "event.agent_id_status", "ignore_missing": true, "if": "ctx?.event?.agent_id_status == 'auth_metadata_missing'" } }, { "remove": { "field": [ "message2", "type", "fields", "category", "module", "dataset", "event.dataset_temp", "dataset_tag_temp", "module_temp", "datastream_dataset_temp", "_tmp" ], "ignore_missing": true, "ignore_failure": true } } ] }