Compare commits

...

4 Commits

Author SHA1 Message Date
Mike Reeves 3d11694d51 make so-yaml PG-canonical and add pillar-change reactor stack
Two coupled changes that together let so_pillar.* be the canonical
config store, with config edits driving service reloads automatically:

so-yaml PG-canonical mode
- Adds /opt/so/conf/so-yaml/mode (and SO_YAML_BACKEND env override) with
  three values: dual (legacy), postgres (PG-only for managed paths),
  disk (emergency rollback). Bootstrap files (secrets.sls, ca/init.sls,
  *.nodes.sls, top.sls, ...) stay disk-only regardless via the existing
  SkipPath allowlist in so_yaml_postgres.locate.
- loadYaml/writeYaml/purgeFile now route to so_pillar.* in postgres
  mode: replace/add/get all read+write the database with no disk file
  ever appearing. PG failure is fatal in postgres mode (no silent
  fallback); dual mode preserves the prior best-effort mirror.
- so_yaml_postgres gains read_yaml(path), is_pg_managed(path), and
  is_enabled() so so-yaml can answer "is this path PG-managed and is
  PG up" without reaching into private helpers.
- schema_pillar.sls writes /opt/so/conf/so-yaml/mode = postgres after
  the importer succeeds, so flipping postgres:so_pillar:enabled flips
  so-yaml's behavior in lockstep with the schema being live.

pg_notify-driven change fan-out
- 008_change_notify.sql adds so_pillar.change_queue + an AFTER trigger
  on pillar_entry that enqueues the locator and pg_notifies
  'so_pillar_change'. Queue is drained at-least-once so engine restarts
  don't lose events; pg_notify is just the wakeup signal.
- New salt-master engine pg_notify_pillar.py LISTENs on the channel,
  drains the queue with FOR UPDATE SKIP LOCKED, debounces bursts, and
  fires 'so/pillar/changed' events grouped by (scope, role, minion).
- Reactor so_pillar_changed.sls catches the tag and dispatches to
  orch.so_pillar_reload, which carries a DISPATCH map of pillar-path
  prefix -> (state sls, role grain set) so adding a new service to
  the auto-reload list is a one-line edit instead of a new reactor.
- Engine + reactor wiring is gated on the same postgres:so_pillar:enabled
  flag as the schema and ext_pillar config so the whole stack flips
  on/off together.

Tests: 21 new cases (112 total, all passing) covering mode resolution,
PG-managed detection, and PG-canonical read/write/purge routing with
the PG client stubbed.
2026-05-01 09:31:48 -04:00
Mike Reeves 23255f88e0 add so-yaml dual-write to so_pillar.* + purge verb
Hooks every so-yaml.py write through a new so_yaml_postgres helper that
mirrors disk YAML mutations into so_pillar.pillar_entry via docker exec
psql. Disk remains canonical during the transition; PG mirror failures
are logged only when a real write error occurs (skipped paths and
postgres-unreachable cases stay silent so existing callers don't see
new noise on stderr).

Adds a `purge YAML_FILE` verb on so-yaml that deletes the file from
disk and removes the matching pillar_entry rows. For minion files it
also drops the so_pillar.minion row, which CASCADEs to pillar_entry +
role_member. Designed for so-minion's delete path (replaces rm -f) so
the audit log captures the deletion.

setup/so-functions::generate_passwords + secrets_pillar generate
secrets:pillar_master_pass and /opt/so/conf/postgres/so_pillar.key on
fresh installs, and append the password to existing secrets.sls files
on upgrade.

- salt/manager/tools/sbin/so_yaml_postgres.py: locate(), write_yaml(),
  purge_yaml(), and a small CLI for diagnostics. Skips bootstrap and
  mine-driven paths via the same allowlist used by so-pillar-import.
- salt/manager/tools/sbin/so-yaml.py: import the helper, hook
  writeYaml() to mirror after every disk write, add purgeFile() and
  the purge verb.
- salt/manager/tools/sbin/so-yaml_test.py: 16 new tests covering the
  purge verb and the path-locator / write contract of so_yaml_postgres
  without contacting Postgres. All 91 tests pass.
- setup/so-functions: generate_passwords adds PILLARMASTERPASS and
  SO_PILLAR_KEY; secrets_pillar writes pillar_master_pass and the
  pgcrypto master key file.
2026-04-30 17:09:58 -04:00
Mike Reeves d30b52b327 add so-pillar-import — seeds so_pillar.* from on-disk pillar tree
Idempotent importer that schema_pillar.sls runs once at end of postgres
state on first install, and that so-minion can call per-minion on add /
delete. UPSERTs into so_pillar.pillar_entry; the audit trigger handles
versioning so re-runs without SLS edits produce no version bumps.

Connects via docker exec so-postgres psql, so no DSN config is required
at first-install time. Skips bootstrap files (secrets.sls, postgres/
auth.sls, etc.), mine-driven nodes.sls files, and any file containing
Jinja templates — those stay disk-authoritative and ext_pillar_first:
False means they render before the PG overlay.

Auto-syncs to /usr/sbin via the existing manager_sbin file.recurse.
2026-04-30 16:34:05 -04:00
Mike Reeves 3fad895d6a add so_pillar schema + ext_pillar wiring (postsalt foundation)
Lays the database-backed pillar foundation for the postsalt branch. Salt
continues to read on-disk SLS first; the new ext_pillar config overlays
values from the so_pillar.* schema in so-postgres.

- salt/postgres/files/schema/pillar/00{1..7}_*.sql: idempotent DDL for
  scope/role/role_member/minion/pillar_entry/pillar_entry_history/
  drift_log, secret pgcrypto helpers, RLS, pg_cron retention.
- salt/postgres/schema_pillar.sls: applies the SQL files inside the
  so-postgres container after it's healthy, configures the master_key
  GUC, and runs so-pillar-import once. Gated on
  postgres:so_pillar:enabled feature flag (default false).
- salt/salt/master/ext_pillar_postgres.{sls,conf.jinja}: drops
  /etc/salt/master.d/ext_pillar_postgres.conf with list-form ext_pillar
  queries (global/role/minion/secrets) and ext_pillar_first: False so
  bootstrap pillars on disk render before the PG overlay.
- salt/postgres/init.sls + salt/salt/master.sls: include the new states.

