mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2026-05-09 04:42:40 +02:00
make so-yaml PG-canonical and add pillar-change reactor stack
Two coupled changes that together let so_pillar.* be the canonical config store, with config edits driving service reloads automatically: so-yaml PG-canonical mode - Adds /opt/so/conf/so-yaml/mode (and SO_YAML_BACKEND env override) with three values: dual (legacy), postgres (PG-only for managed paths), disk (emergency rollback). Bootstrap files (secrets.sls, ca/init.sls, *.nodes.sls, top.sls, ...) stay disk-only regardless via the existing SkipPath allowlist in so_yaml_postgres.locate. - loadYaml/writeYaml/purgeFile now route to so_pillar.* in postgres mode: replace/add/get all read+write the database with no disk file ever appearing. PG failure is fatal in postgres mode (no silent fallback); dual mode preserves the prior best-effort mirror. - so_yaml_postgres gains read_yaml(path), is_pg_managed(path), and is_enabled() so so-yaml can answer "is this path PG-managed and is PG up" without reaching into private helpers. - schema_pillar.sls writes /opt/so/conf/so-yaml/mode = postgres after the importer succeeds, so flipping postgres:so_pillar:enabled flips so-yaml's behavior in lockstep with the schema being live. pg_notify-driven change fan-out - 008_change_notify.sql adds so_pillar.change_queue + an AFTER trigger on pillar_entry that enqueues the locator and pg_notifies 'so_pillar_change'. Queue is drained at-least-once so engine restarts don't lose events; pg_notify is just the wakeup signal. - New salt-master engine pg_notify_pillar.py LISTENs on the channel, drains the queue with FOR UPDATE SKIP LOCKED, debounces bursts, and fires 'so/pillar/changed' events grouped by (scope, role, minion). - Reactor so_pillar_changed.sls catches the tag and dispatches to orch.so_pillar_reload, which carries a DISPATCH map of pillar-path prefix -> (state sls, role grain set) so adding a new service to the auto-reload list is a one-line edit instead of a new reactor. - Engine + reactor wiring is gated on the same postgres:so_pillar:enabled flag as the schema and ext_pillar config so the whole stack flips on/off together. Tests: 21 new cases (112 total, all passing) covering mode resolution, PG-managed detection, and PG-canonical read/write/purge routing with the PG client stubbed.
This commit is contained in:
@@ -13,10 +13,28 @@ import json
|
||||
|
||||
lockFile = "/tmp/so-yaml.lock"
|
||||
|
||||
# postsalt: dual-write each disk mutation into so_pillar.* in so-postgres so
|
||||
# Salt's ext_pillar and SOC's PostgresConfigstore see the same data without
|
||||
# requiring a separate writer. Failure of the PG side is logged but never
|
||||
# fails the disk write — disk is canonical during the migration transition.
|
||||
# postsalt: so-yaml supports three backend modes for PG-managed pillar paths:
|
||||
#
|
||||
# dual — write disk + mirror to so_pillar.*. Reads from disk.
|
||||
# Used during the migration transition when disk is still
|
||||
# canonical and PG runs as a shadow.
|
||||
# postgres — write to so_pillar.* only. Reads from so_pillar.*. No disk
|
||||
# file is touched. The end state once cutover is complete.
|
||||
# disk — disk only, no PG. Emergency rollback escape hatch.
|
||||
#
|
||||
# Bootstrap and mine-driven files (secrets.sls, ca/init.sls, */nodes.sls,
|
||||
# top.sls, etc.) are always handled on disk regardless of mode — those paths
|
||||
# are explicitly excluded by so_yaml_postgres.locate() raising SkipPath.
|
||||
#
|
||||
# Mode resolution: SO_YAML_BACKEND env var, then /opt/so/conf/so-yaml/mode,
|
||||
# then default 'dual' (safe upgrade behavior — flipping to 'postgres' is
|
||||
# done by schema_pillar.sls after the schema is in place and the importer
|
||||
# has run at least once).
|
||||
|
||||
MODE_FILE = "/opt/so/conf/so-yaml/mode"
|
||||
VALID_MODES = ("dual", "postgres", "disk")
|
||||
DEFAULT_MODE = "dual"
|
||||
|
||||
try:
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
import so_yaml_postgres
|
||||
@@ -25,6 +43,35 @@ except Exception as _exc:
|
||||
_SO_YAML_PG_AVAILABLE = False
|
||||
|
||||
|
||||
def _resolveBackendMode():
|
||||
env = os.environ.get("SO_YAML_BACKEND")
|
||||
if env and env in VALID_MODES:
|
||||
return env
|
||||
try:
|
||||
with open(MODE_FILE, "r") as fh:
|
||||
value = fh.read().strip()
|
||||
if value in VALID_MODES:
|
||||
return value
|
||||
except (IOError, OSError):
|
||||
pass
|
||||
return DEFAULT_MODE
|
||||
|
||||
|
||||
_BACKEND_MODE = _resolveBackendMode()
|
||||
|
||||
|
||||
def _isPgManaged(filename):
|
||||
"""True when so-yaml should route this file's reads/writes through
|
||||
so_pillar.*. False for bootstrap/mine-driven files that always live on
|
||||
disk, and for arbitrary YAML paths outside the pillar tree."""
|
||||
if not _SO_YAML_PG_AVAILABLE:
|
||||
return False
|
||||
try:
|
||||
return so_yaml_postgres.is_pg_managed(filename)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def showUsage(args):
|
||||
print('Usage: {} <COMMAND> <YAML_FILE> [ARGS...]'.format(sys.argv[0]), file=sys.stderr)
|
||||
print(' General commands:', file=sys.stderr)
|
||||
@@ -39,6 +86,11 @@ def showUsage(args):
|
||||
print(' purge - Delete the YAML file from disk and remove its rows from so_pillar.* (no KEY arg).', file=sys.stderr)
|
||||
print(' help - Prints this usage information.', file=sys.stderr)
|
||||
print('', file=sys.stderr)
|
||||
print(' Backend mode:', file=sys.stderr)
|
||||
print(' Resolved from $SO_YAML_BACKEND, then /opt/so/conf/so-yaml/mode, default "dual".', file=sys.stderr)
|
||||
print(' Valid values: dual | postgres | disk. Bootstrap pillar files (secrets, ca, *.nodes.sls)', file=sys.stderr)
|
||||
print(' are always handled on disk regardless of mode.', file=sys.stderr)
|
||||
print('', file=sys.stderr)
|
||||
print(' Where:', file=sys.stderr)
|
||||
print(' YAML_FILE - Path to the file that will be modified. Ex: /opt/so/conf/service/conf.yaml', file=sys.stderr)
|
||||
print(' KEY - YAML key, does not support \' or " characters at this time. Ex: level1.level2', file=sys.stderr)
|
||||
@@ -51,6 +103,24 @@ def showUsage(args):
|
||||
|
||||
|
||||
def loadYaml(filename):
|
||||
"""Load a YAML file's content as a dict.
|
||||
|
||||
PG-canonical mode (`postgres`): for PG-managed paths, read from
|
||||
so_pillar.pillar_entry. A missing row is treated as an empty dict so
|
||||
that `replace`/`add` on a fresh path can populate it from scratch.
|
||||
|
||||
Other modes / non-PG-managed paths: read from disk as today.
|
||||
"""
|
||||
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
|
||||
try:
|
||||
data = so_yaml_postgres.read_yaml(filename)
|
||||
except so_yaml_postgres.SkipPath:
|
||||
data = None
|
||||
except Exception as e:
|
||||
print(f"so-yaml: pg read failed for {filename}: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return data if data is not None else {}
|
||||
|
||||
try:
|
||||
with open(filename, "r") as file:
|
||||
content = file.read()
|
||||
@@ -64,10 +134,33 @@ def loadYaml(filename):
|
||||
|
||||
|
||||
def writeYaml(filename, content):
|
||||
"""Persist `content` for `filename`.
|
||||
|
||||
PG-canonical mode + PG-managed path: write only to so_pillar.*. A PG
|
||||
failure is fatal (no disk fallback) — caller must retry.
|
||||
|
||||
Dual mode: write disk, then mirror to PG (failures are warnings).
|
||||
|
||||
Disk mode or non-PG-managed path: write disk only.
|
||||
"""
|
||||
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
|
||||
if not _SO_YAML_PG_AVAILABLE:
|
||||
print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
ok, msg = so_yaml_postgres.write_yaml(
|
||||
filename, content,
|
||||
reason="so-yaml " + " ".join(sys.argv[1:2]))
|
||||
if not ok:
|
||||
print(f"so-yaml: pg write failed for {filename}: {msg}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return None
|
||||
|
||||
file = open(filename, "w")
|
||||
result = yaml.safe_dump(content, file)
|
||||
file.close()
|
||||
_mirrorToPostgres(filename, content)
|
||||
|
||||
if _BACKEND_MODE == "dual":
|
||||
_mirrorToPostgres(filename, content)
|
||||
return result
|
||||
|
||||
|
||||
@@ -75,7 +168,8 @@ def _mirrorToPostgres(filename, content):
|
||||
"""Best-effort dual-write of a YAML mutation into so_pillar.*. Skips
|
||||
files outside the PG-managed pillar surface (secrets.sls,
|
||||
elasticsearch/nodes.sls, etc.) and silently degrades when so-postgres
|
||||
is unreachable. Disk write is canonical; this never raises.
|
||||
is unreachable. Disk write is canonical in dual mode; this never
|
||||
raises.
|
||||
|
||||
Only real PG failures (`pg write failed: ...`) are logged so the
|
||||
common cases (skipped path, postgres not running) don't pollute
|
||||
@@ -92,9 +186,29 @@ def _mirrorToPostgres(filename, content):
|
||||
|
||||
|
||||
def purgeFile(filename):
|
||||
"""Delete a YAML file from disk and mirror the deletion into PG.
|
||||
Idempotent: missing file → success. Mirrors so-yaml's other verbs
|
||||
in tolerating a soft PG failure."""
|
||||
"""Delete a YAML file from disk and remove the matching rows from
|
||||
so_pillar.*. Idempotent — missing file/row counts as success.
|
||||
|
||||
PG-canonical mode + PG-managed path: PG delete is canonical. If a stale
|
||||
disk file from the dual-write era happens to still exist, it's removed
|
||||
too as a cleanup courtesy. PG failure is fatal in this mode.
|
||||
|
||||
Dual / disk modes: remove disk first; PG cleanup is best-effort."""
|
||||
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
|
||||
if not _SO_YAML_PG_AVAILABLE:
|
||||
print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr)
|
||||
return 1
|
||||
ok, msg = so_yaml_postgres.purge_yaml(filename, reason="so-yaml purge")
|
||||
if not ok:
|
||||
print(f"so-yaml: pg purge failed for {filename}: {msg}", file=sys.stderr)
|
||||
return 1
|
||||
if os.path.exists(filename):
|
||||
try:
|
||||
os.remove(filename)
|
||||
except Exception as e:
|
||||
print(f"so-yaml: warn — could not remove stale disk file {filename}: {e}", file=sys.stderr)
|
||||
return 0
|
||||
|
||||
if os.path.exists(filename):
|
||||
try:
|
||||
os.remove(filename)
|
||||
@@ -102,7 +216,7 @@ def purgeFile(filename):
|
||||
print(f"Failed to remove {filename}: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
if _SO_YAML_PG_AVAILABLE:
|
||||
if _BACKEND_MODE == "dual" and _SO_YAML_PG_AVAILABLE:
|
||||
try:
|
||||
ok, msg = so_yaml_postgres.purge_yaml(filename,
|
||||
reason="so-yaml purge")
|
||||
|
||||
@@ -1106,3 +1106,214 @@ class TestSoYamlPostgres(unittest.TestCase):
|
||||
{"soc": {"foo": "bar"}})
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(msg, "postgres unreachable")
|
||||
|
||||
def test_is_pg_managed_true(self):
|
||||
self.assertTrue(self.mod.is_pg_managed(
|
||||
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
|
||||
self.assertTrue(self.mod.is_pg_managed(
|
||||
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls"))
|
||||
|
||||
def test_is_pg_managed_false_for_bootstrap(self):
|
||||
self.assertFalse(self.mod.is_pg_managed(
|
||||
"/opt/so/saltstack/local/pillar/secrets.sls"))
|
||||
self.assertFalse(self.mod.is_pg_managed(
|
||||
"/opt/so/saltstack/local/pillar/postgres/auth.sls"))
|
||||
self.assertFalse(self.mod.is_pg_managed(
|
||||
"/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls"))
|
||||
|
||||
def test_read_yaml_unreachable(self):
|
||||
with patch.object(self.mod, '_is_enabled', return_value=False):
|
||||
self.assertIsNone(self.mod.read_yaml(
|
||||
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls"))
|
||||
|
||||
def test_read_yaml_skips_disk_only(self):
|
||||
with patch.object(self.mod, '_is_enabled', return_value=True):
|
||||
with self.assertRaises(self.mod.SkipPath):
|
||||
self.mod.read_yaml(
|
||||
"/opt/so/saltstack/local/pillar/secrets.sls")
|
||||
|
||||
def test_read_yaml_returns_data(self):
|
||||
with patch.object(self.mod, '_is_enabled', return_value=True):
|
||||
with patch.object(self.mod, '_docker_psql',
|
||||
return_value='{"soc": {"foo": "bar"}}\n'):
|
||||
data = self.mod.read_yaml(
|
||||
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
|
||||
self.assertEqual(data, {"soc": {"foo": "bar"}})
|
||||
|
||||
def test_read_yaml_returns_none_when_no_row(self):
|
||||
with patch.object(self.mod, '_is_enabled', return_value=True):
|
||||
with patch.object(self.mod, '_docker_psql', return_value=''):
|
||||
data = self.mod.read_yaml(
|
||||
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
|
||||
self.assertIsNone(data)
|
||||
|
||||
def test_read_yaml_minion_query_shape(self):
|
||||
captured = {}
|
||||
|
||||
def fake_psql(sql):
|
||||
captured['sql'] = sql
|
||||
return '{"host": {"mainip": "10.0.0.1"}}'
|
||||
|
||||
with patch.object(self.mod, '_is_enabled', return_value=True):
|
||||
with patch.object(self.mod, '_docker_psql', side_effect=fake_psql):
|
||||
data = self.mod.read_yaml(
|
||||
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
|
||||
self.assertEqual(data, {"host": {"mainip": "10.0.0.1"}})
|
||||
self.assertIn("scope='minion'", captured['sql'])
|
||||
self.assertIn("'h1_sensor'", captured['sql'])
|
||||
self.assertIn("'minions.h1_sensor'", captured['sql'])
|
||||
|
||||
def test_is_enabled_public_alias(self):
|
||||
with patch.object(self.mod, '_is_enabled', return_value=True):
|
||||
self.assertTrue(self.mod.is_enabled())
|
||||
with patch.object(self.mod, '_is_enabled', return_value=False):
|
||||
self.assertFalse(self.mod.is_enabled())
|
||||
|
||||
|
||||
class TestSoYamlBackendMode(unittest.TestCase):
|
||||
"""Tests so-yaml's backend-mode resolution and PG-canonical routing
|
||||
for read/write/purge. The PG calls themselves are stubbed; what we're
|
||||
asserting is that the right backend is chosen for each (mode, path)
|
||||
combination."""
|
||||
|
||||
def test_resolve_mode_env_overrides_file(self):
|
||||
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'postgres'}):
|
||||
self.assertEqual(soyaml._resolveBackendMode(), 'postgres')
|
||||
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'disk'}):
|
||||
self.assertEqual(soyaml._resolveBackendMode(), 'disk')
|
||||
|
||||
def test_resolve_mode_invalid_env_falls_back(self):
|
||||
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'garbage'}, clear=False):
|
||||
with patch('builtins.open', side_effect=IOError):
|
||||
self.assertEqual(soyaml._resolveBackendMode(), 'dual')
|
||||
|
||||
def test_resolve_mode_default_dual(self):
|
||||
env = {k: v for k, v in __import__('os').environ.items()
|
||||
if k != 'SO_YAML_BACKEND'}
|
||||
with patch.dict('os.environ', env, clear=True):
|
||||
with patch('builtins.open', side_effect=IOError):
|
||||
self.assertEqual(soyaml._resolveBackendMode(), 'dual')
|
||||
|
||||
def test_is_pg_managed_proxies(self):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
self.assertTrue(soyaml._isPgManaged(
|
||||
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
|
||||
self.assertFalse(soyaml._isPgManaged(
|
||||
"/opt/so/saltstack/local/pillar/secrets.sls"))
|
||||
|
||||
def test_is_pg_managed_false_when_module_unavailable(self):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
|
||||
self.assertFalse(soyaml._isPgManaged(
|
||||
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
|
||||
|
||||
def test_load_yaml_postgres_mode_reads_pg(self):
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
|
||||
return_value=True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'read_yaml',
|
||||
return_value={"a": 1}):
|
||||
result = soyaml.loadYaml(
|
||||
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
|
||||
self.assertEqual(result, {"a": 1})
|
||||
|
||||
def test_load_yaml_postgres_mode_returns_empty_when_no_row(self):
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
|
||||
return_value=True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'read_yaml',
|
||||
return_value=None):
|
||||
result = soyaml.loadYaml(
|
||||
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
|
||||
self.assertEqual(result, {})
|
||||
|
||||
def test_load_yaml_postgres_mode_reads_disk_for_bootstrap(self):
|
||||
import tempfile, os as _os
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
f.write("foo: bar\n")
|
||||
tmp = f.name
|
||||
try:
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres,
|
||||
'is_pg_managed', return_value=False):
|
||||
result = soyaml.loadYaml(tmp)
|
||||
self.assertEqual(result, {"foo": "bar"})
|
||||
finally:
|
||||
_os.unlink(tmp)
|
||||
|
||||
def test_write_yaml_postgres_mode_skips_disk(self):
|
||||
import tempfile, os as _os
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
tmp = f.name
|
||||
_os.unlink(tmp)
|
||||
try:
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
|
||||
return_value=True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'write_yaml',
|
||||
return_value=(True, 'ok')) as mock_w:
|
||||
soyaml.writeYaml(tmp, {"x": 1})
|
||||
self.assertFalse(_os.path.exists(tmp))
|
||||
mock_w.assert_called_once()
|
||||
finally:
|
||||
if _os.path.exists(tmp):
|
||||
_os.unlink(tmp)
|
||||
|
||||
def test_write_yaml_postgres_mode_failure_is_fatal(self):
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
|
||||
return_value=True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'write_yaml',
|
||||
return_value=(False, 'pg write failed: connection refused')):
|
||||
with patch('sys.exit', new=MagicMock()) as sysmock:
|
||||
with patch('sys.stderr', new=StringIO()) as mock_err:
|
||||
soyaml.writeYaml(
|
||||
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls",
|
||||
{"x": 1})
|
||||
sysmock.assert_called_with(1)
|
||||
|
||||
def test_write_yaml_disk_mode_skips_pg(self):
|
||||
import tempfile, os as _os
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
tmp = f.name
|
||||
try:
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'disk'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'write_yaml') as mock_w:
|
||||
soyaml.writeYaml(tmp, {"x": 1})
|
||||
mock_w.assert_not_called()
|
||||
with open(tmp) as f:
|
||||
self.assertIn('x: 1', f.read())
|
||||
finally:
|
||||
_os.unlink(tmp)
|
||||
|
||||
def test_purge_postgres_mode_calls_pg_only(self):
|
||||
import tempfile, os as _os
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
tmp = f.name
|
||||
_os.unlink(tmp)
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
|
||||
return_value=True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'purge_yaml',
|
||||
return_value=(True, 'ok')) as mock_p:
|
||||
rc = soyaml.purgeFile(tmp)
|
||||
self.assertEqual(rc, 0)
|
||||
mock_p.assert_called_once()
|
||||
|
||||
def test_purge_postgres_mode_failure_returns_nonzero(self):
|
||||
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
|
||||
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
|
||||
return_value=True):
|
||||
with patch.object(soyaml.so_yaml_postgres, 'purge_yaml',
|
||||
return_value=(False, 'pg purge failed: x')):
|
||||
with patch('sys.stderr', new=StringIO()):
|
||||
rc = soyaml.purgeFile(
|
||||
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
|
||||
self.assertEqual(rc, 1)
|
||||
|
||||
@@ -69,6 +69,12 @@ class SkipPath(Exception):
|
||||
"""Raised when a file path is intentionally not mirrored to PG."""
|
||||
|
||||
|
||||
def is_enabled():
|
||||
"""Public alias for callers that want to probe PG reachability without
|
||||
relying on a leading-underscore private name."""
|
||||
return _is_enabled()
|
||||
|
||||
|
||||
def _is_enabled():
|
||||
"""PG dual-write only fires if so-postgres is reachable. Cheap probe.
|
||||
Returns True when docker exec succeeds, False otherwise. We never
|
||||
@@ -149,6 +155,55 @@ def _conflict_target(scope):
|
||||
raise ValueError(f"unknown scope {scope!r}")
|
||||
|
||||
|
||||
def is_pg_managed(path):
|
||||
"""True if this path maps to a so_pillar.* row (locate() succeeds).
|
||||
Bootstrap and mine-driven files return False — they always live on
|
||||
disk regardless of so-yaml's backend mode."""
|
||||
try:
|
||||
locate(path)
|
||||
return True
|
||||
except SkipPath:
|
||||
return False
|
||||
|
||||
|
||||
def read_yaml(path):
|
||||
"""Return the content dict stored in so_pillar.pillar_entry for `path`,
|
||||
or None when no row exists. Raises SkipPath when `path` is not part of
|
||||
the PG-managed surface (caller should read disk in that case).
|
||||
|
||||
Used by so-yaml.py PG-canonical mode so `replace`, `get`, etc. resolve
|
||||
against the database rather than a stale (or absent) disk file."""
|
||||
if not _is_enabled():
|
||||
return None
|
||||
scope, role, minion_id, pillar_path = locate(path)
|
||||
|
||||
if scope == "minion":
|
||||
sql = ("SELECT data FROM so_pillar.pillar_entry "
|
||||
"WHERE scope='minion' "
|
||||
f"AND minion_id={_pg_str(minion_id)} "
|
||||
f"AND pillar_path={_pg_str(pillar_path)}")
|
||||
elif scope == "role":
|
||||
sql = ("SELECT data FROM so_pillar.pillar_entry "
|
||||
"WHERE scope='role' "
|
||||
f"AND role_name={_pg_str(role)} "
|
||||
f"AND pillar_path={_pg_str(pillar_path)}")
|
||||
else:
|
||||
sql = ("SELECT data FROM so_pillar.pillar_entry "
|
||||
"WHERE scope='global' "
|
||||
f"AND pillar_path={_pg_str(pillar_path)}")
|
||||
|
||||
try:
|
||||
out = _docker_psql(sql).strip()
|
||||
except Exception:
|
||||
return None
|
||||
if not out:
|
||||
return None
|
||||
try:
|
||||
return json.loads(out)
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def write_yaml(path, content_dict, *, reason="so-yaml dual-write"):
|
||||
"""Mirror the disk write at `path` (whose content was just rendered as
|
||||
`content_dict`) into so_pillar.pillar_entry. Best-effort: any failure
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
||||
# https://securityonion.net/license; you may not use this file except in compliance with the
|
||||
# Elastic License 2.0.
|
||||
|
||||
# Driven by the so_pillar_changed reactor. Translates a so_pillar.pillar_entry
|
||||
# change into (cache.clear_pillar -> saltutil.refresh_pillar -> state.apply)
|
||||
# on the appropriate target.
|
||||
#
|
||||
# Routing rules live in the DISPATCH map below — one entry per
|
||||
# (pillar_path prefix) -> (state sls, role grain). Add new services here
|
||||
# rather than wiring more reactors.
|
||||
#
|
||||
# Idempotent: state.apply is idempotent; if the pillar value didn't actually
|
||||
# change anything observable, the affected state runs a no-op. Bulk imports
|
||||
# and replays are safe.
|
||||
|
||||
{% set change = salt['pillar.get']('so_pillar_change', {}) %}
|
||||
{% set scope = change.get('scope') %}
|
||||
{% set role = change.get('role_name') %}
|
||||
{% set minion = change.get('minion_id') %}
|
||||
{% set changes = change.get('changes', []) %}
|
||||
|
||||
{# (pillar_path prefix) -> {sls: <state to apply>, role: <role grain that runs it>}
|
||||
role is a grain value (e.g. 'so-sensor'), used to compute compound targets
|
||||
when the change is global or role-scoped. #}
|
||||
{% set DISPATCH = {
|
||||
'suricata.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
|
||||
'sensor.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
|
||||
'zeek.': {'sls': 'zeek.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
|
||||
'stenographer.': {'sls': 'stenographer.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
|
||||
'pcap.': {'sls': 'pcap.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
|
||||
'logstash.': {'sls': 'logstash.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver']},
|
||||
'redis.': {'sls': 'redis.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
|
||||
'kafka.': {'sls': 'kafka.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver', 'so-searchnode']},
|
||||
'elasticsearch.': {'sls': 'elasticsearch.config','roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-searchnode', 'so-heavynode', 'so-standalone']},
|
||||
'kibana.': {'sls': 'kibana.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
|
||||
'soc.': {'sls': 'soc.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
|
||||
'telegraf.': {'sls': 'telegraf.config', 'roles': ['*']},
|
||||
'fleet.': {'sls': 'fleet.config', 'roles': ['so-fleet']},
|
||||
'strelka.': {'sls': 'strelka.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
|
||||
} %}
|
||||
|
||||
{# Collect a deduplicated set of (sls, target_kind) actions. target_kind is
|
||||
either 'minion:<id>' (scope=minion) or 'roles:so-x,so-y' (scope=role/global). #}
|
||||
{% set actions = {} %}
|
||||
|
||||
{% for c in changes %}
|
||||
{% set path = c.get('pillar_path', '') %}
|
||||
{% for prefix, action in DISPATCH.items() %}
|
||||
{% if path.startswith(prefix) %}
|
||||
{% set sls = action['sls'] %}
|
||||
{% if scope == 'minion' and minion %}
|
||||
{% set key = sls ~ '|minion|' ~ minion %}
|
||||
{% set _ = actions.update({key: {'sls': sls, 'tgt': minion, 'tgt_type': 'glob'}}) %}
|
||||
{% else %}
|
||||
{% set role_targets = action['roles'] %}
|
||||
{% if '*' in role_targets %}
|
||||
{% set tgt = '*' %}
|
||||
{% set tgt_type = 'glob' %}
|
||||
{% else %}
|
||||
{% set tgt = ('I@role:' ~ role_targets|join(' or I@role:')) %}
|
||||
{% set tgt_type = 'compound' %}
|
||||
{% endif %}
|
||||
{% set key = sls ~ '|' ~ tgt %}
|
||||
{% set _ = actions.update({key: {'sls': sls, 'tgt': tgt, 'tgt_type': tgt_type}}) %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endfor %}
|
||||
|
||||
{% if actions %}
|
||||
|
||||
{% for key, action in actions.items() %}
|
||||
{% set safe_id = loop.index0 | string %}
|
||||
|
||||
so_pillar_reload_clear_cache_{{ safe_id }}:
|
||||
salt.runner:
|
||||
- name: cache.clear_pillar
|
||||
- tgt: '{{ action.tgt }}'
|
||||
- tgt_type: '{{ action.tgt_type }}'
|
||||
|
||||
so_pillar_reload_refresh_pillar_{{ safe_id }}:
|
||||
salt.function:
|
||||
- name: saltutil.refresh_pillar
|
||||
- tgt: '{{ action.tgt }}'
|
||||
- tgt_type: '{{ action.tgt_type }}'
|
||||
- kwarg:
|
||||
wait: True
|
||||
- require:
|
||||
- salt: so_pillar_reload_clear_cache_{{ safe_id }}
|
||||
|
||||
so_pillar_reload_apply_state_{{ safe_id }}:
|
||||
salt.state:
|
||||
- tgt: '{{ action.tgt }}'
|
||||
- tgt_type: '{{ action.tgt_type }}'
|
||||
- sls:
|
||||
- {{ action.sls }}
|
||||
- queue: True
|
||||
- require:
|
||||
- salt: so_pillar_reload_refresh_pillar_{{ safe_id }}
|
||||
{% endfor %}
|
||||
|
||||
{% else %}
|
||||
|
||||
{# No DISPATCH entry matched. Pillar still gets refreshed so any other states
|
||||
read fresh values, but no service-specific reload is invoked. #}
|
||||
so_pillar_reload_unmapped_path_noop:
|
||||
test.nop
|
||||
{% do salt.log.info('orch.so_pillar_reload: no dispatch match for %s' % changes) %}
|
||||
|
||||
{% endif %}
|
||||
@@ -37,6 +37,16 @@ GRANT SELECT ON so_pillar.v_pillar_global,
|
||||
TO so_pillar_master;
|
||||
GRANT EXECUTE ON FUNCTION so_pillar.fn_pillar_secrets(text) TO so_pillar_master;
|
||||
|
||||
-- Engine reads + drains the change queue from the salt-master process. It
|
||||
-- needs SELECT to find unprocessed rows and UPDATE to mark them processed.
|
||||
-- The queue contains only locator metadata (no pillar data), so the master
|
||||
-- role's existing privilege footprint is unchanged in practice.
|
||||
GRANT SELECT, UPDATE ON so_pillar.change_queue TO so_pillar_master;
|
||||
GRANT USAGE ON SEQUENCE so_pillar.change_queue_id_seq TO so_pillar_master;
|
||||
-- Writer needs INSERT (the trigger runs as table owner, so this is just for
|
||||
-- direct testing / manual replays from psql).
|
||||
GRANT INSERT ON so_pillar.change_queue TO so_pillar_writer;
|
||||
|
||||
-- Writer needs CRUD on pillar_entry/minion/role_member plus access to seed tables.
|
||||
GRANT SELECT, INSERT, UPDATE, DELETE
|
||||
ON so_pillar.pillar_entry,
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
-- pg_notify-driven change fan-out for so_pillar.pillar_entry.
|
||||
--
|
||||
-- Two layers:
|
||||
-- 1. so_pillar.change_queue — durable, drained by the salt-master
|
||||
-- engine. Survives engine downtime,
|
||||
-- de-duplicated by id, processed once.
|
||||
-- 2. pg_notify('so_pillar_change') — wakeup signal. Payload is the
|
||||
-- change_queue row id and locator
|
||||
-- (no secret data — channels are
|
||||
-- snoopable by anyone with LISTEN).
|
||||
--
|
||||
-- The salt-master engine LISTENs on the channel for low-latency wakeup,
|
||||
-- then SELECTs unprocessed change_queue rows so a missed notification
|
||||
-- (engine restart, network blip) self-heals on the next event.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS so_pillar.change_queue (
|
||||
id bigserial PRIMARY KEY,
|
||||
scope text NOT NULL,
|
||||
role_name text,
|
||||
minion_id text,
|
||||
pillar_path text NOT NULL,
|
||||
op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')),
|
||||
enqueued_at timestamptz NOT NULL DEFAULT now(),
|
||||
processed_at timestamptz
|
||||
);
|
||||
|
||||
-- Hot index for the engine's drain query.
|
||||
CREATE INDEX IF NOT EXISTS ix_change_queue_unprocessed
|
||||
ON so_pillar.change_queue (id)
|
||||
WHERE processed_at IS NULL;
|
||||
|
||||
-- Retention index: pg_cron job in 007 sweeps processed rows older than 7d.
|
||||
CREATE INDEX IF NOT EXISTS ix_change_queue_processed_at
|
||||
ON so_pillar.change_queue (processed_at)
|
||||
WHERE processed_at IS NOT NULL;
|
||||
|
||||
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_notify()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
v_row record;
|
||||
v_id bigint;
|
||||
BEGIN
|
||||
IF TG_OP = 'DELETE' THEN
|
||||
v_row := OLD;
|
||||
ELSE
|
||||
v_row := NEW;
|
||||
END IF;
|
||||
|
||||
INSERT INTO so_pillar.change_queue
|
||||
(scope, role_name, minion_id, pillar_path, op)
|
||||
VALUES
|
||||
(v_row.scope, v_row.role_name, v_row.minion_id, v_row.pillar_path, TG_OP)
|
||||
RETURNING id INTO v_id;
|
||||
|
||||
-- Payload is the queue id + locator only. Engine joins back to
|
||||
-- pillar_entry if it needs the data — keeps secrets off the wire.
|
||||
PERFORM pg_notify('so_pillar_change', json_build_object(
|
||||
'queue_id', v_id,
|
||||
'scope', v_row.scope,
|
||||
'role_name', v_row.role_name,
|
||||
'minion_id', v_row.minion_id,
|
||||
'pillar_path', v_row.pillar_path,
|
||||
'op', TG_OP
|
||||
)::text);
|
||||
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$;
|
||||
|
||||
DROP TRIGGER IF EXISTS tg_pillar_entry_notify ON so_pillar.pillar_entry;
|
||||
CREATE TRIGGER tg_pillar_entry_notify
|
||||
AFTER INSERT OR UPDATE OR DELETE
|
||||
ON so_pillar.pillar_entry
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION so_pillar.fn_pillar_entry_notify();
|
||||
@@ -54,6 +54,10 @@ so_pillar_postgres_wait_ready:
|
||||
{% do sql_files.append('007_drift_pgcron.sql') %}
|
||||
{% endif %}
|
||||
|
||||
# 008 always applies — pg_notify-driven change fan-out is what the salt-master
|
||||
# pg_notify_pillar engine consumes. Without it reactor wiring sees no events.
|
||||
{% do sql_files.append('008_change_notify.sql') %}
|
||||
|
||||
{% for sql_file in sql_files %}
|
||||
so_pillar_apply_{{ sql_file | replace('.', '_') }}:
|
||||
cmd.run:
|
||||
@@ -87,7 +91,7 @@ so_pillar_master_key_configure:
|
||||
exit 1
|
||||
fi
|
||||
- require:
|
||||
- cmd: so_pillar_apply_006_rls_sql
|
||||
- cmd: so_pillar_apply_{{ sql_files[-1] | replace('.', '_') }}
|
||||
|
||||
# Run the importer once after the schema is in place. Idempotent — re-runs
|
||||
# with no SLS edits produce zero row changes.
|
||||
@@ -97,6 +101,29 @@ so_pillar_initial_import:
|
||||
- require:
|
||||
- cmd: so_pillar_master_key_configure
|
||||
|
||||
# Flip so-yaml from dual-write to PG-canonical for managed paths now that
|
||||
# the schema and importer are both in place. Bootstrap files (secrets.sls,
|
||||
# postgres/auth.sls, ca/init.sls, *.nodes.sls, top.sls, ...) remain on disk
|
||||
# regardless because so_yaml_postgres.locate() raises SkipPath for them.
|
||||
so_pillar_so_yaml_mode_dir:
|
||||
file.directory:
|
||||
- name: /opt/so/conf/so-yaml
|
||||
- user: socore
|
||||
- group: socore
|
||||
- mode: '0755'
|
||||
- makedirs: True
|
||||
|
||||
so_pillar_so_yaml_mode_postgres:
|
||||
file.managed:
|
||||
- name: /opt/so/conf/so-yaml/mode
|
||||
- contents: postgres
|
||||
- user: socore
|
||||
- group: socore
|
||||
- mode: '0644'
|
||||
- require:
|
||||
- file: so_pillar_so_yaml_mode_dir
|
||||
- cmd: so_pillar_initial_import
|
||||
|
||||
{% else %}
|
||||
|
||||
so_pillar_disabled_noop:
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
||||
# https://securityonion.net/license; you may not use this file except in compliance with the
|
||||
# Elastic License 2.0.
|
||||
|
||||
# Fires for every event tagged 'so/pillar/changed'. Source of those events
|
||||
# is the pg_notify_pillar engine on the salt-master, which in turn drains
|
||||
# so_pillar.change_queue (populated by the AFTER trigger on
|
||||
# so_pillar.pillar_entry — see 008_change_notify.sql).
|
||||
#
|
||||
# All routing logic — which pillar paths reload which services on which
|
||||
# targets — lives in orch.so_pillar_reload so it stays editable as one
|
||||
# YAML table without touching reactor wiring.
|
||||
|
||||
{% set payload = data.get('data', {}) %}
|
||||
{% do salt.log.info('so_pillar_changed reactor: %s' % payload) %}
|
||||
|
||||
so_pillar_dispatch_reload:
|
||||
runner.state.orchestrate:
|
||||
- args:
|
||||
- mods: orch.so_pillar_reload
|
||||
- pillar:
|
||||
so_pillar_change:
|
||||
scope: {{ payload.get('scope') | json }}
|
||||
role_name: {{ payload.get('role_name') | json }}
|
||||
minion_id: {{ payload.get('minion_id') | json }}
|
||||
changes: {{ payload.get('changes', []) | json }}
|
||||
@@ -0,0 +1,200 @@
|
||||
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
||||
# https://securityonion.net/license; you may not use this file except in compliance with the
|
||||
# Elastic License 2.0.
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
pg_notify_pillar — Salt master engine that bridges so_pillar.change_queue
|
||||
into the Salt event bus.
|
||||
|
||||
Architecture (see 008_change_notify.sql):
|
||||
pillar_entry -- AFTER trigger --> change_queue (durable)
|
||||
+ pg_notify('so_pillar_change') (wakeup)
|
||||
|
|
||||
LISTEN <-- this engine <-+
|
||||
SELECT/UPDATE change_queue
|
||||
|
|
||||
fire_event('so/pillar/changed', ...)
|
||||
|
|
||||
reactor matches tag --> orch
|
||||
|
||||
Why a queue + notify rather than just notify: pg_notify is fire-and-forget
|
||||
within a session. If the engine is down or the LISTEN connection is broken
|
||||
when a write happens, the notification is lost forever. The change_queue
|
||||
lets us recover — on (re)connect, we drain everything still flagged
|
||||
processed_at IS NULL.
|
||||
|
||||
Debounce: bulk operations (so-pillar-import, fresh installs) can fire
|
||||
hundreds of notifications per second. The engine collects whatever lands in
|
||||
a short window and emits one event per (scope, role, minion) tuple so the
|
||||
reactor isn't stampeded.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
import time
|
||||
|
||||
import salt.utils.event
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
__virtualname__ = 'pg_notify_pillar'
|
||||
|
||||
DEFAULT_CHANNEL = 'so_pillar_change'
|
||||
DEFAULT_DEBOUNCE_MS = 500
|
||||
DEFAULT_RECONNECT_BACKOFF = 5
|
||||
DEFAULT_BACKLOG_INTERVAL = 30
|
||||
DEFAULT_BATCH_LIMIT = 500
|
||||
|
||||
EVENT_TAG = 'so/pillar/changed'
|
||||
|
||||
|
||||
def __virtual__():
|
||||
try:
|
||||
import psycopg2 # noqa: F401
|
||||
return __virtualname__
|
||||
except ImportError:
|
||||
return False, 'pg_notify_pillar engine requires psycopg2'
|
||||
|
||||
|
||||
def start(dsn=None,
|
||||
host='127.0.0.1',
|
||||
port=5432,
|
||||
dbname='securityonion',
|
||||
user='so_pillar_master',
|
||||
password=None,
|
||||
channel=DEFAULT_CHANNEL,
|
||||
debounce_ms=DEFAULT_DEBOUNCE_MS,
|
||||
reconnect_backoff=DEFAULT_RECONNECT_BACKOFF,
|
||||
backlog_interval=DEFAULT_BACKLOG_INTERVAL,
|
||||
batch_limit=DEFAULT_BATCH_LIMIT,
|
||||
password_file=None):
|
||||
"""
|
||||
Run the change-queue bridge until the master shuts the engine down.
|
||||
|
||||
Either pass a full ``dsn`` string, or supply discrete kwargs. The
|
||||
password may also be read from ``password_file`` (mode 0400) so the
|
||||
engine config in ``/etc/salt/master.d/`` doesn't have to embed it
|
||||
inline — only the file path.
|
||||
"""
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
|
||||
if dsn is None:
|
||||
if password is None and password_file:
|
||||
try:
|
||||
with open(password_file, 'r') as fh:
|
||||
password = fh.read().strip()
|
||||
except (IOError, OSError) as exc:
|
||||
log.error('pg_notify_pillar: cannot read password_file %s: %s',
|
||||
password_file, exc)
|
||||
return
|
||||
dsn = _build_dsn(host=host, port=port, dbname=dbname,
|
||||
user=user, password=password)
|
||||
|
||||
bus = salt.utils.event.get_master_event(
|
||||
__opts__, __opts__['sock_dir'], listen=False)
|
||||
|
||||
log.info('pg_notify_pillar: starting (channel=%s debounce=%dms)',
|
||||
channel, debounce_ms)
|
||||
|
||||
while True:
|
||||
conn = None
|
||||
try:
|
||||
conn = psycopg2.connect(dsn)
|
||||
conn.set_isolation_level(
|
||||
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = conn.cursor()
|
||||
cur.execute('LISTEN {0}'.format(channel))
|
||||
log.info('pg_notify_pillar: connected; LISTEN %s', channel)
|
||||
|
||||
_drain(cur, bus, batch_limit)
|
||||
|
||||
while True:
|
||||
ready, _, _ = select.select([conn], [], [], backlog_interval)
|
||||
if not ready:
|
||||
_drain(cur, bus, batch_limit)
|
||||
continue
|
||||
|
||||
conn.poll()
|
||||
_consume_notifies(conn)
|
||||
|
||||
if debounce_ms > 0:
|
||||
time.sleep(debounce_ms / 1000.0)
|
||||
conn.poll()
|
||||
_consume_notifies(conn)
|
||||
|
||||
_drain(cur, bus, batch_limit)
|
||||
|
||||
except Exception as exc: # psycopg2.Error subclasses + OS errors
|
||||
log.error('pg_notify_pillar: %s; reconnecting in %ds',
|
||||
exc, reconnect_backoff)
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(reconnect_backoff)
|
||||
|
||||
|
||||
def _build_dsn(host, port, dbname, user, password):
|
||||
parts = ['host={0}'.format(host),
|
||||
'port={0}'.format(port),
|
||||
'dbname={0}'.format(dbname),
|
||||
'user={0}'.format(user)]
|
||||
if password:
|
||||
parts.append('password={0}'.format(password))
|
||||
return ' '.join(parts)
|
||||
|
||||
|
||||
def _consume_notifies(conn):
|
||||
# We don't use the payload directly — the queue table is the source of
|
||||
# truth, and draining it covers any notifications we missed. So just
|
||||
# discard them; their presence already proved there's something to drain.
|
||||
while conn.notifies:
|
||||
conn.notifies.pop(0)
|
||||
|
||||
|
||||
def _drain(cur, bus, batch_limit):
|
||||
"""Mark unprocessed change_queue rows processed and emit one event per
|
||||
(scope, role_name, minion_id) group. SKIP LOCKED so multiple masters
|
||||
sharing a Postgres don't double-process."""
|
||||
cur.execute("""
|
||||
UPDATE so_pillar.change_queue
|
||||
SET processed_at = now()
|
||||
WHERE id IN (
|
||||
SELECT id FROM so_pillar.change_queue
|
||||
WHERE processed_at IS NULL
|
||||
ORDER BY id
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT %s)
|
||||
RETURNING id, scope, role_name, minion_id, pillar_path, op
|
||||
""", (batch_limit,))
|
||||
rows = cur.fetchall()
|
||||
if not rows:
|
||||
return
|
||||
|
||||
groups = {}
|
||||
for row_id, scope, role_name, minion_id, pillar_path, op in rows:
|
||||
key = (scope, role_name, minion_id)
|
||||
groups.setdefault(key, []).append({
|
||||
'queue_id': row_id,
|
||||
'pillar_path': pillar_path,
|
||||
'op': op,
|
||||
})
|
||||
|
||||
for (scope, role_name, minion_id), changes in groups.items():
|
||||
payload = {
|
||||
'scope': scope,
|
||||
'role_name': role_name,
|
||||
'minion_id': minion_id,
|
||||
'changes': changes,
|
||||
}
|
||||
log.debug('pg_notify_pillar: firing %s for %s',
|
||||
EVENT_TAG, payload)
|
||||
bus.fire_event(payload, EVENT_TAG)
|
||||
@@ -15,6 +15,7 @@
|
||||
include:
|
||||
- salt.minion
|
||||
- salt.master.ext_pillar_postgres
|
||||
- salt.master.pg_notify_pillar_engine
|
||||
{% if 'vrt' in salt['pillar.get']('features', []) %}
|
||||
- salt.cloud
|
||||
- salt.cloud.reactor_config_hypervisor
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
# /etc/salt/master.d/pg_notify_pillar_engine.conf
|
||||
# Rendered by salt/salt/master/pg_notify_pillar_engine.sls.
|
||||
#
|
||||
# Subscribes the salt-master to so_pillar.change_queue via LISTEN
|
||||
# so_pillar_change. The engine drains queued changes and re-publishes
|
||||
# them on the event bus as 'so/pillar/changed'. Reactor wiring is in
|
||||
# so_pillar_reactor.conf.
|
||||
|
||||
engines:
|
||||
- pg_notify_pillar:
|
||||
host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }}
|
||||
port: {{ pillar.get('postgres', {}).get('port', 5432) }}
|
||||
dbname: securityonion
|
||||
user: so_pillar_master
|
||||
password: {{ pillar['secrets']['pillar_master_pass'] }}
|
||||
channel: so_pillar_change
|
||||
debounce_ms: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_debounce_ms', 500) }}
|
||||
reconnect_backoff: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_reconnect_backoff', 5) }}
|
||||
backlog_interval: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_backlog_interval', 30) }}
|
||||
batch_limit: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_batch_limit', 500) }}
|
||||
@@ -0,0 +1,12 @@
|
||||
# /etc/salt/master.d/so_pillar_reactor.conf
|
||||
# Wires the so/pillar/changed event tag — emitted by the pg_notify_pillar
|
||||
# engine — to the so_pillar_changed reactor, which dispatches to
|
||||
# orch.so_pillar_reload.
|
||||
#
|
||||
# Lives in its own file (rather than appended to reactor_hypervisor.conf)
|
||||
# so the postgres:so_pillar:enabled flag can flip it on/off independently
|
||||
# of hypervisor reactor wiring.
|
||||
|
||||
reactor:
|
||||
- 'so/pillar/changed':
|
||||
- /opt/so/saltstack/default/salt/reactor/so_pillar_changed.sls
|
||||
@@ -0,0 +1,81 @@
|
||||
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
||||
# https://securityonion.net/license; you may not use this file except in compliance with the
|
||||
# Elastic License 2.0.
|
||||
|
||||
# Deploys the pg_notify_pillar engine module + its master.d config so the
|
||||
# salt-master subscribes to so_pillar.change_queue and republishes changes
|
||||
# on the salt event bus as so/pillar/changed. Reactor (so_pillar_changed.sls)
|
||||
# matches that tag and dispatches the appropriate orch.
|
||||
#
|
||||
# Gated on the same postgres:so_pillar:enabled flag as the schema and
|
||||
# ext_pillar config so the three components flip together.
|
||||
|
||||
{% from 'allowed_states.map.jinja' import allowed_states %}
|
||||
{% if sls.split('.')[0] in allowed_states %}
|
||||
|
||||
{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %}
|
||||
|
||||
pg_notify_pillar_engine_module:
|
||||
file.managed:
|
||||
- name: /etc/salt/engines/pg_notify_pillar.py
|
||||
- source: salt://salt/engines/master/pg_notify_pillar.py
|
||||
- mode: '0644'
|
||||
- user: root
|
||||
- group: root
|
||||
- makedirs: True
|
||||
- watch_in:
|
||||
- service: salt_master_service
|
||||
|
||||
pg_notify_pillar_engine_config:
|
||||
file.managed:
|
||||
- name: /etc/salt/master.d/pg_notify_pillar_engine.conf
|
||||
- source: salt://salt/master/files/pg_notify_pillar_engine.conf.jinja
|
||||
- template: jinja
|
||||
- mode: '0640'
|
||||
- user: root
|
||||
- group: salt
|
||||
- watch_in:
|
||||
- service: salt_master_service
|
||||
|
||||
pg_notify_pillar_reactor_config:
|
||||
file.managed:
|
||||
- name: /etc/salt/master.d/so_pillar_reactor.conf
|
||||
- source: salt://salt/master/files/so_pillar_reactor.conf
|
||||
- mode: '0644'
|
||||
- user: root
|
||||
- group: root
|
||||
- watch_in:
|
||||
- service: salt_master_service
|
||||
|
||||
{% else %}
|
||||
|
||||
# When the flag flips off, peel everything back so a rollback returns to
|
||||
# pure-disk pillar with no orphan engine churning on a dead listen socket.
|
||||
pg_notify_pillar_engine_module_absent:
|
||||
file.absent:
|
||||
- name: /etc/salt/engines/pg_notify_pillar.py
|
||||
- watch_in:
|
||||
- service: salt_master_service
|
||||
|
||||
pg_notify_pillar_engine_config_absent:
|
||||
file.absent:
|
||||
- name: /etc/salt/master.d/pg_notify_pillar_engine.conf
|
||||
- watch_in:
|
||||
- service: salt_master_service
|
||||
|
||||
pg_notify_pillar_reactor_config_absent:
|
||||
file.absent:
|
||||
- name: /etc/salt/master.d/so_pillar_reactor.conf
|
||||
- watch_in:
|
||||
- service: salt_master_service
|
||||
|
||||
{% endif %}
|
||||
|
||||
{% else %}
|
||||
|
||||
{{sls}}_state_not_allowed:
|
||||
test.fail_without_changes:
|
||||
- name: {{sls}}_state_not_allowed
|
||||
|
||||
{% endif %}
|
||||
Reference in New Issue
Block a user