Compare commits

..

69 Commits

Author SHA1 Message Date
Mike Reeves c1d187599b Merge pull request #15912 from Security-Onion-Solutions/3/dev
3.1.0
2026-05-21 15:41:50 -04:00
Mike Reeves d87313db27 Merge pull request #15911 from Security-Onion-Solutions/3.1.0
3.1.0
2026-05-21 13:50:23 -04:00
Mike Reeves 141a61f5b5 3.1.0 2026-05-21 13:47:03 -04:00
Jorge Reyes 901cbf03e4 Merge pull request #15907 from Security-Onion-Solutions/reyesj2/es-verify-compat
Verify compatibility for all ES nodes in the cluster
2026-05-20 14:16:41 -05:00
reyesj2 b485be4602 separate salt-key command from main es version compatiblity loop 2026-05-20 14:12:58 -05:00
reyesj2 7d13007aa9 block soup if all ES nodes are not online and reporting their ES version for compatibility check 2026-05-20 10:03:37 -05:00
reyesj2 d7a1b67095 use pipefail on heavynode versino command to pass through error 2026-05-20 09:16:57 -05:00
reyesj2 6c8997b28a verify all heavynodes and all searchnodes are at compatible ES version before attempting an elasticsearch upgrade 2026-05-19 22:27:31 -05:00
Jorge Reyes 58f1d08ebe Merge pull request #15902 from Security-Onion-Solutions/reyesj2/ea-fleet-sync
sync elastic agent packages to fleet nodes
2026-05-19 11:08:48 -05:00
reyesj2 d0aa33a255 sync elastic agent packages to fleet nodes 2026-05-19 10:50:17 -05:00
Jorge Reyes 74b50f6009 Merge pull request #15899 from Security-Onion-Solutions/revert-15895-reyesj2/agentinstall
Revert "use -verify flag during grid agent install to ensure agent health"
2026-05-16 10:01:58 -05:00
Jorge Reyes e89c820b65 Revert "use -verify flag during grid agent install to ensure agent health" 2026-05-16 09:59:14 -05:00
Jorge Reyes 9ac05a6ad1 Merge pull request #15895 from Security-Onion-Solutions/reyesj2/agentinstall
use -verify flag during grid agent install to ensure agent health
2026-05-15 12:58:09 -05:00
Jason Ertel 24ee3318bc Merge pull request #15898 from Security-Onion-Solutions/jertel/logcheck
exclude fps
2026-05-15 11:38:20 -04:00
Jason Ertel ce566ba174 exclude fps 2026-05-15 11:36:46 -04:00
Mike Reeves 2635a60a8c Merge pull request #15896 from Security-Onion-Solutions/quickfixes2
Make so-postgres-backup fail-safe against silent corruption
2026-05-15 09:32:15 -04:00
Mike Reeves 244a73b7a2 Make so-postgres-backup fail-safe against silent corruption
The dump pipeline returned gzip's exit status, so a pg_dumpall that
died mid-stream still produced a valid .gz holding a truncated dump,
written straight to the final filename. The idempotency check then
blocked retries for the day and the corrupt file counted toward
retention, evicting a good backup each day until none remained.

- set -o pipefail so a failed pg_dumpall fails the pipeline
- dump to a .tmp file and atomically rename only after success, so
  the final filename appears only for a complete backup
- gzip -t integrity check before publishing
- trap-based cleanup of the temp file; sweep stale temps at startup
- run retention only after a successful backup, with a glob
  restricted to finished backups
