diff --git a/salt/manager/tools/sbin/so-yaml.py b/salt/manager/tools/sbin/so-yaml.py index 5e17cee15..ea24cef95 100755 --- a/salt/manager/tools/sbin/so-yaml.py +++ b/salt/manager/tools/sbin/so-yaml.py @@ -13,10 +13,28 @@ import json lockFile = "/tmp/so-yaml.lock" -# postsalt: dual-write each disk mutation into so_pillar.* in so-postgres so -# Salt's ext_pillar and SOC's PostgresConfigstore see the same data without -# requiring a separate writer. Failure of the PG side is logged but never -# fails the disk write — disk is canonical during the migration transition. +# postsalt: so-yaml supports three backend modes for PG-managed pillar paths: +# +# dual — write disk + mirror to so_pillar.*. Reads from disk. +# Used during the migration transition when disk is still +# canonical and PG runs as a shadow. +# postgres — write to so_pillar.* only. Reads from so_pillar.*. No disk +# file is touched. The end state once cutover is complete. +# disk — disk only, no PG. Emergency rollback escape hatch. +# +# Bootstrap and mine-driven files (secrets.sls, ca/init.sls, */nodes.sls, +# top.sls, etc.) are always handled on disk regardless of mode — those paths +# are explicitly excluded by so_yaml_postgres.locate() raising SkipPath. +# +# Mode resolution: SO_YAML_BACKEND env var, then /opt/so/conf/so-yaml/mode, +# then default 'dual' (safe upgrade behavior — flipping to 'postgres' is +# done by schema_pillar.sls after the schema is in place and the importer +# has run at least once). + +MODE_FILE = "/opt/so/conf/so-yaml/mode" +VALID_MODES = ("dual", "postgres", "disk") +DEFAULT_MODE = "dual" + try: sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import so_yaml_postgres @@ -25,6 +43,35 @@ except Exception as _exc: _SO_YAML_PG_AVAILABLE = False +def _resolveBackendMode(): + env = os.environ.get("SO_YAML_BACKEND") + if env and env in VALID_MODES: + return env + try: + with open(MODE_FILE, "r") as fh: + value = fh.read().strip() + if value in VALID_MODES: + return value + except (IOError, OSError): + pass + return DEFAULT_MODE + + +_BACKEND_MODE = _resolveBackendMode() + + +def _isPgManaged(filename): + """True when so-yaml should route this file's reads/writes through + so_pillar.*. False for bootstrap/mine-driven files that always live on + disk, and for arbitrary YAML paths outside the pillar tree.""" + if not _SO_YAML_PG_AVAILABLE: + return False + try: + return so_yaml_postgres.is_pg_managed(filename) + except Exception: + return False + + def showUsage(args): print('Usage: {} [ARGS...]'.format(sys.argv[0]), file=sys.stderr) print(' General commands:', file=sys.stderr) @@ -39,6 +86,11 @@ def showUsage(args): print(' purge - Delete the YAML file from disk and remove its rows from so_pillar.* (no KEY arg).', file=sys.stderr) print(' help - Prints this usage information.', file=sys.stderr) print('', file=sys.stderr) + print(' Backend mode:', file=sys.stderr) + print(' Resolved from $SO_YAML_BACKEND, then /opt/so/conf/so-yaml/mode, default "dual".', file=sys.stderr) + print(' Valid values: dual | postgres | disk. Bootstrap pillar files (secrets, ca, *.nodes.sls)', file=sys.stderr) + print(' are always handled on disk regardless of mode.', file=sys.stderr) + print('', file=sys.stderr) print(' Where:', file=sys.stderr) print(' YAML_FILE - Path to the file that will be modified. Ex: /opt/so/conf/service/conf.yaml', file=sys.stderr) print(' KEY - YAML key, does not support \' or " characters at this time. Ex: level1.level2', file=sys.stderr) @@ -51,6 +103,24 @@ def showUsage(args): def loadYaml(filename): + """Load a YAML file's content as a dict. + + PG-canonical mode (`postgres`): for PG-managed paths, read from + so_pillar.pillar_entry. A missing row is treated as an empty dict so + that `replace`/`add` on a fresh path can populate it from scratch. + + Other modes / non-PG-managed paths: read from disk as today. + """ + if _BACKEND_MODE == "postgres" and _isPgManaged(filename): + try: + data = so_yaml_postgres.read_yaml(filename) + except so_yaml_postgres.SkipPath: + data = None + except Exception as e: + print(f"so-yaml: pg read failed for {filename}: {e}", file=sys.stderr) + sys.exit(1) + return data if data is not None else {} + try: with open(filename, "r") as file: content = file.read() @@ -64,10 +134,33 @@ def loadYaml(filename): def writeYaml(filename, content): + """Persist `content` for `filename`. + + PG-canonical mode + PG-managed path: write only to so_pillar.*. A PG + failure is fatal (no disk fallback) — caller must retry. + + Dual mode: write disk, then mirror to PG (failures are warnings). + + Disk mode or non-PG-managed path: write disk only. + """ + if _BACKEND_MODE == "postgres" and _isPgManaged(filename): + if not _SO_YAML_PG_AVAILABLE: + print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr) + sys.exit(1) + ok, msg = so_yaml_postgres.write_yaml( + filename, content, + reason="so-yaml " + " ".join(sys.argv[1:2])) + if not ok: + print(f"so-yaml: pg write failed for {filename}: {msg}", file=sys.stderr) + sys.exit(1) + return None + file = open(filename, "w") result = yaml.safe_dump(content, file) file.close() - _mirrorToPostgres(filename, content) + + if _BACKEND_MODE == "dual": + _mirrorToPostgres(filename, content) return result @@ -75,7 +168,8 @@ def _mirrorToPostgres(filename, content): """Best-effort dual-write of a YAML mutation into so_pillar.*. Skips files outside the PG-managed pillar surface (secrets.sls, elasticsearch/nodes.sls, etc.) and silently degrades when so-postgres - is unreachable. Disk write is canonical; this never raises. + is unreachable. Disk write is canonical in dual mode; this never + raises. Only real PG failures (`pg write failed: ...`) are logged so the common cases (skipped path, postgres not running) don't pollute @@ -92,9 +186,29 @@ def _mirrorToPostgres(filename, content): def purgeFile(filename): - """Delete a YAML file from disk and mirror the deletion into PG. - Idempotent: missing file → success. Mirrors so-yaml's other verbs - in tolerating a soft PG failure.""" + """Delete a YAML file from disk and remove the matching rows from + so_pillar.*. Idempotent — missing file/row counts as success. + + PG-canonical mode + PG-managed path: PG delete is canonical. If a stale + disk file from the dual-write era happens to still exist, it's removed + too as a cleanup courtesy. PG failure is fatal in this mode. + + Dual / disk modes: remove disk first; PG cleanup is best-effort.""" + if _BACKEND_MODE == "postgres" and _isPgManaged(filename): + if not _SO_YAML_PG_AVAILABLE: + print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr) + return 1 + ok, msg = so_yaml_postgres.purge_yaml(filename, reason="so-yaml purge") + if not ok: + print(f"so-yaml: pg purge failed for {filename}: {msg}", file=sys.stderr) + return 1 + if os.path.exists(filename): + try: + os.remove(filename) + except Exception as e: + print(f"so-yaml: warn — could not remove stale disk file {filename}: {e}", file=sys.stderr) + return 0 + if os.path.exists(filename): try: os.remove(filename) @@ -102,7 +216,7 @@ def purgeFile(filename): print(f"Failed to remove {filename}: {e}", file=sys.stderr) return 1 - if _SO_YAML_PG_AVAILABLE: + if _BACKEND_MODE == "dual" and _SO_YAML_PG_AVAILABLE: try: ok, msg = so_yaml_postgres.purge_yaml(filename, reason="so-yaml purge") diff --git a/salt/manager/tools/sbin/so-yaml_test.py b/salt/manager/tools/sbin/so-yaml_test.py index 53aea8fdd..4eb017445 100644 --- a/salt/manager/tools/sbin/so-yaml_test.py +++ b/salt/manager/tools/sbin/so-yaml_test.py @@ -1106,3 +1106,214 @@ class TestSoYamlPostgres(unittest.TestCase): {"soc": {"foo": "bar"}}) self.assertFalse(ok) self.assertEqual(msg, "postgres unreachable") + + def test_is_pg_managed_true(self): + self.assertTrue(self.mod.is_pg_managed( + "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")) + self.assertTrue(self.mod.is_pg_managed( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls")) + + def test_is_pg_managed_false_for_bootstrap(self): + self.assertFalse(self.mod.is_pg_managed( + "/opt/so/saltstack/local/pillar/secrets.sls")) + self.assertFalse(self.mod.is_pg_managed( + "/opt/so/saltstack/local/pillar/postgres/auth.sls")) + self.assertFalse(self.mod.is_pg_managed( + "/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls")) + + def test_read_yaml_unreachable(self): + with patch.object(self.mod, '_is_enabled', return_value=False): + self.assertIsNone(self.mod.read_yaml( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls")) + + def test_read_yaml_skips_disk_only(self): + with patch.object(self.mod, '_is_enabled', return_value=True): + with self.assertRaises(self.mod.SkipPath): + self.mod.read_yaml( + "/opt/so/saltstack/local/pillar/secrets.sls") + + def test_read_yaml_returns_data(self): + with patch.object(self.mod, '_is_enabled', return_value=True): + with patch.object(self.mod, '_docker_psql', + return_value='{"soc": {"foo": "bar"}}\n'): + data = self.mod.read_yaml( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") + self.assertEqual(data, {"soc": {"foo": "bar"}}) + + def test_read_yaml_returns_none_when_no_row(self): + with patch.object(self.mod, '_is_enabled', return_value=True): + with patch.object(self.mod, '_docker_psql', return_value=''): + data = self.mod.read_yaml( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") + self.assertIsNone(data) + + def test_read_yaml_minion_query_shape(self): + captured = {} + + def fake_psql(sql): + captured['sql'] = sql + return '{"host": {"mainip": "10.0.0.1"}}' + + with patch.object(self.mod, '_is_enabled', return_value=True): + with patch.object(self.mod, '_docker_psql', side_effect=fake_psql): + data = self.mod.read_yaml( + "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls") + self.assertEqual(data, {"host": {"mainip": "10.0.0.1"}}) + self.assertIn("scope='minion'", captured['sql']) + self.assertIn("'h1_sensor'", captured['sql']) + self.assertIn("'minions.h1_sensor'", captured['sql']) + + def test_is_enabled_public_alias(self): + with patch.object(self.mod, '_is_enabled', return_value=True): + self.assertTrue(self.mod.is_enabled()) + with patch.object(self.mod, '_is_enabled', return_value=False): + self.assertFalse(self.mod.is_enabled()) + + +class TestSoYamlBackendMode(unittest.TestCase): + """Tests so-yaml's backend-mode resolution and PG-canonical routing + for read/write/purge. The PG calls themselves are stubbed; what we're + asserting is that the right backend is chosen for each (mode, path) + combination.""" + + def test_resolve_mode_env_overrides_file(self): + with patch.dict('os.environ', {'SO_YAML_BACKEND': 'postgres'}): + self.assertEqual(soyaml._resolveBackendMode(), 'postgres') + with patch.dict('os.environ', {'SO_YAML_BACKEND': 'disk'}): + self.assertEqual(soyaml._resolveBackendMode(), 'disk') + + def test_resolve_mode_invalid_env_falls_back(self): + with patch.dict('os.environ', {'SO_YAML_BACKEND': 'garbage'}, clear=False): + with patch('builtins.open', side_effect=IOError): + self.assertEqual(soyaml._resolveBackendMode(), 'dual') + + def test_resolve_mode_default_dual(self): + env = {k: v for k, v in __import__('os').environ.items() + if k != 'SO_YAML_BACKEND'} + with patch.dict('os.environ', env, clear=True): + with patch('builtins.open', side_effect=IOError): + self.assertEqual(soyaml._resolveBackendMode(), 'dual') + + def test_is_pg_managed_proxies(self): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + self.assertTrue(soyaml._isPgManaged( + "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")) + self.assertFalse(soyaml._isPgManaged( + "/opt/so/saltstack/local/pillar/secrets.sls")) + + def test_is_pg_managed_false_when_module_unavailable(self): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False): + self.assertFalse(soyaml._isPgManaged( + "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")) + + def test_load_yaml_postgres_mode_reads_pg(self): + with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', + return_value=True): + with patch.object(soyaml.so_yaml_postgres, 'read_yaml', + return_value={"a": 1}): + result = soyaml.loadYaml( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") + self.assertEqual(result, {"a": 1}) + + def test_load_yaml_postgres_mode_returns_empty_when_no_row(self): + with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', + return_value=True): + with patch.object(soyaml.so_yaml_postgres, 'read_yaml', + return_value=None): + result = soyaml.loadYaml( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls") + self.assertEqual(result, {}) + + def test_load_yaml_postgres_mode_reads_disk_for_bootstrap(self): + import tempfile, os as _os + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write("foo: bar\n") + tmp = f.name + try: + with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, + 'is_pg_managed', return_value=False): + result = soyaml.loadYaml(tmp) + self.assertEqual(result, {"foo": "bar"}) + finally: + _os.unlink(tmp) + + def test_write_yaml_postgres_mode_skips_disk(self): + import tempfile, os as _os + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + tmp = f.name + _os.unlink(tmp) + try: + with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', + return_value=True): + with patch.object(soyaml.so_yaml_postgres, 'write_yaml', + return_value=(True, 'ok')) as mock_w: + soyaml.writeYaml(tmp, {"x": 1}) + self.assertFalse(_os.path.exists(tmp)) + mock_w.assert_called_once() + finally: + if _os.path.exists(tmp): + _os.unlink(tmp) + + def test_write_yaml_postgres_mode_failure_is_fatal(self): + with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', + return_value=True): + with patch.object(soyaml.so_yaml_postgres, 'write_yaml', + return_value=(False, 'pg write failed: connection refused')): + with patch('sys.exit', new=MagicMock()) as sysmock: + with patch('sys.stderr', new=StringIO()) as mock_err: + soyaml.writeYaml( + "/opt/so/saltstack/local/pillar/soc/soc_soc.sls", + {"x": 1}) + sysmock.assert_called_with(1) + + def test_write_yaml_disk_mode_skips_pg(self): + import tempfile, os as _os + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + tmp = f.name + try: + with patch.object(soyaml, '_BACKEND_MODE', 'disk'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, 'write_yaml') as mock_w: + soyaml.writeYaml(tmp, {"x": 1}) + mock_w.assert_not_called() + with open(tmp) as f: + self.assertIn('x: 1', f.read()) + finally: + _os.unlink(tmp) + + def test_purge_postgres_mode_calls_pg_only(self): + import tempfile, os as _os + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + tmp = f.name + _os.unlink(tmp) + with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', + return_value=True): + with patch.object(soyaml.so_yaml_postgres, 'purge_yaml', + return_value=(True, 'ok')) as mock_p: + rc = soyaml.purgeFile(tmp) + self.assertEqual(rc, 0) + mock_p.assert_called_once() + + def test_purge_postgres_mode_failure_returns_nonzero(self): + with patch.object(soyaml, '_BACKEND_MODE', 'postgres'): + with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True): + with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed', + return_value=True): + with patch.object(soyaml.so_yaml_postgres, 'purge_yaml', + return_value=(False, 'pg purge failed: x')): + with patch('sys.stderr', new=StringIO()): + rc = soyaml.purgeFile( + "/opt/so/saltstack/local/pillar/minions/h1_sensor.sls") + self.assertEqual(rc, 1) diff --git a/salt/manager/tools/sbin/so_yaml_postgres.py b/salt/manager/tools/sbin/so_yaml_postgres.py index dbf47b1ea..b23254a1a 100644 --- a/salt/manager/tools/sbin/so_yaml_postgres.py +++ b/salt/manager/tools/sbin/so_yaml_postgres.py @@ -69,6 +69,12 @@ class SkipPath(Exception): """Raised when a file path is intentionally not mirrored to PG.""" +def is_enabled(): + """Public alias for callers that want to probe PG reachability without + relying on a leading-underscore private name.""" + return _is_enabled() + + def _is_enabled(): """PG dual-write only fires if so-postgres is reachable. Cheap probe. Returns True when docker exec succeeds, False otherwise. We never @@ -149,6 +155,55 @@ def _conflict_target(scope): raise ValueError(f"unknown scope {scope!r}") +def is_pg_managed(path): + """True if this path maps to a so_pillar.* row (locate() succeeds). + Bootstrap and mine-driven files return False — they always live on + disk regardless of so-yaml's backend mode.""" + try: + locate(path) + return True + except SkipPath: + return False + + +def read_yaml(path): + """Return the content dict stored in so_pillar.pillar_entry for `path`, + or None when no row exists. Raises SkipPath when `path` is not part of + the PG-managed surface (caller should read disk in that case). + + Used by so-yaml.py PG-canonical mode so `replace`, `get`, etc. resolve + against the database rather than a stale (or absent) disk file.""" + if not _is_enabled(): + return None + scope, role, minion_id, pillar_path = locate(path) + + if scope == "minion": + sql = ("SELECT data FROM so_pillar.pillar_entry " + "WHERE scope='minion' " + f"AND minion_id={_pg_str(minion_id)} " + f"AND pillar_path={_pg_str(pillar_path)}") + elif scope == "role": + sql = ("SELECT data FROM so_pillar.pillar_entry " + "WHERE scope='role' " + f"AND role_name={_pg_str(role)} " + f"AND pillar_path={_pg_str(pillar_path)}") + else: + sql = ("SELECT data FROM so_pillar.pillar_entry " + "WHERE scope='global' " + f"AND pillar_path={_pg_str(pillar_path)}") + + try: + out = _docker_psql(sql).strip() + except Exception: + return None + if not out: + return None + try: + return json.loads(out) + except (ValueError, TypeError): + return None + + def write_yaml(path, content_dict, *, reason="so-yaml dual-write"): """Mirror the disk write at `path` (whose content was just rendered as `content_dict`) into so_pillar.pillar_entry. Best-effort: any failure diff --git a/salt/orch/so_pillar_reload.sls b/salt/orch/so_pillar_reload.sls new file mode 100644 index 000000000..90e7646b1 --- /dev/null +++ b/salt/orch/so_pillar_reload.sls @@ -0,0 +1,112 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +# Driven by the so_pillar_changed reactor. Translates a so_pillar.pillar_entry +# change into (cache.clear_pillar -> saltutil.refresh_pillar -> state.apply) +# on the appropriate target. +# +# Routing rules live in the DISPATCH map below — one entry per +# (pillar_path prefix) -> (state sls, role grain). Add new services here +# rather than wiring more reactors. +# +# Idempotent: state.apply is idempotent; if the pillar value didn't actually +# change anything observable, the affected state runs a no-op. Bulk imports +# and replays are safe. + +{% set change = salt['pillar.get']('so_pillar_change', {}) %} +{% set scope = change.get('scope') %} +{% set role = change.get('role_name') %} +{% set minion = change.get('minion_id') %} +{% set changes = change.get('changes', []) %} + +{# (pillar_path prefix) -> {sls: , role: } + role is a grain value (e.g. 'so-sensor'), used to compute compound targets + when the change is global or role-scoped. #} +{% set DISPATCH = { + 'suricata.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, + 'sensor.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, + 'zeek.': {'sls': 'zeek.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, + 'stenographer.': {'sls': 'stenographer.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, + 'pcap.': {'sls': 'pcap.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, + 'logstash.': {'sls': 'logstash.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver']}, + 'redis.': {'sls': 'redis.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']}, + 'kafka.': {'sls': 'kafka.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver', 'so-searchnode']}, + 'elasticsearch.': {'sls': 'elasticsearch.config','roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-searchnode', 'so-heavynode', 'so-standalone']}, + 'kibana.': {'sls': 'kibana.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']}, + 'soc.': {'sls': 'soc.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']}, + 'telegraf.': {'sls': 'telegraf.config', 'roles': ['*']}, + 'fleet.': {'sls': 'fleet.config', 'roles': ['so-fleet']}, + 'strelka.': {'sls': 'strelka.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']}, +} %} + +{# Collect a deduplicated set of (sls, target_kind) actions. target_kind is + either 'minion:' (scope=minion) or 'roles:so-x,so-y' (scope=role/global). #} +{% set actions = {} %} + +{% for c in changes %} +{% set path = c.get('pillar_path', '') %} +{% for prefix, action in DISPATCH.items() %} +{% if path.startswith(prefix) %} +{% set sls = action['sls'] %} +{% if scope == 'minion' and minion %} +{% set key = sls ~ '|minion|' ~ minion %} +{% set _ = actions.update({key: {'sls': sls, 'tgt': minion, 'tgt_type': 'glob'}}) %} +{% else %} +{% set role_targets = action['roles'] %} +{% if '*' in role_targets %} +{% set tgt = '*' %} +{% set tgt_type = 'glob' %} +{% else %} +{% set tgt = ('I@role:' ~ role_targets|join(' or I@role:')) %} +{% set tgt_type = 'compound' %} +{% endif %} +{% set key = sls ~ '|' ~ tgt %} +{% set _ = actions.update({key: {'sls': sls, 'tgt': tgt, 'tgt_type': tgt_type}}) %} +{% endif %} +{% endif %} +{% endfor %} +{% endfor %} + +{% if actions %} + +{% for key, action in actions.items() %} +{% set safe_id = loop.index0 | string %} + +so_pillar_reload_clear_cache_{{ safe_id }}: + salt.runner: + - name: cache.clear_pillar + - tgt: '{{ action.tgt }}' + - tgt_type: '{{ action.tgt_type }}' + +so_pillar_reload_refresh_pillar_{{ safe_id }}: + salt.function: + - name: saltutil.refresh_pillar + - tgt: '{{ action.tgt }}' + - tgt_type: '{{ action.tgt_type }}' + - kwarg: + wait: True + - require: + - salt: so_pillar_reload_clear_cache_{{ safe_id }} + +so_pillar_reload_apply_state_{{ safe_id }}: + salt.state: + - tgt: '{{ action.tgt }}' + - tgt_type: '{{ action.tgt_type }}' + - sls: + - {{ action.sls }} + - queue: True + - require: + - salt: so_pillar_reload_refresh_pillar_{{ safe_id }} +{% endfor %} + +{% else %} + +{# No DISPATCH entry matched. Pillar still gets refreshed so any other states + read fresh values, but no service-specific reload is invoked. #} +so_pillar_reload_unmapped_path_noop: + test.nop + {% do salt.log.info('orch.so_pillar_reload: no dispatch match for %s' % changes) %} + +{% endif %} diff --git a/salt/postgres/files/schema/pillar/006_rls.sql b/salt/postgres/files/schema/pillar/006_rls.sql index dba890fc3..3b0da95ae 100644 --- a/salt/postgres/files/schema/pillar/006_rls.sql +++ b/salt/postgres/files/schema/pillar/006_rls.sql @@ -37,6 +37,16 @@ GRANT SELECT ON so_pillar.v_pillar_global, TO so_pillar_master; GRANT EXECUTE ON FUNCTION so_pillar.fn_pillar_secrets(text) TO so_pillar_master; +-- Engine reads + drains the change queue from the salt-master process. It +-- needs SELECT to find unprocessed rows and UPDATE to mark them processed. +-- The queue contains only locator metadata (no pillar data), so the master +-- role's existing privilege footprint is unchanged in practice. +GRANT SELECT, UPDATE ON so_pillar.change_queue TO so_pillar_master; +GRANT USAGE ON SEQUENCE so_pillar.change_queue_id_seq TO so_pillar_master; +-- Writer needs INSERT (the trigger runs as table owner, so this is just for +-- direct testing / manual replays from psql). +GRANT INSERT ON so_pillar.change_queue TO so_pillar_writer; + -- Writer needs CRUD on pillar_entry/minion/role_member plus access to seed tables. GRANT SELECT, INSERT, UPDATE, DELETE ON so_pillar.pillar_entry, diff --git a/salt/postgres/files/schema/pillar/008_change_notify.sql b/salt/postgres/files/schema/pillar/008_change_notify.sql new file mode 100644 index 000000000..7fe22ad18 --- /dev/null +++ b/salt/postgres/files/schema/pillar/008_change_notify.sql @@ -0,0 +1,77 @@ +-- pg_notify-driven change fan-out for so_pillar.pillar_entry. +-- +-- Two layers: +-- 1. so_pillar.change_queue — durable, drained by the salt-master +-- engine. Survives engine downtime, +-- de-duplicated by id, processed once. +-- 2. pg_notify('so_pillar_change') — wakeup signal. Payload is the +-- change_queue row id and locator +-- (no secret data — channels are +-- snoopable by anyone with LISTEN). +-- +-- The salt-master engine LISTENs on the channel for low-latency wakeup, +-- then SELECTs unprocessed change_queue rows so a missed notification +-- (engine restart, network blip) self-heals on the next event. + +CREATE TABLE IF NOT EXISTS so_pillar.change_queue ( + id bigserial PRIMARY KEY, + scope text NOT NULL, + role_name text, + minion_id text, + pillar_path text NOT NULL, + op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')), + enqueued_at timestamptz NOT NULL DEFAULT now(), + processed_at timestamptz +); + +-- Hot index for the engine's drain query. +CREATE INDEX IF NOT EXISTS ix_change_queue_unprocessed + ON so_pillar.change_queue (id) + WHERE processed_at IS NULL; + +-- Retention index: pg_cron job in 007 sweeps processed rows older than 7d. +CREATE INDEX IF NOT EXISTS ix_change_queue_processed_at + ON so_pillar.change_queue (processed_at) + WHERE processed_at IS NOT NULL; + +CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_notify() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +DECLARE + v_row record; + v_id bigint; +BEGIN + IF TG_OP = 'DELETE' THEN + v_row := OLD; + ELSE + v_row := NEW; + END IF; + + INSERT INTO so_pillar.change_queue + (scope, role_name, minion_id, pillar_path, op) + VALUES + (v_row.scope, v_row.role_name, v_row.minion_id, v_row.pillar_path, TG_OP) + RETURNING id INTO v_id; + + -- Payload is the queue id + locator only. Engine joins back to + -- pillar_entry if it needs the data — keeps secrets off the wire. + PERFORM pg_notify('so_pillar_change', json_build_object( + 'queue_id', v_id, + 'scope', v_row.scope, + 'role_name', v_row.role_name, + 'minion_id', v_row.minion_id, + 'pillar_path', v_row.pillar_path, + 'op', TG_OP + )::text); + + RETURN NULL; +END; +$$; + +DROP TRIGGER IF EXISTS tg_pillar_entry_notify ON so_pillar.pillar_entry; +CREATE TRIGGER tg_pillar_entry_notify + AFTER INSERT OR UPDATE OR DELETE + ON so_pillar.pillar_entry + FOR EACH ROW + EXECUTE FUNCTION so_pillar.fn_pillar_entry_notify(); diff --git a/salt/postgres/schema_pillar.sls b/salt/postgres/schema_pillar.sls index 22599a332..462855b82 100644 --- a/salt/postgres/schema_pillar.sls +++ b/salt/postgres/schema_pillar.sls @@ -54,6 +54,10 @@ so_pillar_postgres_wait_ready: {% do sql_files.append('007_drift_pgcron.sql') %} {% endif %} +# 008 always applies — pg_notify-driven change fan-out is what the salt-master +# pg_notify_pillar engine consumes. Without it reactor wiring sees no events. +{% do sql_files.append('008_change_notify.sql') %} + {% for sql_file in sql_files %} so_pillar_apply_{{ sql_file | replace('.', '_') }}: cmd.run: @@ -87,7 +91,7 @@ so_pillar_master_key_configure: exit 1 fi - require: - - cmd: so_pillar_apply_006_rls_sql + - cmd: so_pillar_apply_{{ sql_files[-1] | replace('.', '_') }} # Run the importer once after the schema is in place. Idempotent — re-runs # with no SLS edits produce zero row changes. @@ -97,6 +101,29 @@ so_pillar_initial_import: - require: - cmd: so_pillar_master_key_configure +# Flip so-yaml from dual-write to PG-canonical for managed paths now that +# the schema and importer are both in place. Bootstrap files (secrets.sls, +# postgres/auth.sls, ca/init.sls, *.nodes.sls, top.sls, ...) remain on disk +# regardless because so_yaml_postgres.locate() raises SkipPath for them. +so_pillar_so_yaml_mode_dir: + file.directory: + - name: /opt/so/conf/so-yaml + - user: socore + - group: socore + - mode: '0755' + - makedirs: True + +so_pillar_so_yaml_mode_postgres: + file.managed: + - name: /opt/so/conf/so-yaml/mode + - contents: postgres + - user: socore + - group: socore + - mode: '0644' + - require: + - file: so_pillar_so_yaml_mode_dir + - cmd: so_pillar_initial_import + {% else %} so_pillar_disabled_noop: diff --git a/salt/reactor/so_pillar_changed.sls b/salt/reactor/so_pillar_changed.sls new file mode 100644 index 000000000..5d8995e21 --- /dev/null +++ b/salt/reactor/so_pillar_changed.sls @@ -0,0 +1,27 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +# Fires for every event tagged 'so/pillar/changed'. Source of those events +# is the pg_notify_pillar engine on the salt-master, which in turn drains +# so_pillar.change_queue (populated by the AFTER trigger on +# so_pillar.pillar_entry — see 008_change_notify.sql). +# +# All routing logic — which pillar paths reload which services on which +# targets — lives in orch.so_pillar_reload so it stays editable as one +# YAML table without touching reactor wiring. + +{% set payload = data.get('data', {}) %} +{% do salt.log.info('so_pillar_changed reactor: %s' % payload) %} + +so_pillar_dispatch_reload: + runner.state.orchestrate: + - args: + - mods: orch.so_pillar_reload + - pillar: + so_pillar_change: + scope: {{ payload.get('scope') | json }} + role_name: {{ payload.get('role_name') | json }} + minion_id: {{ payload.get('minion_id') | json }} + changes: {{ payload.get('changes', []) | json }} diff --git a/salt/salt/engines/master/pg_notify_pillar.py b/salt/salt/engines/master/pg_notify_pillar.py new file mode 100644 index 000000000..42fb9836e --- /dev/null +++ b/salt/salt/engines/master/pg_notify_pillar.py @@ -0,0 +1,200 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +# -*- coding: utf-8 -*- + +""" +pg_notify_pillar — Salt master engine that bridges so_pillar.change_queue +into the Salt event bus. + +Architecture (see 008_change_notify.sql): + pillar_entry -- AFTER trigger --> change_queue (durable) + + pg_notify('so_pillar_change') (wakeup) + | + LISTEN <-- this engine <-+ + SELECT/UPDATE change_queue + | + fire_event('so/pillar/changed', ...) + | + reactor matches tag --> orch + +Why a queue + notify rather than just notify: pg_notify is fire-and-forget +within a session. If the engine is down or the LISTEN connection is broken +when a write happens, the notification is lost forever. The change_queue +lets us recover — on (re)connect, we drain everything still flagged +processed_at IS NULL. + +Debounce: bulk operations (so-pillar-import, fresh installs) can fire +hundreds of notifications per second. The engine collects whatever lands in +a short window and emits one event per (scope, role, minion) tuple so the +reactor isn't stampeded. +""" + +import json +import logging +import os +import select +import time + +import salt.utils.event + +log = logging.getLogger(__name__) + +__virtualname__ = 'pg_notify_pillar' + +DEFAULT_CHANNEL = 'so_pillar_change' +DEFAULT_DEBOUNCE_MS = 500 +DEFAULT_RECONNECT_BACKOFF = 5 +DEFAULT_BACKLOG_INTERVAL = 30 +DEFAULT_BATCH_LIMIT = 500 + +EVENT_TAG = 'so/pillar/changed' + + +def __virtual__(): + try: + import psycopg2 # noqa: F401 + return __virtualname__ + except ImportError: + return False, 'pg_notify_pillar engine requires psycopg2' + + +def start(dsn=None, + host='127.0.0.1', + port=5432, + dbname='securityonion', + user='so_pillar_master', + password=None, + channel=DEFAULT_CHANNEL, + debounce_ms=DEFAULT_DEBOUNCE_MS, + reconnect_backoff=DEFAULT_RECONNECT_BACKOFF, + backlog_interval=DEFAULT_BACKLOG_INTERVAL, + batch_limit=DEFAULT_BATCH_LIMIT, + password_file=None): + """ + Run the change-queue bridge until the master shuts the engine down. + + Either pass a full ``dsn`` string, or supply discrete kwargs. The + password may also be read from ``password_file`` (mode 0400) so the + engine config in ``/etc/salt/master.d/`` doesn't have to embed it + inline — only the file path. + """ + import psycopg2 + import psycopg2.extensions + + if dsn is None: + if password is None and password_file: + try: + with open(password_file, 'r') as fh: + password = fh.read().strip() + except (IOError, OSError) as exc: + log.error('pg_notify_pillar: cannot read password_file %s: %s', + password_file, exc) + return + dsn = _build_dsn(host=host, port=port, dbname=dbname, + user=user, password=password) + + bus = salt.utils.event.get_master_event( + __opts__, __opts__['sock_dir'], listen=False) + + log.info('pg_notify_pillar: starting (channel=%s debounce=%dms)', + channel, debounce_ms) + + while True: + conn = None + try: + conn = psycopg2.connect(dsn) + conn.set_isolation_level( + psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = conn.cursor() + cur.execute('LISTEN {0}'.format(channel)) + log.info('pg_notify_pillar: connected; LISTEN %s', channel) + + _drain(cur, bus, batch_limit) + + while True: + ready, _, _ = select.select([conn], [], [], backlog_interval) + if not ready: + _drain(cur, bus, batch_limit) + continue + + conn.poll() + _consume_notifies(conn) + + if debounce_ms > 0: + time.sleep(debounce_ms / 1000.0) + conn.poll() + _consume_notifies(conn) + + _drain(cur, bus, batch_limit) + + except Exception as exc: # psycopg2.Error subclasses + OS errors + log.error('pg_notify_pillar: %s; reconnecting in %ds', + exc, reconnect_backoff) + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass + time.sleep(reconnect_backoff) + + +def _build_dsn(host, port, dbname, user, password): + parts = ['host={0}'.format(host), + 'port={0}'.format(port), + 'dbname={0}'.format(dbname), + 'user={0}'.format(user)] + if password: + parts.append('password={0}'.format(password)) + return ' '.join(parts) + + +def _consume_notifies(conn): + # We don't use the payload directly — the queue table is the source of + # truth, and draining it covers any notifications we missed. So just + # discard them; their presence already proved there's something to drain. + while conn.notifies: + conn.notifies.pop(0) + + +def _drain(cur, bus, batch_limit): + """Mark unprocessed change_queue rows processed and emit one event per + (scope, role_name, minion_id) group. SKIP LOCKED so multiple masters + sharing a Postgres don't double-process.""" + cur.execute(""" + UPDATE so_pillar.change_queue + SET processed_at = now() + WHERE id IN ( + SELECT id FROM so_pillar.change_queue + WHERE processed_at IS NULL + ORDER BY id + FOR UPDATE SKIP LOCKED + LIMIT %s) + RETURNING id, scope, role_name, minion_id, pillar_path, op + """, (batch_limit,)) + rows = cur.fetchall() + if not rows: + return + + groups = {} + for row_id, scope, role_name, minion_id, pillar_path, op in rows: + key = (scope, role_name, minion_id) + groups.setdefault(key, []).append({ + 'queue_id': row_id, + 'pillar_path': pillar_path, + 'op': op, + }) + + for (scope, role_name, minion_id), changes in groups.items(): + payload = { + 'scope': scope, + 'role_name': role_name, + 'minion_id': minion_id, + 'changes': changes, + } + log.debug('pg_notify_pillar: firing %s for %s', + EVENT_TAG, payload) + bus.fire_event(payload, EVENT_TAG) diff --git a/salt/salt/master.sls b/salt/salt/master.sls index 01a6a92a8..1ef11dfd5 100644 --- a/salt/salt/master.sls +++ b/salt/salt/master.sls @@ -15,6 +15,7 @@ include: - salt.minion - salt.master.ext_pillar_postgres + - salt.master.pg_notify_pillar_engine {% if 'vrt' in salt['pillar.get']('features', []) %} - salt.cloud - salt.cloud.reactor_config_hypervisor diff --git a/salt/salt/master/files/pg_notify_pillar_engine.conf.jinja b/salt/salt/master/files/pg_notify_pillar_engine.conf.jinja new file mode 100644 index 000000000..44c8482f6 --- /dev/null +++ b/salt/salt/master/files/pg_notify_pillar_engine.conf.jinja @@ -0,0 +1,20 @@ +# /etc/salt/master.d/pg_notify_pillar_engine.conf +# Rendered by salt/salt/master/pg_notify_pillar_engine.sls. +# +# Subscribes the salt-master to so_pillar.change_queue via LISTEN +# so_pillar_change. The engine drains queued changes and re-publishes +# them on the event bus as 'so/pillar/changed'. Reactor wiring is in +# so_pillar_reactor.conf. + +engines: + - pg_notify_pillar: + host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }} + port: {{ pillar.get('postgres', {}).get('port', 5432) }} + dbname: securityonion + user: so_pillar_master + password: {{ pillar['secrets']['pillar_master_pass'] }} + channel: so_pillar_change + debounce_ms: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_debounce_ms', 500) }} + reconnect_backoff: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_reconnect_backoff', 5) }} + backlog_interval: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_backlog_interval', 30) }} + batch_limit: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_batch_limit', 500) }} diff --git a/salt/salt/master/files/so_pillar_reactor.conf b/salt/salt/master/files/so_pillar_reactor.conf new file mode 100644 index 000000000..7239bbf43 --- /dev/null +++ b/salt/salt/master/files/so_pillar_reactor.conf @@ -0,0 +1,12 @@ +# /etc/salt/master.d/so_pillar_reactor.conf +# Wires the so/pillar/changed event tag — emitted by the pg_notify_pillar +# engine — to the so_pillar_changed reactor, which dispatches to +# orch.so_pillar_reload. +# +# Lives in its own file (rather than appended to reactor_hypervisor.conf) +# so the postgres:so_pillar:enabled flag can flip it on/off independently +# of hypervisor reactor wiring. + +reactor: + - 'so/pillar/changed': + - /opt/so/saltstack/default/salt/reactor/so_pillar_changed.sls diff --git a/salt/salt/master/pg_notify_pillar_engine.sls b/salt/salt/master/pg_notify_pillar_engine.sls new file mode 100644 index 000000000..b3b840263 --- /dev/null +++ b/salt/salt/master/pg_notify_pillar_engine.sls @@ -0,0 +1,81 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +# Deploys the pg_notify_pillar engine module + its master.d config so the +# salt-master subscribes to so_pillar.change_queue and republishes changes +# on the salt event bus as so/pillar/changed. Reactor (so_pillar_changed.sls) +# matches that tag and dispatches the appropriate orch. +# +# Gated on the same postgres:so_pillar:enabled flag as the schema and +# ext_pillar config so the three components flip together. + +{% from 'allowed_states.map.jinja' import allowed_states %} +{% if sls.split('.')[0] in allowed_states %} + +{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %} + +pg_notify_pillar_engine_module: + file.managed: + - name: /etc/salt/engines/pg_notify_pillar.py + - source: salt://salt/engines/master/pg_notify_pillar.py + - mode: '0644' + - user: root + - group: root + - makedirs: True + - watch_in: + - service: salt_master_service + +pg_notify_pillar_engine_config: + file.managed: + - name: /etc/salt/master.d/pg_notify_pillar_engine.conf + - source: salt://salt/master/files/pg_notify_pillar_engine.conf.jinja + - template: jinja + - mode: '0640' + - user: root + - group: salt + - watch_in: + - service: salt_master_service + +pg_notify_pillar_reactor_config: + file.managed: + - name: /etc/salt/master.d/so_pillar_reactor.conf + - source: salt://salt/master/files/so_pillar_reactor.conf + - mode: '0644' + - user: root + - group: root + - watch_in: + - service: salt_master_service + +{% else %} + +# When the flag flips off, peel everything back so a rollback returns to +# pure-disk pillar with no orphan engine churning on a dead listen socket. +pg_notify_pillar_engine_module_absent: + file.absent: + - name: /etc/salt/engines/pg_notify_pillar.py + - watch_in: + - service: salt_master_service + +pg_notify_pillar_engine_config_absent: + file.absent: + - name: /etc/salt/master.d/pg_notify_pillar_engine.conf + - watch_in: + - service: salt_master_service + +pg_notify_pillar_reactor_config_absent: + file.absent: + - name: /etc/salt/master.d/so_pillar_reactor.conf + - watch_in: + - service: salt_master_service + +{% endif %} + +{% else %} + +{{sls}}_state_not_allowed: + test.fail_without_changes: + - name: {{sls}}_state_not_allowed + +{% endif %}