Both new state branches are guarded so a default install with the flag
off is a no-op.
2026-04-30 16:30:57 -04:00
24 changed files with 2559 additions and 1 deletions
+329
View File
@@ -0,0 +1,329 @@
#!/usr/bin/env python3
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
"""
so-pillar-import — populate the so_pillar.* schema in so-postgres from the
on-disk Salt pillar tree.
Reads /opt/so/saltstack/local/pillar/, decomposes each .sls file into a
(scope, role|minion_id, pillar_path, data) tuple, and UPSERTs it into
so_pillar.pillar_entry. Idempotent — re-running with no SLS edits produces
no version bumps because the audit trigger only writes a row when data
actually changes.
Bootstrap and mine-driven files are skipped (see EXCLUDE_BASENAMES /
EXCLUDE_PREFIXES below). Files containing Jinja templates ({% or {{) are
also skipped — those stay disk-authoritative and ext_pillar_first: False
means they render before the PG overlay anyway.
All SQL goes through `docker exec so-postgres psql` so no separate DSN
config is required at first-install time. Designed to be called by
salt/postgres/schema_pillar.sls (initial seed) and by salt/manager/tools/
sbin/so-minion (per-minion sync on add/delete).
"""
import argparse
import json
import os
import shlex
import subprocess
import sys
from pathlib import Path
import yaml
PILLAR_LOCAL_ROOT = Path("/opt/so/saltstack/local/pillar")
PILLAR_DEFAULT_ROOT = Path("/opt/so/saltstack/default/pillar")
DOCKER_CONTAINER = "so-postgres"
PG_SUPERUSER = "postgres"
PG_DATABASE = "securityonion"
# Files that must NEVER move to Postgres. These are read by Salt before
# Postgres is reachable, or contain renderer-time computed values (mine, etc.).
EXCLUDE_BASENAMES = {
"secrets.sls",
"auth.sls", # postgres/auth.sls bootstrap
"top.sls",
}
# Filename prefixes to skip — these are renderer-time computed pillars
# (Salt mine, file_exists guards, etc.) that have to stay on disk.
EXCLUDE_PATH_FRAGMENTS = (
"/elasticsearch/nodes.sls",
"/redis/nodes.sls",
"/kafka/nodes.sls",
"/hypervisor/nodes.sls",
"/logstash/nodes.sls",
"/node_data/ips.sls",
"/postgres/auth.sls",
"/elasticsearch/auth.sls",
"/kibana/secrets.sls",
)
def log(level, msg):
print(f"[{level}] {msg}", file=sys.stderr)
def is_jinja_templated(content_bytes):
return b"{%" in content_bytes or b"{{" in content_bytes
def classify(path):
"""Return (scope, role_name, minion_id, pillar_path) for a pillar file
or None to skip it. role_name is None for now — the importer leaves role
membership to the so_pillar.minion trigger and the salt/auth reactor."""
rel_str = str(path)
if path.name in EXCLUDE_BASENAMES:
return None
for frag in EXCLUDE_PATH_FRAGMENTS:
if frag in rel_str:
return None
# /local/pillar/minions/<id>.sls or adv_<id>.sls
if path.parent.name == "minions":
stem = path.stem # filename without .sls
if stem.startswith("adv_"):
mid = stem[4:]
return ("minion", None, mid, f"minions.adv_{mid}")
return ("minion", None, stem, f"minions.{stem}")
# /local/pillar/<section>/<file>.sls
if path.parent.parent == PILLAR_LOCAL_ROOT or path.parent.parent == PILLAR_DEFAULT_ROOT:
section = path.parent.name
stem = path.stem
# Only soc_<section>.sls and adv_<section>.sls are SOC-managed pillar
# surfaces. Other files (e.g. nodes.sls, auth.sls, *.token) are
# either covered by EXCLUDE_PATH_FRAGMENTS or are bootstrap surfaces
# we leave alone for now.
if stem.startswith("soc_") or stem.startswith("adv_"):
return ("global", None, None, f"{section}.{stem}")
return None
return None
def parse_yaml_file(path):
with open(path, "rb") as f:
content = f.read()
if not content.strip():
return {}
if is_jinja_templated(content):
return None
data = yaml.safe_load(content)
if data is None:
return {}
if not isinstance(data, dict):
return {"_raw": data}
return data
def derive_node_type(minion_id):
"""Conventional Security Onion minion ids are <host>_<role>. Take the
last underscore-delimited token as the canonical role suffix."""
parts = minion_id.rsplit("_", 1)
if len(parts) == 2:
return parts[1]
return None
def docker_psql(sql, *, db=PG_DATABASE, user=PG_SUPERUSER, on_error_stop=True, capture=True):
"""Run sql via docker exec ... psql. Returns stdout as str."""
args = [
"docker", "exec", "-i", DOCKER_CONTAINER,
"psql", "-U", user, "-d", db, "-tA", "-q",
]
if on_error_stop:
args += ["-v", "ON_ERROR_STOP=1"]
proc = subprocess.run(
args, input=sql.encode(),
capture_output=capture, check=False,
)
if proc.returncode != 0:
sys.stderr.write(proc.stderr.decode(errors="replace"))
raise RuntimeError(f"docker exec psql failed (rc={proc.returncode})")
return proc.stdout.decode(errors="replace")
def upsert_minion(minion_id, node_type):
sql = (
"INSERT INTO so_pillar.minion (minion_id, node_type) "
f"VALUES ({pg_str(minion_id)}, {pg_str(node_type) if node_type else 'NULL'}) "
"ON CONFLICT (minion_id) DO UPDATE SET node_type = EXCLUDED.node_type;"
)
docker_psql(sql)
def delete_minion(minion_id):
"""CASCADE removes pillar_entry + role_member rows."""
sql = f"DELETE FROM so_pillar.minion WHERE minion_id = {pg_str(minion_id)};"
docker_psql(sql)
def upsert_pillar_entry(scope, role_name, minion_id, pillar_path, data, reason):
"""Insert or update the row keyed by the partial unique index that
matches scope. Audit trigger handles history; versioning trigger bumps
version only when data changes."""
data_json = json.dumps(data)
role_sql = pg_str(role_name) if role_name else "NULL"
minion_sql = pg_str(minion_id) if minion_id else "NULL"
reason_sql = pg_str(reason)
if scope == "global":
conflict = "(pillar_path) WHERE scope='global'"
elif scope == "role":
conflict = "(role_name, pillar_path) WHERE scope='role'"
elif scope == "minion":
conflict = "(minion_id, pillar_path) WHERE scope='minion'"
else:
raise ValueError(f"unknown scope {scope!r}")
sql = (
"BEGIN;\n"
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);\n"
f"INSERT INTO so_pillar.pillar_entry "
f"(scope, role_name, minion_id, pillar_path, data, change_reason) "
f"VALUES ({pg_str(scope)}, {role_sql}, {minion_sql}, {pg_str(pillar_path)}, {pg_jsonb(data_json)}, {reason_sql}) "
f"ON CONFLICT {conflict} DO UPDATE "
f"SET data = EXCLUDED.data, change_reason = EXCLUDED.change_reason;\n"
"COMMIT;\n"
)
docker_psql(sql)
def pg_str(s):
"""Escape a Python str for inclusion in literal SQL. Pillar content has
already been validated as YAML; we just need standard SQL escaping."""
if s is None:
return "NULL"
return "'" + str(s).replace("'", "''") + "'"
def pg_jsonb(json_str):
return pg_str(json_str) + "::jsonb"
def walk_pillar_root(root, paths):
if not root.is_dir():
return
for path in root.rglob("*.sls"):
if path.is_file():
paths.append(path)
def import_minion(minion_id, node_type, dry_run, reason):
"""Re-import every pillar file for a single minion."""
if not minion_id:
raise ValueError("minion_id required for --scope minion")
upsert_minion(minion_id, node_type)
log("INFO", f"Upserted minion row {minion_id} (node_type={node_type})")
targets = [
PILLAR_LOCAL_ROOT / "minions" / f"{minion_id}.sls",
PILLAR_LOCAL_ROOT / "minions" / f"adv_{minion_id}.sls",
]
for path in targets:
if not path.exists():
log("INFO", f" (no file at {path})")
continue
klass = classify(path)
if not klass:
log("INFO", f" skip {path} (excluded)")
continue
scope, role, mid, pillar_path = klass
data = parse_yaml_file(path)
if data is None:
log("WARN", f" skip {path} (Jinja-templated; stays disk-only)")
continue
if dry_run:
log("DRY", f" would upsert {scope}/{pillar_path} = {len(json.dumps(data))} bytes")
continue
upsert_pillar_entry(scope, role, mid, pillar_path, data, reason)
log("INFO", f" imported {scope}/{pillar_path}")
def import_all(dry_run, reason):
"""Walk the entire local pillar tree and import every eligible file."""
paths = []
walk_pillar_root(PILLAR_LOCAL_ROOT, paths)
imported = 0
skipped = 0
minions_seen = set()
for path in sorted(paths):
klass = classify(path)
if not klass:
skipped += 1
continue
scope, role, minion_id, pillar_path = klass
data = parse_yaml_file(path)
if data is None:
log("WARN", f"skip {path} (Jinja-templated; stays disk-only)")
skipped += 1
continue
if scope == "minion" and minion_id not in minions_seen:
node_type = derive_node_type(minion_id)
if not dry_run:
upsert_minion(minion_id, node_type)
minions_seen.add(minion_id)
if dry_run:
log("DRY", f"would upsert {scope}/{pillar_path} ({len(json.dumps(data))} bytes)")
else:
upsert_pillar_entry(scope, role, minion_id, pillar_path, data, reason)
log("INFO", f"imported {scope}/{pillar_path}")
imported += 1
log("INFO", f"done: {imported} imported, {skipped} skipped")
def main():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--scope", choices=("global", "role", "minion", "all"), default="all")
ap.add_argument("--minion-id")
ap.add_argument("--node-type", help="override node_type for --scope minion (default: derived from minion_id)")
ap.add_argument("--delete", action="store_true",
help="With --scope minion, remove the minion row (and its pillar rows via CASCADE)")
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--diff", action="store_true",
help="(reserved) print structural diffs vs current DB content")
ap.add_argument("--yes", action="store_true",
help="Skip confirmation prompts (currently unused; reserved)")
ap.add_argument("--reason", default="so-pillar-import",
help="change_reason recorded in pillar_entry_history")
args = ap.parse_args()
try:
if args.scope == "minion":
if not args.minion_id:
ap.error("--minion-id required when --scope minion")
if args.delete:
if args.dry_run:
log("DRY", f"would delete {args.minion_id}")
else:
delete_minion(args.minion_id)
log("INFO", f"deleted {args.minion_id}")
else:
node_type = args.node_type or derive_node_type(args.minion_id)
import_minion(args.minion_id, node_type, args.dry_run, args.reason)
elif args.scope == "all":
import_all(args.dry_run, args.reason)
else:
log("ERROR", f"--scope {args.scope} not yet implemented; use --scope all or --scope minion")
return 2
except Exception as e:
log("ERROR", str(e))
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
+185 -1
View File
@@ -13,6 +13,64 @@ import json
lockFile = "/tmp/so-yaml.lock"
# postsalt: so-yaml supports three backend modes for PG-managed pillar paths:
#
# dual — write disk + mirror to so_pillar.*. Reads from disk.
# Used during the migration transition when disk is still
# canonical and PG runs as a shadow.
# postgres — write to so_pillar.* only. Reads from so_pillar.*. No disk
# file is touched. The end state once cutover is complete.
# disk — disk only, no PG. Emergency rollback escape hatch.
#
# Bootstrap and mine-driven files (secrets.sls, ca/init.sls, */nodes.sls,
# top.sls, etc.) are always handled on disk regardless of mode — those paths
# are explicitly excluded by so_yaml_postgres.locate() raising SkipPath.
#
# Mode resolution: SO_YAML_BACKEND env var, then /opt/so/conf/so-yaml/mode,
# then default 'dual' (safe upgrade behavior — flipping to 'postgres' is
# done by schema_pillar.sls after the schema is in place and the importer
# has run at least once).
MODE_FILE = "/opt/so/conf/so-yaml/mode"
VALID_MODES = ("dual", "postgres", "disk")
DEFAULT_MODE = "dual"
try:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import so_yaml_postgres
_SO_YAML_PG_AVAILABLE = True
except Exception as _exc:
_SO_YAML_PG_AVAILABLE = False
def _resolveBackendMode():
env = os.environ.get("SO_YAML_BACKEND")
if env and env in VALID_MODES:
return env
try:
with open(MODE_FILE, "r") as fh:
value = fh.read().strip()
if value in VALID_MODES:
return value
except (IOError, OSError):
pass
return DEFAULT_MODE
_BACKEND_MODE = _resolveBackendMode()
def _isPgManaged(filename):
"""True when so-yaml should route this file's reads/writes through
so_pillar.*. False for bootstrap/mine-driven files that always live on
disk, and for arbitrary YAML paths outside the pillar tree."""
if not _SO_YAML_PG_AVAILABLE:
return False
try:
return so_yaml_postgres.is_pg_managed(filename)
except Exception:
return False
def showUsage(args):
print('Usage: {} <COMMAND> <YAML_FILE> [ARGS...]'.format(sys.argv[0]), file=sys.stderr)
@@ -25,8 +83,14 @@ def showUsage(args):
print(' get [-r] - Displays (to stdout) the value stored in the given key. Requires KEY arg. Use -r for raw output without YAML formatting.', file=sys.stderr)
print(' remove - Removes a yaml key, if it exists. Requires KEY arg.', file=sys.stderr)
print(' replace - Replaces (or adds) a new key and set its value. Requires KEY and VALUE args.', file=sys.stderr)
print(' purge - Delete the YAML file from disk and remove its rows from so_pillar.* (no KEY arg).', file=sys.stderr)
print(' help - Prints this usage information.', file=sys.stderr)
print('', file=sys.stderr)
print(' Backend mode:', file=sys.stderr)
print(' Resolved from $SO_YAML_BACKEND, then /opt/so/conf/so-yaml/mode, default "dual".', file=sys.stderr)
print(' Valid values: dual | postgres | disk. Bootstrap pillar files (secrets, ca, *.nodes.sls)', file=sys.stderr)
print(' are always handled on disk regardless of mode.', file=sys.stderr)
print('', file=sys.stderr)
print(' Where:', file=sys.stderr)
print(' YAML_FILE - Path to the file that will be modified. Ex: /opt/so/conf/service/conf.yaml', file=sys.stderr)
print(' KEY - YAML key, does not support \' or " characters at this time. Ex: level1.level2', file=sys.stderr)
@@ -39,6 +103,24 @@ def showUsage(args):
def loadYaml(filename):
"""Load a YAML file's content as a dict.
PG-canonical mode (`postgres`): for PG-managed paths, read from
so_pillar.pillar_entry. A missing row is treated as an empty dict so
that `replace`/`add` on a fresh path can populate it from scratch.
Other modes / non-PG-managed paths: read from disk as today.
"""
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
try:
data = so_yaml_postgres.read_yaml(filename)
except so_yaml_postgres.SkipPath:
data = None
except Exception as e:
print(f"so-yaml: pg read failed for {filename}: {e}", file=sys.stderr)
sys.exit(1)
return data if data is not None else {}
try:
with open(filename, "r") as file:
content = file.read()
@@ -52,8 +134,97 @@ def loadYaml(filename):
def writeYaml(filename, content):
"""Persist `content` for `filename`.
PG-canonical mode + PG-managed path: write only to so_pillar.*. A PG
failure is fatal (no disk fallback) — caller must retry.
Dual mode: write disk, then mirror to PG (failures are warnings).
Disk mode or non-PG-managed path: write disk only.
"""
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
if not _SO_YAML_PG_AVAILABLE:
print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr)
sys.exit(1)
ok, msg = so_yaml_postgres.write_yaml(
filename, content,
reason="so-yaml " + " ".join(sys.argv[1:2]))
if not ok:
print(f"so-yaml: pg write failed for {filename}: {msg}", file=sys.stderr)
sys.exit(1)
return None
file = open(filename, "w")
return yaml.safe_dump(content, file)
result = yaml.safe_dump(content, file)
file.close()
if _BACKEND_MODE == "dual":
_mirrorToPostgres(filename, content)
return result
def _mirrorToPostgres(filename, content):
"""Best-effort dual-write of a YAML mutation into so_pillar.*. Skips
files outside the PG-managed pillar surface (secrets.sls,
elasticsearch/nodes.sls, etc.) and silently degrades when so-postgres
is unreachable. Disk write is canonical in dual mode; this never
raises.
Only real PG failures (`pg write failed: ...`) are logged so the
common cases (skipped path, postgres not running) don't pollute
stderr."""
if not _SO_YAML_PG_AVAILABLE:
return
try:
ok, msg = so_yaml_postgres.write_yaml(filename, content,
reason="so-yaml " + " ".join(sys.argv[1:2]))
if not ok and msg.startswith("pg write failed"):
print(f"so-yaml: {msg}", file=sys.stderr)
except Exception as e: # pragma: no cover — defensive: never break disk write
print(f"so-yaml: pg mirror exception: {e}", file=sys.stderr)
def purgeFile(filename):
"""Delete a YAML file from disk and remove the matching rows from
so_pillar.*. Idempotent — missing file/row counts as success.
PG-canonical mode + PG-managed path: PG delete is canonical. If a stale
disk file from the dual-write era happens to still exist, it's removed
too as a cleanup courtesy. PG failure is fatal in this mode.
Dual / disk modes: remove disk first; PG cleanup is best-effort."""
if _BACKEND_MODE == "postgres" and _isPgManaged(filename):
if not _SO_YAML_PG_AVAILABLE:
print("so-yaml: PG-canonical mode requires so_yaml_postgres module", file=sys.stderr)
return 1
ok, msg = so_yaml_postgres.purge_yaml(filename, reason="so-yaml purge")
if not ok:
print(f"so-yaml: pg purge failed for {filename}: {msg}", file=sys.stderr)
return 1
if os.path.exists(filename):
try:
os.remove(filename)
except Exception as e:
print(f"so-yaml: warn — could not remove stale disk file {filename}: {e}", file=sys.stderr)
return 0
if os.path.exists(filename):
try:
os.remove(filename)
except Exception as e:
print(f"Failed to remove {filename}: {e}", file=sys.stderr)
return 1
if _BACKEND_MODE == "dual" and _SO_YAML_PG_AVAILABLE:
try:
ok, msg = so_yaml_postgres.purge_yaml(filename,
reason="so-yaml purge")
if not ok and msg.startswith("pg purge failed"):
print(f"so-yaml: {msg}", file=sys.stderr)
except Exception as e:
print(f"so-yaml: pg purge exception: {e}", file=sys.stderr)
return 0
def appendItem(content, key, listItem):
@@ -371,6 +542,18 @@ def get(args):
return 0
def purge(args):
"""purge YAML_FILE — delete the file from disk and remove the matching
rows from so_pillar.* in so-postgres. Used by so-minion's delete path
(in place of `rm -f`) so the audit log captures the deletion and
role_member rows get cleaned up via FK CASCADE on so_pillar.minion."""
if len(args) != 1:
print('Missing filename arg', file=sys.stderr)
showUsage(None)
return 1
return purgeFile(args[0])
def main():
args = sys.argv[1:]
@@ -388,6 +571,7 @@ def main():
"get": get,
"remove": remove,
"replace": replace,
"purge": purge,
}
code = 1
+326
View File
@@ -991,3 +991,329 @@ class TestLoadYaml(unittest.TestCase):
soyaml.loadYaml("/tmp/so-yaml_test-unreadable.yaml")
sysmock.assert_called_with(1)
self.assertIn("Error reading file", mock_stderr.getvalue())
class TestPurge(unittest.TestCase):
def test_purge_missing_arg(self):
# showUsage calls sys.exit(1); patch it like the other tests do.
with patch('sys.exit', new=MagicMock()):
with patch('sys.stderr', new=StringIO()) as mock_stderr:
rc = soyaml.purge([])
self.assertEqual(rc, 1)
self.assertIn("Missing filename", mock_stderr.getvalue())
def test_purge_existing_file(self):
filename = "/tmp/so-yaml_test_purge.yaml"
with open(filename, "w") as f:
f.write("key: value\n")
# Disable PG mirror so the test doesn't shell out to docker.
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
rc = soyaml.purge([filename])
self.assertEqual(rc, 0)
import os as _os
self.assertFalse(_os.path.exists(filename))
def test_purge_missing_file_idempotent(self):
filename = "/tmp/so-yaml_test_purge_missing.yaml"
import os as _os
if _os.path.exists(filename):
_os.remove(filename)
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
rc = soyaml.purge([filename])
self.assertEqual(rc, 0)
class TestSoYamlPostgres(unittest.TestCase):
"""Tests the path-locator and write/purge contract of the dual-write
backend module without actually contacting Postgres."""
def setUp(self):
import importlib
self.mod = importlib.import_module("so_yaml_postgres")
def test_locate_global_soc(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(scope, "global")
self.assertIsNone(role)
self.assertIsNone(mid)
self.assertEqual(path, "soc.soc_soc")
def test_locate_global_advanced(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/soc/adv_soc.sls")
self.assertEqual(scope, "global")
self.assertEqual(path, "soc.adv_soc")
def test_locate_minion(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
self.assertEqual(scope, "minion")
self.assertEqual(mid, "h1_sensor")
self.assertEqual(path, "minions.h1_sensor")
def test_locate_minion_advanced(self):
scope, role, mid, path = self.mod.locate(
"/opt/so/saltstack/local/pillar/minions/adv_h1_sensor.sls")
self.assertEqual(scope, "minion")
self.assertEqual(mid, "h1_sensor")
self.assertEqual(path, "minions.adv_h1_sensor")
def test_locate_skip_secrets(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/secrets.sls")
def test_locate_skip_postgres_auth(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/postgres/auth.sls")
def test_locate_skip_mine_driven(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls")
def test_locate_skip_top(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/opt/so/saltstack/local/pillar/top.sls")
def test_locate_skip_unrelated(self):
with self.assertRaises(self.mod.SkipPath):
self.mod.locate("/etc/hostname")
def test_pg_str_escapes(self):
self.assertEqual(self.mod._pg_str("a'b"), "'a''b'")
self.assertEqual(self.mod._pg_str(None), "NULL")
def test_conflict_target(self):
self.assertIn("scope='global'", self.mod._conflict_target("global"))
self.assertIn("scope='role'", self.mod._conflict_target("role"))
self.assertIn("scope='minion'", self.mod._conflict_target("minion"))
with self.assertRaises(ValueError):
self.mod._conflict_target("bogus")
def test_write_yaml_skips_disk_only_path(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
ok, msg = self.mod.write_yaml(
"/opt/so/saltstack/local/pillar/secrets.sls",
{"secrets": {"foo": "bar"}})
self.assertFalse(ok)
self.assertIn("disk-only", msg)
def test_write_yaml_unreachable(self):
with patch.object(self.mod, '_is_enabled', return_value=False):
ok, msg = self.mod.write_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls",
{"soc": {"foo": "bar"}})
self.assertFalse(ok)
self.assertEqual(msg, "postgres unreachable")
def test_is_pg_managed_true(self):
self.assertTrue(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
self.assertTrue(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls"))
def test_is_pg_managed_false_for_bootstrap(self):
self.assertFalse(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/secrets.sls"))
self.assertFalse(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/postgres/auth.sls"))
self.assertFalse(self.mod.is_pg_managed(
"/opt/so/saltstack/local/pillar/elasticsearch/nodes.sls"))
def test_read_yaml_unreachable(self):
with patch.object(self.mod, '_is_enabled', return_value=False):
self.assertIsNone(self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls"))
def test_read_yaml_skips_disk_only(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
with self.assertRaises(self.mod.SkipPath):
self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/secrets.sls")
def test_read_yaml_returns_data(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
with patch.object(self.mod, '_docker_psql',
return_value='{"soc": {"foo": "bar"}}\n'):
data = self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(data, {"soc": {"foo": "bar"}})
def test_read_yaml_returns_none_when_no_row(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
with patch.object(self.mod, '_docker_psql', return_value=''):
data = self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertIsNone(data)
def test_read_yaml_minion_query_shape(self):
captured = {}
def fake_psql(sql):
captured['sql'] = sql
return '{"host": {"mainip": "10.0.0.1"}}'
with patch.object(self.mod, '_is_enabled', return_value=True):
with patch.object(self.mod, '_docker_psql', side_effect=fake_psql):
data = self.mod.read_yaml(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
self.assertEqual(data, {"host": {"mainip": "10.0.0.1"}})
self.assertIn("scope='minion'", captured['sql'])
self.assertIn("'h1_sensor'", captured['sql'])
self.assertIn("'minions.h1_sensor'", captured['sql'])
def test_is_enabled_public_alias(self):
with patch.object(self.mod, '_is_enabled', return_value=True):
self.assertTrue(self.mod.is_enabled())
with patch.object(self.mod, '_is_enabled', return_value=False):
self.assertFalse(self.mod.is_enabled())
class TestSoYamlBackendMode(unittest.TestCase):
"""Tests so-yaml's backend-mode resolution and PG-canonical routing
for read/write/purge. The PG calls themselves are stubbed; what we're
asserting is that the right backend is chosen for each (mode, path)
combination."""
def test_resolve_mode_env_overrides_file(self):
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'postgres'}):
self.assertEqual(soyaml._resolveBackendMode(), 'postgres')
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'disk'}):
self.assertEqual(soyaml._resolveBackendMode(), 'disk')
def test_resolve_mode_invalid_env_falls_back(self):
with patch.dict('os.environ', {'SO_YAML_BACKEND': 'garbage'}, clear=False):
with patch('builtins.open', side_effect=IOError):
self.assertEqual(soyaml._resolveBackendMode(), 'dual')
def test_resolve_mode_default_dual(self):
env = {k: v for k, v in __import__('os').environ.items()
if k != 'SO_YAML_BACKEND'}
with patch.dict('os.environ', env, clear=True):
with patch('builtins.open', side_effect=IOError):
self.assertEqual(soyaml._resolveBackendMode(), 'dual')
def test_is_pg_managed_proxies(self):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
self.assertTrue(soyaml._isPgManaged(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
self.assertFalse(soyaml._isPgManaged(
"/opt/so/saltstack/local/pillar/secrets.sls"))
def test_is_pg_managed_false_when_module_unavailable(self):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', False):
self.assertFalse(soyaml._isPgManaged(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls"))
def test_load_yaml_postgres_mode_reads_pg(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'read_yaml',
return_value={"a": 1}):
result = soyaml.loadYaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(result, {"a": 1})
def test_load_yaml_postgres_mode_returns_empty_when_no_row(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'read_yaml',
return_value=None):
result = soyaml.loadYaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls")
self.assertEqual(result, {})
def test_load_yaml_postgres_mode_reads_disk_for_bootstrap(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write("foo: bar\n")
tmp = f.name
try:
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres,
'is_pg_managed', return_value=False):
result = soyaml.loadYaml(tmp)
self.assertEqual(result, {"foo": "bar"})
finally:
_os.unlink(tmp)
def test_write_yaml_postgres_mode_skips_disk(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
tmp = f.name
_os.unlink(tmp)
try:
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'write_yaml',
return_value=(True, 'ok')) as mock_w:
soyaml.writeYaml(tmp, {"x": 1})
self.assertFalse(_os.path.exists(tmp))
mock_w.assert_called_once()
finally:
if _os.path.exists(tmp):
_os.unlink(tmp)
def test_write_yaml_postgres_mode_failure_is_fatal(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'write_yaml',
return_value=(False, 'pg write failed: connection refused')):
with patch('sys.exit', new=MagicMock()) as sysmock:
with patch('sys.stderr', new=StringIO()) as mock_err:
soyaml.writeYaml(
"/opt/so/saltstack/local/pillar/soc/soc_soc.sls",
{"x": 1})
sysmock.assert_called_with(1)
def test_write_yaml_disk_mode_skips_pg(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
tmp = f.name
try:
with patch.object(soyaml, '_BACKEND_MODE', 'disk'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'write_yaml') as mock_w:
soyaml.writeYaml(tmp, {"x": 1})
mock_w.assert_not_called()
with open(tmp) as f:
self.assertIn('x: 1', f.read())
finally:
_os.unlink(tmp)
def test_purge_postgres_mode_calls_pg_only(self):
import tempfile, os as _os
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
tmp = f.name
_os.unlink(tmp)
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'purge_yaml',
return_value=(True, 'ok')) as mock_p:
rc = soyaml.purgeFile(tmp)
self.assertEqual(rc, 0)
mock_p.assert_called_once()
def test_purge_postgres_mode_failure_returns_nonzero(self):
with patch.object(soyaml, '_BACKEND_MODE', 'postgres'):
with patch.object(soyaml, '_SO_YAML_PG_AVAILABLE', True):
with patch.object(soyaml.so_yaml_postgres, 'is_pg_managed',
return_value=True):
with patch.object(soyaml.so_yaml_postgres, 'purge_yaml',
return_value=(False, 'pg purge failed: x')):
with patch('sys.stderr', new=StringIO()):
rc = soyaml.purgeFile(
"/opt/so/saltstack/local/pillar/minions/h1_sensor.sls")
self.assertEqual(rc, 1)
+320
View File
@@ -0,0 +1,320 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
"""
so_yaml_postgres Postgres-backed dual-write helpers for so-yaml.py.
so-yaml.py writes YAML pillar files on disk; this module mirrors those
writes into so_pillar.* in so-postgres so ext_pillar and the SOC
PostgresConfigstore see the same data. During the postsalt transition
disk is canonical; PG writes are best-effort and never fail the disk
operation.
Connection: shells out to `docker exec so-postgres psql -U postgres -d
securityonion`. Same pattern so-pillar-import uses; avoids needing a
separate DSN config at install time. Performance is fine because so-yaml
is invoked from infrequent code paths (setup scripts, so-minion,
so-firewall); SOC's hot path uses the in-process pgxpool in
PostgresConfigstore, not so-yaml.
Path-to-row mapping mirrors PostgresConfigstore.locateSetting in
securityonion-soc:
/opt/so/saltstack/local/pillar/<section>/soc_<section>.sls
-> scope=global, pillar_path=<section>.soc_<section>
/opt/so/saltstack/local/pillar/<section>/adv_<section>.sls
-> scope=global, pillar_path=<section>.adv_<section>
/opt/so/saltstack/local/pillar/minions/<id>.sls
-> scope=minion, minion_id=<id>, pillar_path=minions.<id>
/opt/so/saltstack/local/pillar/minions/adv_<id>.sls
-> scope=minion, minion_id=<id>, pillar_path=minions.adv_<id>
Files outside that mapping (notably secrets.sls, postgres/auth.sls,
elasticsearch/nodes.sls, etc.) are skipped they stay disk-only forever
or render dynamically and don't belong in PG.
"""
import json
import os
import shlex
import subprocess
import sys
DOCKER_CONTAINER = os.environ.get("SO_PILLAR_PG_CONTAINER", "so-postgres")
PG_DATABASE = os.environ.get("SO_PILLAR_PG_DATABASE", "securityonion")
PG_USER = os.environ.get("SO_PILLAR_PG_USER", "postgres")
# File paths whose mutations stay disk-only forever. Mirrors EXCLUDE_*
# in so-pillar-import.
DISK_ONLY_PATHS = (
"/opt/so/saltstack/local/pillar/secrets.sls",
"/opt/so/saltstack/local/pillar/postgres/auth.sls",
"/opt/so/saltstack/local/pillar/elasticsearch/auth.sls",
"/opt/so/saltstack/local/pillar/kibana/secrets.sls",
)
DISK_ONLY_FRAGMENTS = (
"/elasticsearch/nodes.sls",
"/redis/nodes.sls",
"/kafka/nodes.sls",
"/hypervisor/nodes.sls",
"/logstash/nodes.sls",
"/node_data/ips.sls",
"/top.sls",
)
class SkipPath(Exception):
"""Raised when a file path is intentionally not mirrored to PG."""
def is_enabled():
"""Public alias for callers that want to probe PG reachability without
relying on a leading-underscore private name."""
return _is_enabled()
def _is_enabled():
"""PG dual-write only fires if so-postgres is reachable. Cheap probe.
Returns True when docker exec succeeds, False otherwise. We never
want a PG hiccup to fail a disk write on a manager whose Postgres is
momentarily unreachable."""
try:
proc = subprocess.run(
["docker", "exec", DOCKER_CONTAINER,
"pg_isready", "-h", "127.0.0.1", "-U", PG_USER, "-q"],
capture_output=True, timeout=5, check=False,
)
return proc.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return False
def locate(path):
"""Translate a so-yaml file path to (scope, role_name, minion_id, pillar_path).
Raises SkipPath when the file is not part of the PG-managed surface."""
norm = os.path.normpath(path)
if norm in DISK_ONLY_PATHS:
raise SkipPath(f"{path}: explicit disk-only allowlist")
for frag in DISK_ONLY_FRAGMENTS:
if frag in norm:
raise SkipPath(f"{path}: matches disk-only fragment {frag}")
parent = os.path.basename(os.path.dirname(norm))
grandparent = os.path.basename(os.path.dirname(os.path.dirname(norm)))
name = os.path.basename(norm)
if not name.endswith(".sls"):
raise SkipPath(f"{path}: not a .sls file")
stem = name[:-4]
if parent == "minions":
if stem.startswith("adv_"):
mid = stem[4:]
return ("minion", None, mid, f"minions.adv_{mid}")
return ("minion", None, stem, f"minions.{stem}")
# /local/pillar/<section>/<file>.sls
if grandparent == "pillar" and parent and parent != "":
if stem.startswith("soc_") or stem.startswith("adv_"):
return ("global", None, None, f"{parent}.{stem}")
raise SkipPath(f"{path}: <section>/{stem}.sls is not a soc_/adv_ file")
raise SkipPath(f"{path}: unrecognised pillar layout")
def _pg_str(s):
if s is None:
return "NULL"
return "'" + str(s).replace("'", "''") + "'"
def _docker_psql(sql):
"""Run sql via docker exec ... psql. Returns stdout. Caller catches
exceptions and downgrades to a warning."""
proc = subprocess.run(
["docker", "exec", "-i", DOCKER_CONTAINER,
"psql", "-U", PG_USER, "-d", PG_DATABASE,
"-tA", "-q", "-v", "ON_ERROR_STOP=1"],
input=sql.encode(), capture_output=True, check=False, timeout=30,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode(errors="replace") or
f"docker exec psql exit {proc.returncode}")
return proc.stdout.decode(errors="replace")
def _conflict_target(scope):
if scope == "global":
return "(pillar_path) WHERE scope='global'"
if scope == "role":
return "(role_name, pillar_path) WHERE scope='role'"
if scope == "minion":
return "(minion_id, pillar_path) WHERE scope='minion'"
raise ValueError(f"unknown scope {scope!r}")
def is_pg_managed(path):
"""True if this path maps to a so_pillar.* row (locate() succeeds).
Bootstrap and mine-driven files return False they always live on
disk regardless of so-yaml's backend mode."""
try:
locate(path)
return True
except SkipPath:
return False
def read_yaml(path):
"""Return the content dict stored in so_pillar.pillar_entry for `path`,
or None when no row exists. Raises SkipPath when `path` is not part of
the PG-managed surface (caller should read disk in that case).
Used by so-yaml.py PG-canonical mode so `replace`, `get`, etc. resolve
against the database rather than a stale (or absent) disk file."""
if not _is_enabled():
return None
scope, role, minion_id, pillar_path = locate(path)
if scope == "minion":
sql = ("SELECT data FROM so_pillar.pillar_entry "
"WHERE scope='minion' "
f"AND minion_id={_pg_str(minion_id)} "
f"AND pillar_path={_pg_str(pillar_path)}")
elif scope == "role":
sql = ("SELECT data FROM so_pillar.pillar_entry "
"WHERE scope='role' "
f"AND role_name={_pg_str(role)} "
f"AND pillar_path={_pg_str(pillar_path)}")
else:
sql = ("SELECT data FROM so_pillar.pillar_entry "
"WHERE scope='global' "
f"AND pillar_path={_pg_str(pillar_path)}")
try:
out = _docker_psql(sql).strip()
except Exception:
return None
if not out:
return None
try:
return json.loads(out)
except (ValueError, TypeError):
return None
def write_yaml(path, content_dict, *, reason="so-yaml dual-write"):
"""Mirror the disk write at `path` (whose content was just rendered as
`content_dict`) into so_pillar.pillar_entry. Best-effort: any failure
is swallowed so the caller (so-yaml.py) does not see it as a fatal."""
if not _is_enabled():
return False, "postgres unreachable"
try:
scope, role, minion_id, pillar_path = locate(path)
except SkipPath as e:
return False, str(e)
data_json = json.dumps(content_dict if content_dict is not None else {})
role_sql = _pg_str(role)
minion_sql = _pg_str(minion_id)
reason_sql = _pg_str(reason)
conflict = _conflict_target(scope)
sql_parts = []
if scope == "minion":
# FK requires the minion row before pillar_entry can reference it.
sql_parts.append(
f"INSERT INTO so_pillar.minion (minion_id) VALUES ({minion_sql}) "
"ON CONFLICT (minion_id) DO NOTHING;"
)
sql_parts.append(
"BEGIN;\n"
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);\n"
"INSERT INTO so_pillar.pillar_entry "
"(scope, role_name, minion_id, pillar_path, data, change_reason) "
f"VALUES ({_pg_str(scope)}, {role_sql}, {minion_sql}, "
f"{_pg_str(pillar_path)}, {_pg_str(data_json)}::jsonb, {reason_sql}) "
f"ON CONFLICT {conflict} DO UPDATE "
"SET data = EXCLUDED.data, change_reason = EXCLUDED.change_reason;\n"
"COMMIT;\n"
)
try:
_docker_psql("\n".join(sql_parts))
except Exception as e:
return False, f"pg write failed: {e}"
return True, "ok"
def purge_yaml(path, *, reason="so-yaml purge"):
"""Mirror the disk file deletion at `path` by deleting the matching
pillar_entry rows. For minion files also deletes the so_pillar.minion
row (CASCADE removes pillar_entry + role_member rows)."""
if not _is_enabled():
return False, "postgres unreachable"
try:
scope, role, minion_id, pillar_path = locate(path)
except SkipPath as e:
return False, str(e)
reason_sql = _pg_str(reason)
parts = ["BEGIN;",
f"SELECT set_config('so_pillar.change_reason', {reason_sql}, true);"]
if scope == "minion":
# If both <id>.sls and adv_<id>.sls are gone the trigger / CASCADE
# cleans up role_member; otherwise we just remove this one row.
parts.append(
f"DELETE FROM so_pillar.pillar_entry "
f"WHERE scope='minion' AND minion_id={_pg_str(minion_id)} "
f"AND pillar_path={_pg_str(pillar_path)};"
)
parts.append(
f"DELETE FROM so_pillar.minion WHERE minion_id={_pg_str(minion_id)} "
"AND NOT EXISTS (SELECT 1 FROM so_pillar.pillar_entry "
f"WHERE minion_id={_pg_str(minion_id)});"
)
else:
parts.append(
f"DELETE FROM so_pillar.pillar_entry "
f"WHERE scope={_pg_str(scope)} AND pillar_path={_pg_str(pillar_path)};"
)
parts.append("COMMIT;")
try:
_docker_psql("\n".join(parts))
except Exception as e:
return False, f"pg purge failed: {e}"
return True, "ok"
# CLI for diagnostics. Not exercised by so-yaml.py itself.
def _main(argv):
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("op", choices=("locate", "ping"))
ap.add_argument("path", nargs="?")
args = ap.parse_args(argv)
if args.op == "ping":
ok = _is_enabled()
print("ok" if ok else "unreachable")
return 0 if ok else 1
if args.op == "locate":
if not args.path:
ap.error("locate requires PATH")
try:
scope, role, minion_id, pillar_path = locate(args.path)
print(f"scope={scope} role={role} minion_id={minion_id} pillar_path={pillar_path}")
return 0
except SkipPath as e:
print(f"SKIP: {e}", file=sys.stderr)
return 2
return 1
if __name__ == "__main__":
sys.exit(_main(sys.argv[1:]))
+112
View File
@@ -0,0 +1,112 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Driven by the so_pillar_changed reactor. Translates a so_pillar.pillar_entry
# change into (cache.clear_pillar -> saltutil.refresh_pillar -> state.apply)
# on the appropriate target.
#
# Routing rules live in the DISPATCH map below — one entry per
# (pillar_path prefix) -> (state sls, role grain). Add new services here
# rather than wiring more reactors.
#
# Idempotent: state.apply is idempotent; if the pillar value didn't actually
# change anything observable, the affected state runs a no-op. Bulk imports
# and replays are safe.
{% set change = salt['pillar.get']('so_pillar_change', {}) %}
{% set scope = change.get('scope') %}
{% set role = change.get('role_name') %}
{% set minion = change.get('minion_id') %}
{% set changes = change.get('changes', []) %}
{# (pillar_path prefix) -> {sls: <state to apply>, role: <role grain that runs it>}
role is a grain value (e.g. 'so-sensor'), used to compute compound targets
when the change is global or role-scoped. #}
{% set DISPATCH = {
'suricata.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'sensor.': {'sls': 'suricata.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'zeek.': {'sls': 'zeek.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'stenographer.': {'sls': 'stenographer.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'pcap.': {'sls': 'pcap.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
'logstash.': {'sls': 'logstash.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver']},
'redis.': {'sls': 'redis.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
'kafka.': {'sls': 'kafka.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-receiver', 'so-searchnode']},
'elasticsearch.': {'sls': 'elasticsearch.config','roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-searchnode', 'so-heavynode', 'so-standalone']},
'kibana.': {'sls': 'kibana.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
'soc.': {'sls': 'soc.config', 'roles': ['so-manager', 'so-managersearch', 'so-managerhype', 'so-standalone']},
'telegraf.': {'sls': 'telegraf.config', 'roles': ['*']},
'fleet.': {'sls': 'fleet.config', 'roles': ['so-fleet']},
'strelka.': {'sls': 'strelka.config', 'roles': ['so-sensor', 'so-heavynode', 'so-standalone']},
} %}
{# Collect a deduplicated set of (sls, target_kind) actions. target_kind is
either 'minion:<id>' (scope=minion) or 'roles:so-x,so-y' (scope=role/global). #}
{% set actions = {} %}
{% for c in changes %}
{% set path = c.get('pillar_path', '') %}
{% for prefix, action in DISPATCH.items() %}
{% if path.startswith(prefix) %}
{% set sls = action['sls'] %}
{% if scope == 'minion' and minion %}
{% set key = sls ~ '|minion|' ~ minion %}
{% set _ = actions.update({key: {'sls': sls, 'tgt': minion, 'tgt_type': 'glob'}}) %}
{% else %}
{% set role_targets = action['roles'] %}
{% if '*' in role_targets %}
{% set tgt = '*' %}
{% set tgt_type = 'glob' %}
{% else %}
{% set tgt = ('I@role:' ~ role_targets|join(' or I@role:')) %}
{% set tgt_type = 'compound' %}
{% endif %}
{% set key = sls ~ '|' ~ tgt %}
{% set _ = actions.update({key: {'sls': sls, 'tgt': tgt, 'tgt_type': tgt_type}}) %}
{% endif %}
{% endif %}
{% endfor %}
{% endfor %}
{% if actions %}
{% for key, action in actions.items() %}
{% set safe_id = loop.index0 | string %}
so_pillar_reload_clear_cache_{{ safe_id }}:
salt.runner:
- name: cache.clear_pillar
- tgt: '{{ action.tgt }}'
- tgt_type: '{{ action.tgt_type }}'
so_pillar_reload_refresh_pillar_{{ safe_id }}:
salt.function:
- name: saltutil.refresh_pillar
- tgt: '{{ action.tgt }}'
- tgt_type: '{{ action.tgt_type }}'
- kwarg:
wait: True
- require:
- salt: so_pillar_reload_clear_cache_{{ safe_id }}
so_pillar_reload_apply_state_{{ safe_id }}:
salt.state:
- tgt: '{{ action.tgt }}'
- tgt_type: '{{ action.tgt_type }}'
- sls:
- {{ action.sls }}
- queue: True
- require:
- salt: so_pillar_reload_refresh_pillar_{{ safe_id }}
{% endfor %}
{% else %}
{# No DISPATCH entry matched. Pillar still gets refreshed so any other states
read fresh values, but no service-specific reload is invoked. #}
so_pillar_reload_unmapped_path_noop:
test.nop
{% do salt.log.info('orch.so_pillar_reload: no dispatch match for %s' % changes) %}
{% endif %}
@@ -0,0 +1,124 @@
-- so_pillar schema: queryable, versioned, audited pillar config store.
-- Replaces flat-file Salt pillar consumed via salt.pillar.postgres ext_pillar.
-- Idempotent. Run via salt/postgres/schema_pillar.sls inside the so-postgres container.
CREATE SCHEMA IF NOT EXISTS so_pillar;
CREATE TABLE IF NOT EXISTS so_pillar.scope (
scope_kind text PRIMARY KEY,
precedence int NOT NULL,
description text
);
INSERT INTO so_pillar.scope(scope_kind, precedence, description) VALUES
('global', 100, 'Applies to every minion'),
('role', 200, 'Applies to minions whose minion_id matches a top.sls compound role match'),
('minion', 300, 'Applies only to a single minion (per-minion overlay)')
ON CONFLICT (scope_kind) DO NOTHING;
CREATE TABLE IF NOT EXISTS so_pillar.role (
role_name text PRIMARY KEY,
match_kind text NOT NULL CHECK (match_kind IN ('compound','grain','glob','list')),
match_expr text NOT NULL,
description text
);
CREATE TABLE IF NOT EXISTS so_pillar.minion (
minion_id text PRIMARY KEY,
node_type text,
hostname text,
extra_roles text[] NOT NULL DEFAULT '{}',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS so_pillar.role_member (
role_name text NOT NULL REFERENCES so_pillar.role(role_name) ON DELETE CASCADE,
minion_id text NOT NULL REFERENCES so_pillar.minion(minion_id) ON DELETE CASCADE,
source text NOT NULL DEFAULT 'computed' CHECK (source IN ('computed','manual','imported')),
PRIMARY KEY (role_name, minion_id)
);
CREATE INDEX IF NOT EXISTS ix_role_member_minion ON so_pillar.role_member(minion_id);
-- pillar_entry holds the actual data. as_json=True ext_pillar reads `data` directly.
CREATE TABLE IF NOT EXISTS so_pillar.pillar_entry (
id bigserial PRIMARY KEY,
scope text NOT NULL REFERENCES so_pillar.scope(scope_kind),
role_name text REFERENCES so_pillar.role(role_name) ON DELETE CASCADE,
minion_id text REFERENCES so_pillar.minion(minion_id) ON DELETE CASCADE,
pillar_path text NOT NULL,
data jsonb NOT NULL,
is_secret boolean NOT NULL DEFAULT false,
sort_key int NOT NULL DEFAULT 0,
version int NOT NULL DEFAULT 1,
updated_at timestamptz NOT NULL DEFAULT now(),
updated_by text NOT NULL DEFAULT current_user,
change_reason text,
CONSTRAINT pillar_entry_scope_target CHECK (
(scope='global' AND role_name IS NULL AND minion_id IS NULL)
OR (scope='role' AND role_name IS NOT NULL AND minion_id IS NULL)
OR (scope='minion' AND role_name IS NULL AND minion_id IS NOT NULL)
),
-- Reserved namespaces that MUST stay rendered from SLS (mine-driven). Nothing
-- under these prefixes is allowed in the database; the merge logic relies on
-- ext_pillar leaving these subtrees alone.
CONSTRAINT pillar_entry_reserved_paths CHECK (
pillar_path NOT LIKE 'elasticsearch.nodes%'
AND pillar_path NOT LIKE 'redis.nodes%'
AND pillar_path NOT LIKE 'kafka.nodes%'
AND pillar_path NOT LIKE 'hypervisor.nodes%'
AND pillar_path NOT LIKE 'logstash.nodes%'
AND pillar_path NOT LIKE 'node_data.ips%'
)
);
CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_global ON so_pillar.pillar_entry(pillar_path)
WHERE scope = 'global';
CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_role ON so_pillar.pillar_entry(role_name, pillar_path)
WHERE scope = 'role';
CREATE UNIQUE INDEX IF NOT EXISTS ux_pillar_entry_minion ON so_pillar.pillar_entry(minion_id, pillar_path)
WHERE scope = 'minion';
CREATE INDEX IF NOT EXISTS ix_pillar_entry_minion_hot ON so_pillar.pillar_entry(minion_id)
WHERE scope = 'minion';
CREATE INDEX IF NOT EXISTS ix_pillar_entry_role_hot ON so_pillar.pillar_entry(role_name)
WHERE scope = 'role';
-- Append-only audit log for every change to pillar_entry. No FK to entry so DELETE
-- history survives the row removal.
CREATE TABLE IF NOT EXISTS so_pillar.pillar_entry_history (
history_id bigserial PRIMARY KEY,
entry_id bigint,
op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')),
scope text NOT NULL,
role_name text,
minion_id text,
pillar_path text NOT NULL,
old_data jsonb,
new_data jsonb,
is_secret boolean,
version int,
changed_at timestamptz NOT NULL DEFAULT now(),
changed_by text NOT NULL DEFAULT current_user,
change_reason text
);
CREATE INDEX IF NOT EXISTS ix_pillar_history_entry ON so_pillar.pillar_entry_history(entry_id, changed_at DESC);
CREATE INDEX IF NOT EXISTS ix_pillar_history_minion ON so_pillar.pillar_entry_history(minion_id, changed_at DESC);
CREATE INDEX IF NOT EXISTS ix_pillar_history_role ON so_pillar.pillar_entry_history(role_name, changed_at DESC);
-- Drift watch — populated by a pg_cron job that re-renders the on-disk SLS files
-- and compares them to pillar_entry. Cleared once cutover completes.
CREATE TABLE IF NOT EXISTS so_pillar.drift_log (
id bigserial PRIMARY KEY,
scope text NOT NULL,
role_name text,
minion_id text,
pillar_path text NOT NULL,
disk_data jsonb,
db_data jsonb,
detected_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_drift_log_detected ON so_pillar.drift_log(detected_at DESC);
@@ -0,0 +1,49 @@
-- Views consumed by the Salt master's salt.pillar.postgres ext_pillar with
-- as_json=True. Each view exposes data ordered by (sort_key, pillar_path) so
-- the deep-merge in ext_pillar resolves precedence deterministically.
--
-- ext_pillar always binds exactly one parameter to the query: (minion_id,).
-- Master-config queries reference these views and add WHERE clauses, e.g.:
-- SELECT data FROM so_pillar.v_pillar_role WHERE minion_id = %s
-- SELECT data FROM so_pillar.v_pillar_minion WHERE minion_id = %s
-- For v_pillar_global the binding is satisfied with `WHERE %s IS NOT NULL`.
CREATE OR REPLACE VIEW so_pillar.v_pillar_global AS
SELECT pillar_path, sort_key, data
FROM so_pillar.pillar_entry
WHERE scope = 'global'
AND is_secret = false
ORDER BY sort_key, pillar_path;
-- Role view exposes minion_id so the master-config WHERE clause can filter to
-- the rows that apply to the requesting minion. JOIN to role_member fans out
-- one row per (role assignment, pillar entry) tuple.
CREATE OR REPLACE VIEW so_pillar.v_pillar_role AS
SELECT rm.minion_id,
pe.role_name,
pe.pillar_path,
pe.sort_key,
pe.data
FROM so_pillar.pillar_entry pe
JOIN so_pillar.role_member rm ON rm.role_name = pe.role_name
WHERE pe.scope = 'role'
AND pe.is_secret = false;
CREATE OR REPLACE VIEW so_pillar.v_pillar_minion AS
SELECT minion_id,
pillar_path,
sort_key,
data
FROM so_pillar.pillar_entry
WHERE scope = 'minion'
AND is_secret = false;
-- v_pillar_secrets is filled in by 004_secrets.sql once pgcrypto is available;
-- placeholder here returns no rows so initial schema deploy succeeds even on a
-- container that has not yet loaded pgcrypto.
CREATE OR REPLACE VIEW so_pillar.v_pillar_secrets AS
SELECT NULL::text AS minion_id,
NULL::text AS pillar_path,
NULL::int AS sort_key,
'{}'::jsonb AS data
WHERE false;
@@ -0,0 +1,120 @@
-- Audit trigger: every INSERT/UPDATE/DELETE on so_pillar.pillar_entry writes a
-- row to pillar_entry_history. Captures the actor (current_user), reason
-- (passed via SET LOCAL so_pillar.change_reason), and full before/after data.
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_audit() RETURNS trigger
LANGUAGE plpgsql AS $fn$
DECLARE
v_reason text := current_setting('so_pillar.change_reason', true);
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO so_pillar.pillar_entry_history(
entry_id, op, scope, role_name, minion_id, pillar_path,
old_data, new_data, is_secret, version, changed_by, change_reason)
VALUES (NEW.id, 'INSERT', NEW.scope, NEW.role_name, NEW.minion_id, NEW.pillar_path,
NULL, NEW.data, NEW.is_secret, NEW.version, NEW.updated_by, v_reason);
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
IF OLD.data IS DISTINCT FROM NEW.data
OR OLD.is_secret IS DISTINCT FROM NEW.is_secret THEN
INSERT INTO so_pillar.pillar_entry_history(
entry_id, op, scope, role_name, minion_id, pillar_path,
old_data, new_data, is_secret, version, changed_by, change_reason)
VALUES (NEW.id, 'UPDATE', NEW.scope, NEW.role_name, NEW.minion_id, NEW.pillar_path,
OLD.data, NEW.data, NEW.is_secret, NEW.version, NEW.updated_by, v_reason);
END IF;
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO so_pillar.pillar_entry_history(
entry_id, op, scope, role_name, minion_id, pillar_path,
old_data, new_data, is_secret, version, changed_by, change_reason)
VALUES (OLD.id, 'DELETE', OLD.scope, OLD.role_name, OLD.minion_id, OLD.pillar_path,
OLD.data, NULL, OLD.is_secret, OLD.version, current_user, v_reason);
RETURN OLD;
END IF;
RETURN NULL;
END
$fn$;
DROP TRIGGER IF EXISTS pillar_entry_audit ON so_pillar.pillar_entry;
CREATE TRIGGER pillar_entry_audit
AFTER INSERT OR UPDATE OR DELETE ON so_pillar.pillar_entry
FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_pillar_entry_audit();
-- updated_at + version maintenance: bump version on every UPDATE that changes data.
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_versioning() RETURNS trigger
LANGUAGE plpgsql AS $fn$
BEGIN
IF (TG_OP = 'UPDATE') THEN
IF OLD.data IS DISTINCT FROM NEW.data
OR OLD.is_secret IS DISTINCT FROM NEW.is_secret THEN
NEW.version := OLD.version + 1;
NEW.updated_at := now();
ELSE
NEW.version := OLD.version;
NEW.updated_at := OLD.updated_at;
END IF;
END IF;
RETURN NEW;
END
$fn$;
DROP TRIGGER IF EXISTS pillar_entry_versioning ON so_pillar.pillar_entry;
CREATE TRIGGER pillar_entry_versioning
BEFORE UPDATE ON so_pillar.pillar_entry
FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_pillar_entry_versioning();
-- Recompute role_member rows for a minion based on node_type.
-- Compound matchers in pillar/top.sls are pure suffix patterns of the form
-- '*_<rolename>' plus the special multi-role 'manager/managersearch/managerhype'
-- bucket. node_type is split on common dashes/underscores; any token that
-- matches a known role_name produces a role_member row.
CREATE OR REPLACE FUNCTION so_pillar.fn_recompute_role_members(p_minion_id text)
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE
v_node_type text;
v_extra text[];
v_role text;
BEGIN
SELECT node_type, extra_roles INTO v_node_type, v_extra
FROM so_pillar.minion WHERE minion_id = p_minion_id;
IF v_node_type IS NULL THEN
RETURN;
END IF;
DELETE FROM so_pillar.role_member
WHERE minion_id = p_minion_id AND source = 'computed';
-- Main role from node_type.
IF EXISTS (SELECT 1 FROM so_pillar.role WHERE role_name = lower(v_node_type)) THEN
INSERT INTO so_pillar.role_member(role_name, minion_id, source)
VALUES (lower(v_node_type), p_minion_id, 'computed')
ON CONFLICT DO NOTHING;
END IF;
-- Extra roles supplied by the importer / reactor for compound matchers
-- that need to apply multiple buckets (e.g. managersearch also gets the
-- 'manager' bucket per top.sls line 36 grouping).
FOREACH v_role IN ARRAY COALESCE(v_extra, '{}'::text[]) LOOP
IF EXISTS (SELECT 1 FROM so_pillar.role WHERE role_name = v_role) THEN
INSERT INTO so_pillar.role_member(role_name, minion_id, source)
VALUES (v_role, p_minion_id, 'computed')
ON CONFLICT DO NOTHING;
END IF;
END LOOP;
END
$fn$;
CREATE OR REPLACE FUNCTION so_pillar.fn_minion_after_change() RETURNS trigger
LANGUAGE plpgsql AS $fn$
BEGIN
PERFORM so_pillar.fn_recompute_role_members(COALESCE(NEW.minion_id, OLD.minion_id));
RETURN COALESCE(NEW, OLD);
END
$fn$;
DROP TRIGGER IF EXISTS minion_role_sync ON so_pillar.minion;
CREATE TRIGGER minion_role_sync
AFTER INSERT OR UPDATE OF node_type, extra_roles ON so_pillar.minion
FOR EACH ROW EXECUTE FUNCTION so_pillar.fn_minion_after_change();
@@ -0,0 +1,130 @@
-- pgcrypto-backed secret storage for pillar_entry rows where is_secret = true.
-- The plaintext value is encrypted with a symmetric key held in a server-side
-- GUC (so_pillar.master_key) which is set per-role via ALTER ROLE so the key
-- never touches a flat file readable by Salt itself.
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public;
-- Encrypt a JSONB value using the configured master key. Stored as a JSONB
-- envelope {"_enc": "<armored ciphertext>"} so the same column type is reused.
CREATE OR REPLACE FUNCTION so_pillar.fn_encrypt_jsonb(p_value jsonb)
RETURNS jsonb LANGUAGE plpgsql AS $fn$
DECLARE
v_key text := current_setting('so_pillar.master_key', true);
BEGIN
IF v_key IS NULL OR v_key = '' THEN
RAISE EXCEPTION 'so_pillar.master_key GUC not configured';
END IF;
RETURN jsonb_build_object(
'_enc',
encode(pgp_sym_encrypt(p_value::text, v_key), 'base64')
);
END
$fn$;
-- Decrypt the envelope produced by fn_encrypt_jsonb. SECURITY DEFINER so callers
-- with no direct access to pgcrypto/master_key can still pull plaintext via the
-- v_pillar_secrets view.
CREATE OR REPLACE FUNCTION so_pillar.fn_decrypt_jsonb(p_envelope jsonb)
RETURNS jsonb LANGUAGE plpgsql SECURITY DEFINER AS $fn$
DECLARE
v_key text := current_setting('so_pillar.master_key', true);
v_ct text;
BEGIN
IF v_key IS NULL OR v_key = '' THEN
RAISE EXCEPTION 'so_pillar.master_key GUC not configured';
END IF;
v_ct := p_envelope->>'_enc';
IF v_ct IS NULL THEN
RETURN p_envelope; -- not encrypted; pass through
END IF;
RETURN pgp_sym_decrypt(decode(v_ct, 'base64'), v_key)::jsonb;
END
$fn$;
REVOKE ALL ON FUNCTION so_pillar.fn_decrypt_jsonb(jsonb) FROM PUBLIC;
-- Secrets view consumed by ext_pillar. Decrypts at the boundary so Salt sees
-- plaintext JSONB. Filters the rows to those that apply to the requesting
-- minion via current_setting, since views can't take parameters and ext_pillar
-- can only bind one parameter per query.
--
-- Master-config query: SELECT data FROM so_pillar.v_pillar_secrets WHERE %s IS NOT NULL
-- The %s satisfies the bound parameter; the view itself reads the minion_id
-- from a session GUC set by a small wrapper function (see fn_pillar_secrets).
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_secrets(p_minion_id text)
RETURNS TABLE(data jsonb)
LANGUAGE sql STABLE SECURITY DEFINER AS $fn$
SELECT so_pillar.fn_decrypt_jsonb(pe.data)
FROM so_pillar.pillar_entry pe
WHERE pe.is_secret = true
AND ( pe.scope = 'global'
OR (pe.scope = 'role'
AND pe.role_name IN (
SELECT role_name FROM so_pillar.role_member
WHERE minion_id = p_minion_id))
OR (pe.scope = 'minion' AND pe.minion_id = p_minion_id))
ORDER BY pe.sort_key, pe.pillar_path;
$fn$;
-- Replace the placeholder view from 002 with a parameterised version. Master
-- config query becomes:
-- SELECT data FROM so_pillar.fn_pillar_secrets(%s) AS s
DROP VIEW IF EXISTS so_pillar.v_pillar_secrets;
CREATE OR REPLACE VIEW so_pillar.v_pillar_secrets AS
SELECT NULL::text AS minion_id,
NULL::text AS pillar_path,
NULL::int AS sort_key,
'{}'::jsonb AS data
WHERE false;
COMMENT ON VIEW so_pillar.v_pillar_secrets IS
'Deprecated placeholder; use SELECT data FROM so_pillar.fn_pillar_secrets(minion_id) instead';
-- Convenience helper for so-yaml.py and the importer to set a secret without
-- ever exposing the master_key to the caller. SECURITY DEFINER means the
-- caller does not need read access to so_pillar.master_key.
CREATE OR REPLACE FUNCTION so_pillar.fn_set_secret(
p_scope text,
p_role_name text,
p_minion_id text,
p_pillar_path text,
p_value jsonb,
p_change_reason text DEFAULT NULL
) RETURNS bigint LANGUAGE plpgsql SECURITY DEFINER AS $fn$
DECLARE
v_envelope jsonb := so_pillar.fn_encrypt_jsonb(p_value);
v_id bigint;
BEGIN
PERFORM set_config('so_pillar.change_reason',
COALESCE(p_change_reason, 'fn_set_secret'),
true);
INSERT INTO so_pillar.pillar_entry(
scope, role_name, minion_id, pillar_path, data, is_secret, change_reason)
VALUES (p_scope, p_role_name, p_minion_id, p_pillar_path, v_envelope, true, p_change_reason)
ON CONFLICT (pillar_path) WHERE scope='global' DO UPDATE
SET data = EXCLUDED.data, is_secret = true, change_reason = EXCLUDED.change_reason
RETURNING id INTO v_id;
IF v_id IS NULL THEN
UPDATE so_pillar.pillar_entry
SET data = v_envelope, is_secret = true, change_reason = p_change_reason
WHERE scope = p_scope
AND COALESCE(role_name,'') = COALESCE(p_role_name,'')
AND COALESCE(minion_id,'') = COALESCE(p_minion_id,'')
AND pillar_path = p_pillar_path
RETURNING id INTO v_id;
IF v_id IS NULL THEN
INSERT INTO so_pillar.pillar_entry(
scope, role_name, minion_id, pillar_path, data, is_secret, change_reason)
VALUES (p_scope, p_role_name, p_minion_id, p_pillar_path, v_envelope, true, p_change_reason)
RETURNING id INTO v_id;
END IF;
END IF;
RETURN v_id;
END
$fn$;
REVOKE ALL ON FUNCTION so_pillar.fn_set_secret(text,text,text,text,jsonb,text) FROM PUBLIC;
@@ -0,0 +1,39 @@
-- Seed the so_pillar.role table with the role buckets defined in pillar/top.sls.
-- The match_expr column preserves the original Salt compound expression purely
-- as documentation; PG-side membership is materialised in role_member.
-- Idempotent: ON CONFLICT lets re-application leave existing rows untouched.
INSERT INTO so_pillar.role(role_name, match_kind, match_expr, description) VALUES
('manager', 'compound', '*_manager or *_managersearch or *_managerhype',
'Manager-class node. Includes managersearch and managerhype subtypes.'),
('managersearch', 'compound', '*_managersearch',
'Combined manager + searchnode role.'),
('managerhype', 'compound', '*_managerhype',
'Combined manager + hypervisor role.'),
('sensor', 'compound', '*_sensor',
'Sensor node running zeek/suricata/strelka.'),
('eval', 'compound', '*_eval',
'Single-node evaluation install (manager + sensor + storage on one host).'),
('standalone', 'compound', '*_standalone',
'Single-node production install (no distributed cluster).'),
('heavynode', 'compound', '*_heavynode',
'Distributed manager node carrying logstash + ES.'),
('idh', 'compound', '*_idh',
'Intrusion-detection-honeypot node.'),
('searchnode', 'compound', '*_searchnode',
'Distributed Elasticsearch search node.'),
('receiver', 'compound', '*_receiver',
'Kafka receiver node.'),
('import', 'compound', '*_import',
'Single-node import-only install.'),
('fleet', 'compound', '*_fleet',
'Elastic Fleet server node.'),
('hypervisor', 'compound', '*_hypervisor',
'Hypervisor host (libvirt). Hosts VM minions.'),
('desktop', 'compound', '*_desktop',
'Desktop minion (no firewall/nginx pillars apply).'),
('not_desktop', 'compound', '* and not *_desktop',
'Pseudo-role; matches every minion that is not a desktop. Used for global firewall/nginx.'),
('libvirt', 'grain', 'salt-cloud:driver:libvirt',
'Pseudo-role; matches any minion with grain salt-cloud.driver = libvirt.')
ON CONFLICT (role_name) DO NOTHING;
@@ -0,0 +1,106 @@
-- Roles + Row-Level Security policies for the so_pillar schema.
-- Three roles:
-- so_pillar_master — connected by salt-master ext_pillar. Read-only.
-- RLS forces it to skip is_secret rows; reads
-- encrypted secrets only via fn_pillar_secrets().
-- so_pillar_writer — connected by so-yaml dual-write and the SOC
-- PostgresConfigstore. Read+write on pillar_entry,
-- minion, role_member.
-- so_pillar_secret_owner — owns the master encryption key GUC; sole role
-- allowed to call fn_set_secret directly. Other
-- writers reach this function only via grants.
--
-- The existing app role so_postgres_user (created by init-users.sh) is granted
-- INTO so_pillar_writer so SOC keeps using its existing connection but inherits
-- pillar-write capability.
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_master') THEN
CREATE ROLE so_pillar_master NOLOGIN;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_writer') THEN
CREATE ROLE so_pillar_writer NOLOGIN;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_pillar_secret_owner') THEN
CREATE ROLE so_pillar_secret_owner NOLOGIN;
END IF;
END
$$;
GRANT USAGE ON SCHEMA so_pillar TO so_pillar_master, so_pillar_writer, so_pillar_secret_owner;
-- Read access for ext_pillar through the views only.
GRANT SELECT ON so_pillar.v_pillar_global,
so_pillar.v_pillar_role,
so_pillar.v_pillar_minion
TO so_pillar_master;
GRANT EXECUTE ON FUNCTION so_pillar.fn_pillar_secrets(text) TO so_pillar_master;
-- Engine reads + drains the change queue from the salt-master process. It
-- needs SELECT to find unprocessed rows and UPDATE to mark them processed.
-- The queue contains only locator metadata (no pillar data), so the master
-- role's existing privilege footprint is unchanged in practice.
GRANT SELECT, UPDATE ON so_pillar.change_queue TO so_pillar_master;
GRANT USAGE ON SEQUENCE so_pillar.change_queue_id_seq TO so_pillar_master;
-- Writer needs INSERT (the trigger runs as table owner, so this is just for
-- direct testing / manual replays from psql).
GRANT INSERT ON so_pillar.change_queue TO so_pillar_writer;
-- Writer needs CRUD on pillar_entry/minion/role_member plus access to seed tables.
GRANT SELECT, INSERT, UPDATE, DELETE
ON so_pillar.pillar_entry,
so_pillar.minion,
so_pillar.role_member
TO so_pillar_writer;
GRANT SELECT ON so_pillar.role, so_pillar.scope TO so_pillar_writer;
GRANT SELECT, INSERT, UPDATE, DELETE ON so_pillar.drift_log TO so_pillar_writer;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA so_pillar TO so_pillar_writer;
GRANT SELECT ON so_pillar.pillar_entry_history TO so_pillar_writer;
-- Secret owner can call fn_set_secret directly; writer goes through it via the
-- function's SECURITY DEFINER attribute, which executes as the function owner.
GRANT EXECUTE ON FUNCTION so_pillar.fn_set_secret(text,text,text,text,jsonb,text)
TO so_pillar_writer, so_pillar_secret_owner;
-- so_postgres_user (SOC's existing app user, created by init-users.sh) inherits
-- writer privilege so the PostgresConfigstore in SOC can mutate pillars without
-- a second connection pool. Inheritance is per-PG default (NOINHERIT must be
-- explicit), so this just works.
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = current_setting('so_pillar.app_role', true))
THEN
EXECUTE format('GRANT so_pillar_writer TO %I',
current_setting('so_pillar.app_role', true));
ELSIF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'so_postgres_user') THEN
GRANT so_pillar_writer TO so_postgres_user;
END IF;
END
$$;
-- RLS on pillar_entry: master sees only non-secret rows. Writer sees all
-- (it must, to UPDATE secret rows when so-yaml replaces them). Secret rows
-- still require fn_decrypt_jsonb to read plaintext.
ALTER TABLE so_pillar.pillar_entry ENABLE ROW LEVEL SECURITY;
ALTER TABLE so_pillar.pillar_entry FORCE ROW LEVEL SECURITY;
DROP POLICY IF EXISTS pillar_entry_master_read ON so_pillar.pillar_entry;
DROP POLICY IF EXISTS pillar_entry_writer_all ON so_pillar.pillar_entry;
DROP POLICY IF EXISTS pillar_entry_owner_all ON so_pillar.pillar_entry;
CREATE POLICY pillar_entry_master_read ON so_pillar.pillar_entry
FOR SELECT TO so_pillar_master
USING (NOT is_secret);
CREATE POLICY pillar_entry_writer_all ON so_pillar.pillar_entry
FOR ALL TO so_pillar_writer
USING (true)
WITH CHECK (true);
CREATE POLICY pillar_entry_owner_all ON so_pillar.pillar_entry
FOR ALL TO so_pillar_secret_owner
USING (true)
WITH CHECK (true);
-- minion / role_member do not need RLS — they hold no secrets.
@@ -0,0 +1,43 @@
-- Drift detection + retention via pg_cron. Optional — the schema_pillar.sls
-- state guards this file behind the postgres:so_pillar:drift_check_enabled
-- pillar flag because pg_cron may not be loaded on every install.
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Retention: trim pillar_entry_history older than a year. Adjustable via the
-- so_pillar.history_retention_days GUC (default 365 if unset).
CREATE OR REPLACE FUNCTION so_pillar.fn_history_retain()
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE
v_days int := COALESCE(current_setting('so_pillar.history_retention_days', true)::int, 365);
BEGIN
DELETE FROM so_pillar.pillar_entry_history
WHERE changed_at < (now() - (v_days::text || ' days')::interval);
END
$fn$;
-- Drift retention: keep two weeks of drift_log.
CREATE OR REPLACE FUNCTION so_pillar.fn_drift_retain()
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
DELETE FROM so_pillar.drift_log
WHERE detected_at < (now() - interval '14 days');
END
$fn$;
-- pg_cron schedules (idempotent — unschedule any existing same-named job first).
DO $$
DECLARE
v_jobid bigint;
BEGIN
SELECT jobid INTO v_jobid FROM cron.job WHERE jobname = 'so_pillar_history_retain';
IF v_jobid IS NOT NULL THEN PERFORM cron.unschedule(v_jobid); END IF;
PERFORM cron.schedule('so_pillar_history_retain', '15 3 * * *',
'SELECT so_pillar.fn_history_retain();');
SELECT jobid INTO v_jobid FROM cron.job WHERE jobname = 'so_pillar_drift_retain';
IF v_jobid IS NOT NULL THEN PERFORM cron.unschedule(v_jobid); END IF;
PERFORM cron.schedule('so_pillar_drift_retain', '20 3 * * *',
'SELECT so_pillar.fn_drift_retain();');
END
$$;
@@ -0,0 +1,77 @@
-- pg_notify-driven change fan-out for so_pillar.pillar_entry.
--
-- Two layers:
-- 1. so_pillar.change_queue — durable, drained by the salt-master
-- engine. Survives engine downtime,
-- de-duplicated by id, processed once.
-- 2. pg_notify('so_pillar_change') — wakeup signal. Payload is the
-- change_queue row id and locator
-- (no secret data — channels are
-- snoopable by anyone with LISTEN).
--
-- The salt-master engine LISTENs on the channel for low-latency wakeup,
-- then SELECTs unprocessed change_queue rows so a missed notification
-- (engine restart, network blip) self-heals on the next event.
CREATE TABLE IF NOT EXISTS so_pillar.change_queue (
id bigserial PRIMARY KEY,
scope text NOT NULL,
role_name text,
minion_id text,
pillar_path text NOT NULL,
op text NOT NULL CHECK (op IN ('INSERT','UPDATE','DELETE')),
enqueued_at timestamptz NOT NULL DEFAULT now(),
processed_at timestamptz
);
-- Hot index for the engine's drain query.
CREATE INDEX IF NOT EXISTS ix_change_queue_unprocessed
ON so_pillar.change_queue (id)
WHERE processed_at IS NULL;
-- Retention index: pg_cron job in 007 sweeps processed rows older than 7d.
CREATE INDEX IF NOT EXISTS ix_change_queue_processed_at
ON so_pillar.change_queue (processed_at)
WHERE processed_at IS NOT NULL;
CREATE OR REPLACE FUNCTION so_pillar.fn_pillar_entry_notify()
RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
v_row record;
v_id bigint;
BEGIN
IF TG_OP = 'DELETE' THEN
v_row := OLD;
ELSE
v_row := NEW;
END IF;
INSERT INTO so_pillar.change_queue
(scope, role_name, minion_id, pillar_path, op)
VALUES
(v_row.scope, v_row.role_name, v_row.minion_id, v_row.pillar_path, TG_OP)
RETURNING id INTO v_id;
-- Payload is the queue id + locator only. Engine joins back to
-- pillar_entry if it needs the data — keeps secrets off the wire.
PERFORM pg_notify('so_pillar_change', json_build_object(
'queue_id', v_id,
'scope', v_row.scope,
'role_name', v_row.role_name,
'minion_id', v_row.minion_id,
'pillar_path', v_row.pillar_path,
'op', TG_OP
)::text);
RETURN NULL;
END;
$$;
DROP TRIGGER IF EXISTS tg_pillar_entry_notify ON so_pillar.pillar_entry;
CREATE TRIGGER tg_pillar_entry_notify
AFTER INSERT OR UPDATE OR DELETE
ON so_pillar.pillar_entry
FOR EACH ROW
EXECUTE FUNCTION so_pillar.fn_pillar_entry_notify();
+1
View File
@@ -8,6 +8,7 @@
include:
{% if PGMERGED.enabled %}
- postgres.enabled
- postgres.schema_pillar
{% else %}
- postgres.disabled
{% endif %}
+140
View File
@@ -0,0 +1,140 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
# Deploys the so_pillar schema (tables, views, audit triggers, secrets,
# RLS, pg_cron retention) inside the so-postgres container. Idempotent —
# every CREATE / GRANT is wrapped in IF NOT EXISTS / ON CONFLICT or DO
# blocks so re-running the state is a no-op when the schema is current.
#
# Gated on the postgres:so_pillar:enabled feature flag (default false).
# Flip to true once the postsalt branch is ready to bring ext_pillar live.
include:
- postgres.enabled
{% set so_pillar_enabled = salt['pillar.get']('postgres:so_pillar:enabled', False) %}
{% if so_pillar_enabled %}
{% set drift_enabled = salt['pillar.get']('postgres:so_pillar:drift_check_enabled', False) %}
{% set schema_dir = '/opt/so/saltstack/default/salt/postgres/files/schema/pillar' %}
# Wait for postgres to actually accept TCP connections. Same idiom as
# telegraf_users.sls. The docker_container.running state returns earlier than
# the database is ready on first init.
so_pillar_postgres_wait_ready:
cmd.run:
- name: |
for i in $(seq 1 60); do
if docker exec so-postgres pg_isready -h 127.0.0.1 -U postgres -q 2>/dev/null; then
exit 0
fi
sleep 2
done
echo "so-postgres did not accept TCP connections within 120s" >&2
exit 1
- require:
- docker_container: so-postgres
{% set sql_files = [
'001_schema.sql',
'002_views.sql',
'003_history_trigger.sql',
'004_secrets.sql',
'005_seed_roles.sql',
'006_rls.sql',
] %}
{% if drift_enabled %}
{% do sql_files.append('007_drift_pgcron.sql') %}
{% endif %}
# 008 always applies — pg_notify-driven change fan-out is what the salt-master
# pg_notify_pillar engine consumes. Without it reactor wiring sees no events.
{% do sql_files.append('008_change_notify.sql') %}
{% for sql_file in sql_files %}
so_pillar_apply_{{ sql_file | replace('.', '_') }}:
cmd.run:
- name: |
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d securityonion \
< {{ schema_dir }}/{{ sql_file }}
- require:
- cmd: so_pillar_postgres_wait_ready
{% if not loop.first %}
- cmd: so_pillar_apply_{{ sql_files[loop.index0 - 1] | replace('.', '_') }}
{% endif %}
{% endfor %}
# Set the master encryption key GUC on the secret-owner role. The key itself
# is generated by setup/so-functions::secrets_pillar() (extended for postsalt)
# and lives in /opt/so/conf/postgres/so_pillar.key (mode 0400) — never read by
# Salt itself; the value flows into PG via ALTER ROLE so it sits only in the
# server's role catalog.
so_pillar_master_key_configure:
cmd.run:
- name: |
if [ -r /opt/so/conf/postgres/so_pillar.key ]; then
KEY="$(< /opt/so/conf/postgres/so_pillar.key)"
docker exec -i so-postgres psql -v ON_ERROR_STOP=1 -U postgres -d securityonion <<EOSQL
ALTER ROLE so_pillar_secret_owner SET so_pillar.master_key = '$KEY';
ALTER ROLE so_pillar_master SET so_pillar.master_key = '$KEY';
ALTER ROLE so_pillar_writer SET so_pillar.master_key = '$KEY';
EOSQL
else
echo "so_pillar.key not present yet; setup/so-functions must generate it before schema_pillar.sls" >&2
exit 1
fi
- require:
- cmd: so_pillar_apply_{{ sql_files[-1] | replace('.', '_') }}
# Run the importer once after the schema is in place. Idempotent — re-runs
# with no SLS edits produce zero row changes.
so_pillar_initial_import:
cmd.run:
- name: /usr/sbin/so-pillar-import --yes --reason 'schema_pillar.sls initial import'
- require:
- cmd: so_pillar_master_key_configure
# Flip so-yaml from dual-write to PG-canonical for managed paths now that
# the schema and importer are both in place. Bootstrap files (secrets.sls,
# postgres/auth.sls, ca/init.sls, *.nodes.sls, top.sls, ...) remain on disk
# regardless because so_yaml_postgres.locate() raises SkipPath for them.
so_pillar_so_yaml_mode_dir:
file.directory:
- name: /opt/so/conf/so-yaml
- user: socore
- group: socore
- mode: '0755'
- makedirs: True
so_pillar_so_yaml_mode_postgres:
file.managed:
- name: /opt/so/conf/so-yaml/mode
- contents: postgres
- user: socore
- group: socore
- mode: '0644'
- require:
- file: so_pillar_so_yaml_mode_dir
- cmd: so_pillar_initial_import
{% else %}
so_pillar_disabled_noop:
test.nop
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}
+27
View File
@@ -0,0 +1,27 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Fires for every event tagged 'so/pillar/changed'. Source of those events
# is the pg_notify_pillar engine on the salt-master, which in turn drains
# so_pillar.change_queue (populated by the AFTER trigger on
# so_pillar.pillar_entry — see 008_change_notify.sql).
#
# All routing logic — which pillar paths reload which services on which
# targets — lives in orch.so_pillar_reload so it stays editable as one
# YAML table without touching reactor wiring.
{% set payload = data.get('data', {}) %}
{% do salt.log.info('so_pillar_changed reactor: %s' % payload) %}
so_pillar_dispatch_reload:
runner.state.orchestrate:
- args:
- mods: orch.so_pillar_reload
- pillar:
so_pillar_change:
scope: {{ payload.get('scope') | json }}
role_name: {{ payload.get('role_name') | json }}
minion_id: {{ payload.get('minion_id') | json }}
changes: {{ payload.get('changes', []) | json }}
@@ -0,0 +1,200 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# -*- coding: utf-8 -*-
"""
pg_notify_pillar Salt master engine that bridges so_pillar.change_queue
into the Salt event bus.
Architecture (see 008_change_notify.sql):
pillar_entry -- AFTER trigger --> change_queue (durable)
+ pg_notify('so_pillar_change') (wakeup)
|
LISTEN <-- this engine <-+
SELECT/UPDATE change_queue
|
fire_event('so/pillar/changed', ...)
|
reactor matches tag --> orch
Why a queue + notify rather than just notify: pg_notify is fire-and-forget
within a session. If the engine is down or the LISTEN connection is broken
when a write happens, the notification is lost forever. The change_queue
lets us recover on (re)connect, we drain everything still flagged
processed_at IS NULL.
Debounce: bulk operations (so-pillar-import, fresh installs) can fire
hundreds of notifications per second. The engine collects whatever lands in
a short window and emits one event per (scope, role, minion) tuple so the
reactor isn't stampeded.
"""
import json
import logging
import os
import select
import time
import salt.utils.event
log = logging.getLogger(__name__)
__virtualname__ = 'pg_notify_pillar'
DEFAULT_CHANNEL = 'so_pillar_change'
DEFAULT_DEBOUNCE_MS = 500
DEFAULT_RECONNECT_BACKOFF = 5
DEFAULT_BACKLOG_INTERVAL = 30
DEFAULT_BATCH_LIMIT = 500
EVENT_TAG = 'so/pillar/changed'
def __virtual__():
try:
import psycopg2 # noqa: F401
return __virtualname__
except ImportError:
return False, 'pg_notify_pillar engine requires psycopg2'
def start(dsn=None,
host='127.0.0.1',
port=5432,
dbname='securityonion',
user='so_pillar_master',
password=None,
channel=DEFAULT_CHANNEL,
debounce_ms=DEFAULT_DEBOUNCE_MS,
reconnect_backoff=DEFAULT_RECONNECT_BACKOFF,
backlog_interval=DEFAULT_BACKLOG_INTERVAL,
batch_limit=DEFAULT_BATCH_LIMIT,
password_file=None):
"""
Run the change-queue bridge until the master shuts the engine down.
Either pass a full ``dsn`` string, or supply discrete kwargs. The
password may also be read from ``password_file`` (mode 0400) so the
engine config in ``/etc/salt/master.d/`` doesn't have to embed it
inline only the file path.
"""
import psycopg2
import psycopg2.extensions
if dsn is None:
if password is None and password_file:
try:
with open(password_file, 'r') as fh:
password = fh.read().strip()
except (IOError, OSError) as exc:
log.error('pg_notify_pillar: cannot read password_file %s: %s',
password_file, exc)
return
dsn = _build_dsn(host=host, port=port, dbname=dbname,
user=user, password=password)
bus = salt.utils.event.get_master_event(
__opts__, __opts__['sock_dir'], listen=False)
log.info('pg_notify_pillar: starting (channel=%s debounce=%dms)',
channel, debounce_ms)
while True:
conn = None
try:
conn = psycopg2.connect(dsn)
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute('LISTEN {0}'.format(channel))
log.info('pg_notify_pillar: connected; LISTEN %s', channel)
_drain(cur, bus, batch_limit)
while True:
ready, _, _ = select.select([conn], [], [], backlog_interval)
if not ready:
_drain(cur, bus, batch_limit)
continue
conn.poll()
_consume_notifies(conn)
if debounce_ms > 0:
time.sleep(debounce_ms / 1000.0)
conn.poll()
_consume_notifies(conn)
_drain(cur, bus, batch_limit)
except Exception as exc: # psycopg2.Error subclasses + OS errors
log.error('pg_notify_pillar: %s; reconnecting in %ds',
exc, reconnect_backoff)
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
time.sleep(reconnect_backoff)
def _build_dsn(host, port, dbname, user, password):
parts = ['host={0}'.format(host),
'port={0}'.format(port),
'dbname={0}'.format(dbname),
'user={0}'.format(user)]
if password:
parts.append('password={0}'.format(password))
return ' '.join(parts)
def _consume_notifies(conn):
# We don't use the payload directly — the queue table is the source of
# truth, and draining it covers any notifications we missed. So just
# discard them; their presence already proved there's something to drain.
while conn.notifies:
conn.notifies.pop(0)
def _drain(cur, bus, batch_limit):
"""Mark unprocessed change_queue rows processed and emit one event per
(scope, role_name, minion_id) group. SKIP LOCKED so multiple masters
sharing a Postgres don't double-process."""
cur.execute("""
UPDATE so_pillar.change_queue
SET processed_at = now()
WHERE id IN (
SELECT id FROM so_pillar.change_queue
WHERE processed_at IS NULL
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT %s)
RETURNING id, scope, role_name, minion_id, pillar_path, op
""", (batch_limit,))
rows = cur.fetchall()
if not rows:
return
groups = {}
for row_id, scope, role_name, minion_id, pillar_path, op in rows:
key = (scope, role_name, minion_id)
groups.setdefault(key, []).append({
'queue_id': row_id,
'pillar_path': pillar_path,
'op': op,
})
for (scope, role_name, minion_id), changes in groups.items():
payload = {
'scope': scope,
'role_name': role_name,
'minion_id': minion_id,
'changes': changes,
}
log.debug('pg_notify_pillar: firing %s for %s',
EVENT_TAG, payload)
bus.fire_event(payload, EVENT_TAG)
+2
View File
@@ -14,6 +14,8 @@
include:
- salt.minion
- salt.master.ext_pillar_postgres
- salt.master.pg_notify_pillar_engine
{% if 'vrt' in salt['pillar.get']('features', []) %}
- salt.cloud
- salt.cloud.reactor_config_hypervisor
+46
View File
@@ -0,0 +1,46 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Drops /etc/salt/master.d/ext_pillar_postgres.conf so the salt-master loads
# pillar overlays from the so_pillar.* schema in so-postgres alongside the
# on-disk SLS pillar tree. Gated on the postgres:so_pillar:enabled feature
# flag (default false) so the file only appears once the schema is deployed
# and the importer has run at least once.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %}
ext_pillar_postgres_config:
file.managed:
- name: /etc/salt/master.d/ext_pillar_postgres.conf
- source: salt://salt/master/files/ext_pillar_postgres.conf.jinja
- template: jinja
- mode: '0640'
- user: root
- group: salt
- watch_in:
- service: salt_master_service
{% else %}
# When the flag is off make sure any previously-deployed config is removed
# so a rollback flips behavior cleanly.
ext_pillar_postgres_config_absent:
file.absent:
- name: /etc/salt/master.d/ext_pillar_postgres.conf
- watch_in:
- service: salt_master_service
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}
@@ -0,0 +1,38 @@
# /etc/salt/master.d/ext_pillar_postgres.conf
# Rendered by salt/salt/master/ext_pillar_postgres.sls.
# Reads the so_pillar.* schema in so-postgres and overlays it onto SLS pillar.
# SLS still renders first (ext_pillar_first: False) so bootstrap and mine-driven
# pillars work before Postgres is reachable; PG values overlay/override on top.
postgres:
host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }}
port: {{ pillar.get('postgres', {}).get('port', 5432) }}
db: securityonion
user: so_pillar_master
pass: {{ pillar['secrets']['pillar_master_pass'] }}
ext_pillar_first: False
pillar_source_merging_strategy: smart
pillar_merge_lists: False
pillar_cache: True
pillar_cache_backend: disk
pillar_cache_ttl: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('pillar_cache_ttl', 60) }}
# List form (not mapping form) so result rows merge into the pillar root rather
# than under a named subtree. Verified against salt/pillar/sql_base.py: list
# entries pass root=None to enter_root() which sets self.focus = self.result.
ext_pillar:
- postgres:
- query: "SELECT data FROM so_pillar.v_pillar_global WHERE %s IS NOT NULL ORDER BY sort_key, pillar_path"
as_json: True
ignore_null: True
- query: "SELECT data FROM so_pillar.v_pillar_role WHERE minion_id = %s ORDER BY sort_key, pillar_path"
as_json: True
ignore_null: True
- query: "SELECT data FROM so_pillar.v_pillar_minion WHERE minion_id = %s ORDER BY sort_key, pillar_path"
as_json: True
ignore_null: True
- query: "SELECT data FROM so_pillar.fn_pillar_secrets(%s)"
as_json: True
ignore_null: True
@@ -0,0 +1,20 @@
# /etc/salt/master.d/pg_notify_pillar_engine.conf
# Rendered by salt/salt/master/pg_notify_pillar_engine.sls.
#
# Subscribes the salt-master to so_pillar.change_queue via LISTEN
# so_pillar_change. The engine drains queued changes and re-publishes
# them on the event bus as 'so/pillar/changed'. Reactor wiring is in
# so_pillar_reactor.conf.
engines:
- pg_notify_pillar:
host: {{ pillar.get('postgres', {}).get('host', '127.0.0.1') }}
port: {{ pillar.get('postgres', {}).get('port', 5432) }}
dbname: securityonion
user: so_pillar_master
password: {{ pillar['secrets']['pillar_master_pass'] }}
channel: so_pillar_change
debounce_ms: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_debounce_ms', 500) }}
reconnect_backoff: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_reconnect_backoff', 5) }}
backlog_interval: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_backlog_interval', 30) }}
batch_limit: {{ pillar.get('postgres', {}).get('so_pillar', {}).get('engine_batch_limit', 500) }}
@@ -0,0 +1,12 @@
# /etc/salt/master.d/so_pillar_reactor.conf
# Wires the so/pillar/changed event tag — emitted by the pg_notify_pillar
# engine — to the so_pillar_changed reactor, which dispatches to
# orch.so_pillar_reload.
#
# Lives in its own file (rather than appended to reactor_hypervisor.conf)
# so the postgres:so_pillar:enabled flag can flip it on/off independently
# of hypervisor reactor wiring.
reactor:
- 'so/pillar/changed':
- /opt/so/saltstack/default/salt/reactor/so_pillar_changed.sls
@@ -0,0 +1,81 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Deploys the pg_notify_pillar engine module + its master.d config so the
# salt-master subscribes to so_pillar.change_queue and republishes changes
# on the salt event bus as so/pillar/changed. Reactor (so_pillar_changed.sls)
# matches that tag and dispatches the appropriate orch.
#
# Gated on the same postgres:so_pillar:enabled flag as the schema and
# ext_pillar config so the three components flip together.
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% if salt['pillar.get']('postgres:so_pillar:enabled', False) %}
pg_notify_pillar_engine_module:
file.managed:
- name: /etc/salt/engines/pg_notify_pillar.py
- source: salt://salt/engines/master/pg_notify_pillar.py
- mode: '0644'
- user: root
- group: root
- makedirs: True
- watch_in:
- service: salt_master_service
pg_notify_pillar_engine_config:
file.managed:
- name: /etc/salt/master.d/pg_notify_pillar_engine.conf
- source: salt://salt/master/files/pg_notify_pillar_engine.conf.jinja
- template: jinja
- mode: '0640'
- user: root
- group: salt
- watch_in:
- service: salt_master_service
pg_notify_pillar_reactor_config:
file.managed:
- name: /etc/salt/master.d/so_pillar_reactor.conf
- source: salt://salt/master/files/so_pillar_reactor.conf
- mode: '0644'
- user: root
- group: root
- watch_in:
- service: salt_master_service
{% else %}
# When the flag flips off, peel everything back so a rollback returns to
# pure-disk pillar with no orphan engine churning on a dead listen socket.
pg_notify_pillar_engine_module_absent:
file.absent:
- name: /etc/salt/engines/pg_notify_pillar.py
- watch_in:
- service: salt_master_service
pg_notify_pillar_engine_config_absent:
file.absent:
- name: /etc/salt/master.d/pg_notify_pillar_engine.conf
- watch_in:
- service: salt_master_service
pg_notify_pillar_reactor_config_absent:
file.absent:
- name: /etc/salt/master.d/so_pillar_reactor.conf
- watch_in:
- service: salt_master_service
{% endif %}
{% else %}
{{sls}}_state_not_allowed:
test.fail_without_changes:
- name: {{sls}}_state_not_allowed
{% endif %}
+32
View File
@@ -1057,6 +1057,11 @@ generate_passwords(){
POSTGRESPASS=$(get_random_value)
SOCSRVKEY=$(get_random_value 64)
IMPORTPASS=$(get_random_value)
# postsalt: salt-master connects to so_pillar.* as so_pillar_master, and the
# so-postgres container needs a symmetric key for pgcrypto-encrypted secrets.
# Both are generated here so they survive reinstall like the other secrets.
PILLARMASTERPASS=$(get_random_value)
SO_PILLAR_KEY=$(get_random_value 64)
}
generate_interface_vars() {
@@ -1853,7 +1858,34 @@ secrets_pillar(){
"secrets:"\
" import_pass: $IMPORTPASS"\
" influx_pass: $INFLUXPASS"\
" pillar_master_pass: $PILLARMASTERPASS"\
" postgres_pass: $POSTGRESPASS" > $local_salt_dir/pillar/secrets.sls
elif ! grep -q '^[[:space:]]*pillar_master_pass:' $local_salt_dir/pillar/secrets.sls; then
# Existing install pre-postsalt — append the new key without disturbing
# the values already on disk. Keys we already wrote stay; only the new
# pillar_master_pass is added.
info "Appending pillar_master_pass to existing Secrets Pillar"
if [ -z "$PILLARMASTERPASS" ]; then
PILLARMASTERPASS=$(get_random_value)
fi
printf ' pillar_master_pass: %s\n' "$PILLARMASTERPASS" >> $local_salt_dir/pillar/secrets.sls
fi
# postsalt: write the so_pillar pgcrypto master key to a 0400 file owned by
# root. The key itself is never read by Salt — schema_pillar.sls loads it
# into the so-postgres container via ALTER ROLE so_pillar_secret_owner SET
# so_pillar.master_key = '<key>'; the file just lets the value survive
# container restarts.
if [ ! -f /opt/so/conf/postgres/so_pillar.key ]; then
info "Generating so_pillar pgcrypto master key"
mkdir -p /opt/so/conf/postgres
if [ -z "$SO_PILLAR_KEY" ]; then
SO_PILLAR_KEY=$(get_random_value 64)
fi
umask 077
printf '%s' "$SO_PILLAR_KEY" > /opt/so/conf/postgres/so_pillar.key
chmod 0400 /opt/so/conf/postgres/so_pillar.key
chown root:root /opt/so/conf/postgres/so_pillar.key
fi
}