- log timestamped OK/ERROR outcomes to /opt/so/log/postgres/backup.log
2026-05-15 08:48:54 -04:00
Mike Reeves 1189621ec5 Merge pull request #15893 from Security-Onion-Solutions/quickfixes2 2026-05-14 18:21:30 -04:00
reyesj2 d2524a593f use -verify flag during grid agent install to ensure agent health 2026-05-14 17:12:02 -05:00
Josh Brower f2ab2354fd Merge pull request #15894 from Security-Onion-Solutions/3/nginx-fix
Tweak for nginx upgrade
2026-05-14 23:20:57 +02:00
Mike Reeves 64731c73ba Fix psql :var substitution in telegraf role and retention SQL
psql does not substitute :var references inside dollar-quoted strings,
so the DO blocks in the user and retention subcommands were receiving
literal colons and failing (silently for user, via hide_output: True).
Rewrite the conditional CREATE/ALTER ROLE with SELECT format(...) \\gexec
and guard the retention UPDATE with \\gset + \\if.
2026-05-14 17:17:49 -04:00
Josh Brower 024fece607 Tweak for nginx upgrade 2026-05-14 17:08:57 -04:00
Mike Reeves 249b126312 Quote telegraf role env vars to survive YAML-special chars in passwords 2026-05-14 17:08:51 -04:00
Mike Reeves 8e38bff0c3 Rename telegraf_postgres.sh to so-telegraf-postgres 2026-05-14 16:55:53 -04:00
Mike Reeves b9f2d56932 Consolidate telegraf postgres SQL into multi-mode script
Replace inline psql heredocs in telegraf_users.sls with subcommand
dispatcher telegraf_postgres.sh: create_db, group_role, user, retention.
2026-05-14 16:37:08 -04:00
Mike Reeves 03fa01a705 Move telegraf_role.sh to postgres tools/sbin 2026-05-14 16:18:01 -04:00
Mike Reeves 450eacca41 Move telegraf role provisioning to external script with env vars 2026-05-14 16:15:54 -04:00
Mike Reeves b7a13899f7 Suppress output logging for postgres telegraf role provisioning 2026-05-14 15:56:04 -04:00
Mike Reeves 6f273d7d97 Rename init-users.sh to init-db.sh and update all references 2026-05-14 15:53:00 -04:00
Josh Brower b328820c01 Merge pull request #15792 from Security-Onion-Solutions/3/strelkalnk
Fix module name
2026-05-14 13:06:26 +02:00
Jorge Reyes 638aca97c8 Merge pull request #15877 from Security-Onion-Solutions/reyesj2-patch-1
update redis index template
2026-05-13 13:44:04 -05:00
Jorge Reyes 74a5c895e8 Merge pull request #15889 from Security-Onion-Solutions/reyesj2/zeek-ja4d
add zeek.ja4d ingest pipeline
2026-05-13 13:43:56 -05:00
reyesj2 d56bf01823 add zeek.ja4d ingest pipeline 2026-05-13 12:32:54 -05:00
Mike Reeves d29267d9c2 Merge pull request #15888 from Security-Onion-Solutions/TOoSmOotH-patch-1
Change Telegraf output from BOTH to INFLUXDB
2026-05-13 12:47:55 -04:00
Mike Reeves 72327285b2 Change Telegraf output from BOTH to INFLUXDB 2026-05-13 11:58:21 -04:00
Josh Patterson cc7a237457 Merge pull request #15887 from Security-Onion-Solutions/m0duspwnens-patch-1
remove stig from hypervisor and managerhype
2026-05-13 10:57:58 -04:00
Josh Patterson b068ad2b35 remove stig from hypervisor and managerhype 2026-05-13 10:53:11 -04:00
Jorge Reyes b103f412b5 Merge pull request #15884 from Security-Onion-Solutions/reyesj2/strelkalnk
rename strelka ScanLNK - ScanLnk
2026-05-13 09:46:52 -05:00
reyesj2 ef79c63858 Merge branch '3/dev' of github.com:Security-Onion-Solutions/securityonion into reyesj2/strelkalnk 2026-05-12 15:20:09 -05:00
reyesj2 01fb1aa156 check pillars for ScanLNK and rename to ScanLnk 2026-05-12 15:19:44 -05:00
Doug Burks f19bdd7aae Merge pull request #15883 from Security-Onion-Solutions/reyesj2/transformhealth
use temp files to prevent jq arg too long
2026-05-12 15:36:12 -04:00
reyesj2 f637dc62d1 use temp files to prevent jq arg too long 2026-05-12 13:29:32 -05:00
Jorge Reyes 081f6fa1fb Merge pull request #15878 from Security-Onion-Solutions/reyesj2/es-ingest-lag
add ingest latency metrics
2026-05-12 10:21:04 -05:00
Josh Brower d6d90d84cd Merge pull request #15880 from Security-Onion-Solutions/feature/import-overrides
Initial commit
2026-05-12 17:00:44 +02:00
Josh Brower 125610ed42 Additional test coverage 2026-05-12 10:11:22 -04:00
Josh Brower 306b0af4d0 Initial commit 2026-05-12 09:55:06 -04:00
reyesj2 492ae80da7 add ingest latency metrics 2026-05-11 16:51:38 -05:00
Jorge Reyes 4a2177c827 update redis index template
missing redis integration component templates
2026-05-11 16:15:56 -05:00
Josh Brower 006ac31109 Merge pull request #15579 from marcopedrinazzi/3/dev
New Sigma rules pipeline mapping for M365 and Fortigate
2026-05-11 21:03:53 +02:00
Josh Brower 49a643fff4 Merge pull request #15875 from Security-Onion-Solutions/3/sigma-fp-os
proc_creation per OS type
2026-05-08 15:13:14 +02:00
Josh Brower e1d830da76 proc_creation per OS type 2026-05-08 09:11:24 -04:00
Josh Brower e847c46129 Merge pull request #15872 from Security-Onion-Solutions/3/soc-logs
cleanup status code
2026-05-07 19:01:24 +02:00
Josh Brower 499f7102bd cleanup status code 2026-05-07 11:27:49 -04:00
Josh Patterson 4bc19f91ce Merge pull request #15867 from Security-Onion-Solutions/fixhype
sanitize minion ids for hypervisor reactors / orchestration
2026-05-06 09:46:01 -04:00
Mike Reeves 4990d0ddea Merge pull request #15866 from Security-Onion-Solutions/management-bond1
Management bond1
2026-05-05 17:17:58 -04:00
Mike Reeves 3e49322220 Allow preconfigured management bond in requirements 2026-05-05 15:35:12 -04:00
Mike Reeves ecb92d43fc Limit management bond setup to ISO installs 2026-05-05 15:30:09 -04:00
Mike Reeves 3b714db0bf Show management bond option consistently 2026-05-05 15:22:40 -04:00
Mike Reeves f17da4e68b Add management bond setup option 2026-05-05 15:13:24 -04:00
Jorge Reyes 04cfc22e3f Merge pull request #15864 from Security-Onion-Solutions/reyesj2/patch-2
update grok type conversion to convert processor
2026-05-05 13:58:39 -05:00
reyesj2 dceed421ae update grok type conversion to convert processor 2026-05-05 13:41:00 -05:00
Josh Patterson 652ac5d61f fix regex 2026-05-05 14:26:04 -04:00
Josh Patterson f888a2ba6b Merge remote-tracking branch 'origin/3/dev' into fixhype 2026-05-05 10:28:49 -04:00
Mike Reeves 8a1ee02335 Merge pull request #15846 from Security-Onion-Solutions/feature/ensure-pyyaml
Ensure python3-pyyaml is installed before continuing setup
2026-05-05 10:24:25 -04:00
Josh Patterson 192f6cfe13 Merge remote-tracking branch 'origin/3/dev' into fixhype 2026-05-05 08:18:26 -04:00
Josh Patterson 1c6574c694 ensure minion ids 2026-05-04 14:03:14 -04:00
Josh Brower affede7f0a Rename 'ScanLNK' to 'ScanLnk' in YAML config 2026-04-20 10:01:10 -04:00
Josh Brower 97366c0496 Rename 'ScanLNK' to 'ScanLnk' in defaults.yaml 2026-04-20 10:00:29 -04:00
Marco Pedrinazzi d7e971a0fc m365 and fortigate mappings sigma 2026-03-11 14:49:40 +01:00
64 changed files with 2026 additions and 2861 deletions
+11 -11
View File
@@ -1,17 +1,17 @@
### 3.0.0-20260331 ISO image released on 2026/03/31
### 3.1.0-20260521 ISO image released on 2026/05/21
### Download and Verify
3.0.0-20260331 ISO image:
https://download.securityonion.net/file/securityonion/securityonion-3.0.0-20260331.iso
3.1.0-20260521 ISO image:
https://download.securityonion.net/file/securityonion/securityonion-3.1.0-20260521.iso
MD5: ECD318A1662A6FDE0EF213F5A9BD4B07
SHA1: E55BE314440CCF3392DC0B06BC5E270B43176D9C
SHA256: 7FC47405E335CBE5C2B6C51FE7AC60248F35CBE504907B8B5A33822B23F8F4D5
MD5: A853BC118639ABCE1795D6E313BFFBDE
SHA1: FCA615AD6E31710B33AE5870FEF447861FDB3B8F
SHA256: CE2A5947274D9ED2C5068A1FD46B64C4FEF70445EA9B61A98DD3621781329F2C
Signature for ISO image:
https://github.com/Security-Onion-Solutions/securityonion/raw/3/main/sigs/securityonion-3.0.0-20260331.iso.sig
https://github.com/Security-Onion-Solutions/securityonion/raw/3/main/sigs/securityonion-3.1.0-20260521.iso.sig
Signing key:
https://raw.githubusercontent.com/Security-Onion-Solutions/securityonion/3/main/KEYS
@@ -25,22 +25,22 @@ wget https://raw.githubusercontent.com/Security-Onion-Solutions/securityonion/3/
Download the signature file for the ISO:
```
wget https://github.com/Security-Onion-Solutions/securityonion/raw/3/main/sigs/securityonion-3.0.0-20260331.iso.sig
wget https://github.com/Security-Onion-Solutions/securityonion/raw/3/main/sigs/securityonion-3.1.0-20260521.iso.sig
```
Download the ISO image:
```
wget https://download.securityonion.net/file/securityonion/securityonion-3.0.0-20260331.iso
wget https://download.securityonion.net/file/securityonion/securityonion-3.1.0-20260521.iso
```
Verify the downloaded ISO image using the signature file:
```
gpg --verify securityonion-3.0.0-20260331.iso.sig securityonion-3.0.0-20260331.iso
gpg --verify securityonion-3.1.0-20260521.iso.sig securityonion-3.1.0-20260521.iso
```
The output should show "Good signature" and the Primary key fingerprint should match what's shown below:
```
gpg: Signature made Mon 30 Mar 2026 06:22:14 PM EDT using RSA key ID FE507013
gpg: Signature made Thu 21 May 2026 11:10:01 AM EDT using RSA key ID FE507013
gpg: Good signature from "Security Onion Solutions, LLC <info@securityonionsolutions.com>"
gpg: WARNING: This key is not certified with a trusted signature!
gpg: There is no indication that the signature belongs to the owner.
+2
View File
@@ -165,6 +165,8 @@ if [[ $EXCLUDE_FALSE_POSITIVE_ERRORS == 'Y' ]]; then
EXCLUDED_ERRORS="$EXCLUDED_ERRORS|upgrading component template" # false positive (elasticsearch index or template names contain 'error')
EXCLUDED_ERRORS="$EXCLUDED_ERRORS|upgrading composable template" # false positive (elasticsearch composable template names contain 'error')
EXCLUDED_ERRORS="$EXCLUDED_ERRORS|Error while parsing document for index \[.ds-logs-kratos-so-.*object mapping for \[file\]" # false positive (mapping error occuring BEFORE kratos index has rolled over in 2.4.210)
EXCLUDED_ERRORS="$EXCLUDED_ERRORS|No such container" # false positive (telegraf trying to run stats on an old container)
EXCLUDED_ERRORS="$EXCLUDED_ERRORS|passwords do not match" # false positive (automated hydra test)
fi
if [[ $EXCLUDE_KNOWN_ERRORS == 'Y' ]]; then
+2
View File
@@ -26,7 +26,9 @@ include:
wait_for_elasticsearch_elasticfleet:
cmd.run:
- name: so-elasticsearch-wait
{% endif %}
{% if GLOBALS.role == "so-fleet" %}
# Sync Elastic Agent artifacts to Fleet Node
elasticagent_syncartifacts:
file.recurse:
+4 -1
View File
@@ -3958,10 +3958,13 @@ elasticsearch:
- vulnerability-mappings
- common-settings
- common-dynamic-mappings
- logs-redis.log@package
- logs-redis.log@custom
data_stream:
allow_custom_routing: false
hidden: false
ignore_missing_component_templates: []
ignore_missing_component_templates:
- logs-redis.log@custom
index_patterns:
- logs-redis.log*
priority: 501
+2 -1
View File
@@ -63,7 +63,8 @@
{ "set": { "if": "ctx.event?.dataset != null && !ctx.event.dataset.contains('.')", "field": "event.dataset", "value": "{{event.module}}.{{event.dataset}}" } },
{ "split": { "if": "ctx.event?.dataset != null && ctx.event.dataset.contains('.')", "field": "event.dataset", "separator": "\\.", "target_field": "dataset_tag_temp" } },
{ "append": { "if": "ctx.dataset_tag_temp != null", "field": "tags", "value": "{{dataset_tag_temp.1}}" } },
{ "grok": { "if": "ctx.http?.response?.status_code != null", "field": "http.response.status_code", "patterns": ["%{NUMBER:http.response.status_code:long} %{GREEDYDATA}"]} },
{ "grok": { "if": "ctx.http?.response?.status_code instanceof String", "field": "http.response.status_code", "patterns": ["%{NUMBER:http.response.status_code:long}(?:\\s+%{GREEDYDATA})?"], "ignore_failure": true } },
{ "convert": { "if": "ctx.http?.response?.status_code != null && !(ctx.http.response.status_code instanceof Number)", "field": "http.response.status_code", "type": "long", "ignore_failure": true } },
{ "set": { "if": "ctx?.metadata?.kafka != null" , "field": "kafka.id", "value": "{{metadata.kafka.partition}}{{metadata.kafka.offset}}{{metadata.kafka.timestamp}}", "ignore_failure": true } },
{ "remove": { "field": [ "message2", "type", "fields", "category", "module", "dataset", "dataset_tag_temp", "event.dataset_temp" ], "ignore_missing": true, "ignore_failure": true } },
{ "pipeline": { "name": "global@custom", "ignore_missing_pipeline": true, "description": "[Fleet] Global pipeline for all data streams" } }
+75 -2
View File
@@ -177,12 +177,84 @@
"description": "Extract IPs from Elastic Agent events (host.ip) and adds them to related.ip"
}
},
{
"script": {
"description": "Snapshot event.ingested into _tmp.event_ingested_pre_fleet before .fleet_final_pipeline-1 overwrites it with ES ingest time",
"lang": "painless",
"if": "ctx.event?.ingested != null && ctx.event?.created == null",
"ignore_failure": true,
"source": "ctx.putIfAbsent('_tmp', [:]); ctx._tmp.event_ingested_pre_fleet = ctx.event.ingested;"
}
},
{
"pipeline": {
"name": ".fleet_final_pipeline-1",
"ignore_missing_pipeline": true
}
},
{
"script": {
"description": "Calculate time from Elastic Agent to Logstash.",
"lang": "painless",
"if": "ctx._tmp?.logstash_from_agent != null",
"ignore_failure": true,
"source": "ZonedDateTime start = ctx._tmp.event_ingested_pre_fleet != null ? ZonedDateTime.parse(ctx._tmp.event_ingested_pre_fleet) : ZonedDateTime.parse(ctx['@timestamp']); ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_elasticagent_to_logstash = ChronoUnit.SECONDS.between(start, ZonedDateTime.parse(ctx._tmp.logstash_from_agent));"
}
},
{
"script": {
"description": "Calculate time from Logstash to Redis",
"lang": "painless",
"if": "ctx._tmp?.logstash_from_agent != null && ctx._tmp?.logstash_to_redis != null",
"ignore_failure": true,
"source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_logstash_to_redis = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_agent), ZonedDateTime.parse(ctx._tmp.logstash_to_redis));"
}
},
{
"script": {
"description": "Calculate time message spends in redis queue (logstash delay in pulling event).",
"lang": "painless",
"if": "ctx._tmp?.logstash_to_redis != null && ctx._tmp?.logstash_from_redis != null",
"ignore_failure": true,
"source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_redis_to_logstash = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_to_redis), ZonedDateTime.parse(ctx._tmp.logstash_from_redis));"
}
},
{
"script": {
"description": "Calculate time from Logstash to Elasticsearch (after read from Redis).",
"lang": "painless",
"if": "ctx._tmp?.logstash_from_redis != null",
"ignore_failure": true,
"source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_logstash_to_elasticsearch = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_redis), metadata().now);"
}
},
{
"script": {
"description": "Calculate time from Elastic Agent to Kafka.",
"lang": "painless",
"if": "ctx._tmp?.logstash_from_kafka != null && ctx._tmp?.logstash_from_agent == null",
"ignore_failure": true,
"source": "ZonedDateTime start = ctx._tmp.event_ingested_pre_fleet != null ? ZonedDateTime.parse(ctx._tmp.event_ingested_pre_fleet) : ZonedDateTime.parse(ctx['@timestamp']); ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_elasticagent_to_kafka = ChronoUnit.SECONDS.between(start, ZonedDateTime.parse(ctx._tmp.logstash_from_kafka));"
}
},
{
"script": {
"description": "Calculate time message spends in Kafka queue (logstash delay in pulling event).",
"lang": "painless",
"if": "ctx._tmp?.logstash_from_kafka != null && ctx.metadata?.kafka?.timestamp != null && ctx._tmp?.logstash_from_agent == null",
"ignore_failure": true,
"source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_kafka_queue = ChronoUnit.SECONDS.between(ZonedDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(ctx.metadata.kafka.timestamp.toString())), ZoneId.of('UTC')), ZonedDateTime.parse(ctx._tmp.logstash_from_kafka));"
}
},
{
"script": {
"description": "Calculate time from Logstash to Elasticsearch (after read from Kafka).",
"lang": "painless",
"if": "ctx._tmp?.logstash_from_kafka != null && ctx._tmp?.logstash_from_agent == null",
"ignore_failure": true,
"source": "ctx.event.putIfAbsent('ingestion', [:]); ctx.event.ingestion.latency_kafka_to_elasticsearch = ChronoUnit.SECONDS.between(ZonedDateTime.parse(ctx._tmp.logstash_from_kafka), metadata().now);"
}
},
{
"remove": {
"field": "event.agent_id_status",
@@ -202,11 +274,12 @@
"event.dataset_temp",
"dataset_tag_temp",
"module_temp",
"datastream_dataset_temp"
"datastream_dataset_temp",
"_tmp"
],
"ignore_missing": true,
"ignore_failure": true
}
}
]
}
}
+71
View File
@@ -0,0 +1,71 @@
{
"description": "zeek.ja4d",
"processors": [
{
"set": {
"field": "event.dataset",
"value": "ja4d"
}
},
{
"remove": {
"field": [
"host"
],
"ignore_failure": true
}
},
{
"json": {
"field": "message",
"target_field": "message2",
"ignore_failure": true
}
},
{
"rename": {
"field": "message2.ja4d",
"target_field": "hash.ja4d",
"ignore_missing": true,
"if": "ctx?.message2?.ja4d != null && ctx.message2.ja4d.length() > 0"
}
},
{
"rename": {
"field": "message2.client_mac",
"target_field": "host.mac",
"ignore_missing": true,
"if": "ctx?.message2?.client_mac != null && ctx.message2.client_mac.length() > 0"
}
},
{
"rename": {
"field": "message2.hostname",
"target_field": "host.hostname",
"ignore_missing": true,
"if": "ctx?.message2?.hostname != null && ctx.message2.hostname.length() > 0"
}
},
{
"rename": {
"field": "message2.requested_ip",
"target_field": "dhcp.requested_address",
"ignore_missing": true,
"if": "ctx?.message2?.requested_ip != null && ctx.message2.requested_ip.length() > 0"
}
},
{
"rename": {
"field": "message2.vendor_class_id",
"target_field": "zeek.ja4d.vendor_class_id",
"ignore_missing": true,
"if": "ctx?.message2?.vendor_class_id != null && ctx.message2.vendor_class_id.length() > 0"
}
},
{
"pipeline": {
"name": "zeek.common"
}
}
]
}
+3 -2
View File
@@ -26,12 +26,12 @@ logstash:
manager:
- so/0011_input_endgame.conf
- so/0012_input_elastic_agent.conf.jinja
- so/0013_input_lumberjack_fleet.conf
- so/0013_input_lumberjack_fleet.conf.jinja
- so/9999_output_redis.conf.jinja
receiver:
- so/0011_input_endgame.conf
- so/0012_input_elastic_agent.conf.jinja
- so/0013_input_lumberjack_fleet.conf
- so/0013_input_lumberjack_fleet.conf.jinja
- so/9999_output_redis.conf.jinja
search:
- so/0900_input_redis.conf.jinja
@@ -69,4 +69,5 @@ logstash:
pipeline_x_batch_x_size: 125
pipeline_x_ecs_compatibility: disabled
dmz_nodes: []
latency_metrics: False
@@ -1,3 +1,4 @@
{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %}
input {
elastic_agent {
port => 5055
@@ -11,10 +12,15 @@ input {
}
}
filter {
if ![metadata] {
mutate {
rename => {"@metadata" => "metadata"}
{% if LOGSTASH_MERGED.get('latency_metrics', False) %}
ruby {
code => "event.set('[_tmp][logstash_from_agent]', Time.now().utc.iso8601(3));"
}
{% endif %}
if ![metadata] {
mutate {
rename => {"@metadata" => "metadata"}
}
}
}
}
@@ -1,23 +0,0 @@
input {
elastic_agent {
port => 5056
tags => [ "elastic-agent", "fleet-lumberjack-input" ]
ssl_enabled => true
ssl_certificate => "/usr/share/logstash/elasticfleet-lumberjack.crt"
ssl_key => "/usr/share/logstash/elasticfleet-lumberjack.key"
ecs_compatibility => v8
id => "fleet-lumberjack-in"
codec => "json"
}
}
filter {
if ![metadata] {
mutate {
rename => {"@metadata" => "metadata"}
}
}
}
@@ -0,0 +1,26 @@
{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %}
input {
elastic_agent {
port => 5056
tags => [ "elastic-agent", "fleet-lumberjack-input" ]
ssl_enabled => true
ssl_certificate => "/usr/share/logstash/elasticfleet-lumberjack.crt"
ssl_key => "/usr/share/logstash/elasticfleet-lumberjack.key"
ecs_compatibility => v8
id => "fleet-lumberjack-in"
codec => "json"
}
}
filter {
{% if LOGSTASH_MERGED.get('latency_metrics', False) %}
ruby {
code => "event.set('[_tmp][logstash_from_fleet]', Time.now().utc.iso8601(3));"
}
{% endif %}
if ![metadata] {
mutate {
rename => {"@metadata" => "metadata"}
}
}
}
@@ -1,3 +1,4 @@
{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %}
{%- set kafka_password = salt['pillar.get']('kafka:config:password') %}
{%- set kafka_trustpass = salt['pillar.get']('kafka:config:trustpass') %}
{%- set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %}
@@ -30,6 +31,11 @@ input {
}
}
filter {
{% if LOGSTASH_MERGED.get('latency_metrics', False) %}
ruby {
code => "event.set('[_tmp][logstash_from_kafka]', Time.now().utc.iso8601(3));"
}
{% endif %}
if ![metadata] {
mutate {
rename => { "@metadata" => "metadata" }
@@ -1,4 +1,4 @@
{%- from 'logstash/map.jinja' import LOGSTASH_REDIS_NODES with context %}
{%- from 'logstash/map.jinja' import LOGSTASH_REDIS_NODES, LOGSTASH_MERGED %}
{%- set REDIS_PASS = salt['pillar.get']('redis:config:requirepass') %}
{%- for index in range(LOGSTASH_REDIS_NODES|length) %}
@@ -18,3 +18,10 @@ input {
}
{% endfor %}
{% endfor -%}
filter {
{% if LOGSTASH_MERGED.get('latency_metrics', False) %}
ruby {
code => "event.set('[_tmp][logstash_from_redis]', Time.now().utc.iso8601(3));"
}
{% endif %}
}
@@ -1,3 +1,11 @@
{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %}
{% if LOGSTASH_MERGED.get('latency_metrics', False) %}
filter {
ruby {
code => "event.set('[_tmp][logstash_to_elasticsearch]', Time.now().utc.iso8601(3));"
}
}
{% endif %}
output {
if "elastic-agent" in [tags] and "so-ip-mappings" in [tags] {
elasticsearch {
@@ -13,13 +13,20 @@ filter {
add_tag => "fleet-lumberjack-{{ GLOBALS.hostname }}"
}
}
output {
lumberjack {
codec => json
{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %}
{% if LOGSTASH_MERGED.get('latency_metrics', False) %}
filter {
ruby {
code => "event.set('[_tmp][fleet_to_logstash]', Time.now().utc.iso8601(3));"
}
}
{% endif %}
output {
lumberjack {
codec => json
hosts => {{ FAILOVER_LOGSTASH_NODES }}
ssl_certificate => "/usr/share/filebeat/ca.crt"
port => 5056
port => 5056
id => "fleet-lumberjack-{{ GLOBALS.hostname }}"
}
}
}
@@ -1,10 +1,17 @@
{%- from 'logstash/map.jinja' import LOGSTASH_MERGED %}
{%- if grains.role in ['so-heavynode', 'so-receiver'] %}
{%- set HOST = GLOBALS.hostname %}
{%- else %}
{%- set HOST = GLOBALS.manager %}
{%- endif %}
{%- set REDIS_PASS = salt['pillar.get']('redis:config:requirepass') %}
{% if LOGSTASH_MERGED.get('latency_metrics', False) %}
filter {
ruby {
code => "event.set('[_tmp][logstash_to_redis]', Time.now().utc.iso8601(3));"
}
}
{% endif %}
output {
redis {
host => '{{ HOST }}'
+5
View File
@@ -86,3 +86,8 @@ logstash:
multiline: True
advanced: True
forcedType: "[]string"
latency_metrics:
description: Enable latency metrics within events processed by logstash. Useful for pinpointing log ingest delay.
forcedType: bool
global: False
advanced: True
+381
View File
@@ -0,0 +1,381 @@
#!/usr/bin/env python3
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Imports detection overrides (e.g. from so-detections-backup) into the so-detection
# index. Reads <publicId>.<ext> files (NDJSON, one override per line) from a source
# directory, looks up the matching detection by publicId+engine, validates each
# override against the same rules SOC enforces, dedupes against existing overrides
# (operational fields only), and appends new ones.
import argparse
import ipaddress
import json
import os
import re
import sys
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DEFAULT_INDEX = "so-detection"
AUTH_FILE = "/opt/so/conf/elasticsearch/curl.config"
ES_URL = "https://localhost:9200"
# Engines we know how to handle and the file extension the backup script writes.
ENGINES = {
"suricata": "txt",
}
# Standard Suricata variables that ship with Security Onion. Anything else
# referenced in an override is "custom" and the user needs to make sure it
# exists in SOC Config before the override will function.
BUILTIN_SURICATA_VARS = {
"$HOME_NET", "$EXTERNAL_NET",
"$HTTP_SERVERS", "$DNS_SERVERS", "$SQL_SERVERS", "$SMTP_SERVERS",
"$TELNET_SERVERS", "$AIM_SERVERS", "$DC_SERVERS", "$MODBUS_SERVER",
"$MODBUS_CLIENT", "$ENIP_CLIENT", "$ENIP_SERVER",
"$HTTP_PORTS", "$SHELLCODE_PORTS", "$ORACLE_PORTS", "$SSH_PORTS",
"$FTP_PORTS", "$FILE_DATA_PORTS",
}
VAR_PATTERN = re.compile(r"\$[A-Z_][A-Z0-9_]*")
# Canonical valid values, per securityonion-soc/model/detection.go.
SURICATA_OVERRIDE_TYPES = {"suppress", "threshold", "modify"}
SUPPRESS_TRACKS = {"by_src", "by_dst", "by_either"}
THRESHOLD_TRACKS = {"by_src", "by_dst", "by_both"}
THRESHOLD_TYPES = {"limit", "threshold", "both"}
STALE_WARNING = """\
WARNING: so-detections-backup does not remove backup files when overrides are
deleted via the Security Onion web UI. As a result, files in the source
directory may represent overrides that were intentionally deleted and should
NOT be re-imported.
Before continuing, verify that the source directory reflects the overrides you
actually want imported. Remove any files corresponding to overrides you previously deleted.
"""
def make_session(auth_file):
with open(auth_file, "r") as f:
for line in f:
if line.startswith("user ="):
creds = line.split("=", 1)[1].strip().replace('"', "")
user, _, password = creds.partition(":")
session = requests.Session()
session.auth = HTTPBasicAuth(user, password)
session.headers.update({"Content-Type": "application/json"})
session.verify = False
return session
raise RuntimeError(f"Could not find 'user =' line in {auth_file}")
def find_detection(session, index, public_id, engine):
query = {
"query": {"bool": {"must": [
{"term": {"so_detection.publicId": public_id}},
{"term": {"so_detection.engine": engine}},
]}},
"size": 2,
}
r = session.get(f"{ES_URL}/{index}/_search", json=query)
r.raise_for_status()
hits = r.json().get("hits", {}).get("hits", [])
if not hits:
return None, None, None
if len(hits) > 1:
# Shouldn't happen — publicId is unique per engine — but flag it.
print(f" WARN: {len(hits)} detections matched publicId={public_id} engine={engine}; using first")
hit = hits[0]
existing = hit["_source"].get("so_detection", {}).get("overrides") or []
return hit["_id"], hit["_index"], existing
def update_overrides(session, doc_index, doc_id, overrides):
body = {"doc": {"so_detection": {"overrides": overrides}}}
r = session.post(f"{ES_URL}/{doc_index}/_update/{doc_id}", json=body)
r.raise_for_status()
return r.json()
def dedupe_key(override):
"""Operational fields only, per Override.Equal() in detection.go.
Excludes timestamps and isEnabled so re-imports don't appear unique."""
t = override.get("type")
if t == "suppress":
return (t, override.get("track"), override.get("ip"))
if t == "threshold":
return (t, override.get("thresholdType"), override.get("track"),
override.get("count"), override.get("seconds"))
if t == "modify":
return (t, override.get("regex"), override.get("value"))
def _validate_suricata_ip(ip):
if not ip:
return "ip cannot be empty"
if ip.startswith("$"):
return None
if ip.startswith("[") and ip.endswith("]"):
for part in ip[1:-1].split(","):
err = _validate_single_ip(part.strip())
if err:
return f"invalid IP in list: {err}"
return None
return _validate_single_ip(ip)
def _validate_single_ip(ip):
try:
if "/" in ip:
ipaddress.ip_network(ip, strict=False)
else:
ipaddress.ip_address(ip)
except ValueError:
return f"invalid IP/CIDR {ip!r}"
return None
def validate_override(override, engine):
"""Mirror Override.Validate() from securityonion-soc/model/detection.go.
Returns None on success, an error string otherwise."""
t = override.get("type")
if not t:
return "override type is required"
if t not in SURICATA_OVERRIDE_TYPES:
return f"invalid type {t!r}: must be one of {sorted(SURICATA_OVERRIDE_TYPES)}"
has = {k: override.get(k) is not None for k in
("regex", "value", "thresholdType", "track", "ip", "count", "seconds", "customFilter")}
if t == "suppress":
if not has["ip"] or not has["track"]:
return "suppress requires 'ip' and 'track'"
if any(has[k] for k in ("regex", "value", "thresholdType", "count", "seconds", "customFilter")):
return "suppress has unnecessary fields"
if override["track"] not in SUPPRESS_TRACKS:
return f"invalid track {override['track']!r}: must be one of {sorted(SUPPRESS_TRACKS)}"
return _validate_suricata_ip(override["ip"])
if t == "threshold":
if not all(has[k] for k in ("thresholdType", "track", "count", "seconds")):
return "threshold requires 'thresholdType', 'track', 'count', 'seconds'"
if any(has[k] for k in ("regex", "value", "customFilter")):
return "threshold has unnecessary fields"
if override["thresholdType"] not in THRESHOLD_TYPES:
return f"invalid thresholdType {override['thresholdType']!r}: must be one of {sorted(THRESHOLD_TYPES)}"
if override["track"] not in THRESHOLD_TRACKS:
return f"invalid track {override['track']!r}: must be one of {sorted(THRESHOLD_TRACKS)}"
if not isinstance(override["count"], int) or override["count"] <= 0:
return f"count must be a positive integer, got {override['count']!r}"
if not isinstance(override["seconds"], int) or override["seconds"] <= 0:
return f"seconds must be a positive integer, got {override['seconds']!r}"
return None
if t == "modify":
if not has["regex"] or not has["value"]:
return "modify requires 'regex' and 'value'"
if any(has[k] for k in ("thresholdType", "track", "count", "seconds", "customFilter")):
return "modify has unnecessary fields"
try:
re.compile(override["regex"])
except re.error as e:
return f"invalid regex: {e}"
return None
def parse_overrides_file(path):
"""Parse a file written by so-detections-backup.py: NDJSON, one override
per line. Returns a list of (override_dict, line_number)."""
overrides = []
with open(path, "r") as f:
for i, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
overrides.append((json.loads(line), i))
return overrides
def describe(override):
"""Human-readable summary of the operational fields for a given override type."""
t = override.get("type")
if t == "suppress":
return f"type=suppress track={override.get('track')} ip={override.get('ip')}"
if t == "threshold":
return (f"type=threshold track={override.get('track')} "
f"thresholdType={override.get('thresholdType')} "
f"count={override.get('count')} seconds={override.get('seconds')}")
if t == "modify":
return f"type=modify regex={override.get('regex')!r}"
def collect_custom_vars(override):
found = set()
for value in override.values():
if isinstance(value, str):
for match in VAR_PATTERN.findall(value):
if match not in BUILTIN_SURICATA_VARS:
found.add(match)
return found
def parse_args():
p = argparse.ArgumentParser(
description="Import detection overrides into the so-detection index.",
)
p.add_argument("--source", "-s", required=True,
help="Source directory containing <publicId>.<ext> override files.")
p.add_argument("--engine", "-e", default="suricata", choices=list(ENGINES.keys()),
help="Detection engine (default: suricata).")
p.add_argument("--dry-run", "-n", action="store_true",
help="Print what would happen without writing to Elasticsearch.")
p.add_argument("--no-import-note", action="store_true",
help="Do not prepend '[Imported YYYY-MM-DD] ' to the override note.")
p.add_argument("--index", "-i", default=DEFAULT_INDEX,
help=f"Elasticsearch index to update (default: {DEFAULT_INDEX}).")
return p.parse_args()
def confirm_proceed(args):
"""Show the stale-backup warning. Dry-run prints it and continues. Real
runs require the user typing 'yes' at the prompt."""
print(STALE_WARNING)
if args.dry_run:
print("(dry-run: no acknowledgement required)\n")
return True
answer = input("Type 'yes' to acknowledge and continue: ").strip().lower()
print()
return answer == "yes"
def main():
args = parse_args()
if not os.path.isdir(args.source):
print(f"ERROR: source directory not found: {args.source}", file=sys.stderr)
sys.exit(1)
extension = ENGINES[args.engine]
files = sorted(f for f in os.listdir(args.source) if f.endswith(f".{extension}"))
if not files:
print(f"No *.{extension} files found in {args.source}")
sys.exit(0)
if not confirm_proceed(args):
print("Aborted.")
sys.exit(1)
session = make_session(AUTH_FILE)
today = datetime.now().strftime("%Y-%m-%d")
note_prefix = "" if args.no_import_note else f"[Imported {today}] "
counts = {"added": 0, "skipped_dedupe": 0, "skipped_not_found": 0, "invalid": 0, "error": 0}
custom_vars = set()
mode = "DRY-RUN" if args.dry_run else "IMPORT"
print(f"[{mode}] engine={args.engine} source={args.source} index={args.index}\n")
for filename in files:
public_id = os.path.splitext(filename)[0]
path = os.path.join(args.source, filename)
print(f"{public_id}:")
try:
new_overrides = parse_overrides_file(path)
except (json.JSONDecodeError, OSError) as e:
print(f" ERROR: could not parse {filename}: {e}")
counts["error"] += 1
continue
if not new_overrides:
print(" SKIP: empty file")
continue
try:
doc_id, doc_index, existing = find_detection(session, args.index, public_id, args.engine)
except requests.HTTPError as e:
print(f" ERROR: search failed: {e}")
counts["error"] += 1
continue
if doc_id is None:
print(f" WARN: no detection found for publicId={public_id} engine={args.engine}; skipping")
counts["skipped_not_found"] += len(new_overrides)
continue
existing_keys = {dedupe_key(o) for o in existing}
merged = list(existing)
added_this_file = 0
for override, line_no in new_overrides:
err = validate_override(override, args.engine)
if err:
print(f" INVALID (line {line_no}): {err}")
counts["invalid"] += 1
continue
custom_vars.update(collect_custom_vars(override))
key = dedupe_key(override)
if key in existing_keys:
print(f" SKIP (line {line_no}): duplicate of existing override [{describe(override)}]")
counts["skipped_dedupe"] += 1
continue
if note_prefix:
override = dict(override)
override["note"] = note_prefix + (override.get("note") or "")
merged.append(override)
existing_keys.add(key)
added_this_file += 1
print(f" ADD (line {line_no}): {describe(override)}")
if added_this_file == 0:
continue
if args.dry_run:
print(f" DRY-RUN: would update {doc_index}/{doc_id} "
f"({len(existing)} existing → {len(merged)} total)")
counts["added"] += added_this_file
continue
try:
update_overrides(session, doc_index, doc_id, merged)
print(f" UPDATED {doc_index}/{doc_id} ({len(existing)} → {len(merged)})")
counts["added"] += added_this_file
except requests.HTTPError as e:
print(f" ERROR: update failed: {e}")
counts["error"] += 1
print()
print("=" * 60)
print(f"Summary ({mode}):")
print(f" Overrides added: {counts['added']}")
print(f" Skipped (already present): {counts['skipped_dedupe']}")
print(f" Skipped (no detection): {counts['skipped_not_found']}")
print(f" Invalid (failed checks): {counts['invalid']}")
print(f" Errors: {counts['error']}")
if custom_vars:
print()
print("WARNING: detected custom Suricata variables in imported overrides:")
for v in sorted(custom_vars):
print(f" {v}")
print("If any of these are not already defined in SOC Config (Suricata variables),")
print("you must add them manually before the rules will function correctly.")
sys.exit(0 if counts["error"] == 0 and counts["invalid"] == 0 else 1)
if __name__ == "__main__":
main()
@@ -0,0 +1,588 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
import importlib.util
import json
import os
import shutil
import sys
import tempfile
import unittest
from importlib.machinery import SourceFileLoader
from io import StringIO
from unittest.mock import MagicMock, patch
import requests
# The script has no .py extension; spec_from_file_location can't auto-detect a
# loader, so we hand it a SourceFileLoader explicitly. (load_module() is
# deprecated in 3.14 and slated for removal in 3.15.)
HERE = os.path.dirname(os.path.abspath(__file__))
SCRIPT = os.path.join(HERE, "so-detections-overrides-import")
_loader = SourceFileLoader("so_overrides_import", SCRIPT)
_spec = importlib.util.spec_from_loader("so_overrides_import", _loader)
soi = importlib.util.module_from_spec(_spec)
_loader.exec_module(soi)
class TestValidateSuppress(unittest.TestCase):
def test_valid(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}, "suricata"))
def test_valid_var(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_either", "ip": "$HOME_NET"}, "suricata"))
def test_valid_cidr(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_dst", "ip": "10.0.0.0/8"}, "suricata"))
def test_valid_bracket_list(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "[1.2.3.4,10.0.0.0/8]"}, "suricata"))
def test_missing_ip(self):
err = soi.validate_override({"type": "suppress", "track": "by_src"}, "suricata")
self.assertIn("requires", err)
def test_missing_track(self):
err = soi.validate_override({"type": "suppress", "ip": "1.2.3.4"}, "suricata")
self.assertIn("requires", err)
def test_invalid_track(self):
err = soi.validate_override(
{"type": "suppress", "track": "by_both", "ip": "1.2.3.4"}, "suricata")
self.assertIn("invalid track", err)
def test_invalid_ip(self):
err = soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "not-an-ip"}, "suricata")
self.assertIn("invalid IP", err)
def test_unnecessary_field(self):
err = soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4", "count": 5}, "suricata")
self.assertIn("unnecessary fields", err)
class TestValidateThreshold(unittest.TestCase):
def test_valid(self):
self.assertIsNone(soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": 60,
}, "suricata"))
def test_valid_by_both(self):
self.assertIsNone(soi.validate_override({
"type": "threshold", "track": "by_both",
"thresholdType": "both", "count": 1, "seconds": 1,
}, "suricata"))
def test_track_by_either_invalid(self):
err = soi.validate_override({
"type": "threshold", "track": "by_either",
"thresholdType": "limit", "count": 10, "seconds": 60,
}, "suricata")
self.assertIn("invalid track", err)
def test_invalid_threshold_type(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "bogus", "count": 10, "seconds": 60,
}, "suricata")
self.assertIn("invalid thresholdType", err)
def test_zero_count(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 0, "seconds": 60,
}, "suricata")
self.assertIn("count", err)
def test_negative_seconds(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": -1,
}, "suricata")
self.assertIn("seconds", err)
def test_missing_field(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, # missing seconds
}, "suricata")
self.assertIn("requires", err)
def test_unnecessary_field(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": 60,
"regex": "foo",
}, "suricata")
self.assertIn("unnecessary fields", err)
class TestValidateModify(unittest.TestCase):
def test_valid(self):
self.assertIsNone(soi.validate_override(
{"type": "modify", "regex": r"content:\"foo\"", "value": "content:bar"}, "suricata"))
def test_invalid_regex(self):
err = soi.validate_override(
{"type": "modify", "regex": "(unbalanced", "value": "x"}, "suricata")
self.assertIn("invalid regex", err)
def test_missing_value(self):
err = soi.validate_override({"type": "modify", "regex": "x"}, "suricata")
self.assertIn("requires", err)
def test_unnecessary_field(self):
err = soi.validate_override(
{"type": "modify", "regex": "x", "value": "y", "track": "by_src"}, "suricata")
self.assertIn("unnecessary fields", err)
class TestValidateMisc(unittest.TestCase):
def test_unknown_type(self):
err = soi.validate_override({"type": "suppresss", "track": "by_src", "ip": "1.2.3.4"}, "suricata")
self.assertIn("invalid type", err)
def test_missing_type(self):
err = soi.validate_override({"track": "by_src"}, "suricata")
self.assertIn("type is required", err)
class TestValidateIP(unittest.TestCase):
def test_plain_ipv4(self):
self.assertIsNone(soi._validate_suricata_ip("1.2.3.4"))
def test_plain_ipv6(self):
self.assertIsNone(soi._validate_suricata_ip("::1"))
def test_cidr(self):
self.assertIsNone(soi._validate_suricata_ip("10.0.0.0/8"))
def test_var(self):
self.assertIsNone(soi._validate_suricata_ip("$CONCOURSEWORKERS"))
def test_bracket_list(self):
self.assertIsNone(soi._validate_suricata_ip("[1.2.3.4, 10.0.0.0/8]"))
def test_bracket_list_bad_member(self):
err = soi._validate_suricata_ip("[1.2.3.4,nope]")
self.assertIn("invalid IP in list", err)
def test_empty(self):
self.assertIn("empty", soi._validate_suricata_ip(""))
def test_invalid(self):
self.assertIn("invalid", soi._validate_suricata_ip("999.999.999.999"))
class TestDedupeKey(unittest.TestCase):
def test_suppress(self):
a = {"type": "suppress", "track": "by_src", "ip": "1.2.3.4", "count": 99}
b = {"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}
# count is irrelevant for suppress dedupe
self.assertEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_suppress_differs_on_ip(self):
a = {"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}
b = {"type": "suppress", "track": "by_src", "ip": "5.6.7.8"}
self.assertNotEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_threshold(self):
a = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 10, "seconds": 60, "ip": "ignored"}
b = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 10, "seconds": 60}
self.assertEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_threshold_differs_on_count(self):
a = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 10, "seconds": 60}
b = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 20, "seconds": 60}
self.assertNotEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_modify(self):
a = {"type": "modify", "regex": "x", "value": "y"}
b = {"type": "modify", "regex": "x", "value": "y"}
self.assertEqual(soi.dedupe_key(a), soi.dedupe_key(b))
class TestDescribe(unittest.TestCase):
def test_suppress(self):
s = soi.describe({"type": "suppress", "track": "by_src", "ip": "1.2.3.4"})
self.assertIn("suppress", s)
self.assertIn("by_src", s)
self.assertIn("1.2.3.4", s)
def test_threshold_includes_count(self):
s = soi.describe({"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": 60})
self.assertIn("count=10", s)
self.assertIn("seconds=60", s)
def test_modify(self):
s = soi.describe({"type": "modify", "regex": "foo"})
self.assertIn("modify", s)
self.assertIn("foo", s)
class TestParseOverridesFile(unittest.TestCase):
def _write(self, content):
fd, path = tempfile.mkstemp(suffix=".txt")
os.close(fd)
with open(path, "w") as f:
f.write(content)
self.addCleanup(os.unlink, path)
return path
def test_single_line(self):
path = self._write('{"type":"suppress","track":"by_src","ip":"1.2.3.4"}')
result = soi.parse_overrides_file(path)
self.assertEqual(len(result), 1)
self.assertEqual(result[0][0]["type"], "suppress")
self.assertEqual(result[0][1], 1)
def test_ndjson(self):
path = self._write(
'{"type":"suppress","track":"by_src","ip":"1.2.3.4"}\n'
'{"type":"suppress","track":"by_dst","ip":"5.6.7.8"}\n'
)
result = soi.parse_overrides_file(path)
self.assertEqual(len(result), 2)
self.assertEqual(result[1][1], 2)
def test_empty(self):
path = self._write("")
self.assertEqual(soi.parse_overrides_file(path), [])
def test_blank_lines_skipped(self):
path = self._write('\n{"type":"suppress","track":"by_src","ip":"1.2.3.4"}\n\n')
result = soi.parse_overrides_file(path)
self.assertEqual(len(result), 1)
self.assertEqual(result[0][1], 2) # line number reflects original position
def test_invalid_raises(self):
path = self._write("not json")
with self.assertRaises(json.JSONDecodeError):
soi.parse_overrides_file(path)
class TestCollectCustomVars(unittest.TestCase):
def test_finds_custom(self):
v = soi.collect_custom_vars({"ip": "$CONCOURSEWORKERS"})
self.assertEqual(v, {"$CONCOURSEWORKERS"})
def test_filters_builtins(self):
v = soi.collect_custom_vars({"ip": "$HOME_NET"})
self.assertEqual(v, set())
def test_mixed(self):
v = soi.collect_custom_vars({"ip": "[$HOME_NET,$MYNET]"})
self.assertEqual(v, {"$MYNET"})
def test_non_string_fields_ignored(self):
v = soi.collect_custom_vars({"count": 10, "isEnabled": True})
self.assertEqual(v, set())
class TestMakeSession(unittest.TestCase):
def _write(self, content):
fd, path = tempfile.mkstemp()
os.close(fd)
with open(path, "w") as f:
f.write(content)
self.addCleanup(os.unlink, path)
return path
def test_valid_auth_file(self):
path = self._write('user = "admin:secret"\n')
session = soi.make_session(path)
self.assertEqual(session.auth.username, "admin")
self.assertEqual(session.auth.password, "secret")
self.assertFalse(session.verify)
def test_missing_user_line(self):
path = self._write("# no user line here\n")
with self.assertRaises(RuntimeError):
soi.make_session(path)
class TestFindDetection(unittest.TestCase):
def _session_with_response(self, payload):
session = MagicMock()
response = MagicMock()
response.json.return_value = payload
response.raise_for_status.return_value = None
session.get.return_value = response
return session
def test_found(self):
session = self._session_with_response({"hits": {"hits": [{
"_id": "abc", "_index": "so-detection",
"_source": {"so_detection": {"overrides": [{"type": "suppress"}]}},
}]}})
doc_id, idx, existing = soi.find_detection(session, "so-detection", "2049201", "suricata")
self.assertEqual(doc_id, "abc")
self.assertEqual(idx, "so-detection")
self.assertEqual(len(existing), 1)
def test_not_found(self):
session = self._session_with_response({"hits": {"hits": []}})
doc_id, idx, existing = soi.find_detection(session, "so-detection", "x", "suricata")
self.assertIsNone(doc_id)
self.assertIsNone(idx)
self.assertIsNone(existing)
def test_no_overrides_field(self):
session = self._session_with_response({"hits": {"hits": [{
"_id": "abc", "_index": "so-detection",
"_source": {"so_detection": {}},
}]}})
_, _, existing = soi.find_detection(session, "so-detection", "x", "suricata")
self.assertEqual(existing, [])
def test_multiple_hits_warns(self):
session = self._session_with_response({"hits": {"hits": [
{"_id": "a", "_index": "i", "_source": {"so_detection": {"overrides": []}}},
{"_id": "b", "_index": "i", "_source": {"so_detection": {"overrides": []}}},
]}})
with patch("sys.stdout", new=StringIO()) as out:
doc_id, _, _ = soi.find_detection(session, "i", "x", "suricata")
self.assertEqual(doc_id, "a")
self.assertIn("WARN", out.getvalue())
class TestUpdateOverrides(unittest.TestCase):
def test_posts_to_update_endpoint(self):
session = MagicMock()
response = MagicMock()
response.raise_for_status.return_value = None
response.json.return_value = {"result": "updated"}
session.post.return_value = response
result = soi.update_overrides(session, "so-detection", "abc", [{"type": "suppress"}])
self.assertEqual(result, {"result": "updated"})
url = session.post.call_args[0][0]
self.assertIn("/_update/abc", url)
body = session.post.call_args[1]["json"]
self.assertEqual(body["doc"]["so_detection"]["overrides"], [{"type": "suppress"}])
class TestConfirmProceed(unittest.TestCase):
def test_dry_run_skips_prompt(self):
args = MagicMock(dry_run=True)
with patch("sys.stdout", new=StringIO()):
self.assertTrue(soi.confirm_proceed(args))
def test_yes_input(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value="yes"):
self.assertTrue(soi.confirm_proceed(args))
def test_yes_input_case_insensitive(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value="YES"):
self.assertTrue(soi.confirm_proceed(args))
def test_no_input_aborts(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value="no"):
self.assertFalse(soi.confirm_proceed(args))
def test_empty_input_aborts(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value=""):
self.assertFalse(soi.confirm_proceed(args))
class TestParseArgs(unittest.TestCase):
def test_defaults(self):
with patch.object(sys, "argv", ["cmd", "--source", "/some/path"]):
args = soi.parse_args()
self.assertEqual(args.source, "/some/path")
self.assertEqual(args.engine, "suricata")
self.assertFalse(args.dry_run)
self.assertFalse(args.no_import_note)
self.assertEqual(args.index, soi.DEFAULT_INDEX)
def test_all_options(self):
argv = ["cmd", "-s", "/x", "-e", "suricata", "-n",
"--no-import-note", "-i", "alt-index"]
with patch.object(sys, "argv", argv):
args = soi.parse_args()
self.assertEqual(args.source, "/x")
self.assertTrue(args.dry_run)
self.assertTrue(args.no_import_note)
self.assertEqual(args.index, "alt-index")
class TestMain(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmpdir, ignore_errors=True)
# Stub make_session so tests don't need /opt/so/conf/elasticsearch/curl.config.
p = patch.object(soi, "make_session", return_value=MagicMock())
p.start()
self.addCleanup(p.stop)
def _write_file(self, public_id, overrides, ext="txt"):
"""Write an NDJSON override file. Entries may be dicts or raw strings (for malformed input)."""
path = os.path.join(self.tmpdir, f"{public_id}.{ext}")
with open(path, "w") as f:
for o in overrides:
f.write(o if isinstance(o, str) else json.dumps(o))
f.write("\n")
return path
def _run_main(self, *extra_argv, input_response="yes"):
"""Run main() with stdout/stderr captured and input mocked. Returns (stdout, stderr, exit_code)."""
argv = ["cmd", "--source", self.tmpdir, *extra_argv]
out, err = StringIO(), StringIO()
with patch.object(sys, "argv", argv), \
patch("sys.stdout", new=out), \
patch("sys.stderr", new=err), \
patch("builtins.input", return_value=input_response):
with self.assertRaises(SystemExit) as cm:
soi.main()
return out.getvalue(), err.getvalue(), cm.exception.code
def test_source_dir_missing(self):
argv = ["cmd", "--source", "/no/such/path/here"]
err = StringIO()
with patch.object(sys, "argv", argv), patch("sys.stderr", new=err):
with self.assertRaises(SystemExit) as cm:
soi.main()
self.assertEqual(cm.exception.code, 1)
self.assertIn("source directory not found", err.getvalue())
def test_no_files_found(self):
out, _, code = self._run_main()
self.assertEqual(code, 0)
self.assertIn("No *.txt files found", out)
def test_user_aborts(self):
self._write_file("1001", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main(input_response="no")
self.assertEqual(code, 1)
self.assertIn("Aborted", out)
def test_parse_error_increments_error(self):
# Malformed JSON line — parse_overrides_file raises JSONDecodeError.
self._write_file("1002", ["not json"])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 1) # invalid+error → non-zero
self.assertIn("could not parse", out)
self.assertIn("Errors: 1", out)
def test_empty_file_skipped(self):
# Blank lines only — parse_overrides_file returns []; main reports "empty file" and continues.
path = os.path.join(self.tmpdir, "1003.txt")
with open(path, "w") as f:
f.write("\n\n")
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
self.assertIn("empty file", out)
@patch.object(soi, "find_detection")
def test_search_http_error(self, mock_find):
mock_find.side_effect = requests.HTTPError("boom")
self._write_file("1004", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 1)
self.assertIn("search failed", out)
@patch.object(soi, "find_detection")
def test_no_detection_found(self, mock_find):
mock_find.return_value = (None, None, None)
self._write_file("1005", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
self.assertIn("no detection found", out)
self.assertIn("Skipped (no detection): 1", out)
@patch.object(soi, "find_detection")
def test_all_duplicates_no_update(self, mock_find):
existing = [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}]
mock_find.return_value = ("doc1", "so-detection", existing)
self._write_file("1006", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
self.assertIn("SKIP", out)
self.assertNotIn("DRY-RUN: would update", out) # added_this_file == 0 branch
@patch.object(soi, "update_overrides")
@patch.object(soi, "find_detection")
def test_happy_path_full(self, mock_find, mock_update):
# Exercises: ADD, dedupe SKIP, INVALID, note prefix, UPDATE, custom-vars warning, exit=1 (invalid present)
existing = [{"type": "suppress", "track": "by_src", "ip": "9.9.9.9"}]
mock_find.return_value = ("doc1", "so-detection", existing)
mock_update.return_value = {"result": "updated"}
self._write_file("1007", [
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}, # ADD
{"type": "suppress", "track": "by_src", "ip": "9.9.9.9"}, # SKIP (dupe of existing)
{"type": "suppress", "track": "bogus", "ip": "1.2.3.4"}, # INVALID
{"type": "suppress", "track": "by_src", "ip": "$CONCOURSEWORKERS"}, # ADD + custom var
])
out, _, code = self._run_main()
self.assertEqual(code, 1) # one invalid -> non-zero
mock_update.assert_called_once()
merged = mock_update.call_args[0][3]
self.assertEqual(len(merged), 3) # 1 existing + 2 new
new_notes = [o.get("note", "") for o in merged if o.get("ip") in ("1.2.3.4", "$CONCOURSEWORKERS")]
self.assertTrue(all(n.startswith("[Imported ") for n in new_notes))
self.assertIn("ADD", out)
self.assertIn("SKIP", out)
self.assertIn("INVALID", out)
self.assertIn("UPDATED", out)
self.assertIn("$CONCOURSEWORKERS", out)
@patch.object(soi, "update_overrides")
@patch.object(soi, "find_detection")
def test_no_import_note_preserves_note(self, mock_find, mock_update):
mock_find.return_value = ("doc1", "so-detection", [])
mock_update.return_value = {"result": "updated"}
self._write_file("1008", [
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4", "note": "original"},
])
_, _, code = self._run_main("--no-import-note")
self.assertEqual(code, 0)
merged = mock_update.call_args[0][3]
self.assertEqual(merged[0]["note"], "original") # no prefix applied
@patch.object(soi, "find_detection")
def test_dry_run_skips_update(self, mock_find):
mock_find.return_value = ("doc1", "so-detection", [])
self._write_file("1009", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
with patch.object(soi, "update_overrides") as mock_update:
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
mock_update.assert_not_called()
self.assertIn("DRY-RUN: would update", out)
@patch.object(soi, "update_overrides")
@patch.object(soi, "find_detection")
def test_update_http_error(self, mock_find, mock_update):
mock_find.return_value = ("doc1", "so-detection", [])
mock_update.side_effect = requests.HTTPError("nope")
self._write_file("1010", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main()
self.assertEqual(code, 1)
self.assertIn("update failed", out)
if __name__ == "__main__":
unittest.main()
-329
View File
@@ -1,329 +0,0 @@
#!/usr/bin/env python3
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
"""
so-pillar-import — populate the so_pillar.* schema in so-postgres from the
on-disk Salt pillar tree.
Reads /opt/so/saltstack/local/pillar/, decomposes each .sls file into a
(scope, role|minion_id, pillar_path, data) tuple, and UPSERTs it into
so_pillar.pillar_entry. Idempotent — re-running with no SLS edits produces
no version bumps because the audit trigger only writes a row when data
actually changes.
Bootstrap and mine-driven files are skipped (see EXCLUDE_BASENAMES /
EXCLUDE_PREFIXES below). Files containing Jinja templates ({% or {{) are
also skipped — those stay disk-authoritative and ext_pillar_first: False
means they render before the PG overlay anyway.
All SQL goes through `docker exec so-postgres psql` so no separate DSN
config is required at first-install time. Designed to be called by
salt/postgres/schema_pillar.sls (initial seed) and by salt/manager/tools/
sbin/so-minion (per-minion sync on add/delete).
"""
import argparse
import json
import os
import shlex
import subprocess
import sys
from pathlib import Path
import yaml
PILLAR_LOCAL_ROOT = Path("/opt/so/saltstack/local/pillar")
PILLAR_DEFAULT_ROOT = Path("/opt/so/saltstack/default/pillar")
DOCKER_CONTAINER = "so-postgres"
PG_SUPERUSER = "postgres"
PG_DATABASE = "securityonion"
# Files that must NEVER move to Postgres. These are read by Salt before
# Postgres is reachable, or contain renderer-time computed values (mine, etc.).
EXCLUDE_BASENAMES = {
"secrets.sls",
"auth.sls", # postgres/auth.sls bootstrap
"top.sls",
}
# Filename prefixes to skip — these are renderer-time computed pillars
# (Salt mine, file_exists guards, etc.) that have to stay on disk.
EXCLUDE_PATH_FRAGMENTS = (
"/elasticsearch/nodes.sls",
"/redis/nodes.sls",
"/kafka/nodes.sls",
"/hypervisor/nodes.sls",
"/logstash/nodes.sls",
"/node_data/ips.sls",
"/postgres/auth.sls",
"/elasticsearch/auth.sls",
"/kibana/secrets.sls",
)
def log(level, msg):
print(f"[{level}] {msg}", file=sys.stderr)
def is_jinja_templated(content_bytes):
return b"{%" in content_bytes or b"{{" in content_bytes
def classify(path):
"""Return (scope, role_name, minion_id, pillar_path) for a pillar file
or None to skip it. role_name is None for now — the importer leaves role
membership to the so_pillar.minion trigger and the salt/auth reactor."""
rel_str = str(path)
if path.name in EXCLUDE_BASENAMES:
return None
for frag in EXCLUDE_PATH_FRAGMENTS:
if frag in rel_str:
return None
# /local/pillar/minions/<id>.sls or adv_<id>.sls
if path.parent.name == "minions":
stem = path.stem # filename without .sls
if stem.startswith("adv_"):
mid = stem[4:]
return ("minion", None, mid, f"minions.adv_{mid}")
return ("minion", None, stem, f"minions.{stem}")
# /local/pillar/<section>/<file>.sls
if path.parent.parent == PILLAR_LOCAL_ROOT or path.parent.parent == PILLAR_DEFAULT_ROOT:
section = path.parent.name
stem = path.stem
# Only soc_<section>.sls and adv_<section>.sls are SOC-managed pillar
# surfaces. Other files (e.g. nodes.sls, auth.sls, *.token) are
# either covered by EXCLUDE_PATH_FRAGMENTS or are bootstrap surfaces
# we leave alone for now.
if stem.startswith("soc_") or stem.startswith("adv_"):
return ("global", None, None, f"{section}.{stem}")
return None
return None
def parse_yaml_file(path):
with open(path, "rb") as f:
content = f.read()
if not content.strip():
return {}
if is_jinja_templated(content):
return None
data = yaml.safe_load(content)
if data is None:
return {}
if not isinstance(data, dict):
return {"_raw": data}
return data
def derive_node_type(minion_id):
"""Conventional Security Onion minion ids are <host>_<role>. Take the
last underscore-delimited token as the canonical role suffix."""
parts = minion_id.rsplit("_", 1)
if len(parts) == 2:
return parts[1]
return None
def docker_psql(sql, *, db=PG_DATABASE, user=PG_SUPERUSER, on_error_stop=True, capture=True):
"""Run sql via docker exec ... psql. Returns stdout as str."""
args = [
"docker", "exec", "-i", DOCKER_CONTAINER,
"psql", "-U", user, "-d", db, "-tA", "-q",
]
if on_error_stop:
args += ["-v", "ON_ERROR_STOP=1"]
proc = subprocess.run(
args, input=sql.encode(),
capture_output=capture, check=False,
)
if proc.returncode != 0:
sys.stderr.write(proc.stderr.decode(errors="replace"))
raise RuntimeError(f"docker exec psql failed (rc={proc.returncode})")
return proc.stdout.decode(errors="replace")
def upsert_minion(minion_id, node_type):
sql = (
"INSERT INTO so_pillar.minion (minion_id, node_type) "
f"VALUES ({pg_str(minion_id)}, {pg_str(node_type) if node_type else 'NULL'}) "
"ON CONFLICT (minion_id) DO UPDATE SET node_type = EXCLUDED.node_type;"
)
docker_psql(sql)
def delete_minion(minion_id):
"""CASCADE removes pillar_entry + role_member rows."""
sql = f"DELETE FROM so_pillar.minion WHERE minion_id = {pg_str(minion_id)};"
docker_psql(sql)
def upsert_pillar_entry(scope, role_name, minion_id, pillar_path, data, reason):
"""Insert or update the row keyed by the partial unique index that
matches scope. Audit trigger handles history; versioning trigger bumps
version only when data changes."""
data_json = json.dumps(data)
role_sql = pg_str(role_name) if role_name else "NULL"
minion_sql = pg_str(minion_id) if minion_id else "NULL"
reason_sql = pg_str(reason)
if scope == "global":
conflict = "(pillar_path) WHERE scope='global'"
elif scope == "role":
conflict = "(role_name, pillar_path) WHERE scope='role'"
elif scope == "minion":
conflict = "(minion_id, pillar_path) WHERE scope='minion'"
else:
raise ValueError(f"unknown scope {scope!r}")
sql = (
"BEGIN;\n"
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);\n"
f"INSERT INTO so_pillar.pillar_entry "
f"(scope, role_name, minion_id, pillar_path, data, change_reason) "
f"VALUES ({pg_str(scope)}, {role_sql}, {minion_sql}, {pg_str(pillar_path)}, {pg_jsonb(data_json)}, {reason_sql}) "
f"ON CONFLICT {conflict} DO UPDATE "
f"SET data = EXCLUDED.data, change_reason = EXCLUDED.change_reason;\n"
"COMMIT;\n"
)
docker_psql(sql)
def pg_str(s):
"""Escape a Python str for inclusion in literal SQL. Pillar content has
already been validated as YAML; we just need standard SQL escaping."""
if s is None:
return "NULL"
return "'" + str(s).replace("'", "''") + "'"
def pg_jsonb(json_str):
return pg_str(json_str) + "::jsonb"
def walk_pillar_root(root, paths):
if not root.is_dir():
return
for path in root.rglob("*.sls"):
if path.is_file():
paths.append(path)
def import_minion(minion_id, node_type, dry_run, reason):
"""Re-import every pillar file for a single minion."""
if not minion_id:
raise ValueError("minion_id required for --scope minion")
upsert_minion(minion_id, node_type)
log("INFO", f"Upserted minion row {minion_id} (node_type={node_type})")
targets = [
PILLAR_LOCAL_ROOT / "minions" / f"{minion_id}.sls",
PILLAR_LOCAL_ROOT / "minions" / f"adv_{minion_id}.sls",
]
for path in targets:
if not path.exists():
log("INFO", f" (no file at {path})")
continue
klass = classify(path)
if not klass:
log("INFO", f" skip {path} (excluded)")
continue
scope, role, mid, pillar_path = klass
data = parse_yaml_file(path)
if data is None:
log("WARN", f" skip {path} (Jinja-templated; stays disk-only)")
continue
if dry_run:
log("DRY", f" would upsert {scope}/{pillar_path} = {len(json.dumps(data))} bytes")
continue
upsert_pillar_entry(scope, role, mid, pillar_path, data, reason)
log("INFO", f" imported {scope}/{pillar_path}")
def import_all(dry_run, reason):
"""Walk the entire local pillar tree and import every eligible file."""
paths = []
walk_pillar_root(PILLAR_LOCAL_ROOT, paths)
imported = 0
skipped = 0
minions_seen = set()
for path in sorted(paths):
klass = classify(path)
if not klass:
skipped += 1
continue
scope, role, minion_id, pillar_path = klass
data = parse_yaml_file(path)
if data is None:
log("WARN", f"skip {path} (Jinja-templated; stays disk-only)")
skipped += 1
continue
if scope == "minion" and minion_id not in minions_seen:
node_type = derive_node_type(minion_id)
if not dry_run:
upsert_minion(minion_id, node_type)
minions_seen.add(minion_id)
if dry_run:
log("DRY", f"would upsert {scope}/{pillar_path} ({len(json.dumps(data))} bytes)")
else:
upsert_pillar_entry(scope, role, minion_id, pillar_path, data, reason)
log("INFO", f"imported {scope}/{pillar_path}")
imported += 1
log("INFO", f"done: {imported} imported, {skipped} skipped")
def main():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--scope", choices=("global", "role", "minion", "all"), default="all")
ap.add_argument("--minion-id")
ap.add_argument("--node-type", help="override node_type for --scope minion (default: derived from minion_id)")
ap.add_argument("--delete", action="store_true",
help="With --scope minion, remove the minion row (and its pillar rows via CASCADE)")
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--diff", action="store_true",
help="(reserved) print structural diffs vs current DB content")
ap.add_argument("--yes", action="store_true",
help="Skip confirmation prompts (currently unused; reserved)")
ap.add_argument("--reason", default="so-pillar-import",
help="change_reason recorded in pillar_entry_history")
args = ap.parse_args()
try:
if args.scope == "minion":
if not args.minion_id:
ap.error("--minion-id required when --scope minion")
if args.delete:
if args.dry_run:
log("DRY", f"would delete {args.minion_id}")
else:
delete_minion(args.minion_id)
log("INFO", f"deleted {args.minion_id}")
else:
node_type = args.node_type or derive_node_type(args.minion_id)
import_minion(args.minion_id, node_type, args.dry_run, args.reason)
elif args.scope == "all":
import_all(args.dry_run, args.reason)
else:
log("ERROR", f"--scope {args.scope} not yet implemented; use --scope all or --scope minion")
return 2
except Exception as e:
log("ERROR", str(e))
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
+1 -185
View File
@@ -13,64 +13,6 @@ import json
lockFile = "/tmp/so-yaml.lock"
# postsalt: so-yaml supports three backend modes for PG-managed pillar paths:
#
# dual — write disk + mirror to so_pillar.*. Reads from disk.
# Used during the migration transition when disk is still
# canonical and PG runs as a shadow.
# postgres — write to so_pillar.* only. Reads from so_pillar.*. No disk
# file is touched. The end state once cutover is complete.
# disk — disk only, no PG. Emergency rollback escape hatch.
#
# Bootstrap and mine-driven files (secrets.sls, ca/init.sls, */nodes.sls,
# top.sls, etc.) are always handled on disk regardless of mode — those paths
# are explicitly excluded by so_yaml_postgres.locate() raising SkipPath.
#
# Mode resolution: SO_YAML_BACKEND env var, then /opt/so/conf/so-yaml/mode,
# then default 'dual' (safe upgrade behavior — flipping to 'postgres' is
# done by schema_pillar.sls after the schema is in place and the importer
# has run at least once).
MODE_FILE = "/opt/so/conf/so-yaml/mode"
VALID_MODES = ("dual", "postgres", "disk")
DEFAULT_MODE = "dual"
try:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import so_yaml_postgres
_SO_YAML_PG_AVAILABLE = True
except Exception as _exc:
_SO_YAML_PG_AVAILABLE = False
def _resolveBackendMode():
env = os.environ.get("SO_YAML_BACKEND")
if env and env in VALID_MODES:
return env
try:
with open(MODE_FILE, "r") as fh:
value = fh.read().strip()
if value in VALID_MODES:
return value
except (IOError, OSError):
pass
return DEFAULT_MODE
_BACKEND_MODE = _resolveBackendMode()
def _isPgManaged(filename):
"""True when so-yaml should route this file's reads/writes through
so_pillar.*. False for bootstrap/mine-driven files that always live on
disk, and for arbitrary YAML paths outside the pillar tree."""
if not _SO_YAML_PG_AVAILABLE:
return False
try:
return so_yaml_postgres.is_pg_managed(filename)
except Exception:
return False
def showUsage(args):
print('Usage: {} <COMMAND> <YAML_FILE> [ARGS...]'.format(sys.argv[0]), file=sys.stderr)
@@ -83,14 +25,8 @@ def showUsage(args):
print(' get [-r] - Displays (to stdout) the value stored in the given key. Requires KEY arg. Use -r for raw output without YAML formatting.', file=sys.stderr)
print(' remove - Removes a yaml key, if it exists. Requires KEY arg.', file=sys.stderr)
print(' replace - Replaces (or adds) a new key and set its value. Requires KEY and VALUE args.', file=sys.stderr)
print(' purge - Delete the YAML file from disk and remove its rows from so_pillar.* (no KEY arg).', file=sys.stderr)
print(' help - Prints this usage information.', file=sys.stderr)
print('', file=sys.stderr)
print(' Backend mode:', file=sys.stderr)
print(' Resolved from $SO_YAML_BACKEND, then /opt/so/conf/so-yaml/mode, default "dual".', file=sys.stderr)
print(' Valid values: dual | postgres | disk. Bootstrap pillar files (secrets, ca, *.nodes.sls)', file=sys.stderr)
print(' are always handled on disk regardless of mode.', file=sys.stderr)
print('', file=sys.stderr)
print(' Where:', file=sys.stderr)
print(' YAML_FILE - Path to the file that will be modified. Ex: /opt/so/conf/service/conf.yaml', file=sys.stderr)
print(' KEY - YAML key, does not support \' or " characters at this time. Ex: level1.level2', file=sys.stderr)
@@ -103,24 +39,6 @@ def showUsage(args):
def loadYaml(filename):
"""Load a YAML file's content as a dict.
PG-canonical mode (`postgres`): for PG-managed paths, read from
so_pillar.pillar_entry. A missing row is treated as an empty dict so
that `replace`/`add` on a fresh path can populate it from scratch.
Other modes / non-PG-managed paths: read from disk as today.
"""
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
try:
data = so_yaml_postgres.read_yaml(filename)
except so_yaml_postgres.SkipPath:
data = None
except Exception as e:
print(f"so-yaml: pg read failed for {filename}: {e}", file=sys.stderr)
sys.exit(1)
return data if data is not None else {}
try:
with open(filename, "r") as file:
content = file.read()
@@ -134,97 +52,8 @@ def loadYaml(filename):
def writeYaml(filename, content):
"""Persist `content` for `filename`.
PG-canonical mode + PG-managed path: write only to so_pillar.*. A PG
failure is fatal (no disk fallback) caller must retry.
Dual mode: write disk, then mirror to PG (failures are warnings).
Disk mode or non-PG-managed path: write disk only.
"""
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
if not _SO_YAML_PG_AVAILABLE:
print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr)
sys.exit(1)
ok, msg = so_yaml_postgres.write_yaml(
filename, content,
reason="so-yaml " + " ".join(sys.argv[1:2]))
if not ok:
print(f"so-yaml: pg write failed for {filename}: {msg}", file=sys.stderr)
sys.exit(1)
return None
file = open(filename, "w")
result = yaml.safe_dump(content, file)
file.close()
if _BACKEND_MODE == "dual":
_mirrorToPostgres(filename, content)
return result
def _mirrorToPostgres(filename, content):
"""Best-effort dual-write of a YAML mutation into so_pillar.*. Skips
files outside the PG-managed pillar surface (secrets.sls,
elasticsearch/nodes.sls, etc.) and silently degrades when so-postgres
is unreachable. Disk write is canonical in dual mode; this never
raises.
Only real PG failures (`pg write failed: ...`) are logged so the
common cases (skipped path, postgres not running) don't pollute
stderr."""
if not _SO_YAML_PG_AVAILABLE:
return
try:
ok, msg = so_yaml_postgres.write_yaml(filename, content,
reason="so-yaml " + " ".join(sys.argv[1:2]))
if not ok and msg.startswith("pg write failed"):
print(f"so-yaml: {msg}", file=sys.stderr)
except Exception as e: # pragma: no cover — defensive: never break disk write
print(f"so-yaml: pg mirror exception: {e}", file=sys.stderr)
def purgeFile(filename):
"""Delete a YAML file from disk and remove the matching rows from
so_pillar.*. Idempotent missing file/row counts as success.
PG-canonical mode + PG-managed path: PG delete is canonical. If a stale
disk file from the dual-write era happens to still exist, it's removed
too as a cleanup courtesy. PG failure is fatal in this mode.
Dual / disk modes: remove disk first; PG cleanup is best-effort."""
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
if not _SO_YAML_PG_AVAILABLE:
print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr)
return 1
ok, msg = so_yaml_postgres.purge_yaml(filename, reason="so-yaml purge")
if not ok:
print(f"so-yaml: pg purge failed for {filename}: {msg}", file=sys.stderr)
return 1
if os.path.exists(filename):
try:
os.remove(filename)
except Exception as e:
print(f"so-yaml: warn — could not remove stale disk file {filename}: {e}", file=sys.stderr)
return 0
if os.path.exists(filename):
try:
os.remove(filename)
except Exception as e:
print(f"Failed to remove {filename}: {e}", file=sys.stderr)
return 1
if _BACKEND_MODE == "dual" and _SO_YAML_PG_AVAILABLE:
try:
ok, msg = so_yaml_postgres.purge_yaml(filename,
reason="so-yaml purge")
if not ok and msg.startswith("pg purge failed"):
print(f"so-yaml: {msg}", file=sys.stderr)
except Exception as e:
print(f"so-yaml: pg purge exception: {e}", file=sys.stderr)
return 0
return yaml.safe_dump(content, file)
def appendItem(content, key, listItem):
@@ -542,18 +371,6 @@ def get(args):
return 0
def purge(args):
"""purge YAML_FILE — delete the file from disk and remove the matching
rows from so_pillar.* in so-postgres. Used by so-minion's delete path
(in place of `rm -f`) so the audit log captures the deletion and
role_member rows get cleaned up via FK CASCADE on so_pillar.minion."""
if len(args) != 1:
print('Missing filename arg', file=sys.stderr)
showUsage(None)
return 1
return purgeFile(args[0])
def main():
args = sys.argv[1:]
@@ -571,7 +388,6 @@ def main():
"get": get,
"remove": remove,
"replace": replace,
"purge": purge,
}
code = 1
-326
View File
@@ -991,329 +991,3 @@ class TestLoadYaml(unittest.TestCase):
soyaml.loadYaml("/tmp/so-yaml_test-unreadable.yaml")
sysmock.assert_called_with(1)
self.assertIn("Error reading file", mock_stderr.getvalue())
class TestPurge(unittest.TestCase):
def test_purge_missing_arg(self):
# showUsage calls sys.exit(1); patch it like the other tests do.
with patch('sys.exit', new=MagicMock()):
with patch('sys.stderr', new=StringIO()) as mock_stderr:
rc = soyaml.purge([])
self.assertEqual(rc, 1)
self.assertIn("Missing filename", mock_stderr.getvalue())
def test_purge_existing_file(self):
filename = "/tmp/so-yaml_test_purge.yaml"
with open(filename, "w") as f:
f.write("key: value\n")
# Disable PG mirror so the test doesn't shell out to docker.
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
rc = soyaml.purge([filename])
self.assertEqual(rc, 0)
import os as _os
self.assertFalse(_os.path.exists(filename))
def test_purge_missing_file_idempotent(self):
filename = "/tmp/so-yaml_test_purge_missing.yaml"
import os as _os
if _os.path.exists(filename):
_os.remove(filename)
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
rc = soyaml.purge([filename])
self.assertEqual(rc, 0)
class TestSoYamlPostgres(unittest.TestCase):
"""Tests the path-locator and write/purge contract of the dual-write
backend module without actually contacting Postgres."""
def setUp(self):
import importlib
self.mod = importlib.import_module("so_yaml_postgres")
def test_locate_global_soc(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(scope, "global")
self.assertIsNone(role)
self.assertIsNone(mid)
self.assertEqual(path, "soc.soc_soc")
def test_locate_global_advanced(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/soc/adv_soc.sls")
self.assertEqual(scope, "global")
self.assertEqual(path, "soc.adv_soc")
def test_locate_minion(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
self.assertEqual(scope, "minion")
self.assertEqual(mid, "h1_sensor")
self.assertEqual(path, "minions.h1_sensor")
def test_locate_minion_advanced(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/minions/adv_h1_sensor.sls")
self.assertEqual(scope, "minion")
self.assertEqual(mid, "h1_sensor")
self.assertEqual(path, "minions.adv_h1_sensor")
def test_locate_skip_secrets(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/secrets.sls")
def test_locate_skip_postgres_auth(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/postgres/auth.sls")
def test_locate_skip_mine_driven(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls")
def test_locate_skip_top(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/top.sls")
def test_locate_skip_unrelated(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/etc/hostname")
def test_pg_str_escapes(self):
self.assertEqual(self.mod._pg_str("a'b"), "'a''b'")
self.assertEqual(self.mod._pg_str(None), "NULL")
def test_conflict_target(self):
self.assertIn("scope='global'", self.mod._conflict_target("global"))
self.assertIn("scope='role'", self.mod._conflict_target("role"))
self.assertIn("scope='minion'", self.mod._conflict_target("minion"))
with self.assertRaises(ValueError):
self.mod._conflict_target("bogus")
def test_write_yaml_skips_disk_only_path(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
ok, msg = self.mod.write_yaml(
"/opt/so/saltstack/local/pillar/secrets.sls",
{"secrets": {"foo": "bar"}})
self.assertFalse(ok)
self.assertIn("disk-only", msg)
def test_write_yaml_unreachable(self):
with patch.object(self.mod, '_is_enabled', return_value=False):
ok, msg = self.mod.write_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls",
{"soc": {"foo": "bar"}})
self.assertFalse(ok)
self.assertEqual(msg, "postgres unreachable")
def test_is_pg_managed_true(self):
self.assertTrue(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
self.assertTrue(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls"))
def test_is_pg_managed_false_for_bootstrap(self):
self.assertFalse(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/secrets.sls"))
self.assertFalse(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/postgres/auth.sls"))
self.assertFalse(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls"))
def test_read_yaml_unreachable(self):
with patch.object(self.mod, '_is_enabled', return_value=False):
self.assertIsNone(self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls"))
def test_read_yaml_skips_disk_only(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
with self.assertRaises(self.mod.SkipPath):
self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/secrets.sls")
def test_read_yaml_returns_data(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
with patch.object(self.mod, '_docker_psql',
return_value='{"soc": {"foo": "bar"}}\n'):
data = self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(data, {"soc": {"foo": "bar"}})
def test_read_yaml_returns_none_when_no_row(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
with patch.object(self.mod, '_docker_psql', return_value=''):
data = self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertIsNone(data)
def test_read_yaml_minion_query_shape(self):
captured = {}
def fake_psql(sql):
captured['sql'] = sql
return '{"host": {"mainip": "10.0.0.1"}}'
with patch.object(self.mod, '_is_enabled', return_value=True):
with patch.object(self.mod, '_docker_psql', side_effect=fake_psql):
data = self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
self.assertEqual(data, {"host": {"mainip": "10.0.0.1"}})
self.assertIn("scope='minion'", captured['sql'])
self.assertIn("'h1_sensor'", captured['sql'])
self.assertIn("'minions.h1_sensor'", captured['sql'])
def test_is_enabled_public_alias(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
self.assertTrue(self.mod.is_enabled())
with patch.object(self.mod, '_is_enabled', return_value=False):
self.assertFalse(self.mod.is_enabled())
class TestSoYamlBackendMode(unittest.TestCase):
"""Tests so-yaml's backend-mode resolution and PG-canonical routing
for read/write/purge. The PG calls themselves are stubbed; what we're
asserting is that the right backend is chosen for each (mode, path)
combination."""
def test_resolve_mode_env_overrides_file(self):
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'postgres'}):
self.assertEqual(soyaml._resolveBackendMode(), 'postgres')
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'disk'}):
self.assertEqual(soyaml._resolveBackendMode(), 'disk')
def test_resolve_mode_invalid_env_falls_back(self):
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'garbage'}, clear=False):
with patch('builtins.open', side_effect=IOError):
self.assertEqual(soyaml._resolveBackendMode(), 'dual')
def test_resolve_mode_default_dual(self):
env = {k: v for k, v in __import__('os').environ.items()
if k != 'SO_YAML_BACKEND'}
with patch.dict('os.environ', env, clear=True):
with patch('builtins.open', side_effect=IOError):
self.assertEqual(soyaml._resolveBackendMode(), 'dual')
def test_is_pg_managed_proxies(self):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
self.assertTrue(soyaml._isPgManaged(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
self.assertFalse(soyaml._isPgManaged(
"/opt/so/saltstack/local/pillar/secrets.sls"))
def test_is_pg_managed_false_when_module_unavailable(self):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
self.assertFalse(soyaml._isPgManaged(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
def test_load_yaml_postgres_mode_reads_pg(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'read_yaml',
return_value={"a": 1}):
result = soyaml.loadYaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(result, {"a": 1})
def test_load_yaml_postgres_mode_returns_empty_when_no_row(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'read_yaml',
return_value=None):
result = soyaml.loadYaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(result, {})
def test_load_yaml_postgres_mode_reads_disk_for_bootstrap(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write("foo: bar\n")
tmp = f.name
try:
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres,
'is_pg_managed', return_value=False):
result = soyaml.loadYaml(tmp)
self.assertEqual(result, {"foo": "bar"})
finally:
_os.unlink(tmp)
def test_write_yaml_postgres_mode_skips_disk(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
tmp = f.name
_os.unlink(tmp)
try:
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'write_yaml',
return_value=(True, 'ok')) as mock_w:
soyaml.writeYaml(tmp, {"x": 1})
self.assertFalse(_os.path.exists(tmp))
mock_w.assert_called_once()
finally:
if _os.path.exists(tmp):
_os.unlink(tmp)
def test_write_yaml_postgres_mode_failure_is_fatal(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'write_yaml',
return_value=(False, 'pg write failed: connection refused')):
with patch('sys.exit', new=MagicMock()) as sysmock:
with patch('sys.stderr', new=StringIO()) as mock_err:
soyaml.writeYaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls",
{"x": 1})
sysmock.assert_called_with(1)
def test_write_yaml_disk_mode_skips_pg(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
tmp = f.name
try:
with patch.object(soyaml, '_BACKEND_MODE', 'disk'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'write_yaml') as mock_w:
soyaml.writeYaml(tmp, {"x": 1})
mock_w.assert_not_called()
with open(tmp) as f:
self.assertIn('x: 1', f.read())
finally:
_os.unlink(tmp)
def test_purge_postgres_mode_calls_pg_only(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
tmp = f.name
_os.unlink(tmp)
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'purge_yaml',
return_value=(True, 'ok')) as mock_p:
rc = soyaml.purgeFile(tmp)
self.assertEqual(rc, 0)
mock_p.assert_called_once()
def test_purge_postgres_mode_failure_returns_nonzero(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'purge_yaml',
return_value=(False, 'pg purge failed: x')):
with patch('sys.stderr', new=StringIO()):
rc = soyaml.purgeFile(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
self.assertEqual(rc, 1)
-320
View File
@@ -1,320 +0,0 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
"""
so_yaml_postgres Postgres-backed dual-write helpers for so-yaml.py.
so-yaml.py writes YAML pillar files on disk; this module mirrors those
writes into so_pillar.* in so-postgres so ext_pillar and the SOC
PostgresConfigstore see the same data. During the postsalt transition
disk is canonical; PG writes are best-effort and never fail the disk
operation.
Connection: shells out to `docker exec so-postgres psql -U postgres -d
securityonion`. Same pattern so-pillar-import uses; avoids needing a
separate DSN config at install time. Performance is fine because so-yaml
is invoked from infrequent code paths (setup scripts, so-minion,
so-firewall); SOC's hot path uses the in-process pgxpool in
PostgresConfigstore, not so-yaml.
Path-to-row mapping mirrors PostgresConfigstore.locateSetting in
securityonion-soc:
/opt/so/saltstack/local/pillar/<section>/soc_<section>.sls
-> scope=global, pillar_path=<section>.soc_<section>
/opt/so/saltstack/local/pillar/<section>/adv_<section>.sls
-> scope=global, pillar_path=<section>.adv_<section>
/opt/so/saltstack/local/pillar/minions/<id>.sls
-> scope=minion, minion_id=<id>, pillar_path=minions.<id>
/opt/so/saltstack/local/pillar/minions/adv_<id>.sls
-> scope=minion, minion_id=<id>, pillar_path=minions.adv_<id>
Files outside that mapping (notably secrets.sls, postgres/auth.sls,
elasticsearch/nodes.sls, etc.) are skipped they stay disk-only forever
or render dynamically and don't belong in PG.
"""
import json
import os
import shlex
import subprocess
import sys
DOCKER_CONTAINER = os.environ.get("SO_PILLAR_PG_CONTAINER", "so-postgres")
PG_DATABASE = os.environ.get("SO_PILLAR_PG_DATABASE", "securityonion")
PG_USER = os.environ.get("SO_PILLAR_PG_USER", "postgres")
# File paths whose mutations stay disk-only forever. Mirrors EXCLUDE_*
# in so-pillar-import.
DISK_ONLY_PATHS = (
"/opt/so/saltstack/local/pillar/secrets.sls",
"/opt/so/saltstack/local/pillar/postgres/auth.sls",
"/opt/so/saltstack/local/pillar/elasticsearch/auth.sls",
"/opt/so/saltstack/local/pillar/kibana/secrets.sls",
)
DISK_ONLY_FRAGMENTS = (
"/elasticsearch/nodes.sls",
"/redis/nodes.sls",
"/kafka/nodes.sls",
"/hypervisor/nodes.sls",
"/logstash/nodes.sls",
"/node_data/ips.sls",
"/top.sls",
)
class SkipPath(Exception):
"""Raised when a file path is intentionally not mirrored to PG."""
def is_enabled():
"""Public alias for callers that want to probe PG reachability without
relying on a leading-underscore private name."""
return _is_enabled()
def _is_enabled():
"""PG dual-write only fires if so-postgres is reachable. Cheap probe.
Returns True when docker exec succeeds, False otherwise. We never
want a PG hiccup to fail a disk write on a manager whose Postgres is
momentarily unreachable."""
try:
proc = subprocess.run(
["docker", "exec", DOCKER_CONTAINER,
"pg_isready", "-h", "127.0.0.1", "-U", PG_USER, "-q"],
capture_output=True, timeout=5, check=False,
)
return proc.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return False
def locate(path):
"""Translate a so-yaml file path to (scope, role_name, minion_id, pillar_path).
Raises SkipPath when the file is not part of the PG-managed surface."""
norm = os.path.normpath(path)
if norm in DISK_ONLY_PATHS:
raise SkipPath(f"{path}: explicit disk-only allowlist")
for frag in DISK_ONLY_FRAGMENTS:
if frag in norm:
raise SkipPath(f"{path}: matches disk-only fragment {frag}")
parent = os.path.basename(os.path.dirname(norm))
grandparent = os.path.basename(os.path.dirname(os.path.dirname(norm)))
name = os.path.basename(norm)
if not name.endswith(".sls"):
raise SkipPath(f"{path}: not a .sls file")
stem = name[:-4]
if parent == "minions":
if stem.startswith("adv_"):
mid = stem[4:]
return ("minion", None, mid, f"minions.adv_{mid}")
return ("minion", None, stem, f"minions.{stem}")
# /local/pillar/<section>/<file>.sls
if grandparent == "pillar" and parent and parent != "":
if stem.startswith("soc_") or stem.startswith("adv_"):
return ("global", None, None, f"{parent}.{stem}")
raise SkipPath(f"{path}: <section>/{stem}.sls is not a soc_/adv_ file")
raise SkipPath(f"{path}: unrecognised pillar layout")
def _pg_str(s):
if s is None:
return "NULL"
return "'" + str(s).replace("'", "''") + "'"
def _docker_psql(sql):
"""Run sql via docker exec ... psql. Returns stdout. Caller catches
exceptions and downgrades to a warning."""
proc = subprocess.run(
["docker", "exec", "-i", DOCKER_CONTAINER,
"psql", "-U", PG_USER, "-d", PG_DATABASE,
"-tA", "-q", "-v", "ON_ERROR_STOP=1"],
input=sql.encode(), capture_output=True, check=False, timeout=30,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode(errors="replace") or
f"docker exec psql exit {proc.returncode}")
return proc.stdout.decode(errors="replace")
def _conflict_target(scope):
if scope == "global":
return "(pillar_path) WHERE scope='global'"
if scope == "role":
return "(role_name, pillar_path) WHERE scope='role'"
if scope == "minion":
return "(minion_id, pillar_path) WHERE scope='minion'"
raise ValueError(f"unknown scope {scope!r}")
def is_pg_managed(path):
"""True if this path maps to a so_pillar.* row (locate() succeeds).
Bootstrap and mine-driven files return False they always live on
disk regardless of so-yaml's backend mode."""
try:
locate(path)
return True
except SkipPath:
return False
def read_yaml(path):
"""Return the content dict stored in so_pillar.pillar_entry for `path`,
or None when no row exists. Raises SkipPath when `path` is not part of
the PG-managed surface (caller should read disk in that case).
Used by so-yaml.py PG-canonical mode so `replace`, `get`, etc. resolve
against the database rather than a stale (or absent) disk file."""
if not _is_enabled():
return None
scope, role, minion_id, pillar_path = locate(path)
if scope == "minion":
sql = ("SELECT data FROM so_pillar.pillar_entry "
"WHERE scope='minion' "
f"AND minion_id={_pg_str(minion_id)} "
f"AND pillar_path={_pg_str(pillar_path)}")
elif scope == "role":
sql = ("SELECT data FROM so_pillar.pillar_entry "
"WHERE scope='role' "
f"AND role_name={_pg_str(role)} "
f"AND pillar_path={_pg_str(pillar_path)}")
else:
sql = ("SELECT data FROM so_pillar.pillar_entry "
"WHERE scope='global' "
f"AND pillar_path={_pg_str(pillar_path)}")
try:
out = _docker_psql(sql).strip()
except Exception:
return None
if not out:
return None
try:
return json.loads(out)
except (ValueError, TypeError):
return None
def write_yaml(path, content_dict, *, reason="so-yaml dual-write"):
"""Mirror the disk write at `path` (whose content was just rendered as
`content_dict`) into so_pillar.pillar_entry. Best-effort: any failure
is swallowed so the caller (so-yaml.py) does not see it as a fatal."""
if not _is_enabled():
return False, "postgres unreachable"
try:
scope, role, minion_id, pillar_path = locate(path)
except SkipPath as e:
return False, str(e)
data_json = json.dumps(content_dict if content_dict is not None else {})
role_sql = _pg_str(role)
minion_sql = _pg_str(minion_id)
reason_sql = _pg_str(reason)
conflict = _conflict_target(scope)
sql_parts = []
if scope == "minion":
# FK requires the minion row before pillar_entry can reference it.
sql_parts.append(
f"INSERT INTO so_pillar.minion (minion_id) VALUES ({minion_sql}) "
"ON CONFLICT (minion_id) DO NOTHING;"
)
sql_parts.append(
"BEGIN;\n"
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);\n"
"INSERT INTO so_pillar.pillar_entry "
"(scope, role_name, minion_id, pillar_path, data, change_reason) "
f"VALUES ({_pg_str(scope)}, {role_sql}, {minion_sql}, "
f"{_pg_str(pillar_path)}, {_pg_str(data_json)}::jsonb, {reason_sql}) "
f"ON CONFLICT {conflict} DO UPDATE "
"SET data = EXCLUDED.data, change_reason = EXCLUDED.change_reason;\n"
"COMMIT;\n"
)
try:
_docker_psql("\n".join(sql_parts))
except Exception as e:
return False, f"pg write failed: {e}"
return True, "ok"
def purge_yaml(path, *, reason="so-yaml purge"):
"""Mirror the disk file deletion at `path` by deleting the matching
pillar_entry rows. For minion files also deletes the so_pillar.minion
row (CASCADE removes pillar_entry + role_member rows)."""
if not _is_enabled():
return False, "postgres unreachable"
try:
scope, role, minion_id, pillar_path = locate(path)
except SkipPath as e:
return False, str(e)
reason_sql = _pg_str(reason)
parts = ["BEGIN;",
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);"]
if scope == "minion":
# If both <id>.sls and adv_<id>.sls are gone the trigger / CASCADE
# cleans up role_member; otherwise we just remove this one row.
parts.append(
f"DELETE FROM so_pillar.pillar_entry "
f"WHERE scope='minion' AND minion_id={_pg_str(minion_id)} "
f"AND pillar_path={_pg_str(pillar_path)};"
)
parts.append(
f"DELETE FROM so_pillar.minion WHERE minion_id={_pg_str(minion_id)} "
"AND NOT EXISTS (SELECT 1 FROM so_pillar.pillar_entry "
f"WHERE minion_id={_pg_str(minion_id)});"
)
else:
parts.append(
f"DELETE FROM so_pillar.pillar_entry "
f"WHERE scope={_pg_str(scope)} AND pillar_path={_pg_str(pillar_path)};"
)
parts.append("COMMIT;")
try:
_docker_psql("\n".join(parts))
except Exception as e:
return False, f"pg purge failed: {e}"
return True, "ok"
# CLI for diagnostics. Not exercised by so-yaml.py itself.
def _main(argv):
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("op", choices=("locate", "ping"))
ap.add_argument("path", nargs="?")
args = ap.parse_args(argv)
if args.op == "ping":
ok = _is_enabled()
print("ok" if ok else "unreachable")
return 0 if ok else 1
if args.op == "locate":
if not args.path:
ap.error("locate requires PATH")
try:
scope, role, minion_id, pillar_path = locate(args.path)
print(f"scope={scope} role={role} minion_id={minion_id} pillar_path={pillar_path}")
return 0
except SkipPath as e:
print(f"SKIP: {e}", file=sys.stderr)
return 2
return 1
if __name__ == "__main__":
sys.exit(_main(sys.argv[1:]))
+228 -12
View File
@@ -556,14 +556,23 @@ check_transform_health_and_reauthorize() {
# - unhealthy (any non-green health status)
# - metadata has run_as_kibana_system: false (this fix is specific to transforms started prior to Kibana 9.3.3)
# - are not orphaned (integration is not somehow missing/corrupt/uninstalled)
local tmp_transforms tmp_stats tmp_installed
tmp_transforms=$(mktemp)
tmp_stats=$(mktemp)
tmp_installed=$(mktemp)
echo "$transforms_doc" > "$tmp_transforms"
echo "$stats_doc" > "$tmp_stats"
echo "$installed_doc" > "$tmp_installed"
local unhealthy_transforms
unhealthy_transforms=$(jq -c -n \
--argjson t "$transforms_doc" \
--argjson s "$stats_doc" \
--argjson i "$installed_doc" '
($i.items | map({key: .name, value: .version}) | from_entries) as $pkg_ver
| ($s.transforms | map({key: .id, value: .health.status}) | from_entries) as $health
| [ $t.transforms[]
--slurpfile t "$tmp_transforms" \
--slurpfile s "$tmp_stats" \
--slurpfile i "$tmp_installed" '
($i[0].items | map({key: .name, value: .version}) | from_entries) as $pkg_ver
| ($s[0].transforms | map({key: .id, value: .health.status}) | from_entries) as $health
| [ $t[0].transforms[]
| select(._meta.run_as_kibana_system == false)
| select(($health[.id] // "unknown") != "green")
| {id, pkg: ._meta.package.name, ver: ($pkg_ver[._meta.package.name])}
@@ -604,6 +613,8 @@ check_transform_health_and_reauthorize() {
(( total_failures += $(jq 'map(select(.success != true)) | length' <<< "$resp" 2>/dev/null) ))
done <<< "$unhealthy_transforms"
rm -f "$tmp_transforms" "$tmp_stats" "$tmp_installed"
if [[ "$total_failures" -gt 0 ]]; then
echo "Some transform(s) failed to reauthorize."
fi
@@ -644,6 +655,27 @@ ensure_postgres_secret() {
chown socore:socore "$secrets_file"
}
rename_strelka_scan_lnk() {
echo "Renaming strelka pillar ScanLNK to ScanLnk."
local STRELKA_FILE=/opt/so/saltstack/local/pillar/strelka/soc_strelka.sls
local MINIONDIR=/opt/so/saltstack/local/pillar/minions
local OLD_KEY=strelka.backend.config.backend.scanners.ScanLNK
local NEW_KEY=strelka.backend.config.backend.scanners.ScanLnk
local TMP_VALUE_FILE
TMP_VALUE_FILE=$(mktemp)
for pillar_file in "$STRELKA_FILE" "$MINIONDIR"/*.sls; do
[[ -f "$pillar_file" ]] || continue
# Skip if ScanLNK doesn't exist
so-yaml.py get "$pillar_file" "$OLD_KEY" > "$TMP_VALUE_FILE" 2>/dev/null || continue
echo "Found 'ScanLNK' key in $pillar_file. Renaming to 'ScanLnk'."
so-yaml.py add "$pillar_file" "$NEW_KEY" "file:$TMP_VALUE_FILE"
so-yaml.py remove "$pillar_file" "$OLD_KEY"
done
rm -f "$TMP_VALUE_FILE"
}
up_to_3.1.0() {
ensure_postgres_local_pillar
ensure_postgres_secret
@@ -651,7 +683,7 @@ up_to_3.1.0() {
elasticsearch_backup_index_templates
# Clear existing component template state file.
rm -f /opt/so/state/esfleet_component_templates.json
rename_strelka_scan_lnk
INSTALLEDVERSION=3.1.0
}
@@ -939,6 +971,9 @@ verify_es_version_compatibility() {
local is_active_intermediate_upgrade=1
# supported upgrade paths for SO-ES versions
declare -A es_upgrade_map=(
["8.18.4"]="8.18.6 8.18.8 9.0.8"
["8.18.6"]="8.18.8 9.0.8"
["8.18.8"]="9.0.8"
["9.0.8"]="9.3.3"
)
@@ -962,6 +997,171 @@ verify_es_version_compatibility() {
exit 160
fi
compatible_es_versions="$target_es_version"
for current_version in "${!es_upgrade_map[@]}"; do
# shellcheck disable=SC2076
if [[ " ${es_upgrade_map[$current_version]} " =~ " $target_es_version " ]]; then
compatible_es_versions+=" $current_version"
fi
done
# Check if the given ES version can directly upgrade to the target ES version. Used to assist with catching lagging nodes during the upgrade process
es_version_can_upgrade_to_target() {
local current_version="$1"
# shellcheck disable=SC2076
if [[ -n "$current_version" && " $compatible_es_versions " =~ " $current_version " ]]; then
return 0
fi
return 1
}
# Gather Elasticsearch cluster version info and verify that each node in the cluster is running a version compatible with the target ES version.
verify_searchnodes_es_target_compatibility() {
local retries=20
local retry_count=0
local delay=180
local expected_es_nodes searchnode_minions attempt
local searchnode_discovery_success=false
SEARCHNODE_ES_VERSIONS=""
for attempt in {1..3}; do
if searchnode_minions=$(set -o pipefail; salt-key --out=json --list=accepted 2> /dev/null | jq -r '.minions[]? | select(endswith("searchnode"))'); then
searchnode_discovery_success=true
break
fi
echo "Failed to retrieve grid searchnodes via salt-key... Retrying in 30 seconds. Attempt $attempt of 3."
sleep 30
done
if [[ "$searchnode_discovery_success" != "true" ]]; then
echo "Failed to retrieve grid searchnodes via salt-key."
return 1
fi
# Always add node running soup to expected es nodes
expected_es_nodes="${MINIONID%_*}"
while IFS= read -r searchnode_minion; do
[[ -z "$searchnode_minion" ]] && continue
expected_es_nodes+=$'\n'"${searchnode_minion%_searchnode}"
done <<< "$searchnode_minions"
while [[ $retry_count -lt $retries ]]; do
SEARCHNODE_ES_VERSIONS=$(so-elasticsearch-query _nodes/_all/version --retry 5 --retry-delay 10 --fail 2>&1)
local exit_status=$?
if [[ $exit_status -ne 0 ]]; then
echo "Failed to retrieve Elasticsearch versions from searchnodes... Retrying in $delay seconds. Attempt $((retry_count + 1)) of $retries."
((retry_count++))
sleep $delay
continue
fi
local all_searchnodes_compatible=true
while IFS=$'\t' read -r node current_version; do
[[ -z "$node" ]] && continue
if ! es_version_can_upgrade_to_target "$current_version"; then
echo "Searchnode $node is running Elasticsearch $current_version, which is not directly upgradable to Elasticsearch $target_es_version."
all_searchnodes_compatible=false
fi
done < <(echo "$SEARCHNODE_ES_VERSIONS" | jq -r '.nodes | to_entries[] | [.value.name, .value.version] | @tsv')
while IFS= read -r expected_es_node; do
[[ -z "$expected_es_node" ]] && continue
if ! echo "$SEARCHNODE_ES_VERSIONS" | jq -e --arg node "$expected_es_node" '.nodes | to_entries | any(.value.name == $node)' > /dev/null; then
echo "Searchnode $expected_es_node did not report an Elasticsearch version. It may be offline or still upgrading."
all_searchnodes_compatible=false
fi
done <<< "$expected_es_nodes"
if [[ "$all_searchnodes_compatible" == true ]]; then
echo "All Searchnodes are upgradable to Elasticsearch $target_es_version."
return 0
fi
echo "One or more Searchnodes cannot upgrade directly to Elasticsearch $target_es_version. Rechecking in $delay seconds. Attempt $((retry_count + 1)) of $retries."
((retry_count++))
sleep $delay
done
return 1
}
# Gather heavynode version info and verify that each node is running a version compatible with the target ES version.
verify_heavynodes_es_target_compatibility() {
local heavynode_minions attempt
local retries=20
local retry_count=0
local delay=180
local heavynode_discovery_success=false
HEAVYNODE_ES_VERSIONS=""
for attempt in {1..3}; do
if heavynode_minions=$(set -o pipefail; salt-key --out=json --list=accepted 2> /dev/null | jq -r '.minions[]? | select(endswith("heavynode"))'); then
heavynode_discovery_success=true
break
fi
echo "Failed to retrieve grid heavynodes via salt-key... Retrying in 30 seconds. Attempt $attempt of 3."
sleep 30
done
if [[ "$heavynode_discovery_success" != "true" ]]; then
echo "Failed to retrieve grid heavynodes via salt-key."
return 1
fi
if [[ -z "$heavynode_minions" ]]; then
echo "No heavynodes detected. Skipping heavynode Elasticsearch version compatibility check."
return 0
fi
while [[ $retry_count -lt $retries ]]; do
HEAVYNODE_ES_VERSIONS=$(salt -C 'G@role:so-heavynode' cmd.run 'set -o pipefail; so-elasticsearch-query / --retry 5 --retry-delay 10 | jq -er ".version.number"' shell=/bin/bash --out=json 2> /dev/null)
local exit_status=$?
if [[ $exit_status -ne 0 ]]; then
echo "Failed to retrieve Elasticsearch version from one or more heavynodes... Retrying in $delay seconds. Attempt $((retry_count + 1)) of $retries."
((retry_count++))
sleep $delay
continue
fi
local all_heavynodes_compatible=true
while IFS=$'\t' read -r node current_version; do
[[ -z "$node" ]] && continue
if ! es_version_can_upgrade_to_target "$current_version"; then
echo "Heavynode $node is running Elasticsearch $current_version, which is not directly upgradable to Elasticsearch $target_es_version."
all_heavynodes_compatible=false
fi
done < <(echo "$HEAVYNODE_ES_VERSIONS" | jq -r 'to_entries[] | [.key, .value] | @tsv')
while IFS= read -r heavynode_minion; do
[[ -z "$heavynode_minion" ]] && continue
if ! echo "$HEAVYNODE_ES_VERSIONS" | jq -e --arg minion "$heavynode_minion" 'has($minion)' > /dev/null; then
echo "Heavynode $heavynode_minion did not report an Elasticsearch version. It may be offline or still upgrading."
all_heavynodes_compatible=false
fi
done <<< "$heavynode_minions"
if [[ "$all_heavynodes_compatible" == true ]]; then
echo -e "\nAll heavynodes can upgrade to Elasticsearch $target_es_version."
return 0
fi
echo "One or more heavynodes cannot upgrade directly to Elasticsearch $target_es_version. Rechecking in $delay seconds. Attempt $((retry_count + 1)) of $retries."
((retry_count++))
sleep $delay
done
return 1
}
if [[ ! -f "$es_verification_script" ]]; then
create_intermediate_upgrade_verification_script "$es_verification_script"
fi
for statefile in "${es_required_version_statefile_base}"-*; do
[[ -f $statefile ]] || continue
@@ -980,10 +1180,6 @@ verify_es_version_compatibility() {
continue
fi
if [[ ! -f "$es_verification_script" ]]; then
create_intermediate_upgrade_verification_script "$es_verification_script"
fi
echo -e "\n##############################################################################################################################\n"
echo "A previously required intermediate Elasticsearch upgrade was detected. Verifying that all Searchnodes/Heavynodes have successfully upgraded Elasticsearch to $es_required_version_statefile_value before proceeding with soup to avoid potential data loss! This command can take up to an hour to complete."
if ! timeout --foreground 4000 bash "$es_verification_script" "$es_required_version_statefile_value" "$statefile"; then
@@ -1005,6 +1201,26 @@ verify_es_version_compatibility() {
# shellcheck disable=SC2076 # Do not want a regex here eg usage " 8.18.8 9.0.8 " =~ " 9.0.8 "
if [[ " ${es_upgrade_map[$es_version]} " =~ " $target_es_version " || "$es_version" == "$target_es_version" ]]; then
if ! verify_searchnodes_es_target_compatibility || ! verify_heavynodes_es_target_compatibility; then
echo -e "\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
echo "One or more Searchnode(s)/Heavynode(s) cannot upgrade directly to Elasticsearch $target_es_version. This can happen with soups that include Elasticsearch upgrades being run in quick succession. Typically, this will resolve itself as the grid synchronizes. Please allow time for all Searchnodes/Heavynodes to have upgraded Elasticsearch to a compatible version with $target_es_version before running soup again to avoid potential data loss!"
if [[ -n "$HEAVYNODE_ES_VERSIONS" ]]; then
echo "Current heavynode Elasticsearch versions:"
echo "$HEAVYNODE_ES_VERSIONS" | jq '.'
fi
if [[ -n "$SEARCHNODE_ES_VERSIONS" ]]; then
echo "Current searchnode Elasticsearch versions:"
echo "$SEARCHNODE_ES_VERSIONS" | jq '.nodes | to_entries | map({(.value.name): .value.version}) | sort | add'
fi
echo -e "\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
exit 161
fi
# supported upgrade
return 0
else
@@ -1362,7 +1578,7 @@ main() {
echo "Verifying we have the latest soup script."
verify_latest_update_script
echo "Verifying Elasticsearch version compatibility before upgrading."
echo "Verifying Elasticsearch version compatibility across the grid before upgrading."
verify_es_version_compatibility
echo "Let's see if we need to update Security Onion."
+2
View File
@@ -225,6 +225,7 @@ http {
limit_req zone=auth_throttle burst={{ NGINXMERGED.config.throttle_login_burst }} nodelay;
limit_req_status 429;
proxy_pass http://{{ GLOBALS.manager }}:4433;
proxy_set_header Connection "Close";
proxy_read_timeout 90;
proxy_connect_timeout 90;
proxy_set_header Host $host;
@@ -237,6 +238,7 @@ http {
location ~ ^/auth/.*?(whoami|logout|settings|errors|webauthn.js) {
rewrite /auth/(.*) /$1 break;
proxy_pass http://{{ GLOBALS.manager }}:4433;
proxy_set_header Connection "Close";
proxy_read_timeout 90;
proxy_connect_timeout 90;
proxy_set_header Host $host;
+10 -1
View File
@@ -3,7 +3,14 @@
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% set hypervisor = pillar.minion_id %}
{% set hypervisor = pillar.get('minion_id', '') %}
{% if not hypervisor|regex_match('^([A-Za-z0-9._-]{1,253})$') %}
{% do salt.log.error('delete_hypervisor_orch: refusing unsafe minion_id=' ~ hypervisor) %}
delete_hypervisor_invalid_minion_id:
test.fail_without_changes:
- name: delete_hypervisor_invalid_minion_id
{% else %}
ensure_hypervisor_mine_deleted:
salt.function:
@@ -20,3 +27,5 @@ update_salt_cloud_profile:
- sls:
- salt.cloud.config
- concurrent: True
{% endif %}
-112
View File
@@ -1,112 +0,0 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Driven by the so_pillar_changed reactor. Translates a so_pillar.pillar_entry
# change into (cache.clear_pillar -> saltutil.refresh_pillar -> state.apply)
# on the appropriate target.
#
# Routing rules live in the DISPATCH map below — one entry per
# (pillar_path prefix) -> (state sls, role grain). Add new services here
# rather than wiring more reactors.
#
# Idempotent: state.apply is idempotent; if the pillar value didn't actually
# change anything observable, the affected state runs a no-op. Bulk imports
# and replays are safe.
{% set change = salt['pillar.get']('so_pillar_change', {}) %}
{% set scope = change.get('scope') %}
{% set role = change.get('role_name') %}
{% set minion = change.get('minion_id') %}
{% set changes = change.get('changes', []) %}
{# (pillar_path prefix) -> {sls: <state to apply>, role: <role grain that runs it>}
role is a grain value (e.g. 'so-sensor'), used to compute compound targets
when the change is global or role-scoped. #}
{% set DISPATCH = {
'suricata.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'sensor.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'zeek.': {'sls': 'zeek.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'stenographer.': {'sls': 'stenographer.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'pcap.': {'sls': 'pcap.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'logstash.': {'sls': 'logstash.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver']},
'redis.': {'sls': 'redis.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
'kafka.': {'sls': 'kafka.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver', 'so-searchnode']},
'elasticsearch.': {'sls': 'elasticsearch.config','roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-searchnode', 'so-heavynode', 'so-standalone']},
'kibana.': {'sls': 'kibana.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
'soc.': {'sls': 'soc.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
'telegraf.': {'sls': 'telegraf.config', 'roles': ['*']},
'fleet.': {'sls': 'fleet.config', 'roles': ['so-fleet']},
'strelka.': {'sls': 'strelka.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
} %}
{# Collect a deduplicated set of (sls, target_kind) actions. target_kind is
either 'minion:<id>' (scope=minion) or 'roles:so-x,so-y' (scope=role/global). #}
{% set actions = {} %}
{% for c in changes %}
{% set path = c.get('pillar_path', '') %}
{% for prefix, action in DISPATCH.items() %}
{% if path.startswith(prefix) %}
{% set sls = action['sls'] %}
{% if scope == 'minion' and minion %}
{% set key = sls ~ '|minion|' ~ minion %}
{% set _ = actions.update({key: {'sls': sls, 'tgt': minion, 'tgt_type': 'glob'}}) %}
{% else %}
{% set role_targets = action['roles'] %}
{% if '*' in role_targets %}
{% set tgt = '*' %}
{% set tgt_type = 'glob' %}
{% else %}
{% set tgt = ('I@role:' ~ role_targets|join(' or I@role:')) %}
{% set tgt_type = 'compound' %}
{% endif %}
{% set key = sls ~ '|' ~ tgt %}
{% set _ = actions.update({key: {'sls': sls, 'tgt': tgt, 'tgt_type': tgt_type}}) %}
{% endif %}
{% endif %}
{% endfor %}
{% endfor %}
{% if actions %}
{% for key, action in actions.items() %}
{% set safe_id = loop.index0 | string %}
so_pillar_reload_clear_cache_{{ safe_id }}:
salt.runner:
- name: cache.clear_pillar
- tgt: '{{ action.tgt }}'
- tgt_type: '{{ action.tgt_type }}'
so_pillar_reload_refresh_pillar_{{ safe_id }}:
salt.function:
- name: saltutil.refresh_pillar
- tgt: '{{ action.tgt }}'
- tgt_type: '{{ action.tgt_type }}'
- kwarg:
wait: True
- require:
- salt: so_pillar_reload_clear_cache_{{ safe_id }}
so_pillar_reload_apply_state_{{ safe_id }}:
salt.state:
- tgt: '{{ action.tgt }}'
- tgt_type: '{{ action.tgt_type }}'
- sls:
- {{ action.sls }}
- queue: True
- require:
- salt: so_pillar_reload_refresh_pillar_{{ safe_id }}
{% endfor %}
{% else %}
{# No DISPATCH entry matched. Pillar still gets refreshed so any other states
read fresh values, but no service-specific reload is invoked. #}
so_pillar_reload_unmapped_path_noop:
test.nop
{% do salt.log.info('orch.so_pillar_reload: no dispatch match for %s' % changes) %}
{% endif %}
+10 -1
View File
@@ -12,7 +12,14 @@
{% if 'vrt' in salt['pillar.get']('features', []) %}
{% do salt.log.debug('vm_pillar_clean_orch: Running') %}
{% set vm_name = pillar.get('vm_name') %}
{% set vm_name = pillar.get('vm_name', '') %}
{% if not vm_name|regex_match('^([A-Za-z0-9._-]{1,253})$') %}
{% do salt.log.error('vm_pillar_clean_orch: refusing unsafe vm_name=' ~ vm_name) %}
vm_pillar_clean_invalid_name:
test.fail_without_changes:
- name: vm_pillar_clean_invalid_name
{% else %}
delete_adv_{{ vm_name }}_pillar:
module.run:
@@ -24,6 +31,8 @@ delete_{{ vm_name }}_pillar:
- file.remove:
- path: /opt/so/saltstack/local/pillar/minions/{{ vm_name }}.sls
{% endif %}
{% else %}
{% do salt.log.error(
+3 -3
View File
@@ -46,10 +46,10 @@ postgresinitdir:
- require:
- file: postgresconfdir
postgresinitusers:
postgresinitdb:
file.managed:
- name: /opt/so/conf/postgres/init/init-users.sh
- source: salt://postgres/files/init-users.sh
- name: /opt/so/conf/postgres/init/init-db.sh
- source: salt://postgres/files/init-db.sh
- user: 939
- group: 939
- mode: 755
+4 -4
View File
@@ -31,7 +31,7 @@ so-postgres:
- POSTGRES_DB=securityonion
# Passwords are delivered via mounted 0600 secret files, not plaintext env vars.
# The upstream postgres image resolves POSTGRES_PASSWORD_FILE; entrypoint.sh and
# init-users.sh resolve SO_POSTGRES_PASS_FILE the same way.
# init-db.sh resolve SO_POSTGRES_PASS_FILE the same way.
- POSTGRES_PASSWORD_FILE=/run/secrets/postgres_password
- SO_POSTGRES_USER={{ SO_POSTGRES_USER }}
- SO_POSTGRES_PASS_FILE=/run/secrets/so_postgres_pass
@@ -46,7 +46,7 @@ so-postgres:
- /opt/so/conf/postgres/postgresql.conf:/conf/postgresql.conf:ro
- /opt/so/conf/postgres/pg_hba.conf:/conf/pg_hba.conf:ro
- /opt/so/conf/postgres/secrets:/run/secrets:ro
- /opt/so/conf/postgres/init/init-users.sh:/docker-entrypoint-initdb.d/init-users.sh:ro
- /opt/so/conf/postgres/init/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh:ro
- /etc/pki/postgres.crt:/conf/postgres.crt:ro
- /etc/pki/postgres.key:/conf/postgres.key:ro
- /etc/pki/tls/certs/intca.crt:/conf/ca.crt:ro
@@ -70,7 +70,7 @@ so-postgres:
- watch:
- file: postgresconf
- file: postgreshba
- file: postgresinitusers
- file: postgresinitdb
- file: postgres_super_secret
- file: postgres_app_secret
- x509: postgres_crt
@@ -78,7 +78,7 @@ so-postgres:
- require:
- file: postgresconf
- file: postgreshba
- file: postgresinitusers
- file: postgresinitdb
- file: postgres_super_secret
- file: postgres_app_secret
- x509: postgres_crt
@@ -1,124 +0,0 @@
-- so_pillar schema: queryable, versioned, audited pillar config store.
-- Replaces flat-file Salt pillar consumed via salt.pillar.postgres ext_pillar.
-- Idempotent. Run via salt/postgres/schema_pillar.sls inside the so-postgres container.
CREATE SCHEMA IF NOT EXISTS so_pillar;
CREATE TABLE IF NOT EXISTS so_pillar.scope (
scope_kind text PRIMARY KEY,
precedence int NOT NULL,
description text
);
INSERT INTO so_pillar.scope(scope_kind, precedence, description) VALUES
('global', 100, 'Applies to every minion'),
('role', 200, 'Applies to minions whose minion_id matches a top.sls compound role match'),
('minion', 300, 'Applies only to a single minion (per-minion overlay)')
ON CONFLICT (scope_kind) DO NOTHING;
CREATE TABLE IF NOT EXISTS so_pillar.role (
role_name text PRIMARY KEY,
match_kind text NOT NULL CHECK (match_kind IN ('compound','grain','glob','list')),
match_expr text NOT NULL,
description text
);
CREATE TABLE IF NOT EXISTS so_pillar.minion (
minion_id text PRIMARY KEY,
node_type text,
hostname text,
extra_roles text[] NOT NULL DEFAULT '{}',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS so_pillar.role_member (
role_name text NOT NULL REFERENCES so_pillar.role(role_name) ON DELETE CASCADE,
minion_id text NOT NULL REFERENCES so_pillar.minion(minion_id) ON DELETE CASCADE,
source text NOT NULL DEFAULT 'computed' CHECK (source IN ('computed','manual','imported')),
PRIMARY KEY (role_name, minion_id)
);
CREATE INDEX IF NOT EXISTS ix_role_member_minion ON so_pillar.role_member(minion_id);
-- pillar_entry holds the actual data. as_json=True ext_pillar reads `data` directly.
CREATE TABLE IF NOT EXISTS so_pillar.pillar_entry (
id bigserial PRIMARY KEY,
scope text NOT NULL REFERENCES so_pillar.scope(scope_kind),
role_name text REFERENCES so_pillar.role(role_name) ON DELETE CASCADE,
minion_id text REFERENCES so_pillar.minion(minion_id) ON DELETE CASCADE,
pillar_path text NOT NULL,
data jsonb NOT NULL,
is_secret boolean NOT NULL DEFAULT false,
sort_key int NOT NULL DEFAULT 0,
version int NOT NULL DEFAULT 1,
updated_at timestamptz NOT NULL DEFAULT now(),
updated_by text NOT NULL DEFAULT current_user,
change_reason text,
CONSTRAINT pillar_entry_scope_target CHECK (
(scope='global' AND role_name IS NULL AND minion_id IS NULL)
OR (scope='role' AND role_name IS NOT NULL AND minion_id IS NULL)
OR (scope='minion' AND role_name IS NULL AND minion_id IS NOT NULL)
),
-- Reserved namespaces that MUST stay rendered from SLS (mine-driven). Nothing
-- under these prefixes is allowed in the database; the merge logic relies on
-- ext_pillar leaving these subtrees alone.
CONSTRAINT pillar_entry_reserved_paths CHECK (
pillar_path NOT LIKE 'elasticsearch.nodes%'
AND pillar_path NOT LIKE 'redis.nodes%'
AND pillar_path NOT LIKE 'kafka.nodes%'
AND pillar_path NOT LIKE 'hypervisor.nodes%'
AND pillar_path NOT LIKE 'logstash.nodes%'
AND pillar_path NOT LIKE 'node_data.ips%'
)
);
CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_global ON so_pillar.pillar_entry(pillar_path)
WHERE scope = 'global';
CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_role ON so_pillar.pillar_entry(role_name, pillar_path)
WHERE scope = 'role';
CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_minion ON so_pillar.pillar_entry(minion_id, pillar_path)
WHERE scope = 'minion';
CREATE INDEX IF NOT EXISTS ix_pillar_entry_minion_hot ON so_pillar.pillar_entry(minion_id)
WHERE scope = 'minion';
CREATE INDEX IF NOT EXISTS ix_pillar_entry_role_hot ON so_pillar.pillar_entry(role_name)
WHERE scope = 'role';
-- Append-only audit log for every change to pillar_entry. No FK to entry so DELETE
-- history survives the row removal.
CREATE TABLE IF NOT EXISTS so_pillar.pillar_entry_history (
history_id bigserial PRIMARY KEY,
entry_id bigint,
op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')),
scope text NOT NULL,
role_name text,
minion_id text,
pillar_path text NOT NULL,
old_data jsonb,
new_data jsonb,
is_secret boolean,
version int,
changed_at timestamptz NOT NULL DEFAULT now(),
changed_by text NOT NULL DEFAULT current_user,
change_reason text
);
CREATE INDEX IF NOT EXISTS ix_pillar_history_entry ON so_pillar.pillar_entry_history(entry_id, changed_at DESC);
CREATE INDEX IF NOT EXISTS ix_pillar_history_minion ON so_pillar.pillar_entry_history(minion_id, changed_at DESC);
CREATE INDEX IF NOT EXISTS ix_pillar_history_role ON so_pillar.pillar_entry_history(role_name, changed_at DESC);
-- Drift watch — populated by a pg_cron job that re-renders the on-disk SLS files
-- and compares them to pillar_entry. Cleared once cutover completes.
CREATE TABLE IF NOT EXISTS so_pillar.drift_log (
id bigserial PRIMARY KEY,
scope text NOT NULL,
role_name text,
minion_id text,
pillar_path text NOT NULL,
disk_data jsonb,
db_data jsonb,
detected_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_drift_log_detected ON so_pillar.drift_log(detected_at DESC);
@@ -1,49 +0,0 @@
-- Views consumed by the Salt master's salt.pillar.postgres ext_pillar with
-- as_json=True. Each view exposes data ordered by (sort_key, pillar_path) so
-- the deep-merge in ext_pillar resolves precedence deterministically.
--
-- ext_pillar always binds exactly one parameter to the query: (minion_id,).
-- Master-config queries reference these views and add WHERE clauses, e.g.:
-- SELECT data FROM so_pillar.v_pillar_role WHERE minion_id = %s
-- SELECT data FROM so_pillar.v_pillar_minion WHERE minion_id = %s
-- For v_pillar_global the binding is satisfied with `WHERE %s IS NOT NULL`.
CREATE OR REPLACE VIEW so_pillar.v_pillar_global AS
SELECT pillar_path, sort_key, data
FROM so_pillar.pillar_entry
WHERE scope = 'global'
AND is_secret = false
ORDER BY sort_key, pillar_path;
-- Role view exposes minion_id so the master-config WHERE clause can filter to
-- the rows that apply to the requesting minion. JOIN to role_member fans out
-- one row per (role assignment, pillar entry) tuple.
CREATE OR REPLACE VIEW so_pillar.v_pillar_role AS
SELECT rm.minion_id,
pe.role_name,
pe.pillar_path,
pe.sort_key,
pe.data
FROM so_pillar.pillar_entry pe
JOIN so_pillar.role_member rm ON rm.role_name = pe.role_name
WHERE pe.scope = 'role'
AND pe.is_secret = false;
CREATE OR REPLACE VIEW so_pillar.v_pillar_minion AS
SELECT minion_id,
pillar_path,
sort_key,
data
FROM so_pillar.pillar_entry
WHERE scope = 'minion'
AND is_secret = false;
-- v_pillar_secrets is filled in by 004_secrets.sql once pgcrypto is available;
-- placeholder here returns no rows so initial schema deploy succeeds even on a
-- container that has not yet loaded pgcrypto.
CREATE OR REPLACE VIEW so_pillar.v_pillar_secrets AS
SELECT NULL::text AS minion_id,
NULL::text AS pillar_path,
NULL::int AS sort_key,
'{}'::jsonb AS data
WHERE false;
@@ -1,120 +0,0 @@
-- Audit trigger: every INSERT/UPDATE/DELETE on so_pillar.pillar_entry writes a
-- row to pillar_entry_history. Captures the actor (current_user), reason
-- (passed via SET LOCAL so_pillar.change_reason), and full before/after data.
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_audit() RETURNS trigger
LANGUAGE plpgsql AS $fn$
DECLARE
v_reason text := current_setting('so_pillar.change_reason', true);
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO so_pillar.pillar_entry_history(
entry_id, op, scope, role_name, minion_id, pillar_path,
old_data, new_data, is_secret, version, changed_by, change_reason)
VALUES (NEW.id, 'INSERT', NEW.scope, NEW.role_name, NEW.minion_id, NEW.pillar_path,
NULL, NEW.data, NEW.is_secret, NEW.version, NEW.updated_by, v_reason);
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
IF OLD.data IS DISTINCT FROM NEW.data
OR OLD.is_secret IS DISTINCT FROM NEW.is_secret THEN
INSERT INTO so_pillar.pillar_entry_history(
entry_id, op, scope, role_name, minion_id, pillar_path,
old_data, new_data, is_secret, version, changed_by, change_reason)
VALUES (NEW.id, 'UPDATE', NEW.scope, NEW.role_name, NEW.minion_id, NEW.pillar_path,
OLD.data, NEW.data, NEW.is_secret, NEW.version, NEW.updated_by, v_reason);
END IF;
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO so_pillar.pillar_entry_history(
entry_id, op, scope, role_name, minion_id, pillar_path,
old_data, new_data, is_secret, version, changed_by, change_reason)
VALUES (OLD.id, 'DELETE', OLD.scope, OLD.role_name, OLD.minion_id, OLD.pillar_path,
OLD.data, NULL, OLD.is_secret, OLD.version, current_user, v_reason);
RETURN OLD;
END IF;
RETURN NULL;
END
$fn$;
DROP TRIGGER IF EXISTS pillar_entry_audit ON so_pillar.pillar_entry;
CREATE TRIGGER pillar_entry_audit
AFTER INSERT OR UPDATE OR DELETE ON so_pillar.pillar_entry
FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_pillar_entry_audit();
-- updated_at + version maintenance: bump version on every UPDATE that changes data.
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_versioning() RETURNS trigger
LANGUAGE plpgsql AS $fn$
BEGIN
IF (TG_OP = 'UPDATE') THEN
IF OLD.data IS DISTINCT FROM NEW.data
OR OLD.is_secret IS DISTINCT FROM NEW.is_secret THEN
NEW.version := OLD.version + 1;
NEW.updated_at := now();
ELSE
NEW.version := OLD.version;
NEW.updated_at := OLD.updated_at;
END IF;
END IF;
RETURN NEW;
END
$fn$;
DROP TRIGGER IF EXISTS pillar_entry_versioning ON so_pillar.pillar_entry;
CREATE TRIGGER pillar_entry_versioning
BEFORE UPDATE ON so_pillar.pillar_entry
FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_pillar_entry_versioning();
-- Recompute role_member rows for a minion based on node_type.
-- Compound matchers in pillar/top.sls are pure suffix patterns of the form
-- '*_<rolename>' plus the special multi-role 'manager/managersearch/managerhype'
-- bucket. node_type is split on common dashes/underscores; any token that
-- matches a known role_name produces a role_member row.
CREATE OR REPLACE FUNCTION so_pillar.fn_recompute_role_members(p_minion_id text)
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE
v_node_type text;
v_extra text[];
v_role text;
BEGIN
SELECT node_type, extra_roles INTO v_node_type, v_extra
FROM so_pillar.minion WHERE minion_id = p_minion_id;
IF v_node_type IS NULL THEN
RETURN;
END IF;
DELETE FROM so_pillar.role_member
WHERE minion_id = p_minion_id AND source = 'computed';
-- Main role from node_type.
IF EXISTS (SELECT 1 FROM so_pillar.role WHERE role_name = lower(v_node_type)) THEN
INSERT INTO so_pillar.role_member(role_name, minion_id, source)
VALUES (lower(v_node_type), p_minion_id, 'computed')
ON CONFLICT DO NOTHING;
END IF;
-- Extra roles supplied by the importer / reactor for compound matchers
-- that need to apply multiple buckets (e.g. managersearch also gets the
-- 'manager' bucket per top.sls line 36 grouping).
FOREACH v_role IN ARRAY COALESCE(v_extra, '{}'::text[]) LOOP
IF EXISTS (SELECT 1 FROM so_pillar.role WHERE role_name = v_role) THEN
INSERT INTO so_pillar.role_member(role_name, minion_id, source)
VALUES (v_role, p_minion_id, 'computed')
ON CONFLICT DO NOTHING;
END IF;
END LOOP;
END
$fn$;
CREATE OR REPLACE FUNCTION so_pillar.fn_minion_after_change() RETURNS trigger
LANGUAGE plpgsql AS $fn$
BEGIN
PERFORM so_pillar.fn_recompute_role_members(COALESCE(NEW.minion_id, OLD.minion_id));
RETURN COALESCE(NEW, OLD);
END
$fn$;
DROP TRIGGER IF EXISTS minion_role_sync ON so_pillar.minion;
CREATE TRIGGER minion_role_sync
AFTER INSERT OR UPDATE OF node_type, extra_roles ON so_pillar.minion
FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_minion_after_change();
@@ -1,130 +0,0 @@
-- pgcrypto-backed secret storage for pillar_entry rows where is_secret = true.
-- The plaintext value is encrypted with a symmetric key held in a server-side
-- GUC (so_pillar.master_key) which is set per-role via ALTER ROLE so the key
-- never touches a flat file readable by Salt itself.
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public;
-- Encrypt a JSONB value using the configured master key. Stored as a JSONB
-- envelope {"_enc": "<armored ciphertext>"} so the same column type is reused.
CREATE OR REPLACE FUNCTION so_pillar.fn_encrypt_jsonb(p_value jsonb)
RETURNS jsonb LANGUAGE plpgsql AS $fn$
DECLARE
v_key text := current_setting('so_pillar.master_key', true);
BEGIN
IF v_key IS NULL OR v_key = '' THEN
RAISE EXCEPTION 'so_pillar.master_key GUC not configured';
END IF;
RETURN jsonb_build_object(
'_enc',
encode(pgp_sym_encrypt(p_value::text, v_key), 'base64')
);
END
$fn$;
-- Decrypt the envelope produced by fn_encrypt_jsonb. SECURITY DEFINER so callers
-- with no direct access to pgcrypto/master_key can still pull plaintext via the
-- v_pillar_secrets view.
CREATE OR REPLACE FUNCTION so_pillar.fn_decrypt_jsonb(p_envelope jsonb)
RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER AS $fn$
DECLARE
v_key text := current_setting('so_pillar.master_key', true);
v_ct text;
BEGIN
IF v_key IS NULL OR v_key = '' THEN
RAISE EXCEPTION 'so_pillar.master_key GUC not configured';
END IF;
v_ct := p_envelope->>'_enc';
IF v_ct IS NULL THEN
RETURN p_envelope; -- not encrypted; pass through
END IF;
RETURN pgp_sym_decrypt(decode(v_ct, 'base64'), v_key)::jsonb;
END
$fn$;
REVOKE ALL ON FUNCTION so_pillar.fn_decrypt_jsonb(jsonb) FROM PUBLIC;
-- Secrets view consumed by ext_pillar. Decrypts at the boundary so Salt sees
-- plaintext JSONB. Filters the rows to those that apply to the requesting
-- minion via current_setting, since views can't take parameters and ext_pillar
-- can only bind one parameter per query.
--
-- Master-config query: SELECT data FROM so_pillar.v_pillar_secrets WHERE %s IS NOT NULL
-- The %s satisfies the bound parameter; the view itself reads the minion_id
-- from a session GUC set by a small wrapper function (see fn_pillar_secrets).
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_secrets(p_minion_id text)
RETURNS TABLE(data jsonb)
LANGUAGE sql STABLE SECURITY DEFINER AS $fn$
SELECT so_pillar.fn_decrypt_jsonb(pe.data)
FROM so_pillar.pillar_entry pe
WHERE pe.is_secret = true
AND ( pe.scope = 'global'
OR (pe.scope = 'role'
AND pe.role_name IN (
SELECT role_name FROM so_pillar.role_member
WHERE minion_id = p_minion_id))
OR (pe.scope = 'minion' AND pe.minion_id = p_minion_id))
ORDER BY pe.sort_key, pe.pillar_path;
$fn$;
-- Replace the placeholder view from 002 with a parameterised version. Master
-- config query becomes:
-- SELECT data FROM so_pillar.fn_pillar_secrets(%s) AS s
DROP VIEW IF EXISTS so_pillar.v_pillar_secrets;
CREATE OR REPLACE VIEW so_pillar.v_pillar_secrets AS
SELECT NULL::text AS minion_id,
NULL::text AS pillar_path,
NULL::int AS sort_key,
'{}'::jsonb AS data
WHERE false;
COMMENT ON VIEW so_pillar.v_pillar_secrets IS
'Deprecated placeholder; use SELECT data FROM so_pillar.fn_pillar_secrets(minion_id) instead';
-- Convenience helper for so-yaml.py and the importer to set a secret without
-- ever exposing the master_key to the caller. SECURITY DEFINER means the
-- caller does not need read access to so_pillar.master_key.
CREATE OR REPLACE FUNCTION so_pillar.fn_set_secret(
p_scope text,
p_role_name text,
p_minion_id text,
p_pillar_path text,
p_value jsonb,
p_change_reason text DEFAULT NULL
) RETURNS bigint LANGUAGE plpgsql SECURITY DEFINER AS $fn$
DECLARE
v_envelope jsonb := so_pillar.fn_encrypt_jsonb(p_value);
v_id bigint;
BEGIN
PERFORM set_config('so_pillar.change_reason',
COALESCE(p_change_reason, 'fn_set_secret'),
true);
INSERT INTO so_pillar.pillar_entry(
scope, role_name, minion_id, pillar_path, data, is_secret, change_reason)
VALUES (p_scope, p_role_name, p_minion_id, p_pillar_path, v_envelope, true, p_change_reason)
ON CONFLICT (pillar_path) WHERE scope='global' DO UPDATE
SET data = EXCLUDED.data, is_secret = true, change_reason = EXCLUDED.change_reason
RETURNING id INTO v_id;
IF v_id IS NULL THEN
UPDATE so_pillar.pillar_entry
SET data = v_envelope, is_secret = true, change_reason = p_change_reason
WHERE scope = p_scope
AND COALESCE(role_name,'') = COALESCE(p_role_name,'')
AND COALESCE(minion_id,'') = COALESCE(p_minion_id,'')
AND pillar_path = p_pillar_path
RETURNING id INTO v_id;
IF v_id IS NULL THEN
INSERT INTO so_pillar.pillar_entry(
scope, role_name, minion_id, pillar_path, data, is_secret, change_reason)
VALUES (p_scope, p_role_name, p_minion_id, p_pillar_path, v_envelope, true, p_change_reason)
RETURNING id INTO v_id;
END IF;
END IF;
RETURN v_id;
END
$fn$;
REVOKE ALL ON FUNCTION so_pillar.fn_set_secret(text,text,text,text,jsonb,text) FROM PUBLIC;
@@ -1,39 +0,0 @@
-- Seed the so_pillar.role table with the role buckets defined in pillar/top.sls.
-- The match_expr column preserves the original Salt compound expression purely
-- as documentation; PG-side membership is materialised in role_member.
-- Idempotent: ON CONFLICT lets re-application leave existing rows untouched.
INSERT INTO so_pillar.role(role_name, match_kind, match_expr, description) VALUES
('manager', 'compound', '*_manager or *_managersearch or *_managerhype',
'Manager-class node. Includes managersearch and managerhype subtypes.'),
('managersearch', 'compound', '*_managersearch',
'Combined manager + searchnode role.'),
('managerhype', 'compound', '*_managerhype',
'Combined manager + hypervisor role.'),
('sensor', 'compound', '*_sensor',
'Sensor node running zeek/suricata/strelka.'),
('eval', 'compound', '*_eval',
'Single-node evaluation install (manager + sensor + storage on one host).'),
('standalone', 'compound', '*_standalone',
'Single-node production install (no distributed cluster).'),
('heavynode', 'compound', '*_heavynode',
'Distributed manager node carrying logstash + ES.'),
('idh', 'compound', '*_idh',
'Intrusion-detection-honeypot node.'),
('searchnode', 'compound', '*_searchnode',
'Distributed Elasticsearch search node.'),
('receiver', 'compound', '*_receiver',
'Kafka receiver node.'),
('import', 'compound', '*_import',
'Single-node import-only install.'),
('fleet', 'compound', '*_fleet',
'Elastic Fleet server node.'),
('hypervisor', 'compound', '*_hypervisor',
'Hypervisor host (libvirt). Hosts VM minions.'),
('desktop', 'compound', '*_desktop',
'Desktop minion (no firewall/nginx pillars apply).'),
('not_desktop', 'compound', '* and not *_desktop',
'Pseudo-role; matches every minion that is not a desktop. Used for global firewall/nginx.'),
('libvirt', 'grain', 'salt-cloud:driver:libvirt',
'Pseudo-role; matches any minion with grain salt-cloud.driver = libvirt.')
ON CONFLICT (role_name) DO NOTHING;
@@ -1,107 +0,0 @@
-- Roles + Row-Level Security policies for the so_pillar schema.
-- Three roles:
-- so_pillar_master — connected by salt-master ext_pillar. Read-only.
-- RLS forces it to skip is_secret rows; reads
-- encrypted secrets only via fn_pillar_secrets().
-- so_pillar_writer — connected by so-yaml dual-write and the SOC
-- PostgresConfigstore. Read+write on pillar_entry,
-- minion, role_member.
-- so_pillar_secret_owner — owns the master encryption key GUC; sole role
-- allowed to call fn_set_secret directly. Other
-- writers reach this function only via grants.
--
-- The existing app role so_postgres_user (created by init-users.sh) is granted
-- INTO so_pillar_writer so SOC keeps using its existing connection but inherits
-- pillar-write capability.
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_master') THEN
CREATE ROLE so_pillar_master NOLOGIN;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_writer') THEN
CREATE ROLE so_pillar_writer NOLOGIN;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_secret_owner') THEN
CREATE ROLE so_pillar_secret_owner NOLOGIN;
END IF;
END
$$;
-- USAGE on the schema is the bare minimum needed to reference its tables.
-- CONNECT on the database is needed before the role can establish a session
-- at all (default privileges on a new DB grant CONNECT to PUBLIC, but if the
-- securityonion database is restricted that grant has to be explicit).
-- Password + LOGIN privileges are set later in schema_pillar.sls because
-- the password lives in pillar (secrets:pillar_master_pass) and plain SQL
-- can't substitute pillar values.
GRANT CONNECT ON DATABASE securityonion TO so_pillar_master, so_pillar_writer, so_pillar_secret_owner;
GRANT USAGE ON SCHEMA so_pillar TO so_pillar_master, so_pillar_writer, so_pillar_secret_owner;
-- Read access for ext_pillar through the views only.
GRANT SELECT ON so_pillar.v_pillar_global,
so_pillar.v_pillar_role,
so_pillar.v_pillar_minion
TO so_pillar_master;
GRANT EXECUTE ON FUNCTION so_pillar.fn_pillar_secrets(text) TO so_pillar_master;
-- (change_queue grants live in 008_change_notify.sql alongside the table itself,
-- since the table doesn't exist until 008 runs.)
-- Writer needs CRUD on pillar_entry/minion/role_member plus access to seed tables.
GRANT SELECT, INSERT, UPDATE, DELETE
ON so_pillar.pillar_entry,
so_pillar.minion,
so_pillar.role_member
TO so_pillar_writer;
GRANT SELECT ON so_pillar.role, so_pillar.scope TO so_pillar_writer;
GRANT SELECT, INSERT, UPDATE, DELETE ON so_pillar.drift_log TO so_pillar_writer;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA so_pillar TO so_pillar_writer;
GRANT SELECT ON so_pillar.pillar_entry_history TO so_pillar_writer;
-- Secret owner can call fn_set_secret directly; writer goes through it via the
-- function's SECURITY DEFINER attribute, which executes as the function owner.
GRANT EXECUTE ON FUNCTION so_pillar.fn_set_secret(text,text,text,text,jsonb,text)
TO so_pillar_writer, so_pillar_secret_owner;
-- so_postgres_user (SOC's existing app user, created by init-users.sh) inherits
-- writer privilege so the PostgresConfigstore in SOC can mutate pillars without
-- a second connection pool. Inheritance is per-PG default (NOINHERIT must be
-- explicit), so this just works.
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = current_setting('so_pillar.app_role', true))
THEN
EXECUTE format('GRANT so_pillar_writer TO %I',
current_setting('so_pillar.app_role', true));
ELSIF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_postgres_user') THEN
GRANT so_pillar_writer TO so_postgres_user;
END IF;
END
$$;
-- RLS on pillar_entry: master sees only non-secret rows. Writer sees all
-- (it must, to UPDATE secret rows when so-yaml replaces them). Secret rows
-- still require fn_decrypt_jsonb to read plaintext.
ALTER TABLE so_pillar.pillar_entry ENABLE ROW LEVEL SECURITY;
ALTER TABLE so_pillar.pillar_entry FORCE ROW LEVEL SECURITY;
DROP POLICY IF EXISTS pillar_entry_master_read ON so_pillar.pillar_entry;
DROP POLICY IF EXISTS pillar_entry_writer_all ON so_pillar.pillar_entry;
DROP POLICY IF EXISTS pillar_entry_owner_all ON so_pillar.pillar_entry;
CREATE POLICY pillar_entry_master_read ON so_pillar.pillar_entry
FOR SELECT TO so_pillar_master
USING (NOT is_secret);
CREATE POLICY pillar_entry_writer_all ON so_pillar.pillar_entry
FOR ALL TO so_pillar_writer
USING (true)
WITH CHECK (true);
CREATE POLICY pillar_entry_owner_all ON so_pillar.pillar_entry
FOR ALL TO so_pillar_secret_owner
USING (true)
WITH CHECK (true);
-- minion / role_member do not need RLS — they hold no secrets.
@@ -1,43 +0,0 @@
-- Drift detection + retention via pg_cron. Optional — the schema_pillar.sls
-- state guards this file behind the postgres:so_pillar:drift_check_enabled
-- pillar flag because pg_cron may not be loaded on every install.
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Retention: trim pillar_entry_history older than a year. Adjustable via the
-- so_pillar.history_retention_days GUC (default 365 if unset).
CREATE OR REPLACE FUNCTION so_pillar.fn_history_retain()
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE
v_days int := COALESCE(current_setting('so_pillar.history_retention_days', true)::int, 365);
BEGIN
DELETE FROM so_pillar.pillar_entry_history
WHERE changed_at < (now() - (v_days::text || ' days')::interval);
END
$fn$;
-- Drift retention: keep two weeks of drift_log.
CREATE OR REPLACE FUNCTION so_pillar.fn_drift_retain()
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
DELETE FROM so_pillar.drift_log
WHERE detected_at < (now() - interval '14 days');
END
$fn$;
-- pg_cron schedules (idempotent — unschedule any existing same-named job first).
DO $$
DECLARE
v_jobid bigint;
BEGIN
SELECT jobid INTO v_jobid FROM cron.job WHERE jobname = 'so_pillar_history_retain';
IF v_jobid IS NOT NULL THEN PERFORM cron.unschedule(v_jobid); END IF;
PERFORM cron.schedule('so_pillar_history_retain', '15 3 * * *',
'SELECT so_pillar.fn_history_retain();');
SELECT jobid INTO v_jobid FROM cron.job WHERE jobname = 'so_pillar_drift_retain';
IF v_jobid IS NOT NULL THEN PERFORM cron.unschedule(v_jobid); END IF;
PERFORM cron.schedule('so_pillar_drift_retain', '20 3 * * *',
'SELECT so_pillar.fn_drift_retain();');
END
$$;
@@ -1,89 +0,0 @@
-- pg_notify-driven change fan-out for so_pillar.pillar_entry.
--
-- Two layers:
-- 1. so_pillar.change_queue — durable, drained by the salt-master
-- engine. Survives engine downtime,
-- de-duplicated by id, processed once.
-- 2. pg_notify('so_pillar_change') — wakeup signal. Payload is the
-- change_queue row id and locator
-- (no secret data — channels are
-- snoopable by anyone with LISTEN).
--
-- The salt-master engine LISTENs on the channel for low-latency wakeup,
-- then SELECTs unprocessed change_queue rows so a missed notification
-- (engine restart, network blip) self-heals on the next event.
CREATE TABLE IF NOT EXISTS so_pillar.change_queue (
id bigserial PRIMARY KEY,
scope text NOT NULL,
role_name text,
minion_id text,
pillar_path text NOT NULL,
op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')),
enqueued_at timestamptz NOT NULL DEFAULT now(),
processed_at timestamptz
);
-- Hot index for the engine's drain query.
CREATE INDEX IF NOT EXISTS ix_change_queue_unprocessed
ON so_pillar.change_queue (id)
WHERE processed_at IS NULL;
-- Retention index: pg_cron job in 007 sweeps processed rows older than 7d.
CREATE INDEX IF NOT EXISTS ix_change_queue_processed_at
ON so_pillar.change_queue (processed_at)
WHERE processed_at IS NOT NULL;
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_notify()
RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
v_row record;
v_id bigint;
BEGIN
IF TG_OP = 'DELETE' THEN
v_row := OLD;
ELSE
v_row := NEW;
END IF;
INSERT INTO so_pillar.change_queue
(scope, role_name, minion_id, pillar_path, op)
VALUES
(v_row.scope, v_row.role_name, v_row.minion_id, v_row.pillar_path, TG_OP)
RETURNING id INTO v_id;
-- Payload is the queue id + locator only. Engine joins back to
-- pillar_entry if it needs the data — keeps secrets off the wire.
PERFORM pg_notify('so_pillar_change', json_build_object(
'queue_id', v_id,
'scope', v_row.scope,
'role_name', v_row.role_name,
'minion_id', v_row.minion_id,
'pillar_path', v_row.pillar_path,
'op', TG_OP
)::text);
RETURN NULL;
END;
$$;
DROP TRIGGER IF EXISTS tg_pillar_entry_notify ON so_pillar.pillar_entry;
CREATE TRIGGER tg_pillar_entry_notify
AFTER INSERT OR UPDATE OR DELETE
ON so_pillar.pillar_entry
FOR EACH ROW
EXECUTE FUNCTION so_pillar.fn_pillar_entry_notify();
-- Role grants on the change_queue table. Lived in 006_rls.sql historically but
-- moved here so the GRANT references resolve — 006 runs before this file does.
-- Engine reads + drains the change queue from the salt-master process. It
-- needs SELECT to find unprocessed rows and UPDATE to mark them processed.
-- The queue contains only locator metadata (no pillar data), so the master
-- role's existing privilege footprint is unchanged in practice.
GRANT SELECT, UPDATE ON so_pillar.change_queue TO so_pillar_master;
GRANT USAGE ON SEQUENCE so_pillar.change_queue_id_seq TO so_pillar_master;
-- Writer needs INSERT (the trigger runs as table owner, so this is just for
-- direct testing / manual replays from psql).
GRANT INSERT ON so_pillar.change_queue TO so_pillar_writer;
-1
View File
@@ -8,7 +8,6 @@
include:
{% if PGMERGED.enabled %}
- postgres.enabled
- postgres.schema_pillar
{% else %}
- postgres.disabled
{% endif %}
-187
View File
@@ -1,187 +0,0 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if 'postgres' in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
# Deploys the so_pillar schema (tables, views, audit triggers, secrets,
# RLS, pg_cron retention) inside the so-postgres container. Idempotent —
# every CREATE / GRANT is wrapped in IF NOT EXISTS / ON CONFLICT or DO
# blocks so re-running the state is a no-op when the schema is current.
#
# Gated on the postgres:so_pillar:enabled feature flag (default false).
# Flip to true once the postsalt branch is ready to bring ext_pillar live.
include:
- postgres.enabled
{% set so_pillar_enabled = salt['pillar.get']('postgres:so_pillar:enabled', False) %}
{% if so_pillar_enabled %}
{% set drift_enabled = salt['pillar.get']('postgres:so_pillar:drift_check_enabled', False) %}
{% set schema_dir = '/opt/so/saltstack/default/salt/postgres/files/schema/pillar' %}
# Wait for postgres to actually accept TCP connections. Same idiom as
# telegraf_users.sls. The docker_container.running state returns earlier than
# the database is ready on first init.
so_pillar_postgres_wait_ready:
cmd.run:
- name: |
for i in $(seq 1 60); do
if docker exec so-postgres pg_isready -h 127.0.0.1 -U postgres -q 2>/dev/null; then
exit 0
fi
sleep 2
done
echo "so-postgres did not accept TCP connections within 120s" >&2
exit 1
- require:
- docker_container: so-postgres
{% set sql_files = [
'001_schema.sql',
'002_views.sql',
'003_history_trigger.sql',
'004_secrets.sql',
'005_seed_roles.sql',
'006_rls.sql',
] %}
{% if drift_enabled %}
{% do sql_files.append('007_drift_pgcron.sql') %}
{% endif %}
# 008 always applies — pg_notify-driven change fan-out is what the salt-master
# pg_notify_pillar engine consumes. Without it reactor wiring sees no events.
{% do sql_files.append('008_change_notify.sql') %}
{% for sql_file in sql_files %}
so_pillar_apply_{{ sql_file | replace('.', '_') }}:
cmd.run:
- name: |
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d securityonion \
< {{ schema_dir }}/{{ sql_file }}
- require:
- cmd: so_pillar_postgres_wait_ready
{% if not loop.first %}
- cmd: so_pillar_apply_{{ sql_files[loop.index0 - 1] | replace('.', '_') }}
{% endif %}
{% endfor %}
# Set the master encryption key GUC on the secret-owner role. The key itself
# is generated by setup/so-functions::secrets_pillar() (extended for postsalt)
# and lives in /opt/so/conf/postgres/so_pillar.key (mode 0400) — never read by
# Salt itself; the value flows into PG via ALTER ROLE so it sits only in the
# server's role catalog.
so_pillar_master_key_configure:
cmd.run:
- name: |
if [ -r /opt/so/conf/postgres/so_pillar.key ]; then
KEY="$(< /opt/so/conf/postgres/so_pillar.key)"
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d securityonion <<EOSQL
ALTER ROLE so_pillar_secret_owner SET so_pillar.master_key = '$KEY';
ALTER ROLE so_pillar_master SET so_pillar.master_key = '$KEY';
ALTER ROLE so_pillar_writer SET so_pillar.master_key = '$KEY';
EOSQL
else
echo "so_pillar.key not present yet; setup/so-functions must generate it before schema_pillar.sls" >&2
exit 1
fi
- require:
- cmd: so_pillar_apply_{{ sql_files[-1] | replace('.', '_') }}
# Set login passwords on the so_pillar_* roles. 006_rls.sql creates the roles
# as NOLOGIN with no password (plain SQL can't substitute pillar values), so
# the salt-master ext_pillar and the pg_notify_pillar engine — both of which
# connect as so_pillar_master via TCP — would fail with "password
# authentication failed" without this step. The password lives in pillar
# under secrets:pillar_master_pass (generated by setup/so-functions::secrets_pillar)
# and is the same one rendered into ext_pillar_postgres.conf.jinja and the
# engines.conf pg_notify_pillar block, so all three sides agree.
so_pillar_role_login_passwords:
cmd.run:
- name: |
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d securityonion <<EOSQL
ALTER ROLE so_pillar_master WITH LOGIN PASSWORD '{{ pillar['secrets']['pillar_master_pass'] }}';
ALTER ROLE so_pillar_writer WITH LOGIN PASSWORD '{{ pillar['secrets']['pillar_master_pass'] }}';
ALTER ROLE so_pillar_secret_owner WITH LOGIN PASSWORD '{{ pillar['secrets']['pillar_master_pass'] }}';
EOSQL
- require:
- cmd: so_pillar_master_key_configure
# Install psycopg2 into salt-master's bundled python so the pg_notify_pillar
# engine module can `import psycopg2`. Without this the engine's import fails
# silently in salt's loader and the engine just never starts. salt's bundled
# python at /opt/saltstack/salt/bin/python3 doesn't ship psycopg by default.
#
# Uses cmd.run with an `unless` import-test rather than pip.installed because
# pip exits non-zero if patchelf isn't on PATH (it tries to rewrite the
# psycopg2 wheel's RPATH after extraction), even though the wheel is fully
# installed and importable. salt's pip.installed surfaces the non-zero exit
# as a state failure and the cascade kills schema_pillar's downstream work.
# `import psycopg2` succeeds either way, so that's the actual readiness gate.
#
# Pip's stdout/stderr is redirected to /opt/so/log/so_pillar/psycopg2_install.log
# so the literal "ERROR: ... patchelf" line doesn't get hoovered up into
# /root/sosetup.log and then into /root/errors.log by verify_setup's
# substring-grep for "ERROR". The redirect target is preserved for
# triage if `import psycopg2` ever does fail.
so_pillar_psycopg2_in_salt_python:
cmd.run:
- name: |
mkdir -p /opt/so/log/so_pillar
/opt/saltstack/salt/bin/pip3 install --quiet psycopg2-binary \
>/opt/so/log/so_pillar/psycopg2_install.log 2>&1 \
|| true
- unless: /opt/saltstack/salt/bin/python3 -c "import psycopg2"
- require:
- cmd: so_pillar_role_login_passwords
# Run the importer once after the schema is in place. Idempotent — re-runs
# with no SLS edits produce zero row changes.
so_pillar_initial_import:
cmd.run:
- name: /usr/sbin/so-pillar-import --yes --reason 'schema_pillar.sls initial import'
- require:
- cmd: so_pillar_psycopg2_in_salt_python
# Flip so-yaml from dual-write to PG-canonical for managed paths now that
# the schema and importer are both in place. Bootstrap files (secrets.sls,
# postgres/auth.sls, ca/init.sls, *.nodes.sls, top.sls, ...) remain on disk
# regardless because so_yaml_postgres.locate() raises SkipPath for them.
so_pillar_so_yaml_mode_dir:
file.directory:
- name: /opt/so/conf/so-yaml
- user: socore
- group: socore
- mode: '0755'
- makedirs: True
so_pillar_so_yaml_mode_postgres:
file.managed:
- name: /opt/so/conf/so-yaml/mode
- contents: postgres
- user: socore
- group: socore
- mode: '0644'
- require:
- file: so_pillar_so_yaml_mode_dir
- cmd: so_pillar_initial_import
{% else %}
so_pillar_disabled_noop:
test.nop
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}
+16 -69
View File
@@ -39,17 +39,15 @@ postgres_wait_ready:
- require:
- docker_container: so-postgres
# Ensure the shared Telegraf database exists. init-users.sh only runs on a
# Ensure the shared Telegraf database exists. init-db.sh only runs on a
# fresh data dir, so hosts upgraded onto an existing /nsm/postgres volume
# would otherwise never get so_telegraf.
postgres_create_telegraf_db:
cmd.run:
- name: |
if ! docker exec so-postgres psql -U postgres -tAc "SELECT 1 FROM pg_database WHERE datname='so_telegraf'" | grep -q 1; then
docker exec so-postgres psql -v ON_ERROR_STOP=1 -U postgres -c "CREATE DATABASE so_telegraf"
fi
- name: /usr/sbin/so-telegraf-postgres create_db
- require:
- cmd: postgres_wait_ready
- file: postgres_sbin
# Provision the shared group role and schema once. Every per-minion role is a
# member of so_telegraf, and each Telegraf connection does SET ROLE so_telegraf
@@ -57,68 +55,26 @@ postgres_create_telegraf_db:
# on first write are owned by the group role and every member can INSERT/SELECT.
postgres_telegraf_group_role:
cmd.run:
- name: |
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d so_telegraf <<'EOSQL'
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'so_telegraf') THEN
CREATE ROLE so_telegraf NOLOGIN;
END IF;
END
$$;
GRANT CONNECT ON DATABASE so_telegraf TO so_telegraf;
CREATE SCHEMA IF NOT EXISTS telegraf AUTHORIZATION so_telegraf;
GRANT USAGE, CREATE ON SCHEMA telegraf TO so_telegraf;
CREATE SCHEMA IF NOT EXISTS partman;
CREATE EXTENSION IF NOT EXISTS pg_partman SCHEMA partman;
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Telegraf (running as so_telegraf) calls partman.create_parent()
-- on first write of each metric, which needs USAGE on the partman
-- schema, EXECUTE on its functions/procedures, and write access to
-- partman.part_config so it can register new partitioned parents.
GRANT USAGE, CREATE ON SCHEMA partman TO so_telegraf;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA partman TO so_telegraf;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA partman TO so_telegraf;
GRANT EXECUTE ON ALL PROCEDURES IN SCHEMA partman TO so_telegraf;
-- partman creates per-parent template tables (partman.template_*) at
-- runtime; default privileges extend DML/sequence access to them.
ALTER DEFAULT PRIVILEGES IN SCHEMA partman
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO so_telegraf;
ALTER DEFAULT PRIVILEGES IN SCHEMA partman
GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO so_telegraf;
-- Hourly partman maintenance. cron.schedule is idempotent by jobname.
SELECT cron.schedule(
'telegraf-partman-maintenance',
'17 * * * *',
'CALL partman.run_maintenance_proc()'
);
EOSQL
- name: /usr/sbin/so-telegraf-postgres group_role
- require:
- cmd: postgres_create_telegraf_db
- file: postgres_sbin
{% set creds = salt['pillar.get']('telegraf:postgres_creds', {}) %}
{% for mid, entry in creds.items() %}
{% if entry.get('user') and entry.get('pass') %}
{% set u = entry.user %}
{% set p = entry.pass | replace("'", "''") %}
{% set p = entry.pass %}
postgres_telegraf_role_{{ u }}:
cmd.run:
- name: |
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d so_telegraf <<'EOSQL'
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{{ u }}') THEN
EXECUTE format('CREATE ROLE %I WITH LOGIN PASSWORD %L', '{{ u }}', '{{ p }}');
ELSE
EXECUTE format('ALTER ROLE %I WITH PASSWORD %L', '{{ u }}', '{{ p }}');
END IF;
END
$$;
GRANT CONNECT ON DATABASE so_telegraf TO "{{ u }}";
GRANT so_telegraf TO "{{ u }}";
EOSQL
- name: /usr/sbin/so-telegraf-postgres user
- env:
- ROLE_USER: {{ u | tojson }}
- ROLE_PASS: {{ p | tojson }}
- hide_output: True
- require:
- file: postgres_sbin
- cmd: postgres_telegraf_group_role
{% endif %}
@@ -130,21 +86,12 @@ postgres_telegraf_role_{{ u }}:
{% set retention = salt['pillar.get']('postgres:telegraf:retention_days', 14) | int %}
postgres_telegraf_retention_reconcile:
cmd.run:
- name: |
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d so_telegraf <<'EOSQL'
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_partman') THEN
UPDATE partman.part_config
SET retention = '{{ retention }} days',
retention_keep_table = false
WHERE parent_table LIKE 'telegraf.%';
END IF;
END
$$;
EOSQL
- name: /usr/sbin/so-telegraf-postgres retention
- env:
- RETENTION_DAYS: {{ retention }}
- require:
- cmd: postgres_telegraf_group_role
- file: postgres_sbin
{% endif %}
+41 -7
View File
@@ -7,15 +7,29 @@
. /usr/sbin/so-common
# Without pipefail, a pipeline's exit status is gzip's. A failed pg_dumpall would
# otherwise be masked by a successful gzip, silently producing a valid .gz that
# holds a truncated dump.
set -o pipefail
# Backups contain role password hashes and full chat data; keep them 0600.
umask 0077
TODAY=$(date '+%Y_%m_%d')
BACKUPDIR=/nsm/backup
BACKUPFILE="$BACKUPDIR/so-postgres-backup-$TODAY.sql.gz"
TMPFILE="$BACKUPFILE.tmp"
MAXBACKUPS=7
LOGFILE=/opt/so/log/postgres/backup.log
mkdir -p $BACKUPDIR
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') $*" >> "$LOGFILE"
}
mkdir -p "$BACKUPDIR"
# Remove any temp files left behind by a previously crashed run
rm -f "$BACKUPDIR"/so-postgres-backup-*.sql.gz.tmp
# Skip if already backed up today
if [ -f "$BACKUPFILE" ]; then
@@ -27,13 +41,33 @@ if ! docker ps --format '{{.Names}}' | grep -q '^so-postgres$'; then
exit 0
fi
# Dump all databases and roles, compress
docker exec so-postgres pg_dumpall -U postgres | gzip > "$BACKUPFILE"
# Always clean up the temp file on exit; the success path clears this trap
# after the atomic rename so the finished backup is not deleted.
trap 'rm -f "$TMPFILE"' EXIT
# Retention cleanup
NUMBACKUPS=$(find $BACKUPDIR -type f -name "so-postgres-backup*" | wc -l)
# Dump all databases and roles, compress. Write to a temp file so the final
# filename only ever appears for a complete, verified backup.
if ! docker exec so-postgres pg_dumpall -U postgres | gzip > "$TMPFILE"; then
log "ERROR: pg_dumpall/gzip failed; backup aborted"
exit 1
fi
# Verify the compressed stream is intact before publishing it
if ! gzip -t "$TMPFILE"; then
log "ERROR: backup failed gzip integrity check; backup aborted"
exit 1
fi
# Atomically publish the verified backup
mv "$TMPFILE" "$BACKUPFILE"
trap - EXIT
log "OK: wrote $BACKUPFILE"
# Retention cleanup (only reached after a successful backup). The glob is
# restricted to finished backups so an in-progress .tmp can never be counted.
NUMBACKUPS=$(find "$BACKUPDIR" -type f -name "so-postgres-backup-*.sql.gz" | wc -l)
while [ "$NUMBACKUPS" -gt "$MAXBACKUPS" ]; do
OLDEST=$(find $BACKUPDIR -type f -name "so-postgres-backup*" -printf '%T+ %p\n' | sort | head -n 1 | awk -F" " '{print $2}')
OLDEST=$(find "$BACKUPDIR" -type f -name "so-postgres-backup-*.sql.gz" -printf '%T+ %p\n' | sort | head -n 1 | awk -F" " '{print $2}')
rm -f "$OLDEST"
NUMBACKUPS=$(find $BACKUPDIR -type f -name "so-postgres-backup*" | wc -l)
NUMBACKUPS=$(find "$BACKUPDIR" -type f -name "so-postgres-backup-*.sql.gz" | wc -l)
done
@@ -0,0 +1,110 @@
#!/bin/bash
set -e
# Provision Telegraf state inside the so-postgres container.
# Usage: so-telegraf-postgres <subcommand>
# create_db Ensure the so_telegraf database exists.
# group_role Provision the so_telegraf group role, telegraf/partman schemas,
# pg_partman, pg_cron, and the hourly partman maintenance job.
# user Create or update a per-minion login role granted to so_telegraf.
# Env: ROLE_USER, ROLE_PASS.
# retention Reconcile partman retention on telegraf parents.
# Env: RETENTION_DAYS.
cmd="${1:?subcommand required}"
case "$cmd" in
create_db)
if ! docker exec so-postgres psql -U postgres -tAc \
"SELECT 1 FROM pg_database WHERE datname='so_telegraf'" | grep -q 1; then
docker exec so-postgres psql -v ON_ERROR_STOP=1 -U postgres \
-c "CREATE DATABASE so_telegraf"
fi
;;
group_role)
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d so_telegraf <<'EOSQL'
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'so_telegraf') THEN
CREATE ROLE so_telegraf NOLOGIN;
END IF;
END
$$;
GRANT CONNECT ON DATABASE so_telegraf TO so_telegraf;
CREATE SCHEMA IF NOT EXISTS telegraf AUTHORIZATION so_telegraf;
GRANT USAGE, CREATE ON SCHEMA telegraf TO so_telegraf;
CREATE SCHEMA IF NOT EXISTS partman;
CREATE EXTENSION IF NOT EXISTS pg_partman SCHEMA partman;
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Telegraf (running as so_telegraf) calls partman.create_parent()
-- on first write of each metric, which needs USAGE on the partman
-- schema, EXECUTE on its functions/procedures, and write access to
-- partman.part_config so it can register new partitioned parents.
GRANT USAGE, CREATE ON SCHEMA partman TO so_telegraf;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA partman TO so_telegraf;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA partman TO so_telegraf;
GRANT EXECUTE ON ALL PROCEDURES IN SCHEMA partman TO so_telegraf;
-- partman creates per-parent template tables (partman.template_*) at
-- runtime; default privileges extend DML/sequence access to them.
ALTER DEFAULT PRIVILEGES IN SCHEMA partman
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO so_telegraf;
ALTER DEFAULT PRIVILEGES IN SCHEMA partman
GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO so_telegraf;
-- Hourly partman maintenance. cron.schedule is idempotent by jobname.
SELECT cron.schedule(
'telegraf-partman-maintenance',
'17 * * * *',
'CALL partman.run_maintenance_proc()'
);
EOSQL
;;
user)
: "${ROLE_USER:?ROLE_USER is required}"
: "${ROLE_PASS:?ROLE_PASS is required}"
# psql does not substitute :vars inside dollar-quoted strings, so the
# conditional CREATE/ALTER is built outside any DO block and dispatched
# with \gexec. format() handles identifier/literal quoting.
docker exec -i so-postgres psql \
-v ON_ERROR_STOP=1 \
-v role_user="$ROLE_USER" \
-v role_pass="$ROLE_PASS" \
-U postgres -d so_telegraf <<'EOSQL'
SELECT format(
CASE WHEN EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = :'role_user')
THEN 'ALTER ROLE %I WITH LOGIN PASSWORD %L'
ELSE 'CREATE ROLE %I WITH LOGIN PASSWORD %L'
END,
:'role_user',
:'role_pass'
) \gexec
GRANT CONNECT ON DATABASE so_telegraf TO :"role_user";
GRANT so_telegraf TO :"role_user";
EOSQL
;;
retention)
: "${RETENTION_DAYS:?RETENTION_DAYS is required}"
# \gset + \if guards against a missing pg_partman without using a DO
# block (psql :var substitution doesn't reach into dollar-quoted code).
docker exec -i so-postgres psql \
-v ON_ERROR_STOP=1 \
-v retention_days="$RETENTION_DAYS" \
-U postgres -d so_telegraf <<'EOSQL'
SELECT CASE WHEN EXISTS (SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_partman')
THEN 'true' ELSE 'false' END AS has_partman \gset
\if :has_partman
UPDATE partman.part_config
SET retention = :'retention_days' || ' days',
retention_keep_table = false
WHERE parent_table LIKE 'telegraf.%';
\endif
EOSQL
;;
*)
echo "Unknown subcommand: $cmd" >&2
exit 1
;;
esac
+6 -4
View File
@@ -3,12 +3,15 @@
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% if data['id'].endswith('_hypervisor') and data['result'] == True %}
{% set hid = data['id'] %}
{% if hid|regex_match('^([A-Za-z0-9._-]{1,253})$')
and hid.endswith('_hypervisor')
and data['result'] == True %}
{% if data['act'] == 'accept' %}
check_and_trigger:
runner.setup_hypervisor.setup_environment:
- minion_id: {{ data['id'] }}
- minion_id: {{ hid }}
{% endif %}
{% if data['act'] == 'delete' %}
@@ -17,8 +20,7 @@ delete_hypervisor:
- args:
- mods: orch.delete_hypervisor
- pillar:
minion_id: {{ data['id'] }}
minion_id: {{ hid }}
{% endif %}
{% endif %}
+27 -15
View File
@@ -1,7 +1,7 @@
#!py
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
@@ -9,30 +9,42 @@ import logging
import os
import pwd
import grp
import re
log = logging.getLogger(__name__)
PILLAR_ROOT = '/opt/so/saltstack/local/pillar/minions/'
_VMNAME_RE = re.compile(r'^[A-Za-z0-9._-]{1,253}$')
def run():
vm_name = data['kwargs']['name']
logging.error("createEmptyPillar reactor: vm_name: %s" % vm_name)
pillar_root = '/opt/so/saltstack/local/pillar/minions/'
vm_name = data.get('kwargs', {}).get('name', '')
if not _VMNAME_RE.match(str(vm_name)):
log.error("createEmptyPillar reactor: refusing unsafe vm_name=%r", vm_name)
return {}
log.info("createEmptyPillar reactor: vm_name: %s", vm_name)
pillar_files = ['adv_' + vm_name + '.sls', vm_name + '.sls']
try:
# Get socore user and group IDs
socore_uid = pwd.getpwnam('socore').pw_uid
socore_gid = grp.getgrnam('socore').gr_gid
pillar_root_real = os.path.realpath(PILLAR_ROOT)
for f in pillar_files:
full_path = pillar_root + f
if not os.path.exists(full_path):
# Create empty file
os.mknod(full_path)
# Set ownership to socore:socore
os.chown(full_path, socore_uid, socore_gid)
# Set mode to 644 (rw-r--r--)
os.chmod(full_path, 0o640)
logging.error("createEmptyPillar reactor: created %s with socore:socore ownership and mode 644" % f)
full_path = os.path.join(PILLAR_ROOT, f)
resolved = os.path.realpath(full_path)
if os.path.dirname(resolved) != pillar_root_real:
log.error("createEmptyPillar reactor: refusing path outside pillar root: %s", resolved)
continue
if os.path.exists(resolved):
continue
os.mknod(resolved)
os.chown(resolved, socore_uid, socore_gid)
os.chmod(resolved, 0o640)
log.info("createEmptyPillar reactor: created %s with socore:socore ownership and mode 0640", f)
except (KeyError, OSError) as e:
logging.error("createEmptyPillar reactor: Error setting ownership/permissions: %s" % str(e))
log.error("createEmptyPillar reactor: Error setting ownership/permissions: %s", e)
return {}
+33 -11
View File
@@ -1,18 +1,40 @@
#!py
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
remove_key:
wheel.key.delete:
- args:
- match: {{ data['name'] }}
import logging
import re
{{ data['name'] }}_pillar_clean:
runner.state.orchestrate:
- args:
- mods: orch.vm_pillar_clean
- pillar:
vm_name: {{ data['name'] }}
log = logging.getLogger(__name__)
{% do salt.log.info('deleteKey reactor: deleted minion key: %s' % data['name']) %}
_VMNAME_RE = re.compile(r'^[A-Za-z0-9._-]{1,253}$')
def run():
name = data.get('name', '')
if not _VMNAME_RE.match(str(name)):
log.error("deleteKey reactor: refusing unsafe name=%r", name)
return {}
log.info("deleteKey reactor: deleted minion key: %s", name)
return {
'remove_key': {
'wheel.key.delete': [
{'args': [
{'match': name},
]},
],
},
'%s_pillar_clean' % name: {
'runner.state.orchestrate': [
{'args': [
{'mods': 'orch.vm_pillar_clean'},
{'pillar': {'vm_name': name}},
]},
],
},
}
-27
View File
@@ -1,27 +0,0 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Fires for every event tagged 'so/pillar/changed'. Source of those events
# is the pg_notify_pillar engine on the salt-master, which in turn drains
# so_pillar.change_queue (populated by the AFTER trigger on
# so_pillar.pillar_entry — see 008_change_notify.sql).
#
# All routing logic — which pillar paths reload which services on which
# targets — lives in orch.so_pillar_reload so it stays editable as one
# YAML table without touching reactor wiring.
{% set payload = data.get('data', {}) %}
{% do salt.log.info('so_pillar_changed reactor: %s' % payload) %}
so_pillar_dispatch_reload:
runner.state.orchestrate:
- args:
- mods: orch.so_pillar_reload
- pillar:
so_pillar_change:
scope: {{ payload.get('scope') | json }}
role_name: {{ payload.get('role_name') | json }}
minion_id: {{ payload.get('minion_id') | json }}
changes: {{ payload.get('changes', []) | json }}
@@ -1,200 +0,0 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# -*- coding: utf-8 -*-
"""
pg_notify_pillar Salt master engine that bridges so_pillar.change_queue
into the Salt event bus.
Architecture (see 008_change_notify.sql):
pillar_entry -- AFTER trigger --> change_queue (durable)
+ pg_notify('so_pillar_change') (wakeup)
|
LISTEN <-- this engine <-+
SELECT/UPDATE change_queue
|
fire_event('so/pillar/changed', ...)
|
reactor matches tag --> orch
Why a queue + notify rather than just notify: pg_notify is fire-and-forget
within a session. If the engine is down or the LISTEN connection is broken
when a write happens, the notification is lost forever. The change_queue
lets us recover on (re)connect, we drain everything still flagged
processed_at IS NULL.
Debounce: bulk operations (so-pillar-import, fresh installs) can fire
hundreds of notifications per second. The engine collects whatever lands in
a short window and emits one event per (scope, role, minion) tuple so the
reactor isn't stampeded.
"""
import json
import logging
import os
import select
import time
import salt.utils.event
log = logging.getLogger(__name__)
__virtualname__ = 'pg_notify_pillar'
DEFAULT_CHANNEL = 'so_pillar_change'
DEFAULT_DEBOUNCE_MS = 500
DEFAULT_RECONNECT_BACKOFF = 5
DEFAULT_BACKLOG_INTERVAL = 30
DEFAULT_BATCH_LIMIT = 500
EVENT_TAG = 'so/pillar/changed'
def __virtual__():
try:
import psycopg2 # noqa: F401
return __virtualname__
except ImportError:
return False, 'pg_notify_pillar engine requires psycopg2'
def start(dsn=None,
host='127.0.0.1',
port=5432,
dbname='securityonion',
user='so_pillar_master',
password=None,
channel=DEFAULT_CHANNEL,
debounce_ms=DEFAULT_DEBOUNCE_MS,
reconnect_backoff=DEFAULT_RECONNECT_BACKOFF,
backlog_interval=DEFAULT_BACKLOG_INTERVAL,
batch_limit=DEFAULT_BATCH_LIMIT,
password_file=None):
"""
Run the change-queue bridge until the master shuts the engine down.
Either pass a full ``dsn`` string, or supply discrete kwargs. The
password may also be read from ``password_file`` (mode 0400) so the
engine config in ``/etc/salt/master.d/`` doesn't have to embed it
inline only the file path.
"""
import psycopg2
import psycopg2.extensions
if dsn is None:
if password is None and password_file:
try:
with open(password_file, 'r') as fh:
password = fh.read().strip()
except (IOError, OSError) as exc:
log.error('pg_notify_pillar: cannot read password_file %s: %s',
password_file, exc)
return
dsn = _build_dsn(host=host, port=port, dbname=dbname,
user=user, password=password)
bus = salt.utils.event.get_master_event(
__opts__, __opts__['sock_dir'], listen=False)
log.info('pg_notify_pillar: starting (channel=%s debounce=%dms)',
channel, debounce_ms)
while True:
conn = None
try:
conn = psycopg2.connect(dsn)
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute('LISTEN {0}'.format(channel))
log.info('pg_notify_pillar: connected; LISTEN %s', channel)
_drain(cur, bus, batch_limit)
while True:
ready, _, _ = select.select([conn], [], [], backlog_interval)
if not ready:
_drain(cur, bus, batch_limit)
continue
conn.poll()
_consume_notifies(conn)
if debounce_ms > 0:
time.sleep(debounce_ms / 1000.0)
conn.poll()
_consume_notifies(conn)
_drain(cur, bus, batch_limit)
except Exception as exc: # psycopg2.Error subclasses + OS errors
log.error('pg_notify_pillar: %s; reconnecting in %ds',
exc, reconnect_backoff)
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
time.sleep(reconnect_backoff)
def _build_dsn(host, port, dbname, user, password):
parts = ['host={0}'.format(host),
'port={0}'.format(port),
'dbname={0}'.format(dbname),
'user={0}'.format(user)]
if password:
parts.append('password={0}'.format(password))
return ' '.join(parts)
def _consume_notifies(conn):
# We don't use the payload directly — the queue table is the source of
# truth, and draining it covers any notifications we missed. So just
# discard them; their presence already proved there's something to drain.
while conn.notifies:
conn.notifies.pop(0)
def _drain(cur, bus, batch_limit):
"""Mark unprocessed change_queue rows processed and emit one event per
(scope, role_name, minion_id) group. SKIP LOCKED so multiple masters
sharing a Postgres don't double-process."""
cur.execute("""
UPDATE so_pillar.change_queue
SET processed_at = now()
WHERE id IN (
SELECT id FROM so_pillar.change_queue
WHERE processed_at IS NULL
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT %s)
RETURNING id, scope, role_name, minion_id, pillar_path, op
""", (batch_limit,))
rows = cur.fetchall()
if not rows:
return
groups = {}
for row_id, scope, role_name, minion_id, pillar_path, op in rows:
key = (scope, role_name, minion_id)
groups.setdefault(key, []).append({
'queue_id': row_id,
'pillar_path': pillar_path,
'op': op,
})
for (scope, role_name, minion_id), changes in groups.items():
payload = {
'scope': scope,
'role_name': role_name,
'minion_id': minion_id,
'changes': changes,
}
log.debug('pg_notify_pillar: firing %s for %s',
EVENT_TAG, payload)
bus.fire_event(payload, EVENT_TAG)
-20
View File
@@ -1,27 +1,7 @@
engines_dirs:
- /etc/salt/engines
# All salt-master engines must be declared in this single file.
# Salt's master.d/*.conf merge replaces top-level lists rather than
# concatenating them, so a sibling .conf with its own `engines:` list
# would silently overwrite this one (only the last loaded file's list
# would survive). Anything new — including postsalt's pg_notify_pillar
# engine, gated on postgres:so_pillar:enabled below — gets appended
# here under the same `engines:` key.
engines:
{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %}
- pg_notify_pillar:
host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }}
port: {{ pillar.get('postgres', {}).get('port', 5432) }}
dbname: securityonion
user: so_pillar_master
password: {{ pillar['secrets']['pillar_master_pass'] }}
channel: so_pillar_change
debounce_ms: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_debounce_ms', 500) }}
reconnect_backoff: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_reconnect_backoff', 5) }}
backlog_interval: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_backlog_interval', 30) }}
batch_limit: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_batch_limit', 500) }}
{% endif %}
- checkmine:
interval: 60
- pillarWatch:
-3
View File
@@ -14,8 +14,6 @@
include:
- salt.minion
- salt.master.ext_pillar_postgres
- salt.master.pg_notify_pillar_engine
{% if 'vrt' in salt['pillar.get']('features', []) %}
- salt.cloud
- salt.cloud.reactor_config_hypervisor
@@ -63,7 +61,6 @@ engines_config:
file.managed:
- name: /etc/salt/master.d/engines.conf
- source: salt://salt/files/engines.conf
- template: jinja
# update the bootstrap script when used for salt-cloud
salt_bootstrap_cloud:
-46
View File
@@ -1,46 +0,0 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Drops /etc/salt/master.d/ext_pillar_postgres.conf so the salt-master loads
# pillar overlays from the so_pillar.* schema in so-postgres alongside the
# on-disk SLS pillar tree. Gated on the postgres:so_pillar:enabled feature
# flag (default false) so the file only appears once the schema is deployed
# and the importer has run at least once.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if 'salt.master' in allowed_states %}
{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %}
ext_pillar_postgres_config:
file.managed:
- name: /etc/salt/master.d/ext_pillar_postgres.conf
- source: salt://salt/master/files/ext_pillar_postgres.conf.jinja
- template: jinja
- mode: '0640'
- user: root
- group: salt
- watch_in:
- service: salt_master_service
{% else %}
# When the flag is off make sure any previously-deployed config is removed
# so a rollback flips behavior cleanly.
ext_pillar_postgres_config_absent:
file.absent:
- name: /etc/salt/master.d/ext_pillar_postgres.conf
- watch_in:
- service: salt_master_service
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}
@@ -1,38 +0,0 @@
# /etc/salt/master.d/ext_pillar_postgres.conf
# Rendered by salt/salt/master/ext_pillar_postgres.sls.
# Reads the so_pillar.* schema in so-postgres and overlays it onto SLS pillar.
# SLS still renders first (ext_pillar_first: False) so bootstrap and mine-driven
# pillars work before Postgres is reachable; PG values overlay/override on top.
postgres:
host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }}
port: {{ pillar.get('postgres', {}).get('port', 5432) }}
db: securityonion
user: so_pillar_master
pass: {{ pillar['secrets']['pillar_master_pass'] }}
ext_pillar_first: False
pillar_source_merging_strategy: smart
pillar_merge_lists: False
pillar_cache: True
pillar_cache_backend: disk
pillar_cache_ttl: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('pillar_cache_ttl', 60) }}
# List form (not mapping form) so result rows merge into the pillar root rather
# than under a named subtree. Verified against salt/pillar/sql_base.py: list
# entries pass root=None to enter_root() which sets self.focus = self.result.
ext_pillar:
- postgres:
- query: "SELECT data FROM so_pillar.v_pillar_global WHERE %s IS NOT NULL ORDER BY sort_key, pillar_path"
as_json: True
ignore_null: True
- query: "SELECT data FROM so_pillar.v_pillar_role WHERE minion_id = %s ORDER BY sort_key, pillar_path"
as_json: True
ignore_null: True
- query: "SELECT data FROM so_pillar.v_pillar_minion WHERE minion_id = %s ORDER BY sort_key, pillar_path"
as_json: True
ignore_null: True
- query: "SELECT data FROM so_pillar.fn_pillar_secrets(%s)"
as_json: True
ignore_null: True
@@ -1,12 +0,0 @@
# /etc/salt/master.d/so_pillar_reactor.conf
# Wires the so/pillar/changed event tag — emitted by the pg_notify_pillar
# engine — to the so_pillar_changed reactor, which dispatches to
# orch.so_pillar_reload.
#
# Lives in its own file (rather than appended to reactor_hypervisor.conf)
# so the postgres:so_pillar:enabled flag can flip it on/off independently
# of hypervisor reactor wiring.
reactor:
- 'so/pillar/changed':
- /opt/so/saltstack/default/salt/reactor/so_pillar_changed.sls
@@ -1,80 +0,0 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Deploys the pg_notify_pillar engine module + its reactor config so the
# salt-master subscribes to so_pillar.change_queue and republishes changes
# on the salt event bus as so/pillar/changed. Reactor (so_pillar_changed.sls)
# matches that tag and dispatches the appropriate orch.
#
# The actual `engines:` declaration lives in salt/salt/files/engines.conf
# (jinja-rendered, also gated on postgres:so_pillar:enabled). It has to live
# in a single file because salt's master.d/*.conf merge replaces top-level
# lists rather than concatenating them — splitting `engines:` across multiple
# .conf files leaves only one loaded.
#
# Gated on the same postgres:so_pillar:enabled flag as the schema and
# ext_pillar config so the three components flip together.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if 'salt.master' in allowed_states %}
{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %}
pg_notify_pillar_engine_module:
file.managed:
- name: /etc/salt/engines/pg_notify_pillar.py
- source: salt://salt/engines/master/pg_notify_pillar.py
- mode: '0644'
- user: root
- group: root
- makedirs: True
- watch_in:
- service: salt_master_service
pg_notify_pillar_reactor_config:
file.managed:
- name: /etc/salt/master.d/so_pillar_reactor.conf
- source: salt://salt/master/files/so_pillar_reactor.conf
- mode: '0644'
- user: root
- group: root
- watch_in:
- service: salt_master_service
{% else %}
# When the flag flips off, peel everything back so a rollback returns to
# pure-disk pillar with no orphan engine churning on a dead listen socket.
pg_notify_pillar_engine_module_absent:
file.absent:
- name: /etc/salt/engines/pg_notify_pillar.py
- watch_in:
- service: salt_master_service
pg_notify_pillar_engine_config_absent:
# No-op now: the engine config used to live in master.d/pg_notify_pillar_engine.conf
# but was folded into engines.conf to work around salt's master.d list-replace
# merge. Keep this state alive (no-op test.nop) so any old installs that
# still have the file get it cleaned up.
file.absent:
- name: /etc/salt/master.d/pg_notify_pillar_engine.conf
- watch_in:
- service: salt_master_service
pg_notify_pillar_reactor_config_absent:
file.absent:
- name: /etc/salt/master.d/so_pillar_reactor.conf
- watch_in:
- service: salt_master_service
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}
+137 -1
View File
@@ -117,6 +117,121 @@ transformations:
- type: logsource
product: linux
service: auth
# Maps M365 audit rules to Elastic Agent O365 integration logs
- id: m365_audit_field_mappings
type: field_name_mapping
mapping:
Operation: event.action
ResultStatus: event.outcome
ApplicationId: o365.audit.ApplicationId
ObjectId: o365.audit.ObjectId
RequestType: o365.audit.RequestType
rule_conditions:
- type: logsource
product: m365
service: audit
- id: m365_audit_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: audit
# Maps M365 exchange rules to Elastic Agent O365 integration logs
- id: m365_exchange_field_mappings
type: field_name_mapping
mapping:
eventSource: event.provider
eventName: event.action
status: event.outcome
rule_conditions:
- type: logsource
product: m365
service: exchange
- id: m365_exchange_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: exchange
# Maps M365 threat_management rules to Elastic Agent O365 integration logs
- id: m365_threat_management_field_mappings
type: field_name_mapping
mapping:
eventSource: event.provider
eventName: event.action
status: event.outcome
rule_conditions:
- type: logsource
product: m365
service: threat_management
- id: m365_threat_management_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: threat_management
# Maps M365 threat_detection rules to Elastic Agent O365 integration logs
- id: m365_threat_detection_field_mappings
type: field_name_mapping
mapping:
eventSource: event.provider
eventName: event.action
status: event.outcome
rule_conditions:
- type: logsource
product: m365
service: threat_detection
- id: m365_threat_detection_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: threat_detection
# Maps FortiGate event rules to Elastic Agent Fortinet integration logs
- id: fortigate_event_field_mappings
type: field_name_mapping
mapping:
action: fortinet.firewall.action
cfgpath: fortinet.firewall.cfgpath
cfgobj: fortinet.firewall.cfgobj
cfgattr: fortinet.firewall.cfgattr
devname: observer.name
devid: observer.serial_number
logid: event.code
type: fortinet.firewall.type
subtype: fortinet.firewall.subtype
level: log.level
vd: fortinet.firewall.vd
logdesc: fortinet.firewall.desc
user: user.name
ui: fortinet.firewall.ui
cfgtid: fortinet.firewall.cfgtid
msg: message
rule_conditions:
- type: logsource
product: fortigate
service: event
- id: fortigate_event_add-fields
type: add_condition
conditions:
event.dataset: 'fortinet_fortigate.log'
event.module: 'fortinet_fortigate'
rule_conditions:
- type: logsource
product: fortigate
service: event
# event.code should always be a string
- id: convert_event_code_to_string
type: convert_type
@@ -126,15 +241,36 @@ transformations:
fields:
- event.code
# Maps process_creation rules to endpoint process creation logs
# This is an OS-agnostic mapping, to account for logs that don't specify source OS
- id: endpoint_process_create_windows_add-fields
type: add_condition
conditions:
event.category: 'process'
event.type: 'start'
host.os.type: 'windows'
rule_conditions:
- type: logsource
category: process_creation
product: windows
- id: endpoint_process_create_macos_add-fields
type: add_condition
conditions:
event.category: 'process'
event.type: 'start'
host.os.type: 'macos'
rule_conditions:
- type: logsource
category: process_creation
product: macos
- id: endpoint_process_create_linux_add-fields
type: add_condition
conditions:
event.category: 'process'
event.type: 'start'
host.os.type: 'linux'
rule_conditions:
- type: logsource
category: process_creation
product: linux
# Maps file_event rules to endpoint file creation logs
# This is an OS-agnostic mapping, to account for logs that don't specify source OS
- id: endpoint_file_create_add-fields
+1 -1
View File
@@ -261,7 +261,7 @@ strelka:
priority: 5
options:
limit: 1000
'ScanLNK':
'ScanLnk':
- positive:
flavors:
- 'lnk_file'
+1 -1
View File
@@ -99,7 +99,7 @@ strelka:
'ScanJpeg': *scannerOptions
'ScanJson': *scannerOptions
'ScanLibarchive': *scannerOptions
'ScanLNK': *scannerOptions
'ScanLnk': *scannerOptions
'ScanLsb': *scannerOptions
'ScanLzma': *scannerOptions
'ScanMacho': *scannerOptions
+1 -1
View File
@@ -1,6 +1,6 @@
telegraf:
enabled: False
output: BOTH
output: INFLUXDB
config:
interval: '30s'
metric_batch_size: 1000
+27 -2
View File
@@ -119,7 +119,7 @@ base:
- kafka
- pcap.cleanup
'*_manager or *_managerhype and G@saltversion:{{saltversion}} and not I@node_data:False':
'*_manager and G@saltversion:{{saltversion}} and not I@node_data:False':
- match: compound
- salt.master
- registry
@@ -146,6 +146,32 @@ base:
- stig
- kafka
'*_managerhype and G@saltversion:{{saltversion}} and not I@node_data:False':
- match: compound
- salt.master
- registry
- nginx
- influxdb
- postgres
- strelka.manager
- soc
- kratos
- hydra
- firewall
- manager
- sensoroni
- telegraf
- backup.config_backup
- elasticsearch
- logstash
- redis
- elastic-fleet-package-registry
- kibana
- elastalert
- utility
- elasticfleet
- kafka
'*_managerhype and I@features:vrt and G@saltversion:{{saltversion}}':
- match: compound
- manager.hypervisor
@@ -286,7 +312,6 @@ base:
- libvirt
- libvirt.images
- elasticfleet.install_agent_grid
- stig
'*_desktop and G@saltversion:{{saltversion}}':
- sensoroni
+61 -65
View File
@@ -745,6 +745,56 @@ configure_network_sensor() {
return $err
}
configure_management_bond() {
local bond_name="bond1"
local bond_mode=${MBOND_MODE:-active-backup}
info "Setting up $bond_name management interface with mode $bond_mode"
if [[ ${#MBNICS[@]} -eq 0 ]]; then
error "[ERROR] No management bond NICs were selected."
fail_setup
fi
nmcli -t -f NAME con show | grep -Fxq "$bond_name"
local found_int=$?
if [[ $found_int != 0 ]]; then
nmcli con add type bond ifname "$bond_name" con-name "$bond_name" mode "$bond_mode" -- \
ipv6.method ignore \
connection.autoconnect yes >> "$setup_log" 2>&1
else
nmcli con mod "$bond_name" \
bond.options "mode=$bond_mode" \
ipv6.method ignore \
connection.autoconnect yes >> "$setup_log" 2>&1
fi
local err=0
for MBNIC in "${MBNICS[@]}"; do
local slave_name="$bond_name-slave-$MBNIC"
nmcli -t -f NAME con show | grep -Fxq "$slave_name"
found_int=$?
if [[ $found_int != 0 ]]; then
nmcli con add type ethernet ifname "$MBNIC" con-name "$slave_name" master "$bond_name" -- \
connection.autoconnect yes >> "$setup_log" 2>&1
else
nmcli con mod "$slave_name" \
connection.master "$bond_name" \
connection.slave-type bond \
connection.autoconnect yes >> "$setup_log" 2>&1
fi
nmcli con up "$slave_name" >> "$setup_log" 2>&1
local ret=$?
[[ $ret -eq 0 ]] || err=$ret
done
return $err
}
configure_hyper_bridge() {
info "Setting up hypervisor bridge"
info "Checking $MNIC ipv4.method is auto or manual"
@@ -999,6 +1049,11 @@ filter_unused_nics() {
grep_string="$grep_string\|$BONDNIC"
done
fi
if [[ $MBNICS ]]; then
for BONDNIC in "${MBNICS[@]}"; do
grep_string="$grep_string\|$BONDNIC"
done
fi
# Finally, set filtered_nics to any NICs we aren't using (and ignore interfaces that aren't of use)
filtered_nics=$(ip link | awk -F: '$0 !~ "lo|vir|veth|br|docker|wl|^[^0-9]"{print $2}' | grep -vwe "$grep_string" | sed 's/ //g' | sed -r 's/(.*)(\.[0-9]+)@\1/\1\2/g')
@@ -1057,11 +1112,6 @@ generate_passwords(){
POSTGRESPASS=$(get_random_value)
SOCSRVKEY=$(get_random_value 64)
IMPORTPASS=$(get_random_value)
# postsalt: salt-master connects to so_pillar.* as so_pillar_master, and the
# so-postgres container needs a symmetric key for pgcrypto-encrypted secrets.
# Both are generated here so they survive reinstall like the other secrets.
PILLARMASTERPASS=$(get_random_value)
SO_PILLAR_KEY=$(get_random_value 64)
}
generate_interface_vars() {
@@ -1393,7 +1443,7 @@ network_init() {
title "Initializing Network"
disable_ipv6
set_hostname
if [[ ( $is_iso || $is_desktop_iso ) ]]; then
if [[ $is_iso || $is_desktop_iso ]]; then
set_management_interface
fi
}
@@ -1876,66 +1926,8 @@ secrets_pillar(){
"secrets:"\
" import_pass: $IMPORTPASS"\
" influx_pass: $INFLUXPASS"\
" pillar_master_pass: $PILLARMASTERPASS"\
" postgres_pass: $POSTGRESPASS" > $local_salt_dir/pillar/secrets.sls
elif ! grep -q '^[[:space:]]*pillar_master_pass:' $local_salt_dir/pillar/secrets.sls; then
# Existing install pre-postsalt — append the new key without disturbing
# the values already on disk. Keys we already wrote stay; only the new
# pillar_master_pass is added.
info "Appending pillar_master_pass to existing Secrets Pillar"
if [ -z "$PILLARMASTERPASS" ]; then
PILLARMASTERPASS=$(get_random_value)
fi
printf ' pillar_master_pass: %s\n' "$PILLARMASTERPASS" >> $local_salt_dir/pillar/secrets.sls
fi
# postsalt: write the so_pillar pgcrypto master key to a 0400 file owned by
# root. The key itself is never read by Salt — schema_pillar.sls loads it
# into the so-postgres container via ALTER ROLE so_pillar_secret_owner SET
# so_pillar.master_key = '<key>'; the file just lets the value survive
# container restarts.
if [ ! -f /opt/so/conf/postgres/so_pillar.key ]; then
info "Generating so_pillar pgcrypto master key"
mkdir -p /opt/so/conf/postgres
if [ -z "$SO_PILLAR_KEY" ]; then
SO_PILLAR_KEY=$(get_random_value 64)
fi
# Subshell-scope the umask so it doesn't leak into subsequent so-setup
# (and salt-call) file writes. Without the (...) wrapper the umask 077
# persists for the rest of the install and every state-rendered config
# file under /opt/so/conf lands at 0600 — which breaks containers that
# bind-mount their config and run as a non-root user (the influxdb
# container, in particular, exits with "permission denied" on
# /conf/config.yaml after the gosu drop).
(
umask 077
printf '%s' "$SO_PILLAR_KEY" > /opt/so/conf/postgres/so_pillar.key
)
chmod 0400 /opt/so/conf/postgres/so_pillar.key
chown root:root /opt/so/conf/postgres/so_pillar.key
fi
}
# postsalt: flip postgres:so_pillar:enabled to True in the local pillar so
# the schema_pillar / ext_pillar_postgres / pg_notify_pillar engine states
# all activate during the install highstate. Without this the entire
# PG-canonical pillar stack short-circuits on its default-False gate and
# the install ends in legacy disk-pillar mode — defeating the point of
# being on postsalt at all. The companion enabled=False rollback just
# rewrites this file or removes the flag.
enable_so_pillar_postgres() {
local pillar_dir=/opt/so/saltstack/local/pillar/postgres
mkdir -p "$pillar_dir"
cat > "$pillar_dir/adv_postgres.sls" <<'EOPILLAR'
# postsalt: enable PG-canonical pillar mode. Generated by setup/so-functions
# during install. Flip to False here (or delete this file) to roll back to
# disk-pillar mode without wiping the so_pillar.* schema in so-postgres.
postgres:
so_pillar:
enabled: True
EOPILLAR
chown -R socore:socore "$pillar_dir"
chmod 0644 "$pillar_dir/adv_postgres.sls"
}
set_network_dev_status_list() {
@@ -2165,8 +2157,12 @@ set_initial_firewall_access() {
# Set up the management interface on the ISO
set_management_interface() {
title "Setting up the main interface"
if [[ $MNIC == "bond1" ]]; then
configure_management_bond || fail_setup
fi
if [ "$address_type" = 'DHCP' ]; then
logCmd "nmcli con mod $MNIC connection.autoconnect yes"
logCmd "nmcli con mod $MNIC connection.autoconnect yes ipv4.method auto"
logCmd "nmcli con up $MNIC"
logCmd "nmcli -p connection show $MNIC"
else
+1 -22
View File
@@ -795,31 +795,10 @@ if ! [[ -f $install_opt_file ]]; then
checkin_at_boot
set_initial_firewall_access
initialize_elasticsearch_indices "so-case so-casehistory so-assistant-session so-assistant-chat"
# run a final highstate before enabling scheduled highstates.
# run a final highstate before enabling scheduled highstates.
# this will ensure so-elasticsearch-ilm-policy-load and so-elasticsearch-templates-load have a chance to run after elasticfleet is setup
info "Running final highstate for setup"
logCmd "salt-call state.highstate -l info"
# postsalt: enable PG-canonical pillar mode now that the install is
# fully on disk. We can't flip the flag earlier — ext_pillar overlay
# would replace the elasticsearch subtree (and others) with what's
# in PG before the importer has run, dropping secrets-allowlisted
# subkeys like elasticsearch.auth.users.so_elastic_user.pass that
# elasticsearch.enabled.sls needs to render. Order:
# 1. drop adv_postgres.sls flipping the flag
# 2. refresh_pillar so the next state sees enabled=True
# 3. apply postgres.schema_pillar — deploys schema, ALTERs role
# passwords, installs psycopg2 into salt's bundled python,
# runs so-pillar-import, writes /opt/so/conf/so-yaml/mode=postgres
# 4. apply salt.master — re-renders engines.conf with the
# pg_notify_pillar engine block, drops master.d ext_pillar
# config, watch_in restarts salt-master, ext_pillar takes over
info "Enabling postsalt PG-canonical pillar mode"
enable_so_pillar_postgres
logCmd "salt-call saltutil.refresh_pillar"
logCmd "salt-call state.apply postgres.schema_pillar -l info"
logCmd "salt-call state.apply salt.master -l info"
logCmd "salt-call schedule.enable -linfo --local"
verify_setup
else
+83 -2
View File
@@ -845,18 +845,99 @@ whiptail_management_nic() {
[ -n "$TESTING" ] && return
filter_unused_nics
local management_nic_options=( "${nic_list_management[@]}" )
if [[ $is_iso || $is_desktop_iso ]]; then
management_nic_options+=( "BOND" "Configure a bonded management interface" )
fi
MNIC=$(whiptail --title "$whiptail_title" --menu "Please select the NIC you would like to use for management.\n\nUse the arrow keys to move around and the Enter key to select." 20 75 12 "${nic_list_management[@]}" 3>&1 1>&2 2>&3 )
MNIC=$(whiptail --title "$whiptail_title" --menu "Please select the NIC you would like to use for management.\n\nUse the arrow keys to move around and the Enter key to select." 20 75 12 "${management_nic_options[@]}" 3>&1 1>&2 2>&3 )
local exitstatus=$?
whiptail_check_exitstatus $exitstatus
while [ -z "$MNIC" ]
do
MNIC=$(whiptail --title "$whiptail_title" --menu "Please select the NIC you would like to use for management.\n\nUse the arrow keys to move around and the Enter key to select." 22 75 12 "${nic_list_management[@]}" 3>&1 1>&2 2>&3 )
MNIC=$(whiptail --title "$whiptail_title" --menu "Please select the NIC you would like to use for management.\n\nUse the arrow keys to move around and the Enter key to select." 22 75 12 "${management_nic_options[@]}" 3>&1 1>&2 2>&3 )
local exitstatus=$?
whiptail_check_exitstatus $exitstatus
done
if [[ $MNIC == "BOND" ]]; then
whiptail_management_bond
fi
}
whiptail_management_bond() {
[ -n "$TESTING" ] && return
MBOND_MODE=$(whiptail --title "$whiptail_title" --menu \
"Choose the bond mode for the management interface.\n\nThe management bond will be created as bond1." 20 75 7 \
"active-backup" "One active NIC with failover (recommended)" \
"balance-rr" "Round-robin transmit policy" \
"balance-xor" "Transmit based on selected hash policy" \
"broadcast" "Transmit everything on all slave interfaces" \
"802.3ad" "Dynamic link aggregation (requires switch support)" \
"balance-tlb" "Adaptive transmit load balancing" \
"balance-alb" "Adaptive load balancing" 3>&1 1>&2 2>&3)
local exitstatus=$?
whiptail_check_exitstatus $exitstatus
while [ -z "$MBOND_MODE" ]
do
MBOND_MODE=$(whiptail --title "$whiptail_title" --menu \
"Choose the bond mode for the management interface.\n\nThe management bond will be created as bond1." 20 75 7 \
"active-backup" "One active NIC with failover (recommended)" \
"balance-rr" "Round-robin transmit policy" \
"balance-xor" "Transmit based on selected hash policy" \
"broadcast" "Transmit everything on all slave interfaces" \
"802.3ad" "Dynamic link aggregation (requires switch support)" \
"balance-tlb" "Adaptive transmit load balancing" \
"balance-alb" "Adaptive load balancing" 3>&1 1>&2 2>&3)
local exitstatus=$?
whiptail_check_exitstatus $exitstatus
done
whiptail_management_bond_nics
MNIC="bond1"
export MBOND_MODE MNIC
}
whiptail_management_bond_nics() {
[ -n "$TESTING" ] && return
MBNICS=()
filter_unused_nics
MBNICS=$(whiptail --title "$whiptail_title" --checklist "Please add NICs to the Management Interface:" 20 75 12 "${nic_list[@]}" 3>&1 1>&2 2>&3)
local exitstatus=$?
whiptail_check_exitstatus $exitstatus
while [ -z "$MBNICS" ]
do
MBNICS=$(whiptail --title "$whiptail_title" --checklist "Please add NICs to the Management Interface:" 20 75 12 "${nic_list[@]}" 3>&1 1>&2 2>&3)
local exitstatus=$?
whiptail_check_exitstatus $exitstatus
done
MBNICS=$(echo "$MBNICS" | tr -d '"')
IFS=' ' read -ra MBNICS <<< "$MBNICS"
for bond_nic in "${MBNICS[@]}"; do
for dev_status in "${nmcli_dev_status_list[@]}"; do
if [[ $dev_status == "${bond_nic}:unmanaged" ]]; then
whiptail \
--title "$whiptail_title" \
--msgbox "$bond_nic is unmanaged by Network Manager. Please remove it from other network management tools then re-run setup." \
8 75
exit
fi
done
done
export MBNICS
}
whiptail_net_method() {
Binary file not shown.