From a433e9524dcc0a4ddb6a48ee3a4d0f7678c457e1 Mon Sep 17 00:00:00 2001 From: Mike Reeves Date: Tue, 12 May 2026 16:05:55 -0400 Subject: [PATCH] Move onionconfig writes out of so-yaml --- salt/common/soup_scripts.sls | 14 + .../tools/sbin_jinja/so-elastic-fleet-setup | 2 + salt/kafka/disabled.sls | 7 +- salt/manager/tools/sbin/so-config.py | 448 ++++++++++++++++++ salt/manager/tools/sbin/so-config_test.py | 178 +++++++ salt/manager/tools/sbin/so-minion | 30 ++ salt/manager/tools/sbin/so-pillar-import | 329 ------------- salt/manager/tools/sbin/so-yaml.py | 166 +------ salt/manager/tools/sbin/so-yaml_test.py | 302 +----------- salt/manager/tools/sbin/so_yaml_postgres.py | 320 ------------- .../tools/sbin_jinja/so-elastic-fleet-reset | 3 + salt/orch/so_pillar_reload.sls | 112 ----- .../files/schema/pillar/001_schema.sql | 124 ----- .../files/schema/pillar/002_views.sql | 49 -- .../schema/pillar/003_history_trigger.sql | 120 ----- .../files/schema/pillar/004_secrets.sql | 130 ----- .../files/schema/pillar/005_seed_roles.sql | 39 -- salt/postgres/files/schema/pillar/006_rls.sql | 106 ----- .../files/schema/pillar/007_drift_pgcron.sql | 43 -- .../files/schema/pillar/008_change_notify.sql | 77 --- salt/postgres/init.sls | 1 - salt/postgres/schema_pillar.sls | 126 +---- salt/reactor/so_pillar_changed.sls | 27 -- salt/salt/engines/master/pg_notify_pillar.py | 200 -------- salt/salt/files/engines.conf | 6 +- salt/salt/master/ext_pillar_postgres.sls | 26 +- .../files/ext_pillar_postgres.conf.jinja | 38 -- .../files/pg_notify_pillar_engine.conf.jinja | 20 - salt/salt/master/files/so_pillar_reactor.conf | 12 - salt/salt/master/pg_notify_pillar_engine.sls | 50 +- salt/soc/enabled.sls | 23 + 31 files changed, 719 insertions(+), 2409 deletions(-) create mode 100755 salt/manager/tools/sbin/so-config.py create mode 100644 salt/manager/tools/sbin/so-config_test.py delete mode 100755 salt/manager/tools/sbin/so-pillar-import delete mode 100644 salt/manager/tools/sbin/so_yaml_postgres.py delete mode 100644 salt/orch/so_pillar_reload.sls delete mode 100644 salt/postgres/files/schema/pillar/001_schema.sql delete mode 100644 salt/postgres/files/schema/pillar/002_views.sql delete mode 100644 salt/postgres/files/schema/pillar/003_history_trigger.sql delete mode 100644 salt/postgres/files/schema/pillar/004_secrets.sql delete mode 100644 salt/postgres/files/schema/pillar/005_seed_roles.sql delete mode 100644 salt/postgres/files/schema/pillar/006_rls.sql delete mode 100644 salt/postgres/files/schema/pillar/007_drift_pgcron.sql delete mode 100644 salt/postgres/files/schema/pillar/008_change_notify.sql delete mode 100644 salt/reactor/so_pillar_changed.sls delete mode 100644 salt/salt/engines/master/pg_notify_pillar.py delete mode 100644 salt/salt/master/files/ext_pillar_postgres.conf.jinja delete mode 100644 salt/salt/master/files/pg_notify_pillar_engine.conf.jinja delete mode 100644 salt/salt/master/files/so_pillar_reactor.conf diff --git a/salt/common/soup_scripts.sls b/salt/common/soup_scripts.sls index 6b2f1551d..384e6627d 100644 --- a/salt/common/soup_scripts.sls +++ b/salt/common/soup_scripts.sls @@ -48,6 +48,13 @@ copy_so-yaml_manager_tools_sbin: - force: True - preserve: True +copy_so-config_manager_tools_sbin: + file.copy: + - name: /opt/so/saltstack/default/salt/manager/tools/sbin/so-config.py + - source: {{UPDATE_DIR}}/salt/manager/tools/sbin/so-config.py + - force: True + - preserve: True + copy_so-repo-sync_manager_tools_sbin: file.copy: - name: /opt/so/saltstack/default/salt/manager/tools/sbin/so-repo-sync @@ -97,6 +104,13 @@ copy_so-yaml_sbin: - force: True - preserve: True +copy_so-config_sbin: + file.copy: + - name: /usr/sbin/so-config.py + - source: {{UPDATE_DIR}}/salt/manager/tools/sbin/so-config.py + - force: True + - preserve: True + copy_so-repo-sync_sbin: file.copy: - name: /usr/sbin/so-repo-sync diff --git a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup index 5e0dc0c69..257177892 100755 --- a/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup +++ b/salt/elasticfleet/tools/sbin_jinja/so-elastic-fleet-setup @@ -232,6 +232,7 @@ printf '%s\n'\ " grid_enrollment_general: '$GRIDNODESENROLLMENTOKENGENERAL'"\ " grid_enrollment_heavy: '$GRIDNODESENROLLMENTOKENHEAVY'"\ "" >> "$pillar_file" +/usr/sbin/so-config.py import-file "$pillar_file" --note "so-elastic-fleet-setup" #Store Grid Nodes Enrollment token in Global pillar global_pillar_file=/opt/so/saltstack/local/pillar/global/soc_global.sls @@ -239,6 +240,7 @@ printf '%s\n'\ " fleet_grid_enrollment_token_general: '$GRIDNODESENROLLMENTOKENGENERAL'"\ " fleet_grid_enrollment_token_heavy: '$GRIDNODESENROLLMENTOKENHEAVY'"\ "" >> "$global_pillar_file" +/usr/sbin/so-config.py import-file "$global_pillar_file" --note "so-elastic-fleet-setup" # Call Elastic-Fleet Salt State printf "\nApplying elasticfleet state" diff --git a/salt/kafka/disabled.sls b/salt/kafka/disabled.sls index 79fd0c261..a6ec8d6f8 100644 --- a/salt/kafka/disabled.sls +++ b/salt/kafka/disabled.sls @@ -20,8 +20,11 @@ so-kafka_so-status.disabled: ensure_default_pipeline: cmd.run: - name: | - /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled False; + set -e + /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled False + /usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls replace kafka.enabled False --note "kafka.disabled" /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/global/soc_global.sls global.pipeline REDIS + /usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/global/soc_global.sls replace global.pipeline REDIS --note "kafka.disabled" {% endif %} {# If Kafka has never been manually enabled, the 'Kafka' user does not exist. In this case certs for Kafka should not exist since they'll be owned by uid 960 #} @@ -31,4 +34,4 @@ check_kafka_cert_{{cert}}: - name: /etc/pki/{{cert}} - onlyif: stat -c %U /etc/pki/{{cert}} | grep -q UNKNOWN - show_changes: False -{% endfor %} \ No newline at end of file +{% endfor %} diff --git a/salt/manager/tools/sbin/so-config.py b/salt/manager/tools/sbin/so-config.py new file mode 100755 index 000000000..7e62cb229 --- /dev/null +++ b/salt/manager/tools/sbin/so-config.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 + +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +""" +so-config.py writes SOC/onionconfig settings to Postgres. + +so-yaml.py remains a YAML file editor. Call this tool when a pillar-backed +setting also needs to be reflected in the onionconfig database. +""" + +import argparse +import json +import os +from pathlib import Path +import subprocess +import sys + +import yaml + + +PILLAR_ROOT = Path(os.environ.get("SO_CONFIG_PILLAR_ROOT", "/opt/so/saltstack/local/pillar")) +DOCKER_CONTAINER = os.environ.get("SO_CONFIG_PG_CONTAINER", "so-postgres") +PG_DATABASE = os.environ.get("SO_CONFIG_PG_DATABASE", "securityonion") +PG_USER = os.environ.get("SO_CONFIG_PG_USER", "postgres") +DEFAULT_USER_ID = os.environ.get("SO_CONFIG_USER_ID", "so-config") + +EXCLUDE_BASENAMES = { + "secrets.sls", + "auth.sls", + "top.sls", +} +EXCLUDE_PATH_FRAGMENTS = ( + "/elasticsearch/nodes.sls", + "/redis/nodes.sls", + "/kafka/nodes.sls", + "/hypervisor/nodes.sls", + "/logstash/nodes.sls", + "/node_data/ips.sls", + "/postgres/auth.sls", + "/elasticsearch/auth.sls", + "/kibana/secrets.sls", +) + + +class SkipPath(Exception): + pass + + +def pg_str(value): + if value is None: + return "NULL" + return "'" + str(value).replace("'", "''") + "'" + + +def pg_jsonb(value): + return pg_str(json.dumps(value)) + "::jsonb" + + +def docker_psql(sql): + proc = subprocess.run( + ["docker", "exec", "-i", DOCKER_CONTAINER, + "psql", "-U", PG_USER, "-d", PG_DATABASE, + "-tA", "-q", "-v", "ON_ERROR_STOP=1"], + input=sql.encode(), + capture_output=True, + check=False, + timeout=60, + ) + if proc.returncode != 0: + sys.stderr.write(proc.stderr.decode(errors="replace")) + raise RuntimeError(f"docker exec psql failed with rc={proc.returncode}") + return proc.stdout.decode(errors="replace") + + +def schema_ready(): + sql = """ +SELECT to_regclass('public.settings') IS NOT NULL + AND to_regclass('public.audit_settings') IS NOT NULL; +""" + return docker_psql(sql).strip() == "t" + + +def cmd_wait_schema(args): + import time + + deadline = time.time() + args.timeout + while time.time() <= deadline: + if schema_ready(): + return 0 + time.sleep(args.interval) + print("so-config: onionconfig schema is not ready", file=sys.stderr) + return 1 + + +def upsert_setting(setting_id, value, *, node_id="", duplicated_from_id=None, + user_id=DEFAULT_USER_ID, note=None): + note = note or "so-config upsert" + sql = f""" +BEGIN; +WITH old_row AS ( + SELECT value + FROM settings + WHERE setting_id = {pg_str(setting_id)} + AND node_id = {pg_str(node_id)} + FOR UPDATE +), +upserted AS ( + INSERT INTO settings (setting_id, value, duplicated_from_id, node_id) + VALUES ({pg_str(setting_id)}, {pg_jsonb(value)}, {pg_str(duplicated_from_id)}, {pg_str(node_id)}) + ON CONFLICT (setting_id, node_id) DO UPDATE + SET value = EXCLUDED.value, + duplicated_from_id = EXCLUDED.duplicated_from_id + RETURNING value +) +INSERT INTO audit_settings (setting_id, node_id, user_id, old_value, new_value, note) +SELECT {pg_str(setting_id)}, + {pg_str(node_id)}, + {pg_str(user_id)}, + (SELECT value FROM old_row), + (SELECT value FROM upserted), + {pg_str(note)} + WHERE NOT EXISTS (SELECT 1 FROM old_row) + OR (SELECT value FROM old_row) IS DISTINCT FROM (SELECT value FROM upserted); +COMMIT; +""" + docker_psql(sql) + + +def delete_setting(setting_id, *, node_id="", user_id=DEFAULT_USER_ID, note=None): + note = note or "so-config delete" + sql = f""" +BEGIN; +WITH deleted AS ( + DELETE FROM settings + WHERE setting_id = {pg_str(setting_id)} + AND node_id = {pg_str(node_id)} + RETURNING value +) +INSERT INTO audit_settings (setting_id, node_id, user_id, old_value, new_value, note) +SELECT {pg_str(setting_id)}, {pg_str(node_id)}, {pg_str(user_id)}, value, NULL::jsonb, {pg_str(note)} + FROM deleted; +COMMIT; +""" + docker_psql(sql) + + +def delete_setting_prefix(setting_id, *, node_id="", user_id=DEFAULT_USER_ID, note=None): + if not setting_id: + raise ValueError("setting_id prefix cannot be empty") + note = note or "so-config delete-prefix" + sql = f""" +BEGIN; +WITH deleted AS ( + DELETE FROM settings + WHERE node_id = {pg_str(node_id)} + AND ( + setting_id = {pg_str(setting_id)} + OR substring(setting_id from 1 for char_length({pg_str(setting_id)}) + 1) = {pg_str(setting_id + ".")} + ) + RETURNING setting_id, value +) +INSERT INTO audit_settings (setting_id, node_id, user_id, old_value, new_value, note) +SELECT setting_id, {pg_str(node_id)}, {pg_str(user_id)}, value, NULL::jsonb, {pg_str(note)} + FROM deleted; +COMMIT; +""" + docker_psql(sql) + + +def purge_node(node_id, *, user_id=DEFAULT_USER_ID, note=None): + note = note or "so-config purge-node" + sql = f""" +BEGIN; +WITH deleted AS ( + DELETE FROM settings + WHERE node_id = {pg_str(node_id)} + RETURNING setting_id, value +) +INSERT INTO audit_settings (setting_id, node_id, user_id, old_value, new_value, note) +SELECT setting_id, {pg_str(node_id)}, {pg_str(user_id)}, value, NULL::jsonb, {pg_str(note)} + FROM deleted; +COMMIT; +""" + docker_psql(sql) + + +def parse_value(value, value_file=None): + if value_file: + with open(value_file, "r") as fh: + value = fh.read() + parsed = yaml.safe_load(value) + if parsed is None and value == "": + return "" + return parsed + + +def parse_yaml_file(path): + with open(path, "rb") as fh: + raw = fh.read() + if b"{%" in raw or b"{{" in raw: + raise SkipPath(f"{path}: Jinja-templated files stay disk-only") + if not raw.strip(): + return {} + parsed = yaml.safe_load(raw) + return parsed if parsed is not None else {} + + +def flatten(prefix, value): + if isinstance(value, dict): + for key, child in value.items(): + child_id = f"{prefix}.{key}" if prefix else str(key) + yield from flatten(child_id, child) + else: + yield prefix, value + + +def classify_pillar_path(path): + norm = Path(path).resolve() + norm_str = str(norm) + + if norm.name in EXCLUDE_BASENAMES: + raise SkipPath(f"{path}: excluded basename") + for fragment in EXCLUDE_PATH_FRAGMENTS: + if fragment in norm_str: + raise SkipPath(f"{path}: excluded path fragment {fragment}") + if norm.suffix != ".sls": + raise SkipPath(f"{path}: not an .sls file") + + parent = norm.parent.name + stem = norm.stem + + if parent == "minions": + if stem.startswith("adv_"): + return {"kind": "advanced", "setting_id": "advanced", "node_id": stem[4:]} + return {"kind": "normal", "node_id": stem} + + section = parent + if stem == f"soc_{section}": + return {"kind": "normal", "node_id": ""} + if stem == f"adv_{section}": + return {"kind": "advanced", "setting_id": f"{section}.advanced", "node_id": ""} + + raise SkipPath(f"{path}: not a SOC-managed pillar file") + + +def import_pillar_file(path, *, user_id=DEFAULT_USER_ID, note=None): + meta = classify_pillar_path(path) + note = note or f"so-config import-file {path}" + + if meta["kind"] == "advanced": + with open(path, "r") as fh: + upsert_setting(meta["setting_id"], fh.read(), node_id=meta["node_id"], + user_id=user_id, note=note) + return 1 + + data = parse_yaml_file(path) + if not isinstance(data, dict): + raise SkipPath(f"{path}: top-level YAML is not a map") + + count = 0 + for setting_id, value in flatten("", data): + upsert_setting(setting_id, value, node_id=meta["node_id"], + user_id=user_id, note=note) + count += 1 + return count + + +def iter_pillar_files(root): + root = Path(root) + if not root.is_dir(): + return + for path in sorted(root.rglob("*.sls")): + if path.is_file(): + yield path + + +def cmd_set(args): + upsert_setting(args.setting_id, parse_value(args.value, args.value_file), + node_id=args.node_id, + duplicated_from_id=args.duplicated_from_id, + user_id=args.user_id, + note=args.note) + return 0 + + +def cmd_delete(args): + delete_setting(args.setting_id, node_id=args.node_id, + user_id=args.user_id, note=args.note) + return 0 + + +def cmd_delete_prefix(args): + delete_setting_prefix(args.setting_id, node_id=args.node_id, + user_id=args.user_id, note=args.note) + return 0 + + +def cmd_purge_node(args): + purge_node(args.node_id, user_id=args.user_id, note=args.note) + return 0 + + +def cmd_import_file(args): + count = import_pillar_file(args.path, user_id=args.user_id, note=args.note) + print(f"imported {count} settings from {args.path}") + return 0 + + +def cmd_import_minion(args): + count = 0 + for name in (f"{args.node_id}.sls", f"adv_{args.node_id}.sls"): + path = PILLAR_ROOT / "minions" / name + if path.exists(): + count += import_pillar_file(path, user_id=args.user_id, note=args.note) + print(f"imported {count} settings for node {args.node_id}") + return 0 + + +def cmd_import_all(args): + count = 0 + skipped = 0 + for path in iter_pillar_files(args.root): + try: + count += import_pillar_file(path, user_id=args.user_id, note=args.note) + except SkipPath as exc: + skipped += 1 + if args.verbose: + print(f"skip: {exc}", file=sys.stderr) + print(f"imported {count} settings, skipped {skipped} files") + if args.state_file: + with open(args.state_file, "w") as fh: + fh.write("ok\n") + return 0 + + +def cmd_sync_yaml_mutation(args): + meta = classify_pillar_path(args.path) + note = args.note or f"so-config sync-yaml-mutation {args.operation} {args.path}" + + if meta["kind"] == "advanced": + import_pillar_file(args.path, user_id=args.user_id, note=note) + return 0 + + if args.operation in ("add", "replace"): + upsert_setting(args.key, parse_value(args.value, args.value_file), + node_id=meta["node_id"], + user_id=args.user_id, + note=note) + elif args.operation == "remove": + delete_setting_prefix(args.key, node_id=meta["node_id"], + user_id=args.user_id, note=note) + else: + raise ValueError(f"unsupported operation: {args.operation}") + return 0 + + +def build_parser(): + parser = argparse.ArgumentParser(description=__doc__) + sub = parser.add_subparsers(dest="command", required=True) + + p = sub.add_parser("wait-schema", help="wait for SOC-created onionconfig tables") + p.add_argument("--timeout", type=int, default=120) + p.add_argument("--interval", type=int, default=2) + p.set_defaults(func=cmd_wait_schema) + + p = sub.add_parser("set", help="upsert one setting") + p.add_argument("setting_id") + p.add_argument("value", nargs="?", default="") + p.add_argument("--value-file") + p.add_argument("--node-id", default="") + p.add_argument("--duplicated-from-id") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note") + p.set_defaults(func=cmd_set) + + p = sub.add_parser("delete", help="delete one setting") + p.add_argument("setting_id") + p.add_argument("--node-id", default="") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note") + p.set_defaults(func=cmd_delete) + + p = sub.add_parser("delete-prefix", help="delete one setting and all child settings") + p.add_argument("setting_id") + p.add_argument("--node-id", default="") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note") + p.set_defaults(func=cmd_delete_prefix) + + p = sub.add_parser("purge-node", help="delete all settings for one node") + p.add_argument("node_id") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note") + p.set_defaults(func=cmd_purge_node) + + p = sub.add_parser("import-file", help="import one SOC-managed pillar file") + p.add_argument("path") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note") + p.set_defaults(func=cmd_import_file) + + p = sub.add_parser("import-minion", help="import one minion's pillar files") + p.add_argument("node_id") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note") + p.set_defaults(func=cmd_import_minion) + + p = sub.add_parser("import-all", help="import all SOC-managed local pillar files") + p.add_argument("--root", default=str(PILLAR_ROOT)) + p.add_argument("--state-file") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note", default="so-config initial import") + p.add_argument("--verbose", action="store_true") + p.set_defaults(func=cmd_import_all) + + p = sub.add_parser("sync-yaml-mutation", + help="mirror one so-yaml add/replace/remove mutation to onionconfig") + p.add_argument("path") + p.add_argument("operation", choices=("add", "replace", "remove")) + p.add_argument("key") + p.add_argument("value", nargs="?", default="") + p.add_argument("--value-file") + p.add_argument("--user-id", default=DEFAULT_USER_ID) + p.add_argument("--note") + p.set_defaults(func=cmd_sync_yaml_mutation) + + return parser + + +def main(argv): + parser = build_parser() + args = parser.parse_args(argv) + try: + return args.func(args) + except SkipPath as exc: + print(f"skip: {exc}", file=sys.stderr) + return 2 + except Exception as exc: + print(f"so-config: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/salt/manager/tools/sbin/so-config_test.py b/salt/manager/tools/sbin/so-config_test.py new file mode 100644 index 000000000..0488ea08e --- /dev/null +++ b/salt/manager/tools/sbin/so-config_test.py @@ -0,0 +1,178 @@ +import importlib +import os +import tempfile +import unittest +from unittest.mock import patch + + +soconfig = importlib.import_module("so-config") + + +class TestSoConfigPathMapping(unittest.TestCase): + + def test_classify_global_soc(self): + meta = soconfig.classify_pillar_path( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") + self.assertEqual(meta["kind"], "normal") + self.assertEqual(meta["node_id"], "") + + def test_classify_global_advanced(self): + meta = soconfig.classify_pillar_path( + "/opt/so/saltstack/local/pillar/soc/adv_soc.sls") + self.assertEqual(meta["kind"], "advanced") + self.assertEqual(meta["setting_id"], "soc.advanced") + self.assertEqual(meta["node_id"], "") + + def test_classify_minion(self): + meta = soconfig.classify_pillar_path( + "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls") + self.assertEqual(meta["kind"], "normal") + self.assertEqual(meta["node_id"], "h1_sensor") + + def test_classify_minion_advanced(self): + meta = soconfig.classify_pillar_path( + "/opt/so/saltstack/local/pillar/minions/adv_h1_sensor.sls") + self.assertEqual(meta["kind"], "advanced") + self.assertEqual(meta["setting_id"], "advanced") + self.assertEqual(meta["node_id"], "h1_sensor") + + def test_classify_skips_bootstrap(self): + with self.assertRaises(soconfig.SkipPath): + soconfig.classify_pillar_path( + "/opt/so/saltstack/local/pillar/secrets.sls") + + +class TestSoConfigImport(unittest.TestCase): + + def test_flatten_keeps_lists_as_values(self): + flattened = dict(soconfig.flatten("", { + "host": {"mainip": "10.0.0.1"}, + "suricata": {"pcap": {"enabled": True}}, + "items": ["a", "b"], + })) + self.assertEqual(flattened["host.mainip"], "10.0.0.1") + self.assertEqual(flattened["suricata.pcap.enabled"], True) + self.assertEqual(flattened["items"], ["a", "b"]) + + def test_import_file_upserts_flattened_settings(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "h1_sensor.sls") + minions = os.path.join(tmp, "minions") + os.mkdir(minions) + path = os.path.join(minions, "h1_sensor.sls") + with open(path, "w") as fh: + fh.write("host:\n mainip: 10.0.0.1\nsuricata:\n enabled: true\n") + + calls = [] + with patch.object(soconfig, "upsert_setting", + side_effect=lambda *args, **kwargs: calls.append((args, kwargs))): + count = soconfig.import_pillar_file(path) + + self.assertEqual(count, 2) + self.assertIn((("host.mainip", "10.0.0.1"), {"node_id": "h1_sensor", "user_id": "so-config", "note": f"so-config import-file {path}"}), calls) + self.assertIn((("suricata.enabled", True), {"node_id": "h1_sensor", "user_id": "so-config", "note": f"so-config import-file {path}"}), calls) + + def test_import_advanced_file_upserts_raw_content(self): + with tempfile.TemporaryDirectory() as tmp: + minions = os.path.join(tmp, "minions") + os.mkdir(minions) + path = os.path.join(minions, "adv_h1_sensor.sls") + with open(path, "w") as fh: + fh.write("custom:\n raw: true\n") + + calls = [] + with patch.object(soconfig, "upsert_setting", + side_effect=lambda *args, **kwargs: calls.append((args, kwargs))): + count = soconfig.import_pillar_file(path) + + self.assertEqual(count, 1) + self.assertEqual(calls[0][0], ("advanced", "custom:\n raw: true\n")) + self.assertEqual(calls[0][1]["node_id"], "h1_sensor") + + +class TestSoConfigSql(unittest.TestCase): + + def test_schema_ready_checks_soc_tables(self): + captured = {} + with patch.object(soconfig, "docker_psql", + side_effect=lambda sql: captured.update({"sql": sql}) or "t\n"): + ready = soconfig.schema_ready() + + self.assertTrue(ready) + self.assertIn("to_regclass('public.settings')", captured["sql"]) + self.assertIn("to_regclass('public.audit_settings')", captured["sql"]) + + def test_set_writes_settings_and_audit(self): + captured = {} + with patch.object(soconfig, "docker_psql", + side_effect=lambda sql: captured.setdefault("sql", sql)): + soconfig.upsert_setting("host.mainip", "10.0.0.1", + node_id="h1_sensor", user_id="tester", note="unit") + + self.assertIn("INSERT INTO settings", captured["sql"]) + self.assertIn("INSERT INTO audit_settings", captured["sql"]) + self.assertIn("'host.mainip'", captured["sql"]) + self.assertIn("'h1_sensor'", captured["sql"]) + self.assertIn("'tester'", captured["sql"]) + + def test_purge_node_audits_deleted_rows(self): + captured = {} + with patch.object(soconfig, "docker_psql", + side_effect=lambda sql: captured.setdefault("sql", sql)): + soconfig.purge_node("h1_sensor", user_id="tester", note="unit") + + self.assertIn("DELETE FROM settings", captured["sql"]) + self.assertIn("WHERE node_id = 'h1_sensor'", captured["sql"]) + self.assertIn("INSERT INTO audit_settings", captured["sql"]) + + def test_delete_prefix_removes_children_and_audits(self): + captured = {} + with patch.object(soconfig, "docker_psql", + side_effect=lambda sql: captured.setdefault("sql", sql)): + soconfig.delete_setting_prefix("elasticfleet", node_id="h1_sensor", + user_id="tester", note="unit") + + self.assertIn("DELETE FROM settings", captured["sql"]) + self.assertIn("setting_id = 'elasticfleet'", captured["sql"]) + self.assertIn("'elasticfleet.'", captured["sql"]) + self.assertIn("INSERT INTO audit_settings", captured["sql"]) + + def test_sync_yaml_replace_uses_path_node_id(self): + with tempfile.TemporaryDirectory() as tmp: + minions = os.path.join(tmp, "minions") + os.mkdir(minions) + path = os.path.join(minions, "h1_sensor.sls") + open(path, "w").close() + + calls = [] + args = soconfig.build_parser().parse_args([ + "sync-yaml-mutation", path, "replace", "suricata.enabled", "true" + ]) + with patch.object(soconfig, "upsert_setting", + side_effect=lambda *a, **kw: calls.append((a, kw))): + soconfig.cmd_sync_yaml_mutation(args) + + self.assertEqual(calls[0][0], ("suricata.enabled", True)) + self.assertEqual(calls[0][1]["node_id"], "h1_sensor") + + def test_sync_yaml_remove_deletes_prefix(self): + with tempfile.TemporaryDirectory() as tmp: + minions = os.path.join(tmp, "minions") + os.mkdir(minions) + path = os.path.join(minions, "h1_sensor.sls") + open(path, "w").close() + + calls = [] + args = soconfig.build_parser().parse_args([ + "sync-yaml-mutation", path, "remove", "elasticfleet" + ]) + with patch.object(soconfig, "delete_setting_prefix", + side_effect=lambda *a, **kw: calls.append((a, kw))): + soconfig.cmd_sync_yaml_mutation(args) + + self.assertEqual(calls[0][0], ("elasticfleet",)) + self.assertEqual(calls[0][1]["node_id"], "h1_sensor") + + +if __name__ == "__main__": + unittest.main() diff --git a/salt/manager/tools/sbin/so-minion b/salt/manager/tools/sbin/so-minion index 86bab25e6..8a4244271 100755 --- a/salt/manager/tools/sbin/so-minion +++ b/salt/manager/tools/sbin/so-minion @@ -314,6 +314,24 @@ EOSQL fi } +function sync_minion_config_to_db() { + log "INFO" "Syncing minion config to onionconfig for $MINION_ID" + /usr/sbin/so-config.py import-minion "$MINION_ID" --note "so-minion $OPERATION" + if [ $? -ne 0 ]; then + log "ERROR" "Failed to sync minion config to onionconfig for $MINION_ID" + return 1 + fi +} + +function purge_minion_config_from_db() { + log "INFO" "Purging minion config from onionconfig for $MINION_ID" + /usr/sbin/so-config.py purge-node "$MINION_ID" --note "so-minion delete" + if [ $? -ne 0 ]; then + log "ERROR" "Failed to purge minion config from onionconfig for $MINION_ID" + return 1 + fi +} + # Create the minion file function ensure_socore_ownership() { log "INFO" "Setting socore ownership on minion files" @@ -1088,6 +1106,10 @@ case "$OPERATION" in log "ERROR" "Failed to setup minion files for $MINION_ID" exit 1 } + sync_minion_config_to_db || { + log "ERROR" "Failed to sync minion config to onionconfig for $MINION_ID" + exit 1 + } updateMineAndApplyStates || { log "ERROR" "Failed to update mine and apply states for $MINION_ID" exit 1 @@ -1108,12 +1130,20 @@ case "$OPERATION" in log "ERROR" "Failed to setup VM minion files for $MINION_ID" exit 1 } + sync_minion_config_to_db || { + log "ERROR" "Failed to sync VM minion config to onionconfig for $MINION_ID" + exit 1 + } log "INFO" "Successfully added VM minion $MINION_ID" ;; "delete") log "INFO" "Removing minion $MINION_ID" remove_postgres_telegraf_from_minion + purge_minion_config_from_db || { + log "ERROR" "Failed to purge minion config from onionconfig for $MINION_ID" + exit 1 + } deleteMinionFiles || { log "ERROR" "Failed to delete minion files for $MINION_ID" exit 1 diff --git a/salt/manager/tools/sbin/so-pillar-import b/salt/manager/tools/sbin/so-pillar-import deleted file mode 100755 index 2705753d8..000000000 --- a/salt/manager/tools/sbin/so-pillar-import +++ /dev/null @@ -1,329 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one -# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at -# https://securityonion.net/license; you may not use this file except in compliance with the -# Elastic License 2.0. - -""" -so-pillar-import — populate the so_pillar.* schema in so-postgres from the -on-disk Salt pillar tree. - -Reads /opt/so/saltstack/local/pillar/, decomposes each .sls file into a -(scope, role|minion_id, pillar_path, data) tuple, and UPSERTs it into -so_pillar.pillar_entry. Idempotent — re-running with no SLS edits produces -no version bumps because the audit trigger only writes a row when data -actually changes. - -Bootstrap and mine-driven files are skipped (see EXCLUDE_BASENAMES / -EXCLUDE_PREFIXES below). Files containing Jinja templates ({% or {{) are -also skipped — those stay disk-authoritative and ext_pillar_first: False -means they render before the PG overlay anyway. - -All SQL goes through `docker exec so-postgres psql` so no separate DSN -config is required at first-install time. Designed to be called by -salt/postgres/schema_pillar.sls (initial seed) and by salt/manager/tools/ -sbin/so-minion (per-minion sync on add/delete). -""" - -import argparse -import json -import os -import shlex -import subprocess -import sys -from pathlib import Path - -import yaml - - -PILLAR_LOCAL_ROOT = Path("/opt/so/saltstack/local/pillar") -PILLAR_DEFAULT_ROOT = Path("/opt/so/saltstack/default/pillar") -DOCKER_CONTAINER = "so-postgres" -PG_SUPERUSER = "postgres" -PG_DATABASE = "securityonion" - -# Files that must NEVER move to Postgres. These are read by Salt before -# Postgres is reachable, or contain renderer-time computed values (mine, etc.). -EXCLUDE_BASENAMES = { - "secrets.sls", - "auth.sls", # postgres/auth.sls bootstrap - "top.sls", -} -# Filename prefixes to skip — these are renderer-time computed pillars -# (Salt mine, file_exists guards, etc.) that have to stay on disk. -EXCLUDE_PATH_FRAGMENTS = ( - "/elasticsearch/nodes.sls", - "/redis/nodes.sls", - "/kafka/nodes.sls", - "/hypervisor/nodes.sls", - "/logstash/nodes.sls", - "/node_data/ips.sls", - "/postgres/auth.sls", - "/elasticsearch/auth.sls", - "/kibana/secrets.sls", -) - - -def log(level, msg): - print(f"[{level}] {msg}", file=sys.stderr) - - -def is_jinja_templated(content_bytes): - return b"{%" in content_bytes or b"{{" in content_bytes - - -def classify(path): - """Return (scope, role_name, minion_id, pillar_path) for a pillar file - or None to skip it. role_name is None for now — the importer leaves role - membership to the so_pillar.minion trigger and the salt/auth reactor.""" - rel_str = str(path) - if path.name in EXCLUDE_BASENAMES: - return None - for frag in EXCLUDE_PATH_FRAGMENTS: - if frag in rel_str: - return None - - # /local/pillar/minions/.sls or adv_.sls - if path.parent.name == "minions": - stem = path.stem # filename without .sls - if stem.startswith("adv_"): - mid = stem[4:] - return ("minion", None, mid, f"minions.adv_{mid}") - return ("minion", None, stem, f"minions.{stem}") - - # /local/pillar/
/.sls - if path.parent.parent == PILLAR_LOCAL_ROOT or path.parent.parent == PILLAR_DEFAULT_ROOT: - section = path.parent.name - stem = path.stem - # Only soc_
.sls and adv_
.sls are SOC-managed pillar - # surfaces. Other files (e.g. nodes.sls, auth.sls, *.token) are - # either covered by EXCLUDE_PATH_FRAGMENTS or are bootstrap surfaces - # we leave alone for now. - if stem.startswith("soc_") or stem.startswith("adv_"): - return ("global", None, None, f"{section}.{stem}") - return None - - return None - - -def parse_yaml_file(path): - with open(path, "rb") as f: - content = f.read() - if not content.strip(): - return {} - if is_jinja_templated(content): - return None - data = yaml.safe_load(content) - if data is None: - return {} - if not isinstance(data, dict): - return {"_raw": data} - return data - - -def derive_node_type(minion_id): - """Conventional Security Onion minion ids are _. Take the - last underscore-delimited token as the canonical role suffix.""" - parts = minion_id.rsplit("_", 1) - if len(parts) == 2: - return parts[1] - return None - - -def docker_psql(sql, *, db=PG_DATABASE, user=PG_SUPERUSER, on_error_stop=True, capture=True): - """Run sql via docker exec ... psql. Returns stdout as str.""" - args = [ - "docker", "exec", "-i", DOCKER_CONTAINER, - "psql", "-U", user, "-d", db, "-tA", "-q", - ] - if on_error_stop: - args += ["-v", "ON_ERROR_STOP=1"] - proc = subprocess.run( - args, input=sql.encode(), - capture_output=capture, check=False, - ) - if proc.returncode != 0: - sys.stderr.write(proc.stderr.decode(errors="replace")) - raise RuntimeError(f"docker exec psql failed (rc={proc.returncode})") - return proc.stdout.decode(errors="replace") - - -def upsert_minion(minion_id, node_type): - sql = ( - "INSERT INTO so_pillar.minion (minion_id, node_type) " - f"VALUES ({pg_str(minion_id)}, {pg_str(node_type) if node_type else 'NULL'}) " - "ON CONFLICT (minion_id) DO UPDATE SET node_type = EXCLUDED.node_type;" - ) - docker_psql(sql) - - -def delete_minion(minion_id): - """CASCADE removes pillar_entry + role_member rows.""" - sql = f"DELETE FROM so_pillar.minion WHERE minion_id = {pg_str(minion_id)};" - docker_psql(sql) - - -def upsert_pillar_entry(scope, role_name, minion_id, pillar_path, data, reason): - """Insert or update the row keyed by the partial unique index that - matches scope. Audit trigger handles history; versioning trigger bumps - version only when data changes.""" - data_json = json.dumps(data) - role_sql = pg_str(role_name) if role_name else "NULL" - minion_sql = pg_str(minion_id) if minion_id else "NULL" - reason_sql = pg_str(reason) - - if scope == "global": - conflict = "(pillar_path) WHERE scope='global'" - elif scope == "role": - conflict = "(role_name, pillar_path) WHERE scope='role'" - elif scope == "minion": - conflict = "(minion_id, pillar_path) WHERE scope='minion'" - else: - raise ValueError(f"unknown scope {scope!r}") - - sql = ( - "BEGIN;\n" - f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);\n" - f"INSERT INTO so_pillar.pillar_entry " - f"(scope, role_name, minion_id, pillar_path, data, change_reason) " - f"VALUES ({pg_str(scope)}, {role_sql}, {minion_sql}, {pg_str(pillar_path)}, {pg_jsonb(data_json)}, {reason_sql}) " - f"ON CONFLICT {conflict} DO UPDATE " - f"SET data = EXCLUDED.data, change_reason = EXCLUDED.change_reason;\n" - "COMMIT;\n" - ) - docker_psql(sql) - - -def pg_str(s): - """Escape a Python str for inclusion in literal SQL. Pillar content has - already been validated as YAML; we just need standard SQL escaping.""" - if s is None: - return "NULL" - return "'" + str(s).replace("'", "''") + "'" - - -def pg_jsonb(json_str): - return pg_str(json_str) + "::jsonb" - - -def walk_pillar_root(root, paths): - if not root.is_dir(): - return - for path in root.rglob("*.sls"): - if path.is_file(): - paths.append(path) - - -def import_minion(minion_id, node_type, dry_run, reason): - """Re-import every pillar file for a single minion.""" - if not minion_id: - raise ValueError("minion_id required for --scope minion") - - upsert_minion(minion_id, node_type) - log("INFO", f"Upserted minion row {minion_id} (node_type={node_type})") - - targets = [ - PILLAR_LOCAL_ROOT / "minions" / f"{minion_id}.sls", - PILLAR_LOCAL_ROOT / "minions" / f"adv_{minion_id}.sls", - ] - for path in targets: - if not path.exists(): - log("INFO", f" (no file at {path})") - continue - klass = classify(path) - if not klass: - log("INFO", f" skip {path} (excluded)") - continue - scope, role, mid, pillar_path = klass - data = parse_yaml_file(path) - if data is None: - log("WARN", f" skip {path} (Jinja-templated; stays disk-only)") - continue - if dry_run: - log("DRY", f" would upsert {scope}/{pillar_path} = {len(json.dumps(data))} bytes") - continue - upsert_pillar_entry(scope, role, mid, pillar_path, data, reason) - log("INFO", f" imported {scope}/{pillar_path}") - - -def import_all(dry_run, reason): - """Walk the entire local pillar tree and import every eligible file.""" - paths = [] - walk_pillar_root(PILLAR_LOCAL_ROOT, paths) - - imported = 0 - skipped = 0 - minions_seen = set() - - for path in sorted(paths): - klass = classify(path) - if not klass: - skipped += 1 - continue - scope, role, minion_id, pillar_path = klass - data = parse_yaml_file(path) - if data is None: - log("WARN", f"skip {path} (Jinja-templated; stays disk-only)") - skipped += 1 - continue - - if scope == "minion" and minion_id not in minions_seen: - node_type = derive_node_type(minion_id) - if not dry_run: - upsert_minion(minion_id, node_type) - minions_seen.add(minion_id) - - if dry_run: - log("DRY", f"would upsert {scope}/{pillar_path} ({len(json.dumps(data))} bytes)") - else: - upsert_pillar_entry(scope, role, minion_id, pillar_path, data, reason) - log("INFO", f"imported {scope}/{pillar_path}") - imported += 1 - - log("INFO", f"done: {imported} imported, {skipped} skipped") - - -def main(): - ap = argparse.ArgumentParser(description=__doc__) - ap.add_argument("--scope", choices=("global", "role", "minion", "all"), default="all") - ap.add_argument("--minion-id") - ap.add_argument("--node-type", help="override node_type for --scope minion (default: derived from minion_id)") - ap.add_argument("--delete", action="store_true", - help="With --scope minion, remove the minion row (and its pillar rows via CASCADE)") - ap.add_argument("--dry-run", action="store_true") - ap.add_argument("--diff", action="store_true", - help="(reserved) print structural diffs vs current DB content") - ap.add_argument("--yes", action="store_true", - help="Skip confirmation prompts (currently unused; reserved)") - ap.add_argument("--reason", default="so-pillar-import", - help="change_reason recorded in pillar_entry_history") - args = ap.parse_args() - - try: - if args.scope == "minion": - if not args.minion_id: - ap.error("--minion-id required when --scope minion") - if args.delete: - if args.dry_run: - log("DRY", f"would delete {args.minion_id}") - else: - delete_minion(args.minion_id) - log("INFO", f"deleted {args.minion_id}") - else: - node_type = args.node_type or derive_node_type(args.minion_id) - import_minion(args.minion_id, node_type, args.dry_run, args.reason) - elif args.scope == "all": - import_all(args.dry_run, args.reason) - else: - log("ERROR", f"--scope {args.scope} not yet implemented; use --scope all or --scope minion") - return 2 - except Exception as e: - log("ERROR", str(e)) - return 1 - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/salt/manager/tools/sbin/so-yaml.py b/salt/manager/tools/sbin/so-yaml.py index ea24cef95..51b5a5492 100755 --- a/salt/manager/tools/sbin/so-yaml.py +++ b/salt/manager/tools/sbin/so-yaml.py @@ -13,64 +13,6 @@ import json lockFile = "/tmp/so-yaml.lock" -# postsalt: so-yaml supports three backend modes for PG-managed pillar paths: -# -# dual — write disk + mirror to so_pillar.*. Reads from disk. -# Used during the migration transition when disk is still -# canonical and PG runs as a shadow. -# postgres — write to so_pillar.* only. Reads from so_pillar.*. No disk -# file is touched. The end state once cutover is complete. -# disk — disk only, no PG. Emergency rollback escape hatch. -# -# Bootstrap and mine-driven files (secrets.sls, ca/init.sls, */nodes.sls, -# top.sls, etc.) are always handled on disk regardless of mode — those paths -# are explicitly excluded by so_yaml_postgres.locate() raising SkipPath. -# -# Mode resolution: SO_YAML_BACKEND env var, then /opt/so/conf/so-yaml/mode, -# then default 'dual' (safe upgrade behavior — flipping to 'postgres' is -# done by schema_pillar.sls after the schema is in place and the importer -# has run at least once). - -MODE_FILE = "/opt/so/conf/so-yaml/mode" -VALID_MODES = ("dual", "postgres", "disk") -DEFAULT_MODE = "dual" - -try: - sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - import so_yaml_postgres - _SO_YAML_PG_AVAILABLE = True -except Exception as _exc: - _SO_YAML_PG_AVAILABLE = False - - -def _resolveBackendMode(): - env = os.environ.get("SO_YAML_BACKEND") - if env and env in VALID_MODES: - return env - try: - with open(MODE_FILE, "r") as fh: - value = fh.read().strip() - if value in VALID_MODES: - return value - except (IOError, OSError): - pass - return DEFAULT_MODE - - -_BACKEND_MODE = _resolveBackendMode() - - -def _isPgManaged(filename): - """True when so-yaml should route this file's reads/writes through - so_pillar.*. False for bootstrap/mine-driven files that always live on - disk, and for arbitrary YAML paths outside the pillar tree.""" - if not _SO_YAML_PG_AVAILABLE: - return False - try: - return so_yaml_postgres.is_pg_managed(filename) - except Exception: - return False - def showUsage(args): print('Usage: {} [ARGS...]'.format(sys.argv[0]), file=sys.stderr) @@ -83,14 +25,9 @@ def showUsage(args): print(' get [-r] - Displays (to stdout) the value stored in the given key. Requires KEY arg. Use -r for raw output without YAML formatting.', file=sys.stderr) print(' remove - Removes a yaml key, if it exists. Requires KEY arg.', file=sys.stderr) print(' replace - Replaces (or adds) a new key and set its value. Requires KEY and VALUE args.', file=sys.stderr) - print(' purge - Delete the YAML file from disk and remove its rows from so_pillar.* (no KEY arg).', file=sys.stderr) + print(' purge - Delete the YAML file from disk (no KEY arg).', file=sys.stderr) print(' help - Prints this usage information.', file=sys.stderr) print('', file=sys.stderr) - print(' Backend mode:', file=sys.stderr) - print(' Resolved from $SO_YAML_BACKEND, then /opt/so/conf/so-yaml/mode, default "dual".', file=sys.stderr) - print(' Valid values: dual | postgres | disk. Bootstrap pillar files (secrets, ca, *.nodes.sls)', file=sys.stderr) - print(' are always handled on disk regardless of mode.', file=sys.stderr) - print('', file=sys.stderr) print(' Where:', file=sys.stderr) print(' YAML_FILE - Path to the file that will be modified. Ex: /opt/so/conf/service/conf.yaml', file=sys.stderr) print(' KEY - YAML key, does not support \' or " characters at this time. Ex: level1.level2', file=sys.stderr) @@ -103,24 +40,6 @@ def showUsage(args): def loadYaml(filename): - """Load a YAML file's content as a dict. - - PG-canonical mode (`postgres`): for PG-managed paths, read from - so_pillar.pillar_entry. A missing row is treated as an empty dict so - that `replace`/`add` on a fresh path can populate it from scratch. - - Other modes / non-PG-managed paths: read from disk as today. - """ - if _BACKEND_MODE == "postgres" and _isPgManaged(filename): - try: - data = so_yaml_postgres.read_yaml(filename) - except so_yaml_postgres.SkipPath: - data = None - except Exception as e: - print(f"so-yaml: pg read failed for {filename}: {e}", file=sys.stderr) - sys.exit(1) - return data if data is not None else {} - try: with open(filename, "r") as file: content = file.read() @@ -134,96 +53,20 @@ def loadYaml(filename): def writeYaml(filename, content): - """Persist `content` for `filename`. - - PG-canonical mode + PG-managed path: write only to so_pillar.*. A PG - failure is fatal (no disk fallback) — caller must retry. - - Dual mode: write disk, then mirror to PG (failures are warnings). - - Disk mode or non-PG-managed path: write disk only. - """ - if _BACKEND_MODE == "postgres" and _isPgManaged(filename): - if not _SO_YAML_PG_AVAILABLE: - print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr) - sys.exit(1) - ok, msg = so_yaml_postgres.write_yaml( - filename, content, - reason="so-yaml " + " ".join(sys.argv[1:2])) - if not ok: - print(f"so-yaml: pg write failed for {filename}: {msg}", file=sys.stderr) - sys.exit(1) - return None - file = open(filename, "w") result = yaml.safe_dump(content, file) file.close() - - if _BACKEND_MODE == "dual": - _mirrorToPostgres(filename, content) return result -def _mirrorToPostgres(filename, content): - """Best-effort dual-write of a YAML mutation into so_pillar.*. Skips - files outside the PG-managed pillar surface (secrets.sls, - elasticsearch/nodes.sls, etc.) and silently degrades when so-postgres - is unreachable. Disk write is canonical in dual mode; this never - raises. - - Only real PG failures (`pg write failed: ...`) are logged so the - common cases (skipped path, postgres not running) don't pollute - stderr.""" - if not _SO_YAML_PG_AVAILABLE: - return - try: - ok, msg = so_yaml_postgres.write_yaml(filename, content, - reason="so-yaml " + " ".join(sys.argv[1:2])) - if not ok and msg.startswith("pg write failed"): - print(f"so-yaml: {msg}", file=sys.stderr) - except Exception as e: # pragma: no cover — defensive: never break disk write - print(f"so-yaml: pg mirror exception: {e}", file=sys.stderr) - - def purgeFile(filename): - """Delete a YAML file from disk and remove the matching rows from - so_pillar.*. Idempotent — missing file/row counts as success. - - PG-canonical mode + PG-managed path: PG delete is canonical. If a stale - disk file from the dual-write era happens to still exist, it's removed - too as a cleanup courtesy. PG failure is fatal in this mode. - - Dual / disk modes: remove disk first; PG cleanup is best-effort.""" - if _BACKEND_MODE == "postgres" and _isPgManaged(filename): - if not _SO_YAML_PG_AVAILABLE: - print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr) - return 1 - ok, msg = so_yaml_postgres.purge_yaml(filename, reason="so-yaml purge") - if not ok: - print(f"so-yaml: pg purge failed for {filename}: {msg}", file=sys.stderr) - return 1 - if os.path.exists(filename): - try: - os.remove(filename) - except Exception as e: - print(f"so-yaml: warn — could not remove stale disk file {filename}: {e}", file=sys.stderr) - return 0 - + """Delete a YAML file from disk. Idempotent; missing files are success.""" if os.path.exists(filename): try: os.remove(filename) except Exception as e: print(f"Failed to remove {filename}: {e}", file=sys.stderr) return 1 - - if _BACKEND_MODE == "dual" and _SO_YAML_PG_AVAILABLE: - try: - ok, msg = so_yaml_postgres.purge_yaml(filename, - reason="so-yaml purge") - if not ok and msg.startswith("pg purge failed"): - print(f"so-yaml: {msg}", file=sys.stderr) - except Exception as e: - print(f"so-yaml: pg purge exception: {e}", file=sys.stderr) return 0 @@ -543,10 +386,7 @@ def get(args): def purge(args): - """purge YAML_FILE — delete the file from disk and remove the matching - rows from so_pillar.* in so-postgres. Used by so-minion's delete path - (in place of `rm -f`) so the audit log captures the deletion and - role_member rows get cleaned up via FK CASCADE on so_pillar.minion.""" + """purge YAML_FILE - delete the file from disk.""" if len(args) != 1: print('Missing filename arg', file=sys.stderr) showUsage(None) diff --git a/salt/manager/tools/sbin/so-yaml_test.py b/salt/manager/tools/sbin/so-yaml_test.py index 4eb017445..431372ba1 100644 --- a/salt/manager/tools/sbin/so-yaml_test.py +++ b/salt/manager/tools/sbin/so-yaml_test.py @@ -1007,9 +1007,7 @@ class TestPurge(unittest.TestCase): filename = "/tmp/so-yaml_test_purge.yaml" with open(filename, "w") as f: f.write("key: value\n") - # Disable PG mirror so the test doesn't shell out to docker. - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False): - rc = soyaml.purge([filename]) + rc = soyaml.purge([filename]) self.assertEqual(rc, 0) import os as _os self.assertFalse(_os.path.exists(filename)) @@ -1019,301 +1017,5 @@ class TestPurge(unittest.TestCase): import os as _os if _os.path.exists(filename): _os.remove(filename) - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False): - rc = soyaml.purge([filename]) + rc = soyaml.purge([filename]) self.assertEqual(rc, 0) - - -class TestSoYamlPostgres(unittest.TestCase): - """Tests the path-locator and write/purge contract of the dual-write - backend module without actually contacting Postgres.""" - - def setUp(self): - import importlib - self.mod = importlib.import_module("so_yaml_postgres") - - def test_locate_global_soc(self): - scope, role, mid, path = self.mod.locate( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") - self.assertEqual(scope, "global") - self.assertIsNone(role) - self.assertIsNone(mid) - self.assertEqual(path, "soc.soc_soc") - - def test_locate_global_advanced(self): - scope, role, mid, path = self.mod.locate( - "/opt/so/saltstack/local/pillar/soc/adv_soc.sls") - self.assertEqual(scope, "global") - self.assertEqual(path, "soc.adv_soc") - - def test_locate_minion(self): - scope, role, mid, path = self.mod.locate( - "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls") - self.assertEqual(scope, "minion") - self.assertEqual(mid, "h1_sensor") - self.assertEqual(path, "minions.h1_sensor") - - def test_locate_minion_advanced(self): - scope, role, mid, path = self.mod.locate( - "/opt/so/saltstack/local/pillar/minions/adv_h1_sensor.sls") - self.assertEqual(scope, "minion") - self.assertEqual(mid, "h1_sensor") - self.assertEqual(path, "minions.adv_h1_sensor") - - def test_locate_skip_secrets(self): - with self.assertRaises(self.mod.SkipPath): - self.mod.locate("/opt/so/saltstack/local/pillar/secrets.sls") - - def test_locate_skip_postgres_auth(self): - with self.assertRaises(self.mod.SkipPath): - self.mod.locate("/opt/so/saltstack/local/pillar/postgres/auth.sls") - - def test_locate_skip_mine_driven(self): - with self.assertRaises(self.mod.SkipPath): - self.mod.locate("/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls") - - def test_locate_skip_top(self): - with self.assertRaises(self.mod.SkipPath): - self.mod.locate("/opt/so/saltstack/local/pillar/top.sls") - - def test_locate_skip_unrelated(self): - with self.assertRaises(self.mod.SkipPath): - self.mod.locate("/etc/hostname") - - def test_pg_str_escapes(self): - self.assertEqual(self.mod._pg_str("a'b"), "'a''b'") - self.assertEqual(self.mod._pg_str(None), "NULL") - - def test_conflict_target(self): - self.assertIn("scope='global'", self.mod._conflict_target("global")) - self.assertIn("scope='role'", self.mod._conflict_target("role")) - self.assertIn("scope='minion'", self.mod._conflict_target("minion")) - with self.assertRaises(ValueError): - self.mod._conflict_target("bogus") - - def test_write_yaml_skips_disk_only_path(self): - with patch.object(self.mod, '_is_enabled', return_value=True): - ok, msg = self.mod.write_yaml( - "/opt/so/saltstack/local/pillar/secrets.sls", - {"secrets": {"foo": "bar"}}) - self.assertFalse(ok) - self.assertIn("disk-only", msg) - - def test_write_yaml_unreachable(self): - with patch.object(self.mod, '_is_enabled', return_value=False): - ok, msg = self.mod.write_yaml( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls", - {"soc": {"foo": "bar"}}) - self.assertFalse(ok) - self.assertEqual(msg, "postgres unreachable") - - def test_is_pg_managed_true(self): - self.assertTrue(self.mod.is_pg_managed( - "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")) - self.assertTrue(self.mod.is_pg_managed( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls")) - - def test_is_pg_managed_false_for_bootstrap(self): - self.assertFalse(self.mod.is_pg_managed( - "/opt/so/saltstack/local/pillar/secrets.sls")) - self.assertFalse(self.mod.is_pg_managed( - "/opt/so/saltstack/local/pillar/postgres/auth.sls")) - self.assertFalse(self.mod.is_pg_managed( - "/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls")) - - def test_read_yaml_unreachable(self): - with patch.object(self.mod, '_is_enabled', return_value=False): - self.assertIsNone(self.mod.read_yaml( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls")) - - def test_read_yaml_skips_disk_only(self): - with patch.object(self.mod, '_is_enabled', return_value=True): - with self.assertRaises(self.mod.SkipPath): - self.mod.read_yaml( - "/opt/so/saltstack/local/pillar/secrets.sls") - - def test_read_yaml_returns_data(self): - with patch.object(self.mod, '_is_enabled', return_value=True): - with patch.object(self.mod, '_docker_psql', - return_value='{"soc": {"foo": "bar"}}\n'): - data = self.mod.read_yaml( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") - self.assertEqual(data, {"soc": {"foo": "bar"}}) - - def test_read_yaml_returns_none_when_no_row(self): - with patch.object(self.mod, '_is_enabled', return_value=True): - with patch.object(self.mod, '_docker_psql', return_value=''): - data = self.mod.read_yaml( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") - self.assertIsNone(data) - - def test_read_yaml_minion_query_shape(self): - captured = {} - - def fake_psql(sql): - captured['sql'] = sql - return '{"host": {"mainip": "10.0.0.1"}}' - - with patch.object(self.mod, '_is_enabled', return_value=True): - with patch.object(self.mod, '_docker_psql', side_effect=fake_psql): - data = self.mod.read_yaml( - "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls") - self.assertEqual(data, {"host": {"mainip": "10.0.0.1"}}) - self.assertIn("scope='minion'", captured['sql']) - self.assertIn("'h1_sensor'", captured['sql']) - self.assertIn("'minions.h1_sensor'", captured['sql']) - - def test_is_enabled_public_alias(self): - with patch.object(self.mod, '_is_enabled', return_value=True): - self.assertTrue(self.mod.is_enabled()) - with patch.object(self.mod, '_is_enabled', return_value=False): - self.assertFalse(self.mod.is_enabled()) - - -class TestSoYamlBackendMode(unittest.TestCase): - """Tests so-yaml's backend-mode resolution and PG-canonical routing - for read/write/purge. The PG calls themselves are stubbed; what we're - asserting is that the right backend is chosen for each (mode, path) - combination.""" - - def test_resolve_mode_env_overrides_file(self): - with patch.dict('os.environ', {'SO_YAML_BACKEND': 'postgres'}): - self.assertEqual(soyaml._resolveBackendMode(), 'postgres') - with patch.dict('os.environ', {'SO_YAML_BACKEND': 'disk'}): - self.assertEqual(soyaml._resolveBackendMode(), 'disk') - - def test_resolve_mode_invalid_env_falls_back(self): - with patch.dict('os.environ', {'SO_YAML_BACKEND': 'garbage'}, clear=False): - with patch('builtins.open', side_effect=IOError): - self.assertEqual(soyaml._resolveBackendMode(), 'dual') - - def test_resolve_mode_default_dual(self): - env = {k: v for k, v in __import__('os').environ.items() - if k != 'SO_YAML_BACKEND'} - with patch.dict('os.environ', env, clear=True): - with patch('builtins.open', side_effect=IOError): - self.assertEqual(soyaml._resolveBackendMode(), 'dual') - - def test_is_pg_managed_proxies(self): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - self.assertTrue(soyaml._isPgManaged( - "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")) - self.assertFalse(soyaml._isPgManaged( - "/opt/so/saltstack/local/pillar/secrets.sls")) - - def test_is_pg_managed_false_when_module_unavailable(self): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False): - self.assertFalse(soyaml._isPgManaged( - "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")) - - def test_load_yaml_postgres_mode_reads_pg(self): - with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', - return_value=True): - with patch.object(soyaml.so_yaml_postgres, 'read_yaml', - return_value={"a": 1}): - result = soyaml.loadYaml( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") - self.assertEqual(result, {"a": 1}) - - def test_load_yaml_postgres_mode_returns_empty_when_no_row(self): - with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', - return_value=True): - with patch.object(soyaml.so_yaml_postgres, 'read_yaml', - return_value=None): - result = soyaml.loadYaml( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") - self.assertEqual(result, {}) - - def test_load_yaml_postgres_mode_reads_disk_for_bootstrap(self): - import tempfile, os as _os - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: - f.write("foo: bar\n") - tmp = f.name - try: - with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, - 'is_pg_managed', return_value=False): - result = soyaml.loadYaml(tmp) - self.assertEqual(result, {"foo": "bar"}) - finally: - _os.unlink(tmp) - - def test_write_yaml_postgres_mode_skips_disk(self): - import tempfile, os as _os - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: - tmp = f.name - _os.unlink(tmp) - try: - with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', - return_value=True): - with patch.object(soyaml.so_yaml_postgres, 'write_yaml', - return_value=(True, 'ok')) as mock_w: - soyaml.writeYaml(tmp, {"x": 1}) - self.assertFalse(_os.path.exists(tmp)) - mock_w.assert_called_once() - finally: - if _os.path.exists(tmp): - _os.unlink(tmp) - - def test_write_yaml_postgres_mode_failure_is_fatal(self): - with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', - return_value=True): - with patch.object(soyaml.so_yaml_postgres, 'write_yaml', - return_value=(False, 'pg write failed: connection refused')): - with patch('sys.exit', new=MagicMock()) as sysmock: - with patch('sys.stderr', new=StringIO()) as mock_err: - soyaml.writeYaml( - "/opt/so/saltstack/local/pillar/soc/soc_soc.sls", - {"x": 1}) - sysmock.assert_called_with(1) - - def test_write_yaml_disk_mode_skips_pg(self): - import tempfile, os as _os - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: - tmp = f.name - try: - with patch.object(soyaml, '_BACKEND_MODE', 'disk'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, 'write_yaml') as mock_w: - soyaml.writeYaml(tmp, {"x": 1}) - mock_w.assert_not_called() - with open(tmp) as f: - self.assertIn('x: 1', f.read()) - finally: - _os.unlink(tmp) - - def test_purge_postgres_mode_calls_pg_only(self): - import tempfile, os as _os - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: - tmp = f.name - _os.unlink(tmp) - with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', - return_value=True): - with patch.object(soyaml.so_yaml_postgres, 'purge_yaml', - return_value=(True, 'ok')) as mock_p: - rc = soyaml.purgeFile(tmp) - self.assertEqual(rc, 0) - mock_p.assert_called_once() - - def test_purge_postgres_mode_failure_returns_nonzero(self): - with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): - with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): - with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', - return_value=True): - with patch.object(soyaml.so_yaml_postgres, 'purge_yaml', - return_value=(False, 'pg purge failed: x')): - with patch('sys.stderr', new=StringIO()): - rc = soyaml.purgeFile( - "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls") - self.assertEqual(rc, 1) diff --git a/salt/manager/tools/sbin/so_yaml_postgres.py b/salt/manager/tools/sbin/so_yaml_postgres.py deleted file mode 100644 index b23254a1a..000000000 --- a/salt/manager/tools/sbin/so_yaml_postgres.py +++ /dev/null @@ -1,320 +0,0 @@ -# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one -# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at -# https://securityonion.net/license; you may not use this file except in compliance with the -# Elastic License 2.0. - -""" -so_yaml_postgres — Postgres-backed dual-write helpers for so-yaml.py. - -so-yaml.py writes YAML pillar files on disk; this module mirrors those -writes into so_pillar.* in so-postgres so ext_pillar and the SOC -PostgresConfigstore see the same data. During the postsalt transition -disk is canonical; PG writes are best-effort and never fail the disk -operation. - -Connection: shells out to `docker exec so-postgres psql -U postgres -d -securityonion`. Same pattern so-pillar-import uses; avoids needing a -separate DSN config at install time. Performance is fine because so-yaml -is invoked from infrequent code paths (setup scripts, so-minion, -so-firewall); SOC's hot path uses the in-process pgxpool in -PostgresConfigstore, not so-yaml. - -Path-to-row mapping mirrors PostgresConfigstore.locateSetting in -securityonion-soc: - - /opt/so/saltstack/local/pillar/
/soc_
.sls - -> scope=global, pillar_path=
.soc_
- /opt/so/saltstack/local/pillar/
/adv_
.sls - -> scope=global, pillar_path=
.adv_
- /opt/so/saltstack/local/pillar/minions/.sls - -> scope=minion, minion_id=, pillar_path=minions. - /opt/so/saltstack/local/pillar/minions/adv_.sls - -> scope=minion, minion_id=, pillar_path=minions.adv_ - -Files outside that mapping (notably secrets.sls, postgres/auth.sls, -elasticsearch/nodes.sls, etc.) are skipped — they stay disk-only forever -or render dynamically and don't belong in PG. -""" - -import json -import os -import shlex -import subprocess -import sys - -DOCKER_CONTAINER = os.environ.get("SO_PILLAR_PG_CONTAINER", "so-postgres") -PG_DATABASE = os.environ.get("SO_PILLAR_PG_DATABASE", "securityonion") -PG_USER = os.environ.get("SO_PILLAR_PG_USER", "postgres") - -# File paths whose mutations stay disk-only forever. Mirrors EXCLUDE_* -# in so-pillar-import. -DISK_ONLY_PATHS = ( - "/opt/so/saltstack/local/pillar/secrets.sls", - "/opt/so/saltstack/local/pillar/postgres/auth.sls", - "/opt/so/saltstack/local/pillar/elasticsearch/auth.sls", - "/opt/so/saltstack/local/pillar/kibana/secrets.sls", -) -DISK_ONLY_FRAGMENTS = ( - "/elasticsearch/nodes.sls", - "/redis/nodes.sls", - "/kafka/nodes.sls", - "/hypervisor/nodes.sls", - "/logstash/nodes.sls", - "/node_data/ips.sls", - "/top.sls", -) - - -class SkipPath(Exception): - """Raised when a file path is intentionally not mirrored to PG.""" - - -def is_enabled(): - """Public alias for callers that want to probe PG reachability without - relying on a leading-underscore private name.""" - return _is_enabled() - - -def _is_enabled(): - """PG dual-write only fires if so-postgres is reachable. Cheap probe. - Returns True when docker exec succeeds, False otherwise. We never - want a PG hiccup to fail a disk write on a manager whose Postgres is - momentarily unreachable.""" - try: - proc = subprocess.run( - ["docker", "exec", DOCKER_CONTAINER, - "pg_isready", "-h", "127.0.0.1", "-U", PG_USER, "-q"], - capture_output=True, timeout=5, check=False, - ) - return proc.returncode == 0 - except (FileNotFoundError, subprocess.TimeoutExpired, OSError): - return False - - -def locate(path): - """Translate a so-yaml file path to (scope, role_name, minion_id, pillar_path). - Raises SkipPath when the file is not part of the PG-managed surface.""" - norm = os.path.normpath(path) - - if norm in DISK_ONLY_PATHS: - raise SkipPath(f"{path}: explicit disk-only allowlist") - for frag in DISK_ONLY_FRAGMENTS: - if frag in norm: - raise SkipPath(f"{path}: matches disk-only fragment {frag}") - - parent = os.path.basename(os.path.dirname(norm)) - grandparent = os.path.basename(os.path.dirname(os.path.dirname(norm))) - name = os.path.basename(norm) - if not name.endswith(".sls"): - raise SkipPath(f"{path}: not a .sls file") - stem = name[:-4] - - if parent == "minions": - if stem.startswith("adv_"): - mid = stem[4:] - return ("minion", None, mid, f"minions.adv_{mid}") - return ("minion", None, stem, f"minions.{stem}") - - # /local/pillar/
/.sls - if grandparent == "pillar" and parent and parent != "": - if stem.startswith("soc_") or stem.startswith("adv_"): - return ("global", None, None, f"{parent}.{stem}") - raise SkipPath(f"{path}:
/{stem}.sls is not a soc_/adv_ file") - - raise SkipPath(f"{path}: unrecognised pillar layout") - - -def _pg_str(s): - if s is None: - return "NULL" - return "'" + str(s).replace("'", "''") + "'" - - -def _docker_psql(sql): - """Run sql via docker exec ... psql. Returns stdout. Caller catches - exceptions and downgrades to a warning.""" - proc = subprocess.run( - ["docker", "exec", "-i", DOCKER_CONTAINER, - "psql", "-U", PG_USER, "-d", PG_DATABASE, - "-tA", "-q", "-v", "ON_ERROR_STOP=1"], - input=sql.encode(), capture_output=True, check=False, timeout=30, - ) - if proc.returncode != 0: - raise RuntimeError(proc.stderr.decode(errors="replace") or - f"docker exec psql exit {proc.returncode}") - return proc.stdout.decode(errors="replace") - - -def _conflict_target(scope): - if scope == "global": - return "(pillar_path) WHERE scope='global'" - if scope == "role": - return "(role_name, pillar_path) WHERE scope='role'" - if scope == "minion": - return "(minion_id, pillar_path) WHERE scope='minion'" - raise ValueError(f"unknown scope {scope!r}") - - -def is_pg_managed(path): - """True if this path maps to a so_pillar.* row (locate() succeeds). - Bootstrap and mine-driven files return False — they always live on - disk regardless of so-yaml's backend mode.""" - try: - locate(path) - return True - except SkipPath: - return False - - -def read_yaml(path): - """Return the content dict stored in so_pillar.pillar_entry for `path`, - or None when no row exists. Raises SkipPath when `path` is not part of - the PG-managed surface (caller should read disk in that case). - - Used by so-yaml.py PG-canonical mode so `replace`, `get`, etc. resolve - against the database rather than a stale (or absent) disk file.""" - if not _is_enabled(): - return None - scope, role, minion_id, pillar_path = locate(path) - - if scope == "minion": - sql = ("SELECT data FROM so_pillar.pillar_entry " - "WHERE scope='minion' " - f"AND minion_id={_pg_str(minion_id)} " - f"AND pillar_path={_pg_str(pillar_path)}") - elif scope == "role": - sql = ("SELECT data FROM so_pillar.pillar_entry " - "WHERE scope='role' " - f"AND role_name={_pg_str(role)} " - f"AND pillar_path={_pg_str(pillar_path)}") - else: - sql = ("SELECT data FROM so_pillar.pillar_entry " - "WHERE scope='global' " - f"AND pillar_path={_pg_str(pillar_path)}") - - try: - out = _docker_psql(sql).strip() - except Exception: - return None - if not out: - return None - try: - return json.loads(out) - except (ValueError, TypeError): - return None - - -def write_yaml(path, content_dict, *, reason="so-yaml dual-write"): - """Mirror the disk write at `path` (whose content was just rendered as - `content_dict`) into so_pillar.pillar_entry. Best-effort: any failure - is swallowed so the caller (so-yaml.py) does not see it as a fatal.""" - if not _is_enabled(): - return False, "postgres unreachable" - try: - scope, role, minion_id, pillar_path = locate(path) - except SkipPath as e: - return False, str(e) - - data_json = json.dumps(content_dict if content_dict is not None else {}) - role_sql = _pg_str(role) - minion_sql = _pg_str(minion_id) - reason_sql = _pg_str(reason) - conflict = _conflict_target(scope) - - sql_parts = [] - if scope == "minion": - # FK requires the minion row before pillar_entry can reference it. - sql_parts.append( - f"INSERT INTO so_pillar.minion (minion_id) VALUES ({minion_sql}) " - "ON CONFLICT (minion_id) DO NOTHING;" - ) - sql_parts.append( - "BEGIN;\n" - f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);\n" - "INSERT INTO so_pillar.pillar_entry " - "(scope, role_name, minion_id, pillar_path, data, change_reason) " - f"VALUES ({_pg_str(scope)}, {role_sql}, {minion_sql}, " - f"{_pg_str(pillar_path)}, {_pg_str(data_json)}::jsonb, {reason_sql}) " - f"ON CONFLICT {conflict} DO UPDATE " - "SET data = EXCLUDED.data, change_reason = EXCLUDED.change_reason;\n" - "COMMIT;\n" - ) - - try: - _docker_psql("\n".join(sql_parts)) - except Exception as e: - return False, f"pg write failed: {e}" - return True, "ok" - - -def purge_yaml(path, *, reason="so-yaml purge"): - """Mirror the disk file deletion at `path` by deleting the matching - pillar_entry rows. For minion files also deletes the so_pillar.minion - row (CASCADE removes pillar_entry + role_member rows).""" - if not _is_enabled(): - return False, "postgres unreachable" - try: - scope, role, minion_id, pillar_path = locate(path) - except SkipPath as e: - return False, str(e) - - reason_sql = _pg_str(reason) - parts = ["BEGIN;", - f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);"] - - if scope == "minion": - # If both .sls and adv_.sls are gone the trigger / CASCADE - # cleans up role_member; otherwise we just remove this one row. - parts.append( - f"DELETE FROM so_pillar.pillar_entry " - f"WHERE scope='minion' AND minion_id={_pg_str(minion_id)} " - f"AND pillar_path={_pg_str(pillar_path)};" - ) - parts.append( - f"DELETE FROM so_pillar.minion WHERE minion_id={_pg_str(minion_id)} " - "AND NOT EXISTS (SELECT 1 FROM so_pillar.pillar_entry " - f"WHERE minion_id={_pg_str(minion_id)});" - ) - else: - parts.append( - f"DELETE FROM so_pillar.pillar_entry " - f"WHERE scope={_pg_str(scope)} AND pillar_path={_pg_str(pillar_path)};" - ) - - parts.append("COMMIT;") - - try: - _docker_psql("\n".join(parts)) - except Exception as e: - return False, f"pg purge failed: {e}" - return True, "ok" - - -# CLI for diagnostics. Not exercised by so-yaml.py itself. -def _main(argv): - import argparse - ap = argparse.ArgumentParser() - ap.add_argument("op", choices=("locate", "ping")) - ap.add_argument("path", nargs="?") - args = ap.parse_args(argv) - - if args.op == "ping": - ok = _is_enabled() - print("ok" if ok else "unreachable") - return 0 if ok else 1 - if args.op == "locate": - if not args.path: - ap.error("locate requires PATH") - try: - scope, role, minion_id, pillar_path = locate(args.path) - print(f"scope={scope} role={role} minion_id={minion_id} pillar_path={pillar_path}") - return 0 - except SkipPath as e: - print(f"SKIP: {e}", file=sys.stderr) - return 2 - - return 1 - - -if __name__ == "__main__": - sys.exit(_main(sys.argv[1:])) diff --git a/salt/manager/tools/sbin_jinja/so-elastic-fleet-reset b/salt/manager/tools/sbin_jinja/so-elastic-fleet-reset index 1e32268da..c52d1cdcb 100644 --- a/salt/manager/tools/sbin_jinja/so-elastic-fleet-reset +++ b/salt/manager/tools/sbin_jinja/so-elastic-fleet-reset @@ -33,8 +33,11 @@ so-elastic-fleet-stop --force status "Deleting Fleet Data from Pillars..." so-yaml.py remove /opt/so/saltstack/local/pillar/minions/{{ GLOBALS.minion_id }}.sls elasticfleet +/usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/minions/{{ GLOBALS.minion_id }}.sls remove elasticfleet --note "so-elastic-fleet-reset" so-yaml.py remove /opt/so/saltstack/local/pillar/global/soc_global.sls global.fleet_grid_enrollment_token_general +/usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/global/soc_global.sls remove global.fleet_grid_enrollment_token_general --note "so-elastic-fleet-reset" so-yaml.py remove /opt/so/saltstack/local/pillar/global/soc_global.sls global.fleet_grid_enrollment_token_heavy +/usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/global/soc_global.sls remove global.fleet_grid_enrollment_token_heavy --note "so-elastic-fleet-reset" status "Restarting Kibana..." so-kibana-restart --force diff --git a/salt/orch/so_pillar_reload.sls b/salt/orch/so_pillar_reload.sls deleted file mode 100644 index 90e7646b1..000000000 --- a/salt/orch/so_pillar_reload.sls +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one -# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at -# https://securityonion.net/license; you may not use this file except in compliance with the -# Elastic License 2.0. - -# Driven by the so_pillar_changed reactor. Translates a so_pillar.pillar_entry -# change into (cache.clear_pillar -> saltutil.refresh_pillar -> state.apply) -# on the appropriate target. -# -# Routing rules live in the DISPATCH map below — one entry per -# (pillar_path prefix) -> (state sls, role grain). Add new services here -# rather than wiring more reactors. -# -# Idempotent: state.apply is idempotent; if the pillar value didn't actually -# change anything observable, the affected state runs a no-op. Bulk imports -# and replays are safe. - -{% set change = salt['pillar.get']('so_pillar_change', {}) %} -{% set scope = change.get('scope') %} -{% set role = change.get('role_name') %} -{% set minion = change.get('minion_id') %} -{% set changes = change.get('changes', []) %} - -{# (pillar_path prefix) -> {sls: , role: } - role is a grain value (e.g. 'so-sensor'), used to compute compound targets - when the change is global or role-scoped. #} -{% set DISPATCH = { - 'suricata.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, - 'sensor.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, - 'zeek.': {'sls': 'zeek.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, - 'stenographer.': {'sls': 'stenographer.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, - 'pcap.': {'sls': 'pcap.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, - 'logstash.': {'sls': 'logstash.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver']}, - 'redis.': {'sls': 'redis.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']}, - 'kafka.': {'sls': 'kafka.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver', 'so-searchnode']}, - 'elasticsearch.': {'sls': 'elasticsearch.config','roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-searchnode', 'so-heavynode', 'so-standalone']}, - 'kibana.': {'sls': 'kibana.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']}, - 'soc.': {'sls': 'soc.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']}, - 'telegraf.': {'sls': 'telegraf.config', 'roles': ['*']}, - 'fleet.': {'sls': 'fleet.config', 'roles': ['so-fleet']}, - 'strelka.': {'sls': 'strelka.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, -} %} - -{# Collect a deduplicated set of (sls, target_kind) actions. target_kind is - either 'minion:' (scope=minion) or 'roles:so-x,so-y' (scope=role/global). #} -{% set actions = {} %} - -{% for c in changes %} -{% set path = c.get('pillar_path', '') %} -{% for prefix, action in DISPATCH.items() %} -{% if path.startswith(prefix) %} -{% set sls = action['sls'] %} -{% if scope == 'minion' and minion %} -{% set key = sls ~ '|minion|' ~ minion %} -{% set _ = actions.update({key: {'sls': sls, 'tgt': minion, 'tgt_type': 'glob'}}) %} -{% else %} -{% set role_targets = action['roles'] %} -{% if '*' in role_targets %} -{% set tgt = '*' %} -{% set tgt_type = 'glob' %} -{% else %} -{% set tgt = ('I@role:' ~ role_targets|join(' or I@role:')) %} -{% set tgt_type = 'compound' %} -{% endif %} -{% set key = sls ~ '|' ~ tgt %} -{% set _ = actions.update({key: {'sls': sls, 'tgt': tgt, 'tgt_type': tgt_type}}) %} -{% endif %} -{% endif %} -{% endfor %} -{% endfor %} - -{% if actions %} - -{% for key, action in actions.items() %} -{% set safe_id = loop.index0 | string %} - -so_pillar_reload_clear_cache_{{ safe_id }}: - salt.runner: - - name: cache.clear_pillar - - tgt: '{{ action.tgt }}' - - tgt_type: '{{ action.tgt_type }}' - -so_pillar_reload_refresh_pillar_{{ safe_id }}: - salt.function: - - name: saltutil.refresh_pillar - - tgt: '{{ action.tgt }}' - - tgt_type: '{{ action.tgt_type }}' - - kwarg: - wait: True - - require: - - salt: so_pillar_reload_clear_cache_{{ safe_id }} - -so_pillar_reload_apply_state_{{ safe_id }}: - salt.state: - - tgt: '{{ action.tgt }}' - - tgt_type: '{{ action.tgt_type }}' - - sls: - - {{ action.sls }} - - queue: True - - require: - - salt: so_pillar_reload_refresh_pillar_{{ safe_id }} -{% endfor %} - -{% else %} - -{# No DISPATCH entry matched. Pillar still gets refreshed so any other states - read fresh values, but no service-specific reload is invoked. #} -so_pillar_reload_unmapped_path_noop: - test.nop - {% do salt.log.info('orch.so_pillar_reload: no dispatch match for %s' % changes) %} - -{% endif %} diff --git a/salt/postgres/files/schema/pillar/001_schema.sql b/salt/postgres/files/schema/pillar/001_schema.sql deleted file mode 100644 index 28aa545e5..000000000 --- a/salt/postgres/files/schema/pillar/001_schema.sql +++ /dev/null @@ -1,124 +0,0 @@ --- so_pillar schema: queryable, versioned, audited pillar config store. --- Replaces flat-file Salt pillar consumed via salt.pillar.postgres ext_pillar. --- Idempotent. Run via salt/postgres/schema_pillar.sls inside the so-postgres container. - -CREATE SCHEMA IF NOT EXISTS so_pillar; - -CREATE TABLE IF NOT EXISTS so_pillar.scope ( - scope_kind text PRIMARY KEY, - precedence int NOT NULL, - description text -); - -INSERT INTO so_pillar.scope(scope_kind, precedence, description) VALUES - ('global', 100, 'Applies to every minion'), - ('role', 200, 'Applies to minions whose minion_id matches a top.sls compound role match'), - ('minion', 300, 'Applies only to a single minion (per-minion overlay)') -ON CONFLICT (scope_kind) DO NOTHING; - -CREATE TABLE IF NOT EXISTS so_pillar.role ( - role_name text PRIMARY KEY, - match_kind text NOT NULL CHECK (match_kind IN ('compound','grain','glob','list')), - match_expr text NOT NULL, - description text -); - -CREATE TABLE IF NOT EXISTS so_pillar.minion ( - minion_id text PRIMARY KEY, - node_type text, - hostname text, - extra_roles text[] NOT NULL DEFAULT '{}', - created_at timestamptz NOT NULL DEFAULT now(), - updated_at timestamptz NOT NULL DEFAULT now() -); - -CREATE TABLE IF NOT EXISTS so_pillar.role_member ( - role_name text NOT NULL REFERENCES so_pillar.role(role_name) ON DELETE CASCADE, - minion_id text NOT NULL REFERENCES so_pillar.minion(minion_id) ON DELETE CASCADE, - source text NOT NULL DEFAULT 'computed' CHECK (source IN ('computed','manual','imported')), - PRIMARY KEY (role_name, minion_id) -); - -CREATE INDEX IF NOT EXISTS ix_role_member_minion ON so_pillar.role_member(minion_id); - --- pillar_entry holds the actual data. as_json=True ext_pillar reads `data` directly. -CREATE TABLE IF NOT EXISTS so_pillar.pillar_entry ( - id bigserial PRIMARY KEY, - scope text NOT NULL REFERENCES so_pillar.scope(scope_kind), - role_name text REFERENCES so_pillar.role(role_name) ON DELETE CASCADE, - minion_id text REFERENCES so_pillar.minion(minion_id) ON DELETE CASCADE, - pillar_path text NOT NULL, - data jsonb NOT NULL, - is_secret boolean NOT NULL DEFAULT false, - sort_key int NOT NULL DEFAULT 0, - version int NOT NULL DEFAULT 1, - updated_at timestamptz NOT NULL DEFAULT now(), - updated_by text NOT NULL DEFAULT current_user, - change_reason text, - CONSTRAINT pillar_entry_scope_target CHECK ( - (scope='global' AND role_name IS NULL AND minion_id IS NULL) - OR (scope='role' AND role_name IS NOT NULL AND minion_id IS NULL) - OR (scope='minion' AND role_name IS NULL AND minion_id IS NOT NULL) - ), - -- Reserved namespaces that MUST stay rendered from SLS (mine-driven). Nothing - -- under these prefixes is allowed in the database; the merge logic relies on - -- ext_pillar leaving these subtrees alone. - CONSTRAINT pillar_entry_reserved_paths CHECK ( - pillar_path NOT LIKE 'elasticsearch.nodes%' - AND pillar_path NOT LIKE 'redis.nodes%' - AND pillar_path NOT LIKE 'kafka.nodes%' - AND pillar_path NOT LIKE 'hypervisor.nodes%' - AND pillar_path NOT LIKE 'logstash.nodes%' - AND pillar_path NOT LIKE 'node_data.ips%' - ) -); - -CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_global ON so_pillar.pillar_entry(pillar_path) - WHERE scope = 'global'; -CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_role ON so_pillar.pillar_entry(role_name, pillar_path) - WHERE scope = 'role'; -CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_minion ON so_pillar.pillar_entry(minion_id, pillar_path) - WHERE scope = 'minion'; - -CREATE INDEX IF NOT EXISTS ix_pillar_entry_minion_hot ON so_pillar.pillar_entry(minion_id) - WHERE scope = 'minion'; -CREATE INDEX IF NOT EXISTS ix_pillar_entry_role_hot ON so_pillar.pillar_entry(role_name) - WHERE scope = 'role'; - --- Append-only audit log for every change to pillar_entry. No FK to entry so DELETE --- history survives the row removal. -CREATE TABLE IF NOT EXISTS so_pillar.pillar_entry_history ( - history_id bigserial PRIMARY KEY, - entry_id bigint, - op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')), - scope text NOT NULL, - role_name text, - minion_id text, - pillar_path text NOT NULL, - old_data jsonb, - new_data jsonb, - is_secret boolean, - version int, - changed_at timestamptz NOT NULL DEFAULT now(), - changed_by text NOT NULL DEFAULT current_user, - change_reason text -); - -CREATE INDEX IF NOT EXISTS ix_pillar_history_entry ON so_pillar.pillar_entry_history(entry_id, changed_at DESC); -CREATE INDEX IF NOT EXISTS ix_pillar_history_minion ON so_pillar.pillar_entry_history(minion_id, changed_at DESC); -CREATE INDEX IF NOT EXISTS ix_pillar_history_role ON so_pillar.pillar_entry_history(role_name, changed_at DESC); - --- Drift watch — populated by a pg_cron job that re-renders the on-disk SLS files --- and compares them to pillar_entry. Cleared once cutover completes. -CREATE TABLE IF NOT EXISTS so_pillar.drift_log ( - id bigserial PRIMARY KEY, - scope text NOT NULL, - role_name text, - minion_id text, - pillar_path text NOT NULL, - disk_data jsonb, - db_data jsonb, - detected_at timestamptz NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS ix_drift_log_detected ON so_pillar.drift_log(detected_at DESC); diff --git a/salt/postgres/files/schema/pillar/002_views.sql b/salt/postgres/files/schema/pillar/002_views.sql deleted file mode 100644 index b5cffdaaf..000000000 --- a/salt/postgres/files/schema/pillar/002_views.sql +++ /dev/null @@ -1,49 +0,0 @@ --- Views consumed by the Salt master's salt.pillar.postgres ext_pillar with --- as_json=True. Each view exposes data ordered by (sort_key, pillar_path) so --- the deep-merge in ext_pillar resolves precedence deterministically. --- --- ext_pillar always binds exactly one parameter to the query: (minion_id,). --- Master-config queries reference these views and add WHERE clauses, e.g.: --- SELECT data FROM so_pillar.v_pillar_role WHERE minion_id = %s --- SELECT data FROM so_pillar.v_pillar_minion WHERE minion_id = %s --- For v_pillar_global the binding is satisfied with `WHERE %s IS NOT NULL`. - -CREATE OR REPLACE VIEW so_pillar.v_pillar_global AS - SELECT pillar_path, sort_key, data - FROM so_pillar.pillar_entry - WHERE scope = 'global' - AND is_secret = false - ORDER BY sort_key, pillar_path; - --- Role view exposes minion_id so the master-config WHERE clause can filter to --- the rows that apply to the requesting minion. JOIN to role_member fans out --- one row per (role assignment, pillar entry) tuple. -CREATE OR REPLACE VIEW so_pillar.v_pillar_role AS - SELECT rm.minion_id, - pe.role_name, - pe.pillar_path, - pe.sort_key, - pe.data - FROM so_pillar.pillar_entry pe - JOIN so_pillar.role_member rm ON rm.role_name = pe.role_name - WHERE pe.scope = 'role' - AND pe.is_secret = false; - -CREATE OR REPLACE VIEW so_pillar.v_pillar_minion AS - SELECT minion_id, - pillar_path, - sort_key, - data - FROM so_pillar.pillar_entry - WHERE scope = 'minion' - AND is_secret = false; - --- v_pillar_secrets is filled in by 004_secrets.sql once pgcrypto is available; --- placeholder here returns no rows so initial schema deploy succeeds even on a --- container that has not yet loaded pgcrypto. -CREATE OR REPLACE VIEW so_pillar.v_pillar_secrets AS - SELECT NULL::text AS minion_id, - NULL::text AS pillar_path, - NULL::int AS sort_key, - '{}'::jsonb AS data - WHERE false; diff --git a/salt/postgres/files/schema/pillar/003_history_trigger.sql b/salt/postgres/files/schema/pillar/003_history_trigger.sql deleted file mode 100644 index 941c78d13..000000000 --- a/salt/postgres/files/schema/pillar/003_history_trigger.sql +++ /dev/null @@ -1,120 +0,0 @@ --- Audit trigger: every INSERT/UPDATE/DELETE on so_pillar.pillar_entry writes a --- row to pillar_entry_history. Captures the actor (current_user), reason --- (passed via SET LOCAL so_pillar.change_reason), and full before/after data. - -CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_audit() RETURNS trigger -LANGUAGE plpgsql AS $fn$ -DECLARE - v_reason text := current_setting('so_pillar.change_reason', true); -BEGIN - IF (TG_OP = 'INSERT') THEN - INSERT INTO so_pillar.pillar_entry_history( - entry_id, op, scope, role_name, minion_id, pillar_path, - old_data, new_data, is_secret, version, changed_by, change_reason) - VALUES (NEW.id, 'INSERT', NEW.scope, NEW.role_name, NEW.minion_id, NEW.pillar_path, - NULL, NEW.data, NEW.is_secret, NEW.version, NEW.updated_by, v_reason); - RETURN NEW; - ELSIF (TG_OP = 'UPDATE') THEN - IF OLD.data IS DISTINCT FROM NEW.data - OR OLD.is_secret IS DISTINCT FROM NEW.is_secret THEN - INSERT INTO so_pillar.pillar_entry_history( - entry_id, op, scope, role_name, minion_id, pillar_path, - old_data, new_data, is_secret, version, changed_by, change_reason) - VALUES (NEW.id, 'UPDATE', NEW.scope, NEW.role_name, NEW.minion_id, NEW.pillar_path, - OLD.data, NEW.data, NEW.is_secret, NEW.version, NEW.updated_by, v_reason); - END IF; - RETURN NEW; - ELSIF (TG_OP = 'DELETE') THEN - INSERT INTO so_pillar.pillar_entry_history( - entry_id, op, scope, role_name, minion_id, pillar_path, - old_data, new_data, is_secret, version, changed_by, change_reason) - VALUES (OLD.id, 'DELETE', OLD.scope, OLD.role_name, OLD.minion_id, OLD.pillar_path, - OLD.data, NULL, OLD.is_secret, OLD.version, current_user, v_reason); - RETURN OLD; - END IF; - RETURN NULL; -END -$fn$; - -DROP TRIGGER IF EXISTS pillar_entry_audit ON so_pillar.pillar_entry; -CREATE TRIGGER pillar_entry_audit - AFTER INSERT OR UPDATE OR DELETE ON so_pillar.pillar_entry - FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_pillar_entry_audit(); - --- updated_at + version maintenance: bump version on every UPDATE that changes data. -CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_versioning() RETURNS trigger -LANGUAGE plpgsql AS $fn$ -BEGIN - IF (TG_OP = 'UPDATE') THEN - IF OLD.data IS DISTINCT FROM NEW.data - OR OLD.is_secret IS DISTINCT FROM NEW.is_secret THEN - NEW.version := OLD.version + 1; - NEW.updated_at := now(); - ELSE - NEW.version := OLD.version; - NEW.updated_at := OLD.updated_at; - END IF; - END IF; - RETURN NEW; -END -$fn$; - -DROP TRIGGER IF EXISTS pillar_entry_versioning ON so_pillar.pillar_entry; -CREATE TRIGGER pillar_entry_versioning - BEFORE UPDATE ON so_pillar.pillar_entry - FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_pillar_entry_versioning(); - --- Recompute role_member rows for a minion based on node_type. --- Compound matchers in pillar/top.sls are pure suffix patterns of the form --- '*_' plus the special multi-role 'manager/managersearch/managerhype' --- bucket. node_type is split on common dashes/underscores; any token that --- matches a known role_name produces a role_member row. -CREATE OR REPLACE FUNCTION so_pillar.fn_recompute_role_members(p_minion_id text) -RETURNS void LANGUAGE plpgsql AS $fn$ -DECLARE - v_node_type text; - v_extra text[]; - v_role text; -BEGIN - SELECT node_type, extra_roles INTO v_node_type, v_extra - FROM so_pillar.minion WHERE minion_id = p_minion_id; - - IF v_node_type IS NULL THEN - RETURN; - END IF; - - DELETE FROM so_pillar.role_member - WHERE minion_id = p_minion_id AND source = 'computed'; - - -- Main role from node_type. - IF EXISTS (SELECT 1 FROM so_pillar.role WHERE role_name = lower(v_node_type)) THEN - INSERT INTO so_pillar.role_member(role_name, minion_id, source) - VALUES (lower(v_node_type), p_minion_id, 'computed') - ON CONFLICT DO NOTHING; - END IF; - - -- Extra roles supplied by the importer / reactor for compound matchers - -- that need to apply multiple buckets (e.g. managersearch also gets the - -- 'manager' bucket per top.sls line 36 grouping). - FOREACH v_role IN ARRAY COALESCE(v_extra, '{}'::text[]) LOOP - IF EXISTS (SELECT 1 FROM so_pillar.role WHERE role_name = v_role) THEN - INSERT INTO so_pillar.role_member(role_name, minion_id, source) - VALUES (v_role, p_minion_id, 'computed') - ON CONFLICT DO NOTHING; - END IF; - END LOOP; -END -$fn$; - -CREATE OR REPLACE FUNCTION so_pillar.fn_minion_after_change() RETURNS trigger -LANGUAGE plpgsql AS $fn$ -BEGIN - PERFORM so_pillar.fn_recompute_role_members(COALESCE(NEW.minion_id, OLD.minion_id)); - RETURN COALESCE(NEW, OLD); -END -$fn$; - -DROP TRIGGER IF EXISTS minion_role_sync ON so_pillar.minion; -CREATE TRIGGER minion_role_sync - AFTER INSERT OR UPDATE OF node_type, extra_roles ON so_pillar.minion - FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_minion_after_change(); diff --git a/salt/postgres/files/schema/pillar/004_secrets.sql b/salt/postgres/files/schema/pillar/004_secrets.sql deleted file mode 100644 index 299556d55..000000000 --- a/salt/postgres/files/schema/pillar/004_secrets.sql +++ /dev/null @@ -1,130 +0,0 @@ --- pgcrypto-backed secret storage for pillar_entry rows where is_secret = true. --- The plaintext value is encrypted with a symmetric key held in a server-side --- GUC (so_pillar.master_key) which is set per-role via ALTER ROLE so the key --- never touches a flat file readable by Salt itself. - -CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public; - --- Encrypt a JSONB value using the configured master key. Stored as a JSONB --- envelope {"_enc": ""} so the same column type is reused. -CREATE OR REPLACE FUNCTION so_pillar.fn_encrypt_jsonb(p_value jsonb) -RETURNS jsonb LANGUAGE plpgsql AS $fn$ -DECLARE - v_key text := current_setting('so_pillar.master_key', true); -BEGIN - IF v_key IS NULL OR v_key = '' THEN - RAISE EXCEPTION 'so_pillar.master_key GUC not configured'; - END IF; - RETURN jsonb_build_object( - '_enc', - encode(pgp_sym_encrypt(p_value::text, v_key), 'base64') - ); -END -$fn$; - --- Decrypt the envelope produced by fn_encrypt_jsonb. SECURITY DEFINER so callers --- with no direct access to pgcrypto/master_key can still pull plaintext via the --- v_pillar_secrets view. -CREATE OR REPLACE FUNCTION so_pillar.fn_decrypt_jsonb(p_envelope jsonb) -RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER AS $fn$ -DECLARE - v_key text := current_setting('so_pillar.master_key', true); - v_ct text; -BEGIN - IF v_key IS NULL OR v_key = '' THEN - RAISE EXCEPTION 'so_pillar.master_key GUC not configured'; - END IF; - v_ct := p_envelope->>'_enc'; - IF v_ct IS NULL THEN - RETURN p_envelope; -- not encrypted; pass through - END IF; - RETURN pgp_sym_decrypt(decode(v_ct, 'base64'), v_key)::jsonb; -END -$fn$; - -REVOKE ALL ON FUNCTION so_pillar.fn_decrypt_jsonb(jsonb) FROM PUBLIC; - --- Secrets view consumed by ext_pillar. Decrypts at the boundary so Salt sees --- plaintext JSONB. Filters the rows to those that apply to the requesting --- minion via current_setting, since views can't take parameters and ext_pillar --- can only bind one parameter per query. --- --- Master-config query: SELECT data FROM so_pillar.v_pillar_secrets WHERE %s IS NOT NULL --- The %s satisfies the bound parameter; the view itself reads the minion_id --- from a session GUC set by a small wrapper function (see fn_pillar_secrets). -CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_secrets(p_minion_id text) -RETURNS TABLE(data jsonb) -LANGUAGE sql STABLE SECURITY DEFINER AS $fn$ - SELECT so_pillar.fn_decrypt_jsonb(pe.data) - FROM so_pillar.pillar_entry pe - WHERE pe.is_secret = true - AND ( pe.scope = 'global' - OR (pe.scope = 'role' - AND pe.role_name IN ( - SELECT role_name FROM so_pillar.role_member - WHERE minion_id = p_minion_id)) - OR (pe.scope = 'minion' AND pe.minion_id = p_minion_id)) - ORDER BY pe.sort_key, pe.pillar_path; -$fn$; - --- Replace the placeholder view from 002 with a parameterised version. Master --- config query becomes: --- SELECT data FROM so_pillar.fn_pillar_secrets(%s) AS s -DROP VIEW IF EXISTS so_pillar.v_pillar_secrets; -CREATE OR REPLACE VIEW so_pillar.v_pillar_secrets AS - SELECT NULL::text AS minion_id, - NULL::text AS pillar_path, - NULL::int AS sort_key, - '{}'::jsonb AS data - WHERE false; -COMMENT ON VIEW so_pillar.v_pillar_secrets IS - 'Deprecated placeholder; use SELECT data FROM so_pillar.fn_pillar_secrets(minion_id) instead'; - --- Convenience helper for so-yaml.py and the importer to set a secret without --- ever exposing the master_key to the caller. SECURITY DEFINER means the --- caller does not need read access to so_pillar.master_key. -CREATE OR REPLACE FUNCTION so_pillar.fn_set_secret( - p_scope text, - p_role_name text, - p_minion_id text, - p_pillar_path text, - p_value jsonb, - p_change_reason text DEFAULT NULL -) RETURNS bigint LANGUAGE plpgsql SECURITY DEFINER AS $fn$ -DECLARE - v_envelope jsonb := so_pillar.fn_encrypt_jsonb(p_value); - v_id bigint; -BEGIN - PERFORM set_config('so_pillar.change_reason', - COALESCE(p_change_reason, 'fn_set_secret'), - true); - - INSERT INTO so_pillar.pillar_entry( - scope, role_name, minion_id, pillar_path, data, is_secret, change_reason) - VALUES (p_scope, p_role_name, p_minion_id, p_pillar_path, v_envelope, true, p_change_reason) - ON CONFLICT (pillar_path) WHERE scope='global' DO UPDATE - SET data = EXCLUDED.data, is_secret = true, change_reason = EXCLUDED.change_reason - RETURNING id INTO v_id; - - IF v_id IS NULL THEN - UPDATE so_pillar.pillar_entry - SET data = v_envelope, is_secret = true, change_reason = p_change_reason - WHERE scope = p_scope - AND COALESCE(role_name,'') = COALESCE(p_role_name,'') - AND COALESCE(minion_id,'') = COALESCE(p_minion_id,'') - AND pillar_path = p_pillar_path - RETURNING id INTO v_id; - - IF v_id IS NULL THEN - INSERT INTO so_pillar.pillar_entry( - scope, role_name, minion_id, pillar_path, data, is_secret, change_reason) - VALUES (p_scope, p_role_name, p_minion_id, p_pillar_path, v_envelope, true, p_change_reason) - RETURNING id INTO v_id; - END IF; - END IF; - - RETURN v_id; -END -$fn$; - -REVOKE ALL ON FUNCTION so_pillar.fn_set_secret(text,text,text,text,jsonb,text) FROM PUBLIC; diff --git a/salt/postgres/files/schema/pillar/005_seed_roles.sql b/salt/postgres/files/schema/pillar/005_seed_roles.sql deleted file mode 100644 index f66c14e59..000000000 --- a/salt/postgres/files/schema/pillar/005_seed_roles.sql +++ /dev/null @@ -1,39 +0,0 @@ --- Seed the so_pillar.role table with the role buckets defined in pillar/top.sls. --- The match_expr column preserves the original Salt compound expression purely --- as documentation; PG-side membership is materialised in role_member. --- Idempotent: ON CONFLICT lets re-application leave existing rows untouched. - -INSERT INTO so_pillar.role(role_name, match_kind, match_expr, description) VALUES - ('manager', 'compound', '*_manager or *_managersearch or *_managerhype', - 'Manager-class node. Includes managersearch and managerhype subtypes.'), - ('managersearch', 'compound', '*_managersearch', - 'Combined manager + searchnode role.'), - ('managerhype', 'compound', '*_managerhype', - 'Combined manager + hypervisor role.'), - ('sensor', 'compound', '*_sensor', - 'Sensor node running zeek/suricata/strelka.'), - ('eval', 'compound', '*_eval', - 'Single-node evaluation install (manager + sensor + storage on one host).'), - ('standalone', 'compound', '*_standalone', - 'Single-node production install (no distributed cluster).'), - ('heavynode', 'compound', '*_heavynode', - 'Distributed manager node carrying logstash + ES.'), - ('idh', 'compound', '*_idh', - 'Intrusion-detection-honeypot node.'), - ('searchnode', 'compound', '*_searchnode', - 'Distributed Elasticsearch search node.'), - ('receiver', 'compound', '*_receiver', - 'Kafka receiver node.'), - ('import', 'compound', '*_import', - 'Single-node import-only install.'), - ('fleet', 'compound', '*_fleet', - 'Elastic Fleet server node.'), - ('hypervisor', 'compound', '*_hypervisor', - 'Hypervisor host (libvirt). Hosts VM minions.'), - ('desktop', 'compound', '*_desktop', - 'Desktop minion (no firewall/nginx pillars apply).'), - ('not_desktop', 'compound', '* and not *_desktop', - 'Pseudo-role; matches every minion that is not a desktop. Used for global firewall/nginx.'), - ('libvirt', 'grain', 'salt-cloud:driver:libvirt', - 'Pseudo-role; matches any minion with grain salt-cloud.driver = libvirt.') -ON CONFLICT (role_name) DO NOTHING; diff --git a/salt/postgres/files/schema/pillar/006_rls.sql b/salt/postgres/files/schema/pillar/006_rls.sql deleted file mode 100644 index 3b0da95ae..000000000 --- a/salt/postgres/files/schema/pillar/006_rls.sql +++ /dev/null @@ -1,106 +0,0 @@ --- Roles + Row-Level Security policies for the so_pillar schema. --- Three roles: --- so_pillar_master — connected by salt-master ext_pillar. Read-only. --- RLS forces it to skip is_secret rows; reads --- encrypted secrets only via fn_pillar_secrets(). --- so_pillar_writer — connected by so-yaml dual-write and the SOC --- PostgresConfigstore. Read+write on pillar_entry, --- minion, role_member. --- so_pillar_secret_owner — owns the master encryption key GUC; sole role --- allowed to call fn_set_secret directly. Other --- writers reach this function only via grants. --- --- The existing app role so_postgres_user (created by init-users.sh) is granted --- INTO so_pillar_writer so SOC keeps using its existing connection but inherits --- pillar-write capability. - -DO $$ -BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_master') THEN - CREATE ROLE so_pillar_master NOLOGIN; - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_writer') THEN - CREATE ROLE so_pillar_writer NOLOGIN; - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_secret_owner') THEN - CREATE ROLE so_pillar_secret_owner NOLOGIN; - END IF; -END -$$; - -GRANT USAGE ON SCHEMA so_pillar TO so_pillar_master, so_pillar_writer, so_pillar_secret_owner; - --- Read access for ext_pillar through the views only. -GRANT SELECT ON so_pillar.v_pillar_global, - so_pillar.v_pillar_role, - so_pillar.v_pillar_minion - TO so_pillar_master; -GRANT EXECUTE ON FUNCTION so_pillar.fn_pillar_secrets(text) TO so_pillar_master; - --- Engine reads + drains the change queue from the salt-master process. It --- needs SELECT to find unprocessed rows and UPDATE to mark them processed. --- The queue contains only locator metadata (no pillar data), so the master --- role's existing privilege footprint is unchanged in practice. -GRANT SELECT, UPDATE ON so_pillar.change_queue TO so_pillar_master; -GRANT USAGE ON SEQUENCE so_pillar.change_queue_id_seq TO so_pillar_master; --- Writer needs INSERT (the trigger runs as table owner, so this is just for --- direct testing / manual replays from psql). -GRANT INSERT ON so_pillar.change_queue TO so_pillar_writer; - --- Writer needs CRUD on pillar_entry/minion/role_member plus access to seed tables. -GRANT SELECT, INSERT, UPDATE, DELETE - ON so_pillar.pillar_entry, - so_pillar.minion, - so_pillar.role_member - TO so_pillar_writer; -GRANT SELECT ON so_pillar.role, so_pillar.scope TO so_pillar_writer; -GRANT SELECT, INSERT, UPDATE, DELETE ON so_pillar.drift_log TO so_pillar_writer; -GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA so_pillar TO so_pillar_writer; -GRANT SELECT ON so_pillar.pillar_entry_history TO so_pillar_writer; - --- Secret owner can call fn_set_secret directly; writer goes through it via the --- function's SECURITY DEFINER attribute, which executes as the function owner. -GRANT EXECUTE ON FUNCTION so_pillar.fn_set_secret(text,text,text,text,jsonb,text) - TO so_pillar_writer, so_pillar_secret_owner; - --- so_postgres_user (SOC's existing app user, created by init-users.sh) inherits --- writer privilege so the PostgresConfigstore in SOC can mutate pillars without --- a second connection pool. Inheritance is per-PG default (NOINHERIT must be --- explicit), so this just works. -DO $$ -BEGIN - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = current_setting('so_pillar.app_role', true)) - THEN - EXECUTE format('GRANT so_pillar_writer TO %I', - current_setting('so_pillar.app_role', true)); - ELSIF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_postgres_user') THEN - GRANT so_pillar_writer TO so_postgres_user; - END IF; -END -$$; - --- RLS on pillar_entry: master sees only non-secret rows. Writer sees all --- (it must, to UPDATE secret rows when so-yaml replaces them). Secret rows --- still require fn_decrypt_jsonb to read plaintext. -ALTER TABLE so_pillar.pillar_entry ENABLE ROW LEVEL SECURITY; -ALTER TABLE so_pillar.pillar_entry FORCE ROW LEVEL SECURITY; - -DROP POLICY IF EXISTS pillar_entry_master_read ON so_pillar.pillar_entry; -DROP POLICY IF EXISTS pillar_entry_writer_all ON so_pillar.pillar_entry; -DROP POLICY IF EXISTS pillar_entry_owner_all ON so_pillar.pillar_entry; - -CREATE POLICY pillar_entry_master_read ON so_pillar.pillar_entry - FOR SELECT TO so_pillar_master - USING (NOT is_secret); - -CREATE POLICY pillar_entry_writer_all ON so_pillar.pillar_entry - FOR ALL TO so_pillar_writer - USING (true) - WITH CHECK (true); - -CREATE POLICY pillar_entry_owner_all ON so_pillar.pillar_entry - FOR ALL TO so_pillar_secret_owner - USING (true) - WITH CHECK (true); - --- minion / role_member do not need RLS — they hold no secrets. diff --git a/salt/postgres/files/schema/pillar/007_drift_pgcron.sql b/salt/postgres/files/schema/pillar/007_drift_pgcron.sql deleted file mode 100644 index b19b65cf8..000000000 --- a/salt/postgres/files/schema/pillar/007_drift_pgcron.sql +++ /dev/null @@ -1,43 +0,0 @@ --- Drift detection + retention via pg_cron. Optional — the schema_pillar.sls --- state guards this file behind the postgres:so_pillar:drift_check_enabled --- pillar flag because pg_cron may not be loaded on every install. - -CREATE EXTENSION IF NOT EXISTS pg_cron; - --- Retention: trim pillar_entry_history older than a year. Adjustable via the --- so_pillar.history_retention_days GUC (default 365 if unset). -CREATE OR REPLACE FUNCTION so_pillar.fn_history_retain() -RETURNS void LANGUAGE plpgsql AS $fn$ -DECLARE - v_days int := COALESCE(current_setting('so_pillar.history_retention_days', true)::int, 365); -BEGIN - DELETE FROM so_pillar.pillar_entry_history - WHERE changed_at < (now() - (v_days::text || ' days')::interval); -END -$fn$; - --- Drift retention: keep two weeks of drift_log. -CREATE OR REPLACE FUNCTION so_pillar.fn_drift_retain() -RETURNS void LANGUAGE plpgsql AS $fn$ -BEGIN - DELETE FROM so_pillar.drift_log - WHERE detected_at < (now() - interval '14 days'); -END -$fn$; - --- pg_cron schedules (idempotent — unschedule any existing same-named job first). -DO $$ -DECLARE - v_jobid bigint; -BEGIN - SELECT jobid INTO v_jobid FROM cron.job WHERE jobname = 'so_pillar_history_retain'; - IF v_jobid IS NOT NULL THEN PERFORM cron.unschedule(v_jobid); END IF; - PERFORM cron.schedule('so_pillar_history_retain', '15 3 * * *', - 'SELECT so_pillar.fn_history_retain();'); - - SELECT jobid INTO v_jobid FROM cron.job WHERE jobname = 'so_pillar_drift_retain'; - IF v_jobid IS NOT NULL THEN PERFORM cron.unschedule(v_jobid); END IF; - PERFORM cron.schedule('so_pillar_drift_retain', '20 3 * * *', - 'SELECT so_pillar.fn_drift_retain();'); -END -$$; diff --git a/salt/postgres/files/schema/pillar/008_change_notify.sql b/salt/postgres/files/schema/pillar/008_change_notify.sql deleted file mode 100644 index 7fe22ad18..000000000 --- a/salt/postgres/files/schema/pillar/008_change_notify.sql +++ /dev/null @@ -1,77 +0,0 @@ --- pg_notify-driven change fan-out for so_pillar.pillar_entry. --- --- Two layers: --- 1. so_pillar.change_queue — durable, drained by the salt-master --- engine. Survives engine downtime, --- de-duplicated by id, processed once. --- 2. pg_notify('so_pillar_change') — wakeup signal. Payload is the --- change_queue row id and locator --- (no secret data — channels are --- snoopable by anyone with LISTEN). --- --- The salt-master engine LISTENs on the channel for low-latency wakeup, --- then SELECTs unprocessed change_queue rows so a missed notification --- (engine restart, network blip) self-heals on the next event. - -CREATE TABLE IF NOT EXISTS so_pillar.change_queue ( - id bigserial PRIMARY KEY, - scope text NOT NULL, - role_name text, - minion_id text, - pillar_path text NOT NULL, - op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')), - enqueued_at timestamptz NOT NULL DEFAULT now(), - processed_at timestamptz -); - --- Hot index for the engine's drain query. -CREATE INDEX IF NOT EXISTS ix_change_queue_unprocessed - ON so_pillar.change_queue (id) - WHERE processed_at IS NULL; - --- Retention index: pg_cron job in 007 sweeps processed rows older than 7d. -CREATE INDEX IF NOT EXISTS ix_change_queue_processed_at - ON so_pillar.change_queue (processed_at) - WHERE processed_at IS NOT NULL; - -CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_notify() - RETURNS trigger - LANGUAGE plpgsql -AS $$ -DECLARE - v_row record; - v_id bigint; -BEGIN - IF TG_OP = 'DELETE' THEN - v_row := OLD; - ELSE - v_row := NEW; - END IF; - - INSERT INTO so_pillar.change_queue - (scope, role_name, minion_id, pillar_path, op) - VALUES - (v_row.scope, v_row.role_name, v_row.minion_id, v_row.pillar_path, TG_OP) - RETURNING id INTO v_id; - - -- Payload is the queue id + locator only. Engine joins back to - -- pillar_entry if it needs the data — keeps secrets off the wire. - PERFORM pg_notify('so_pillar_change', json_build_object( - 'queue_id', v_id, - 'scope', v_row.scope, - 'role_name', v_row.role_name, - 'minion_id', v_row.minion_id, - 'pillar_path', v_row.pillar_path, - 'op', TG_OP - )::text); - - RETURN NULL; -END; -$$; - -DROP TRIGGER IF EXISTS tg_pillar_entry_notify ON so_pillar.pillar_entry; -CREATE TRIGGER tg_pillar_entry_notify - AFTER INSERT OR UPDATE OR DELETE - ON so_pillar.pillar_entry - FOR EACH ROW - EXECUTE FUNCTION so_pillar.fn_pillar_entry_notify(); diff --git a/salt/postgres/init.sls b/salt/postgres/init.sls index 93fa8b103..2e3c9ffb7 100644 --- a/salt/postgres/init.sls +++ b/salt/postgres/init.sls @@ -8,7 +8,6 @@ include: {% if PGMERGED.enabled %} - postgres.enabled - - postgres.schema_pillar {% else %} - postgres.disabled {% endif %} diff --git a/salt/postgres/schema_pillar.sls b/salt/postgres/schema_pillar.sls index 462855b82..74d07914b 100644 --- a/salt/postgres/schema_pillar.sls +++ b/salt/postgres/schema_pillar.sls @@ -5,132 +5,12 @@ {% from 'allowed_states.map.jinja' import allowed_states %} {% if sls.split('.')[0] in allowed_states %} -{% from 'vars/globals.map.jinja' import GLOBALS %} -# Deploys the so_pillar schema (tables, views, audit triggers, secrets, -# RLS, pg_cron retention) inside the so-postgres container. Idempotent — -# every CREATE / GRANT is wrapped in IF NOT EXISTS / ON CONFLICT or DO -# blocks so re-running the state is a no-op when the schema is current. -# -# Gated on the postgres:so_pillar:enabled feature flag (default false). -# Flip to true once the postsalt branch is ready to bring ext_pillar live. - -include: - - postgres.enabled - -{% set so_pillar_enabled = salt['pillar.get']('postgres:so_pillar:enabled', False) %} -{% if so_pillar_enabled %} - -{% set drift_enabled = salt['pillar.get']('postgres:so_pillar:drift_check_enabled', False) %} -{% set schema_dir = '/opt/so/saltstack/default/salt/postgres/files/schema/pillar' %} - -# Wait for postgres to actually accept TCP connections. Same idiom as -# telegraf_users.sls. The docker_container.running state returns earlier than -# the database is ready on first init. -so_pillar_postgres_wait_ready: - cmd.run: - - name: | - for i in $(seq 1 60); do - if docker exec so-postgres pg_isready -h 127.0.0.1 -U postgres -q 2>/dev/null; then - exit 0 - fi - sleep 2 - done - echo "so-postgres did not accept TCP connections within 120s" >&2 - exit 1 - - require: - - docker_container: so-postgres - -{% set sql_files = [ - '001_schema.sql', - '002_views.sql', - '003_history_trigger.sql', - '004_secrets.sql', - '005_seed_roles.sql', - '006_rls.sql', -] %} - -{% if drift_enabled %} -{% do sql_files.append('007_drift_pgcron.sql') %} -{% endif %} - -# 008 always applies — pg_notify-driven change fan-out is what the salt-master -# pg_notify_pillar engine consumes. Without it reactor wiring sees no events. -{% do sql_files.append('008_change_notify.sql') %} - -{% for sql_file in sql_files %} -so_pillar_apply_{{ sql_file | replace('.', '_') }}: - cmd.run: - - name: | - docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d securityonion \ - < {{ schema_dir }}/{{ sql_file }} - - require: - - cmd: so_pillar_postgres_wait_ready -{% if not loop.first %} - - cmd: so_pillar_apply_{{ sql_files[loop.index0 - 1] | replace('.', '_') }} -{% endif %} -{% endfor %} - -# Set the master encryption key GUC on the secret-owner role. The key itself -# is generated by setup/so-functions::secrets_pillar() (extended for postsalt) -# and lives in /opt/so/conf/postgres/so_pillar.key (mode 0400) — never read by -# Salt itself; the value flows into PG via ALTER ROLE so it sits only in the -# server's role catalog. -so_pillar_master_key_configure: - cmd.run: - - name: | - if [ -r /opt/so/conf/postgres/so_pillar.key ]; then - KEY="$(< /opt/so/conf/postgres/so_pillar.key)" - docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d securityonion <&2 - exit 1 - fi - - require: - - cmd: so_pillar_apply_{{ sql_files[-1] | replace('.', '_') }} - -# Run the importer once after the schema is in place. Idempotent — re-runs -# with no SLS edits produce zero row changes. -so_pillar_initial_import: - cmd.run: - - name: /usr/sbin/so-pillar-import --yes --reason 'schema_pillar.sls initial import' - - require: - - cmd: so_pillar_master_key_configure - -# Flip so-yaml from dual-write to PG-canonical for managed paths now that -# the schema and importer are both in place. Bootstrap files (secrets.sls, -# postgres/auth.sls, ca/init.sls, *.nodes.sls, top.sls, ...) remain on disk -# regardless because so_yaml_postgres.locate() raises SkipPath for them. -so_pillar_so_yaml_mode_dir: - file.directory: - - name: /opt/so/conf/so-yaml - - user: socore - - group: socore - - mode: '0755' - - makedirs: True - -so_pillar_so_yaml_mode_postgres: - file.managed: - - name: /opt/so/conf/so-yaml/mode - - contents: postgres - - user: socore - - group: socore - - mode: '0644' - - require: - - file: so_pillar_so_yaml_mode_dir - - cmd: so_pillar_initial_import - -{% else %} - -so_pillar_disabled_noop: +# Deprecated: the old so_pillar schema has been replaced by SOC-owned +# onionconfig tables. SOC creates its schema on first startup. +postgres_schema_pillar_deprecated: test.nop -{% endif %} - {% else %} {{sls}}_state_not_allowed: diff --git a/salt/reactor/so_pillar_changed.sls b/salt/reactor/so_pillar_changed.sls deleted file mode 100644 index 5d8995e21..000000000 --- a/salt/reactor/so_pillar_changed.sls +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one -# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at -# https://securityonion.net/license; you may not use this file except in compliance with the -# Elastic License 2.0. - -# Fires for every event tagged 'so/pillar/changed'. Source of those events -# is the pg_notify_pillar engine on the salt-master, which in turn drains -# so_pillar.change_queue (populated by the AFTER trigger on -# so_pillar.pillar_entry — see 008_change_notify.sql). -# -# All routing logic — which pillar paths reload which services on which -# targets — lives in orch.so_pillar_reload so it stays editable as one -# YAML table without touching reactor wiring. - -{% set payload = data.get('data', {}) %} -{% do salt.log.info('so_pillar_changed reactor: %s' % payload) %} - -so_pillar_dispatch_reload: - runner.state.orchestrate: - - args: - - mods: orch.so_pillar_reload - - pillar: - so_pillar_change: - scope: {{ payload.get('scope') | json }} - role_name: {{ payload.get('role_name') | json }} - minion_id: {{ payload.get('minion_id') | json }} - changes: {{ payload.get('changes', []) | json }} diff --git a/salt/salt/engines/master/pg_notify_pillar.py b/salt/salt/engines/master/pg_notify_pillar.py deleted file mode 100644 index 42fb9836e..000000000 --- a/salt/salt/engines/master/pg_notify_pillar.py +++ /dev/null @@ -1,200 +0,0 @@ -# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one -# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at -# https://securityonion.net/license; you may not use this file except in compliance with the -# Elastic License 2.0. - -# -*- coding: utf-8 -*- - -""" -pg_notify_pillar — Salt master engine that bridges so_pillar.change_queue -into the Salt event bus. - -Architecture (see 008_change_notify.sql): - pillar_entry -- AFTER trigger --> change_queue (durable) - + pg_notify('so_pillar_change') (wakeup) - | - LISTEN <-- this engine <-+ - SELECT/UPDATE change_queue - | - fire_event('so/pillar/changed', ...) - | - reactor matches tag --> orch - -Why a queue + notify rather than just notify: pg_notify is fire-and-forget -within a session. If the engine is down or the LISTEN connection is broken -when a write happens, the notification is lost forever. The change_queue -lets us recover — on (re)connect, we drain everything still flagged -processed_at IS NULL. - -Debounce: bulk operations (so-pillar-import, fresh installs) can fire -hundreds of notifications per second. The engine collects whatever lands in -a short window and emits one event per (scope, role, minion) tuple so the -reactor isn't stampeded. -""" - -import json -import logging -import os -import select -import time - -import salt.utils.event - -log = logging.getLogger(__name__) - -__virtualname__ = 'pg_notify_pillar' - -DEFAULT_CHANNEL = 'so_pillar_change' -DEFAULT_DEBOUNCE_MS = 500 -DEFAULT_RECONNECT_BACKOFF = 5 -DEFAULT_BACKLOG_INTERVAL = 30 -DEFAULT_BATCH_LIMIT = 500 - -EVENT_TAG = 'so/pillar/changed' - - -def __virtual__(): - try: - import psycopg2 # noqa: F401 - return __virtualname__ - except ImportError: - return False, 'pg_notify_pillar engine requires psycopg2' - - -def start(dsn=None, - host='127.0.0.1', - port=5432, - dbname='securityonion', - user='so_pillar_master', - password=None, - channel=DEFAULT_CHANNEL, - debounce_ms=DEFAULT_DEBOUNCE_MS, - reconnect_backoff=DEFAULT_RECONNECT_BACKOFF, - backlog_interval=DEFAULT_BACKLOG_INTERVAL, - batch_limit=DEFAULT_BATCH_LIMIT, - password_file=None): - """ - Run the change-queue bridge until the master shuts the engine down. - - Either pass a full ``dsn`` string, or supply discrete kwargs. The - password may also be read from ``password_file`` (mode 0400) so the - engine config in ``/etc/salt/master.d/`` doesn't have to embed it - inline — only the file path. - """ - import psycopg2 - import psycopg2.extensions - - if dsn is None: - if password is None and password_file: - try: - with open(password_file, 'r') as fh: - password = fh.read().strip() - except (IOError, OSError) as exc: - log.error('pg_notify_pillar: cannot read password_file %s: %s', - password_file, exc) - return - dsn = _build_dsn(host=host, port=port, dbname=dbname, - user=user, password=password) - - bus = salt.utils.event.get_master_event( - __opts__, __opts__['sock_dir'], listen=False) - - log.info('pg_notify_pillar: starting (channel=%s debounce=%dms)', - channel, debounce_ms) - - while True: - conn = None - try: - conn = psycopg2.connect(dsn) - conn.set_isolation_level( - psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = conn.cursor() - cur.execute('LISTEN {0}'.format(channel)) - log.info('pg_notify_pillar: connected; LISTEN %s', channel) - - _drain(cur, bus, batch_limit) - - while True: - ready, _, _ = select.select([conn], [], [], backlog_interval) - if not ready: - _drain(cur, bus, batch_limit) - continue - - conn.poll() - _consume_notifies(conn) - - if debounce_ms > 0: - time.sleep(debounce_ms / 1000.0) - conn.poll() - _consume_notifies(conn) - - _drain(cur, bus, batch_limit) - - except Exception as exc: # psycopg2.Error subclasses + OS errors - log.error('pg_notify_pillar: %s; reconnecting in %ds', - exc, reconnect_backoff) - finally: - if conn is not None: - try: - conn.close() - except Exception: - pass - time.sleep(reconnect_backoff) - - -def _build_dsn(host, port, dbname, user, password): - parts = ['host={0}'.format(host), - 'port={0}'.format(port), - 'dbname={0}'.format(dbname), - 'user={0}'.format(user)] - if password: - parts.append('password={0}'.format(password)) - return ' '.join(parts) - - -def _consume_notifies(conn): - # We don't use the payload directly — the queue table is the source of - # truth, and draining it covers any notifications we missed. So just - # discard them; their presence already proved there's something to drain. - while conn.notifies: - conn.notifies.pop(0) - - -def _drain(cur, bus, batch_limit): - """Mark unprocessed change_queue rows processed and emit one event per - (scope, role_name, minion_id) group. SKIP LOCKED so multiple masters - sharing a Postgres don't double-process.""" - cur.execute(""" - UPDATE so_pillar.change_queue - SET processed_at = now() - WHERE id IN ( - SELECT id FROM so_pillar.change_queue - WHERE processed_at IS NULL - ORDER BY id - FOR UPDATE SKIP LOCKED - LIMIT %s) - RETURNING id, scope, role_name, minion_id, pillar_path, op - """, (batch_limit,)) - rows = cur.fetchall() - if not rows: - return - - groups = {} - for row_id, scope, role_name, minion_id, pillar_path, op in rows: - key = (scope, role_name, minion_id) - groups.setdefault(key, []).append({ - 'queue_id': row_id, - 'pillar_path': pillar_path, - 'op': op, - }) - - for (scope, role_name, minion_id), changes in groups.items(): - payload = { - 'scope': scope, - 'role_name': role_name, - 'minion_id': minion_id, - 'changes': changes, - } - log.debug('pg_notify_pillar: firing %s for %s', - EVENT_TAG, payload) - bus.fire_event(payload, EVENT_TAG) diff --git a/salt/salt/files/engines.conf b/salt/salt/files/engines.conf index 8192ee201..e853597e3 100644 --- a/salt/salt/files/engines.conf +++ b/salt/salt/files/engines.conf @@ -17,7 +17,7 @@ engines: to: 'KAFKA': - cmd.run: - cmd: /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled True + cmd: /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled True && /usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls replace kafka.enabled True --note "pillarWatch global.pipeline" - cmd.run: cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver or G@role:so-searchnode' saltutil.kill_all_jobs - cmd.run: @@ -28,7 +28,7 @@ engines: to: 'REDIS': - cmd.run: - cmd: /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled False + cmd: /usr/sbin/so-yaml.py replace /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.enabled False && /usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls replace kafka.enabled False --note "pillarWatch global.pipeline" - cmd.run: cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver or G@role:so-searchnode' saltutil.kill_all_jobs - cmd.run: @@ -66,5 +66,5 @@ engines: - cmd.run: cmd: salt -C 'G@role:so-standalone or G@role:so-manager or G@role:so-managersearch or G@role:so-receiver' state.apply kafka.disabled,kafka.reset - cmd.run: - cmd: /usr/sbin/so-yaml.py remove /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.reset + cmd: /usr/sbin/so-yaml.py remove /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls kafka.reset && /usr/sbin/so-config.py sync-yaml-mutation /opt/so/saltstack/local/pillar/kafka/soc_kafka.sls remove kafka.reset --note "pillarWatch kafka.reset" interval: 10 diff --git a/salt/salt/master/ext_pillar_postgres.sls b/salt/salt/master/ext_pillar_postgres.sls index 9ed071f38..46e096d94 100644 --- a/salt/salt/master/ext_pillar_postgres.sls +++ b/salt/salt/master/ext_pillar_postgres.sls @@ -3,40 +3,18 @@ # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. -# Drops /etc/salt/master.d/ext_pillar_postgres.conf so the salt-master loads -# pillar overlays from the so_pillar.* schema in so-postgres alongside the -# on-disk SLS pillar tree. Gated on the postgres:so_pillar:enabled feature -# flag (default false) so the file only appears once the schema is deployed -# and the importer has run at least once. +# Deprecated. SOC/onionconfig owns the settings database now; this state only +# removes the old so_pillar ext_pillar config if it was previously deployed. {% from 'allowed_states.map.jinja' import allowed_states %} {% if sls.split('.')[0] in allowed_states %} -{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %} - -ext_pillar_postgres_config: - file.managed: - - name: /etc/salt/master.d/ext_pillar_postgres.conf - - source: salt://salt/master/files/ext_pillar_postgres.conf.jinja - - template: jinja - - mode: '0640' - - user: root - - group: salt - - watch_in: - - service: salt_master_service - -{% else %} - -# When the flag is off make sure any previously-deployed config is removed -# so a rollback flips behavior cleanly. ext_pillar_postgres_config_absent: file.absent: - name: /etc/salt/master.d/ext_pillar_postgres.conf - watch_in: - service: salt_master_service -{% endif %} - {% else %} {{sls}}_state_not_allowed: diff --git a/salt/salt/master/files/ext_pillar_postgres.conf.jinja b/salt/salt/master/files/ext_pillar_postgres.conf.jinja deleted file mode 100644 index 4e3f0f2d3..000000000 --- a/salt/salt/master/files/ext_pillar_postgres.conf.jinja +++ /dev/null @@ -1,38 +0,0 @@ -# /etc/salt/master.d/ext_pillar_postgres.conf -# Rendered by salt/salt/master/ext_pillar_postgres.sls. -# Reads the so_pillar.* schema in so-postgres and overlays it onto SLS pillar. -# SLS still renders first (ext_pillar_first: False) so bootstrap and mine-driven -# pillars work before Postgres is reachable; PG values overlay/override on top. - -postgres: - host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }} - port: {{ pillar.get('postgres', {}).get('port', 5432) }} - db: securityonion - user: so_pillar_master - pass: {{ pillar['secrets']['pillar_master_pass'] }} - -ext_pillar_first: False -pillar_source_merging_strategy: smart -pillar_merge_lists: False - -pillar_cache: True -pillar_cache_backend: disk -pillar_cache_ttl: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('pillar_cache_ttl', 60) }} - -# List form (not mapping form) so result rows merge into the pillar root rather -# than under a named subtree. Verified against salt/pillar/sql_base.py: list -# entries pass root=None to enter_root() which sets self.focus = self.result. -ext_pillar: - - postgres: - - query: "SELECT data FROM so_pillar.v_pillar_global WHERE %s IS NOT NULL ORDER BY sort_key, pillar_path" - as_json: True - ignore_null: True - - query: "SELECT data FROM so_pillar.v_pillar_role WHERE minion_id = %s ORDER BY sort_key, pillar_path" - as_json: True - ignore_null: True - - query: "SELECT data FROM so_pillar.v_pillar_minion WHERE minion_id = %s ORDER BY sort_key, pillar_path" - as_json: True - ignore_null: True - - query: "SELECT data FROM so_pillar.fn_pillar_secrets(%s)" - as_json: True - ignore_null: True diff --git a/salt/salt/master/files/pg_notify_pillar_engine.conf.jinja b/salt/salt/master/files/pg_notify_pillar_engine.conf.jinja deleted file mode 100644 index 44c8482f6..000000000 --- a/salt/salt/master/files/pg_notify_pillar_engine.conf.jinja +++ /dev/null @@ -1,20 +0,0 @@ -# /etc/salt/master.d/pg_notify_pillar_engine.conf -# Rendered by salt/salt/master/pg_notify_pillar_engine.sls. -# -# Subscribes the salt-master to so_pillar.change_queue via LISTEN -# so_pillar_change. The engine drains queued changes and re-publishes -# them on the event bus as 'so/pillar/changed'. Reactor wiring is in -# so_pillar_reactor.conf. - -engines: - - pg_notify_pillar: - host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }} - port: {{ pillar.get('postgres', {}).get('port', 5432) }} - dbname: securityonion - user: so_pillar_master - password: {{ pillar['secrets']['pillar_master_pass'] }} - channel: so_pillar_change - debounce_ms: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_debounce_ms', 500) }} - reconnect_backoff: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_reconnect_backoff', 5) }} - backlog_interval: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_backlog_interval', 30) }} - batch_limit: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_batch_limit', 500) }} diff --git a/salt/salt/master/files/so_pillar_reactor.conf b/salt/salt/master/files/so_pillar_reactor.conf deleted file mode 100644 index 7239bbf43..000000000 --- a/salt/salt/master/files/so_pillar_reactor.conf +++ /dev/null @@ -1,12 +0,0 @@ -# /etc/salt/master.d/so_pillar_reactor.conf -# Wires the so/pillar/changed event tag — emitted by the pg_notify_pillar -# engine — to the so_pillar_changed reactor, which dispatches to -# orch.so_pillar_reload. -# -# Lives in its own file (rather than appended to reactor_hypervisor.conf) -# so the postgres:so_pillar:enabled flag can flip it on/off independently -# of hypervisor reactor wiring. - -reactor: - - 'so/pillar/changed': - - /opt/so/saltstack/default/salt/reactor/so_pillar_changed.sls diff --git a/salt/salt/master/pg_notify_pillar_engine.sls b/salt/salt/master/pg_notify_pillar_engine.sls index b3b840263..753fd0232 100644 --- a/salt/salt/master/pg_notify_pillar_engine.sls +++ b/salt/salt/master/pg_notify_pillar_engine.sls @@ -3,55 +3,13 @@ # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. -# Deploys the pg_notify_pillar engine module + its master.d config so the -# salt-master subscribes to so_pillar.change_queue and republishes changes -# on the salt event bus as so/pillar/changed. Reactor (so_pillar_changed.sls) -# matches that tag and dispatches the appropriate orch. -# -# Gated on the same postgres:so_pillar:enabled flag as the schema and -# ext_pillar config so the three components flip together. +# Deprecated. SOC/onionconfig owns the settings database now; this state only +# removes the old so_pillar notify engine and reactor config if previously +# deployed. {% from 'allowed_states.map.jinja' import allowed_states %} {% if sls.split('.')[0] in allowed_states %} -{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %} - -pg_notify_pillar_engine_module: - file.managed: - - name: /etc/salt/engines/pg_notify_pillar.py - - source: salt://salt/engines/master/pg_notify_pillar.py - - mode: '0644' - - user: root - - group: root - - makedirs: True - - watch_in: - - service: salt_master_service - -pg_notify_pillar_engine_config: - file.managed: - - name: /etc/salt/master.d/pg_notify_pillar_engine.conf - - source: salt://salt/master/files/pg_notify_pillar_engine.conf.jinja - - template: jinja - - mode: '0640' - - user: root - - group: salt - - watch_in: - - service: salt_master_service - -pg_notify_pillar_reactor_config: - file.managed: - - name: /etc/salt/master.d/so_pillar_reactor.conf - - source: salt://salt/master/files/so_pillar_reactor.conf - - mode: '0644' - - user: root - - group: root - - watch_in: - - service: salt_master_service - -{% else %} - -# When the flag flips off, peel everything back so a rollback returns to -# pure-disk pillar with no orphan engine churning on a dead listen socket. pg_notify_pillar_engine_module_absent: file.absent: - name: /etc/salt/engines/pg_notify_pillar.py @@ -70,8 +28,6 @@ pg_notify_pillar_reactor_config_absent: - watch_in: - service: salt_master_service -{% endif %} - {% else %} {{sls}}_state_not_allowed: diff --git a/salt/soc/enabled.sls b/salt/soc/enabled.sls index 1805bacaf..4ec9cf10c 100644 --- a/salt/soc/enabled.sls +++ b/salt/soc/enabled.sls @@ -100,6 +100,29 @@ so-soc: - file: socusersroles - file: socclientsroles +onionconfig_initial_import: + cmd.run: + - name: | + set -e + SOCONFIG=/usr/sbin/so-config.py + if [ ! -x "$SOCONFIG" ]; then + SOCONFIG=/opt/so/saltstack/default/salt/manager/tools/sbin/so-config.py + fi + for i in $(seq 1 60); do + if docker exec so-postgres pg_isready -h 127.0.0.1 -U postgres -q >/dev/null 2>&1 \ + && curl -fsS --connect-timeout 2 http://{{ DOCKERMERGED.containers['so-soc'].ip }}:9822/ >/dev/null 2>&1; then + "$SOCONFIG" wait-schema --timeout 120 + "$SOCONFIG" import-all --state-file /opt/so/state/onionconfig_initial_import.done + exit 0 + fi + sleep 2 + done + echo "so-soc or so-postgres did not become ready within 120s" >&2 + exit 1 + - unless: test -f /opt/so/state/onionconfig_initial_import.done + - require: + - docker_container: so-soc + delete_so-soc_so-status.disabled: file.uncomment: - name: /opt/so/conf/so-status/so-status.conf