add so-yaml dual-write to so_pillar.* + purge verb

Hooks every so-yaml.py write through a new so_yaml_postgres helper that
mirrors disk YAML mutations into so_pillar.pillar_entry via docker exec
psql. Disk remains canonical during the transition; PG mirror failures
are logged only when a real write error occurs (skipped paths and
postgres-unreachable cases stay silent so existing callers don't see
new noise on stderr).

Adds a `purge YAML_FILE` verb on so-yaml that deletes the file from
disk and removes the matching pillar_entry rows. For minion files it
also drops the so_pillar.minion row, which CASCADEs to pillar_entry +
role_member. Designed for so-minion's delete path (replaces rm -f) so
the audit log captures the deletion.

setup/so-functions::generate_passwords + secrets_pillar generate
secrets:pillar_master_pass and /opt/so/conf/postgres/so_pillar.key on
fresh installs, and append the password to existing secrets.sls files
on upgrade.

- salt/manager/tools/sbin/so_yaml_postgres.py: locate(), write_yaml(),
  purge_yaml(), and a small CLI for diagnostics. Skips bootstrap and
  mine-driven paths via the same allowlist used by so-pillar-import.
- salt/manager/tools/sbin/so-yaml.py: import the helper, hook
  writeYaml() to mirror after every disk write, add purgeFile() and
  the purge verb.
- salt/manager/tools/sbin/so-yaml_test.py: 16 new tests covering the
  purge verb and the path-locator / write contract of so_yaml_postgres
  without contacting Postgres. All 91 tests pass.
- setup/so-functions: generate_passwords adds PILLARMASTERPASS and
  SO_PILLAR_KEY; secrets_pillar writes pillar_master_pass and the
  pgcrypto master key file.
