diff --git a/salt/_beacons/pillar_db.py b/salt/_beacons/pillar_db.py new file mode 100644 index 000000000..9022d9b87 --- /dev/null +++ b/salt/_beacons/pillar_db.py @@ -0,0 +1,142 @@ +# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one +# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at +# https://securityonion.net/license; you may not use this file except in compliance with the +# Elastic License 2.0. + +# Custom salt beacon that watches the SOC audit_settings table in postgres for +# new settings changes and emits a beacon event per new row. This replaces the +# inotify watch on /opt/so/saltstack/local/pillar -- instead of monitoring pillar +# files on disk, we monitor the so_soc.audit_settings table that SOC writes to. +# +# Detection is poll-based with a monotonic `id` watermark persisted to +# WATERMARK_FILE: each pass selects rows with id greater than the last id seen, +# which makes it self-healing (a missed poll simply catches up on the next one). +# +# Each emitted event carries setting_id and node_id; the push_pillar reactor maps +# setting_id -> app via pillar_push_map.yaml and writes a push intent, after which +# the existing so-push-drainer / orch.push_batch pipeline takes over unchanged. + +import logging +import os +import subprocess + +log = logging.getLogger(__name__) + +WATERMARK_FILE = '/opt/so/state/pillar_db_watch.id' +CONTAINER = 'so-postgres' +DATABASE = 'so_soc' + +# Unaligned, tuples-only psql output with a field separator that cannot appear in +# an id/setting_id/node_id, so we can split each row reliably. +FIELD_SEP = '\x1f' + + +def __virtual__(): + return True + + +def validate(config): + return True, 'valid' + + +def _read_watermark(): + # Returns the last processed id, or None if the watermark has not been seeded. + try: + with open(WATERMARK_FILE, 'r') as f: + return int((f.read() or '').strip()) + except (IOError, ValueError): + return None + + +def _write_watermark(value): + try: + os.makedirs(os.path.dirname(WATERMARK_FILE), exist_ok=True) + tmp = WATERMARK_FILE + '.tmp' + with open(tmp, 'w') as f: + f.write(str(int(value))) + os.rename(tmp, WATERMARK_FILE) + except OSError: + log.exception('pillar_db beacon: failed to persist watermark to %s', WATERMARK_FILE) + + +def _query(sql): + # Run a query against so_soc inside the so-postgres container over the unix + # socket (trust auth, no password). Returns stdout on success, or None on any + # failure so the caller can no-op and retry on the next interval. + cmd = [ + 'docker', 'exec', CONTAINER, + 'psql', '-U', 'postgres', '-d', DATABASE, + '-tA', '-F', FIELD_SEP, '-c', sql, + ] + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + except subprocess.TimeoutExpired: + log.warning('pillar_db beacon: psql timed out') + return None + except Exception: + log.exception('pillar_db beacon: failed to exec psql') + return None + if result.returncode != 0: + log.warning('pillar_db beacon: psql failed (rc=%s): %s', + result.returncode, (result.stderr or '').strip()) + return None + return result.stdout + + +def beacon(config): + retval = [] + + watermark = _read_watermark() + + # First run / missing watermark: seed to the current MAX(id) and emit nothing + # so we never replay the entire settings history into a fleetwide push. + if watermark is None: + seed = _query('SELECT COALESCE(MAX(id), 0) FROM audit_settings;') + if seed is None: + return retval # postgres not ready yet; retry next interval + try: + _write_watermark(int((seed or '0').strip() or 0)) + except ValueError: + log.warning('pillar_db beacon: could not parse MAX(id) seed: %r', seed) + return retval + + rows = _query( + "SELECT id, setting_id, COALESCE(node_id, '') FROM audit_settings " + "WHERE id > %d ORDER BY id;" % watermark + ) + if rows is None: + return retval + + max_id = watermark + for line in rows.splitlines(): + # Do NOT str.strip() the whole line: Python treats the \x1f field + # separator (and \x1c-\x1e) as whitespace, so stripping would eat an + # empty trailing node_id field and make the row look malformed. + if not line.strip(): + continue + parts = line.split(FIELD_SEP) + if len(parts) < 3: + log.warning('pillar_db beacon: skipping malformed row: %r', line) + continue + try: + row_id = int(parts[0]) + except ValueError: + log.warning('pillar_db beacon: skipping row with non-int id: %r', line) + continue + setting_id = parts[1] + node_id = parts[2] + retval.append({ + 'tag': 'audit_settings', + 'id': row_id, + 'setting_id': setting_id, + 'node_id': node_id, + }) + if row_id > max_id: + max_id = row_id + + if max_id > watermark: + _write_watermark(max_id) + log.info('pillar_db beacon: emitted %d change(s), watermark %d -> %d', + len(retval), watermark, max_id) + + return retval diff --git a/salt/manager/files/beacons_pushstate.conf.jinja b/salt/manager/files/beacons_pushstate.conf.jinja index 7eca37969..ce9259359 100644 --- a/salt/manager/files/beacons_pushstate.conf.jinja +++ b/salt/manager/files/beacons_pushstate.conf.jinja @@ -1,4 +1,8 @@ +{% from 'global/map.jinja' import GLOBALMERGED %} beacons: + pillar_db: + - interval: {{ GLOBALMERGED.push.drain_interval }} + - disable_during_state_run: True inotify: - disable_during_state_run: True - coalesce: True @@ -35,19 +39,3 @@ beacons: regex: True - '/\.#': regex: True - /opt/so/saltstack/local/pillar: - mask: - - close_write - - moved_to - - delete - recurse: True - auto_add: True - exclude: - - '\.sw[a-z]$': - regex: True - - '~$': - regex: True - - '/4913$': - regex: True - - '/\.#': - regex: True diff --git a/salt/reactor/push_pillar.sls b/salt/reactor/push_pillar.sls index 0e7fd40eb..06ee474df 100644 --- a/salt/reactor/push_pillar.sls +++ b/salt/reactor/push_pillar.sls @@ -1,17 +1,21 @@ #!py -# Reactor invoked by the inotify beacon on pillar file changes under -# /opt/so/saltstack/local/pillar/. +# Reactor invoked by the pillar_db beacon when SOC records settings changes in +# the so_soc.audit_settings table (see salt/_beacons/pillar_db.py). The beacon +# emits one event per new row carrying setting_id and node_id. # -# Two branches: -# A) per-minion override under pillar/minions/.sls or adv_.sls -# -> write an intent that runs state.highstate on just that minion. -# B) shared app pillar (pillar//...) -> look up in -# pillar_push_map.yaml and write an intent with the entry's actions. +# Two branches, keyed on node_id: +# A) node_id populated -> the change is scoped to that one minion. Look up the +# app in pillar_push_map.yaml and write an intent that runs the app's mapped +# state(s) targeted to just that node. +# B) node_id empty -> grid-wide app change. Look up the app in +# pillar_push_map.yaml and write an intent with the entry's actions as-is. +# +# The app name is the first dotted segment of setting_id (e.g. "telegraf.output" +# -> "telegraf"), which matches the pillar_push_map.yaml keys 1:1. # # Reactors never dispatch directly. The so-push-drainer schedule picks up # ready intents, dedupes across pending files, and dispatches orch.push_batch. -# See plan /home/mreeves/.claude/plans/goofy-marinating-hummingbird.md. import fcntl import json @@ -28,9 +32,6 @@ PENDING_DIR = '/opt/so/state/push_pending' LOCK_FILE = os.path.join(PENDING_DIR, '.lock') MAX_PATHS = 20 -PILLAR_ROOT = '/opt/so/saltstack/local/pillar/' -MINIONS_PREFIX = PILLAR_ROOT + 'minions/' - # The pillar_push_map.yaml is shipped via salt:// but the reactor runs on the # master, which mounts the default saltstack tree at this path. PUSH_MAP_PATH = '/opt/so/saltstack/default/salt/reactor/pillar_push_map.yaml' @@ -107,24 +108,25 @@ def _write_intent(key, actions, path): os.close(lock_fd) -def _minion_id_from_path(path): - # path is e.g. /opt/so/saltstack/local/pillar/minions/sensor1.sls - # or /opt/so/saltstack/local/pillar/minions/adv_sensor1.sls - filename = os.path.basename(path) - if not filename.endswith('.sls'): +def _app_from_setting(setting_id): + # setting_id is e.g. 'telegraf.output' -> 'telegraf', 'ntp.config.servers' -> 'ntp' + if not setting_id: return None - stem = filename[:-4] - if stem.startswith('adv_'): - stem = stem[4:] - return stem or None + return setting_id.split('.', 1)[0] or None -def _app_from_path(path): - # path is e.g. /opt/so/saltstack/local/pillar/zeek/soc_zeek.sls -> 'zeek' - remainder = path[len(PILLAR_ROOT):] - if '/' not in remainder: - return None - return remainder.split('/', 1)[0] or None +def _node_actions(entry, node_id): + # Copy the app's mapped actions but retarget each one to the single node. + # Preserves the state/highstate selection and any batch/batch_wait overrides. + actions = [] + for action in entry: + if not isinstance(action, dict): + continue + node_action = dict(action) + node_action['tgt'] = node_id + node_action['tgt_type'] = 'glob' + actions.append(node_action) + return actions def run(): @@ -132,39 +134,43 @@ def run(): LOG.info('push_pillar: push disabled, skipping') return {} - path = data.get('path', '') # noqa: F821 -- data provided by reactor - if not path or not path.startswith(PILLAR_ROOT): - LOG.debug('push_pillar: ignoring path outside pillar root: %s', path) - return {} + # The pillar_db beacon nests its payload under data['data']; fall back to the + # top level so the reactor is robust to either shape. + event = data.get('data', data) # noqa: F821 -- data provided by reactor + setting_id = event.get('setting_id', '') + node_id = (event.get('node_id') or '').strip() - # Branch A: per-minion override - if path.startswith(MINIONS_PREFIX): - minion_id = _minion_id_from_path(path) - if not minion_id: - LOG.debug('push_pillar: ignoring non-sls path under minions/: %s', path) - return {} - actions = [{'highstate': True, 'tgt': minion_id, 'tgt_type': 'glob'}] - _write_intent('minion_{}'.format(minion_id), actions, path) - LOG.info('push_pillar: per-minion intent updated for %s (path=%s)', minion_id, path) - return {} - - # Branch B: shared app pillar -> allowlist lookup - app = _app_from_path(path) + app = _app_from_setting(setting_id) if not app: - LOG.debug('push_pillar: ignoring path with no app segment: %s', path) + LOG.debug('push_pillar: ignoring event with no app segment: setting_id=%s', setting_id) return {} push_map = _load_push_map() entry = push_map.get(app) if not entry: LOG.warning( - 'push_pillar: pillar dir "%s" is not in pillar_push_map.yaml; ' - 'change will be picked up at the next scheduled highstate (path=%s)', - app, path, + 'push_pillar: app "%s" is not in pillar_push_map.yaml; change will be ' + 'picked up at the next scheduled highstate (setting_id=%s)', + app, setting_id, ) return {} + # Branch A: per-node change -> retarget the app's states to just that node. + if node_id: + actions = _node_actions(entry, node_id) + if not actions: + LOG.warning('push_pillar: no usable actions for app "%s" (setting_id=%s)', app, setting_id) + return {} + _write_intent( + 'node_{}_{}'.format(node_id, app), actions, + 'audit:{}@{}'.format(setting_id, node_id), + ) + LOG.info('push_pillar: per-node intent updated for %s on %s (setting_id=%s)', + app, node_id, setting_id) + return {} + + # Branch B: grid-wide app change -> use the map entry's actions as-is. actions = list(entry) # copy to avoid mutating the cache - _write_intent('pillar_{}'.format(app), actions, path) - LOG.info('push_pillar: app intent updated for %s (path=%s)', app, path) + _write_intent('pillar_{}'.format(app), actions, 'audit:{}'.format(setting_id)) + LOG.info('push_pillar: app intent updated for %s (setting_id=%s)', app, setting_id) return {} diff --git a/salt/salt/files/reactor_pushstate.conf b/salt/salt/files/reactor_pushstate.conf index ceab284e2..991c4d516 100644 --- a/salt/salt/files/reactor_pushstate.conf +++ b/salt/salt/files/reactor_pushstate.conf @@ -7,7 +7,5 @@ reactor: - salt://reactor/push_strelka.sls - 'salt/beacon/*/inotify//opt/so/saltstack/local/salt/strelka/rules/compiled/*': - salt://reactor/push_strelka.sls - - 'salt/beacon/*/inotify//opt/so/saltstack/local/pillar': - - salt://reactor/push_pillar.sls - - 'salt/beacon/*/inotify//opt/so/saltstack/local/pillar/*': + - 'salt/beacon/*/pillar_db/audit_settings': - salt://reactor/push_pillar.sls