This commit is contained in:
Mike Reeves
2026-04-30 17:09:58 -04:00
parent d30b52b327
commit 23255f88e0
4 changed files with 483 additions and 1 deletions
+71 -1
View File
@@ -13,6 +13,17 @@ import json
lockFile = "/tmp/so-yaml.lock"
# postsalt: dual-write each disk mutation into so_pillar.* in so-postgres so
# Salt's ext_pillar and SOC's PostgresConfigstore see the same data without
# requiring a separate writer. Failure of the PG side is logged but never
# fails the disk write — disk is canonical during the migration transition.
try:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import so_yaml_postgres
_SO_YAML_PG_AVAILABLE = True
except Exception as _exc:
_SO_YAML_PG_AVAILABLE = False
def showUsage(args):
print('Usage: {} <COMMAND> <YAML_FILE> [ARGS...]'.format(sys.argv[0]), file=sys.stderr)
@@ -25,6 +36,7 @@ def showUsage(args):
print(' get [-r] - Displays (to stdout) the value stored in the given key. Requires KEY arg. Use -r for raw output without YAML formatting.', file=sys.stderr)
print(' remove - Removes a yaml key, if it exists. Requires KEY arg.', file=sys.stderr)
print(' replace - Replaces (or adds) a new key and set its value. Requires KEY and VALUE args.', file=sys.stderr)
print(' purge - Delete the YAML file from disk and remove its rows from so_pillar.* (no KEY arg).', file=sys.stderr)
print(' help - Prints this usage information.', file=sys.stderr)
print('', file=sys.stderr)
print(' Where:', file=sys.stderr)
@@ -53,7 +65,52 @@ def loadYaml(filename):
def writeYaml(filename, content):
file = open(filename, "w")
return yaml.safe_dump(content, file)
result = yaml.safe_dump(content, file)
file.close()
_mirrorToPostgres(filename, content)
return result
def _mirrorToPostgres(filename, content):
"""Best-effort dual-write of a YAML mutation into so_pillar.*. Skips
files outside the PG-managed pillar surface (secrets.sls,
elasticsearch/nodes.sls, etc.) and silently degrades when so-postgres
is unreachable. Disk write is canonical; this never raises.
Only real PG failures (`pg write failed: ...`) are logged so the
common cases (skipped path, postgres not running) don't pollute
stderr."""
if not _SO_YAML_PG_AVAILABLE:
return
try:
ok, msg = so_yaml_postgres.write_yaml(filename, content,
reason="so-yaml " + " ".join(sys.argv[1:2]))
if not ok and msg.startswith("pg write failed"):
print(f"so-yaml: {msg}", file=sys.stderr)
except Exception as e: # pragma: no cover — defensive: never break disk write
print(f"so-yaml: pg mirror exception: {e}", file=sys.stderr)
def purgeFile(filename):
"""Delete a YAML file from disk and mirror the deletion into PG.
Idempotent: missing file → success. Mirrors so-yaml's other verbs
in tolerating a soft PG failure."""
if os.path.exists(filename):
try:
os.remove(filename)
except Exception as e:
print(f"Failed to remove {filename}: {e}", file=sys.stderr)
return 1
if _SO_YAML_PG_AVAILABLE:
try:
ok, msg = so_yaml_postgres.purge_yaml(filename,
reason="so-yaml purge")
if not ok and msg.startswith("pg purge failed"):
print(f"so-yaml: {msg}", file=sys.stderr)
except Exception as e:
print(f"so-yaml: pg purge exception: {e}", file=sys.stderr)
return 0
def appendItem(content, key, listItem):
@@ -371,6 +428,18 @@ def get(args):
return 0
def purge(args):
"""purge YAML_FILE — delete the file from disk and remove the matching
rows from so_pillar.* in so-postgres. Used by so-minion's delete path
(in place of `rm -f`) so the audit log captures the deletion and
role_member rows get cleaned up via FK CASCADE on so_pillar.minion."""
if len(args) != 1:
print('Missing filename arg', file=sys.stderr)
showUsage(None)
return 1
return purgeFile(args[0])
def main():
args = sys.argv[1:]
@@ -388,6 +457,7 @@ def main():
"get": get,
"remove": remove,
"replace": replace,
"purge": purge,
}
code = 1
+115
View File
@@ -991,3 +991,118 @@ class TestLoadYaml(unittest.TestCase):
soyaml.loadYaml("/tmp/so-yaml_test-unreadable.yaml")
sysmock.assert_called_with(1)
self.assertIn("Error reading file", mock_stderr.getvalue())
class TestPurge(unittest.TestCase):
def test_purge_missing_arg(self):
# showUsage calls sys.exit(1); patch it like the other tests do.
with patch('sys.exit', new=MagicMock()):
with patch('sys.stderr', new=StringIO()) as mock_stderr:
rc = soyaml.purge([])
self.assertEqual(rc, 1)
self.assertIn("Missing filename", mock_stderr.getvalue())
def test_purge_existing_file(self):
filename = "/tmp/so-yaml_test_purge.yaml"
with open(filename, "w") as f:
f.write("key: value\n")
# Disable PG mirror so the test doesn't shell out to docker.
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
rc = soyaml.purge([filename])
self.assertEqual(rc, 0)
import os as _os
self.assertFalse(_os.path.exists(filename))
def test_purge_missing_file_idempotent(self):
filename = "/tmp/so-yaml_test_purge_missing.yaml"
import os as _os
if _os.path.exists(filename):
_os.remove(filename)
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
rc = soyaml.purge([filename])
self.assertEqual(rc, 0)
class TestSoYamlPostgres(unittest.TestCase):
"""Tests the path-locator and write/purge contract of the dual-write
backend module without actually contacting Postgres."""
def setUp(self):
import importlib
self.mod = importlib.import_module("so_yaml_postgres")
def test_locate_global_soc(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(scope, "global")
self.assertIsNone(role)
self.assertIsNone(mid)
self.assertEqual(path, "soc.soc_soc")
def test_locate_global_advanced(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/soc/adv_soc.sls")
self.assertEqual(scope, "global")
self.assertEqual(path, "soc.adv_soc")
def test_locate_minion(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
self.assertEqual(scope, "minion")
self.assertEqual(mid, "h1_sensor")
self.assertEqual(path, "minions.h1_sensor")
def test_locate_minion_advanced(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/minions/adv_h1_sensor.sls")
self.assertEqual(scope, "minion")
self.assertEqual(mid, "h1_sensor")
self.assertEqual(path, "minions.adv_h1_sensor")
def test_locate_skip_secrets(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/secrets.sls")
def test_locate_skip_postgres_auth(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/postgres/auth.sls")
def test_locate_skip_mine_driven(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls")
def test_locate_skip_top(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/top.sls")
def test_locate_skip_unrelated(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/etc/hostname")
def test_pg_str_escapes(self):
self.assertEqual(self.mod._pg_str("a'b"), "'a''b'")
self.assertEqual(self.mod._pg_str(None), "NULL")
def test_conflict_target(self):
self.assertIn("scope='global'", self.mod._conflict_target("global"))
self.assertIn("scope='role'", self.mod._conflict_target("role"))
self.assertIn("scope='minion'", self.mod._conflict_target("minion"))
with self.assertRaises(ValueError):
self.mod._conflict_target("bogus")
def test_write_yaml_skips_disk_only_path(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
ok, msg = self.mod.write_yaml(
"/opt/so/saltstack/local/pillar/secrets.sls",
{"secrets": {"foo": "bar"}})
self.assertFalse(ok)
self.assertIn("disk-only", msg)
def test_write_yaml_unreachable(self):
with patch.object(self.mod, '_is_enabled', return_value=False):
ok, msg = self.mod.write_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls",
{"soc": {"foo": "bar"}})
self.assertFalse(ok)
self.assertEqual(msg, "postgres unreachable")
+265
View File
@@ -0,0 +1,265 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
"""
so_yaml_postgres — Postgres-backed dual-write helpers for so-yaml.py.
so-yaml.py writes YAML pillar files on disk; this module mirrors those
writes into so_pillar.* in so-postgres so ext_pillar and the SOC
PostgresConfigstore see the same data. During the postsalt transition
disk is canonical; PG writes are best-effort and never fail the disk
operation.
Connection: shells out to `docker exec so-postgres psql -U postgres -d
securityonion`. Same pattern so-pillar-import uses; avoids needing a
separate DSN config at install time. Performance is fine because so-yaml
is invoked from infrequent code paths (setup scripts, so-minion,
so-firewall); SOC's hot path uses the in-process pgxpool in
PostgresConfigstore, not so-yaml.
Path-to-row mapping mirrors PostgresConfigstore.locateSetting in
securityonion-soc:
/opt/so/saltstack/local/pillar/<section>/soc_<section>.sls
-> scope=global, pillar_path=<section>.soc_<section>
/opt/so/saltstack/local/pillar/<section>/adv_<section>.sls
-> scope=global, pillar_path=<section>.adv_<section>
/opt/so/saltstack/local/pillar/minions/<id>.sls
-> scope=minion, minion_id=<id>, pillar_path=minions.<id>
/opt/so/saltstack/local/pillar/minions/adv_<id>.sls
-> scope=minion, minion_id=<id>, pillar_path=minions.adv_<id>
Files outside that mapping (notably secrets.sls, postgres/auth.sls,
elasticsearch/nodes.sls, etc.) are skipped — they stay disk-only forever
or render dynamically and don't belong in PG.
"""
import json
import os
import shlex
import subprocess
import sys
DOCKER_CONTAINER = os.environ.get("SO_PILLAR_PG_CONTAINER", "so-postgres")
PG_DATABASE = os.environ.get("SO_PILLAR_PG_DATABASE", "securityonion")
PG_USER = os.environ.get("SO_PILLAR_PG_USER", "postgres")
# File paths whose mutations stay disk-only forever. Mirrors EXCLUDE_*
# in so-pillar-import.
DISK_ONLY_PATHS = (
"/opt/so/saltstack/local/pillar/secrets.sls",
"/opt/so/saltstack/local/pillar/postgres/auth.sls",
"/opt/so/saltstack/local/pillar/elasticsearch/auth.sls",
"/opt/so/saltstack/local/pillar/kibana/secrets.sls",
)
DISK_ONLY_FRAGMENTS = (
"/elasticsearch/nodes.sls",
"/redis/nodes.sls",
"/kafka/nodes.sls",
"/hypervisor/nodes.sls",
"/logstash/nodes.sls",
"/node_data/ips.sls",
"/top.sls",
)
class SkipPath(Exception):
"""Raised when a file path is intentionally not mirrored to PG."""
def _is_enabled():
"""PG dual-write only fires if so-postgres is reachable. Cheap probe.
Returns True when docker exec succeeds, False otherwise. We never
want a PG hiccup to fail a disk write on a manager whose Postgres is
momentarily unreachable."""
try:
proc = subprocess.run(
["docker", "exec", DOCKER_CONTAINER,
"pg_isready", "-h", "127.0.0.1", "-U", PG_USER, "-q"],
capture_output=True, timeout=5, check=False,
)
return proc.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return False
def locate(path):
"""Translate a so-yaml file path to (scope, role_name, minion_id, pillar_path).
Raises SkipPath when the file is not part of the PG-managed surface."""
norm = os.path.normpath(path)
if norm in DISK_ONLY_PATHS:
raise SkipPath(f"{path}: explicit disk-only allowlist")
for frag in DISK_ONLY_FRAGMENTS:
if frag in norm:
raise SkipPath(f"{path}: matches disk-only fragment {frag}")
parent = os.path.basename(os.path.dirname(norm))
grandparent = os.path.basename(os.path.dirname(os.path.dirname(norm)))
name = os.path.basename(norm)
if not name.endswith(".sls"):
raise SkipPath(f"{path}: not a .sls file")
stem = name[:-4]
if parent == "minions":
if stem.startswith("adv_"):
mid = stem[4:]
return ("minion", None, mid, f"minions.adv_{mid}")
return ("minion", None, stem, f"minions.{stem}")
# /local/pillar/<section>/<file>.sls
if grandparent == "pillar" and parent and parent != "":
if stem.startswith("soc_") or stem.startswith("adv_"):
return ("global", None, None, f"{parent}.{stem}")
raise SkipPath(f"{path}: <section>/{stem}.sls is not a soc_/adv_ file")
raise SkipPath(f"{path}: unrecognised pillar layout")
def _pg_str(s):
if s is None:
return "NULL"
return "'" + str(s).replace("'", "''") + "'"
def _docker_psql(sql):
"""Run sql via docker exec ... psql. Returns stdout. Caller catches
exceptions and downgrades to a warning."""
proc = subprocess.run(
["docker", "exec", "-i", DOCKER_CONTAINER,
"psql", "-U", PG_USER, "-d", PG_DATABASE,
"-tA", "-q", "-v", "ON_ERROR_STOP=1"],
input=sql.encode(), capture_output=True, check=False, timeout=30,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode(errors="replace") or
f"docker exec psql exit {proc.returncode}")
return proc.stdout.decode(errors="replace")
def _conflict_target(scope):
if scope == "global":
return "(pillar_path) WHERE scope='global'"
if scope == "role":
return "(role_name, pillar_path) WHERE scope='role'"
if scope == "minion":
return "(minion_id, pillar_path) WHERE scope='minion'"
raise ValueError(f"unknown scope {scope!r}")
def write_yaml(path, content_dict, *, reason="so-yaml dual-write"):
"""Mirror the disk write at `path` (whose content was just rendered as
`content_dict`) into so_pillar.pillar_entry. Best-effort: any failure
is swallowed so the caller (so-yaml.py) does not see it as a fatal."""
if not _is_enabled():
return False, "postgres unreachable"
try:
scope, role, minion_id, pillar_path = locate(path)
except SkipPath as e:
return False, str(e)
data_json = json.dumps(content_dict if content_dict is not None else {})
role_sql = _pg_str(role)
minion_sql = _pg_str(minion_id)
reason_sql = _pg_str(reason)
conflict = _conflict_target(scope)
sql_parts = []
if scope == "minion":
# FK requires the minion row before pillar_entry can reference it.
sql_parts.append(
f"INSERT INTO so_pillar.minion (minion_id) VALUES ({minion_sql}) "
"ON CONFLICT (minion_id) DO NOTHING;"
)
sql_parts.append(
"BEGIN;\n"
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);\n"
"INSERT INTO so_pillar.pillar_entry "
"(scope, role_name, minion_id, pillar_path, data, change_reason) "
f"VALUES ({_pg_str(scope)}, {role_sql}, {minion_sql}, "
f"{_pg_str(pillar_path)}, {_pg_str(data_json)}::jsonb, {reason_sql}) "
f"ON CONFLICT {conflict} DO UPDATE "
"SET data = EXCLUDED.data, change_reason = EXCLUDED.change_reason;\n"
"COMMIT;\n"
)
try:
_docker_psql("\n".join(sql_parts))
except Exception as e:
return False, f"pg write failed: {e}"
return True, "ok"
def purge_yaml(path, *, reason="so-yaml purge"):
"""Mirror the disk file deletion at `path` by deleting the matching
pillar_entry rows. For minion files also deletes the so_pillar.minion
row (CASCADE removes pillar_entry + role_member rows)."""
if not _is_enabled():
return False, "postgres unreachable"
try:
scope, role, minion_id, pillar_path = locate(path)
except SkipPath as e:
return False, str(e)
reason_sql = _pg_str(reason)
parts = ["BEGIN;",
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);"]
if scope == "minion":
# If both <id>.sls and adv_<id>.sls are gone the trigger / CASCADE
# cleans up role_member; otherwise we just remove this one row.
parts.append(
f"DELETE FROM so_pillar.pillar_entry "
f"WHERE scope='minion' AND minion_id={_pg_str(minion_id)} "
f"AND pillar_path={_pg_str(pillar_path)};"
)
parts.append(
f"DELETE FROM so_pillar.minion WHERE minion_id={_pg_str(minion_id)} "
"AND NOT EXISTS (SELECT 1 FROM so_pillar.pillar_entry "
f"WHERE minion_id={_pg_str(minion_id)});"
)
else:
parts.append(
f"DELETE FROM so_pillar.pillar_entry "
f"WHERE scope={_pg_str(scope)} AND pillar_path={_pg_str(pillar_path)};"
)
parts.append("COMMIT;")
try:
_docker_psql("\n".join(parts))
except Exception as e:
return False, f"pg purge failed: {e}"
return True, "ok"
# CLI for diagnostics. Not exercised by so-yaml.py itself.
def _main(argv):
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("op", choices=("locate", "ping"))
ap.add_argument("path", nargs="?")
args = ap.parse_args(argv)
if args.op == "ping":
ok = _is_enabled()
print("ok" if ok else "unreachable")
return 0 if ok else 1
if args.op == "locate":
if not args.path:
ap.error("locate requires PATH")
try:
scope, role, minion_id, pillar_path = locate(args.path)
print(f"scope={scope} role={role} minion_id={minion_id} pillar_path={pillar_path}")
return 0
except SkipPath as e:
print(f"SKIP: {e}", file=sys.stderr)
return 2
return 1
if __name__ == "__main__":
sys.exit(_main(sys.argv[1:]))
+32
View File
@@ -1057,6 +1057,11 @@ generate_passwords(){
POSTGRESPASS=$(get_random_value)
SOCSRVKEY=$(get_random_value 64)
IMPORTPASS=$(get_random_value)
# postsalt: salt-master connects to so_pillar.* as so_pillar_master, and the
# so-postgres container needs a symmetric key for pgcrypto-encrypted secrets.
# Both are generated here so they survive reinstall like the other secrets.
PILLARMASTERPASS=$(get_random_value)
SO_PILLAR_KEY=$(get_random_value 64)
}
generate_interface_vars() {
@@ -1853,7 +1858,34 @@ secrets_pillar(){
"secrets:"\
" import_pass: $IMPORTPASS"\
" influx_pass: $INFLUXPASS"\
" pillar_master_pass: $PILLARMASTERPASS"\
" postgres_pass: $POSTGRESPASS" > $local_salt_dir/pillar/secrets.sls
elif ! grep -q '^[[:space:]]*pillar_master_pass:' $local_salt_dir/pillar/secrets.sls; then
# Existing install pre-postsalt — append the new key without disturbing
# the values already on disk. Keys we already wrote stay; only the new
# pillar_master_pass is added.
info "Appending pillar_master_pass to existing Secrets Pillar"
if [ -z "$PILLARMASTERPASS" ]; then
PILLARMASTERPASS=$(get_random_value)
fi
printf ' pillar_master_pass: %s\n' "$PILLARMASTERPASS" >> $local_salt_dir/pillar/secrets.sls
fi
# postsalt: write the so_pillar pgcrypto master key to a 0400 file owned by
# root. The key itself is never read by Salt — schema_pillar.sls loads it
# into the so-postgres container via ALTER ROLE so_pillar_secret_owner SET
# so_pillar.master_key = '<key>'; the file just lets the value survive
# container restarts.
if [ ! -f /opt/so/conf/postgres/so_pillar.key ]; then
info "Generating so_pillar pgcrypto master key"
mkdir -p /opt/so/conf/postgres
if [ -z "$SO_PILLAR_KEY" ]; then
SO_PILLAR_KEY=$(get_random_value 64)
fi
umask 077
printf '%s' "$SO_PILLAR_KEY" > /opt/so/conf/postgres/so_pillar.key
chmod 0400 /opt/so/conf/postgres/so_pillar.key
chown root:root /opt/so/conf/postgres/so_pillar.key
fi
}