mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2026-06-05 18:05:26 +02:00
Replace inotify pillar watch with postgres audit_settings beacon
The active-push feature detected pillar/settings changes via an inotify beacon on the manager watching /opt/so/saltstack/local/pillar. Replace that pillar watch with a custom salt beacon (pillar_db) that polls the SOC so_soc.audit_settings table on a monotonic id watermark, so changes made through SOC drive immediate pushes from the database instead of the files. The suricata/strelka rule inotify watches (and pyinotify) are kept unchanged, since rule-file edits are not recorded in audit_settings. - salt/_beacons/pillar_db.py: new beacon. Polls audit_settings via `docker exec so-postgres psql` (unix-socket trust auth), tracks the last processed id in /opt/so/state/pillar_db_watch.id, seeds to MAX(id) on first run (no history replay), and emits one event per new row. - salt/reactor/push_pillar.sls: consume setting_id/node_id from the beacon event instead of a file path. App = first dotted segment of setting_id, looked up in pillar_push_map.yaml. Empty node_id -> grid-wide actions as is; populated node_id -> the app's state(s) retargeted to that one node. - salt/manager/files/beacons_pushstate.conf.jinja: drop the pillar inotify block, add the pillar_db beacon (interval = push.drain_interval); keep the suricata/strelka inotify watches. - salt/salt/files/reactor_pushstate.conf: map salt/beacon/*/pillar_db/ audit_settings to push_pillar.sls; remove the pillar inotify reactor lines; keep suricata/strelka. The intent -> so-push-drainer -> orch.push_batch pipeline is unchanged. Verified end-to-end on a standalone: a grid-wide telegraf.output change re-applied telegraf fleetwide (container replaced), and a per-host ntp.config.servers change applied ntp to only that node.
This commit is contained in:
@@ -0,0 +1,142 @@
|
||||
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
||||
# https://securityonion.net/license; you may not use this file except in compliance with the
|
||||
# Elastic License 2.0.
|
||||
|
||||
# Custom salt beacon that watches the SOC audit_settings table in postgres for
|
||||
# new settings changes and emits a beacon event per new row. This replaces the
|
||||
# inotify watch on /opt/so/saltstack/local/pillar -- instead of monitoring pillar
|
||||
# files on disk, we monitor the so_soc.audit_settings table that SOC writes to.
|
||||
#
|
||||
# Detection is poll-based with a monotonic `id` watermark persisted to
|
||||
# WATERMARK_FILE: each pass selects rows with id greater than the last id seen,
|
||||
# which makes it self-healing (a missed poll simply catches up on the next one).
|
||||
#
|
||||
# Each emitted event carries setting_id and node_id; the push_pillar reactor maps
|
||||
# setting_id -> app via pillar_push_map.yaml and writes a push intent, after which
|
||||
# the existing so-push-drainer / orch.push_batch pipeline takes over unchanged.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
WATERMARK_FILE = '/opt/so/state/pillar_db_watch.id'
|
||||
CONTAINER = 'so-postgres'
|
||||
DATABASE = 'so_soc'
|
||||
|
||||
# Unaligned, tuples-only psql output with a field separator that cannot appear in
|
||||
# an id/setting_id/node_id, so we can split each row reliably.
|
||||
FIELD_SEP = '\x1f'
|
||||
|
||||
|
||||
def __virtual__():
|
||||
return True
|
||||
|
||||
|
||||
def validate(config):
|
||||
return True, 'valid'
|
||||
|
||||
|
||||
def _read_watermark():
|
||||
# Returns the last processed id, or None if the watermark has not been seeded.
|
||||
try:
|
||||
with open(WATERMARK_FILE, 'r') as f:
|
||||
return int((f.read() or '').strip())
|
||||
except (IOError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _write_watermark(value):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(WATERMARK_FILE), exist_ok=True)
|
||||
tmp = WATERMARK_FILE + '.tmp'
|
||||
with open(tmp, 'w') as f:
|
||||
f.write(str(int(value)))
|
||||
os.rename(tmp, WATERMARK_FILE)
|
||||
except OSError:
|
||||
log.exception('pillar_db beacon: failed to persist watermark to %s', WATERMARK_FILE)
|
||||
|
||||
|
||||
def _query(sql):
|
||||
# Run a query against so_soc inside the so-postgres container over the unix
|
||||
# socket (trust auth, no password). Returns stdout on success, or None on any
|
||||
# failure so the caller can no-op and retry on the next interval.
|
||||
cmd = [
|
||||
'docker', 'exec', CONTAINER,
|
||||
'psql', '-U', 'postgres', '-d', DATABASE,
|
||||
'-tA', '-F', FIELD_SEP, '-c', sql,
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
except subprocess.TimeoutExpired:
|
||||
log.warning('pillar_db beacon: psql timed out')
|
||||
return None
|
||||
except Exception:
|
||||
log.exception('pillar_db beacon: failed to exec psql')
|
||||
return None
|
||||
if result.returncode != 0:
|
||||
log.warning('pillar_db beacon: psql failed (rc=%s): %s',
|
||||
result.returncode, (result.stderr or '').strip())
|
||||
return None
|
||||
return result.stdout
|
||||
|
||||
|
||||
def beacon(config):
|
||||
retval = []
|
||||
|
||||
watermark = _read_watermark()
|
||||
|
||||
# First run / missing watermark: seed to the current MAX(id) and emit nothing
|
||||
# so we never replay the entire settings history into a fleetwide push.
|
||||
if watermark is None:
|
||||
seed = _query('SELECT COALESCE(MAX(id), 0) FROM audit_settings;')
|
||||
if seed is None:
|
||||
return retval # postgres not ready yet; retry next interval
|
||||
try:
|
||||
_write_watermark(int((seed or '0').strip() or 0))
|
||||
except ValueError:
|
||||
log.warning('pillar_db beacon: could not parse MAX(id) seed: %r', seed)
|
||||
return retval
|
||||
|
||||
rows = _query(
|
||||
"SELECT id, setting_id, COALESCE(node_id, '') FROM audit_settings "
|
||||
"WHERE id > %d ORDER BY id;" % watermark
|
||||
)
|
||||
if rows is None:
|
||||
return retval
|
||||
|
||||
max_id = watermark
|
||||
for line in rows.splitlines():
|
||||
# Do NOT str.strip() the whole line: Python treats the \x1f field
|
||||
# separator (and \x1c-\x1e) as whitespace, so stripping would eat an
|
||||
# empty trailing node_id field and make the row look malformed.
|
||||
if not line.strip():
|
||||
continue
|
||||
parts = line.split(FIELD_SEP)
|
||||
if len(parts) < 3:
|
||||
log.warning('pillar_db beacon: skipping malformed row: %r', line)
|
||||
continue
|
||||
try:
|
||||
row_id = int(parts[0])
|
||||
except ValueError:
|
||||
log.warning('pillar_db beacon: skipping row with non-int id: %r', line)
|
||||
continue
|
||||
setting_id = parts[1]
|
||||
node_id = parts[2]
|
||||
retval.append({
|
||||
'tag': 'audit_settings',
|
||||
'id': row_id,
|
||||
'setting_id': setting_id,
|
||||
'node_id': node_id,
|
||||
})
|
||||
if row_id > max_id:
|
||||
max_id = row_id
|
||||
|
||||
if max_id > watermark:
|
||||
_write_watermark(max_id)
|
||||
log.info('pillar_db beacon: emitted %d change(s), watermark %d -> %d',
|
||||
len(retval), watermark, max_id)
|
||||
|
||||
return retval
|
||||
@@ -1,4 +1,8 @@
|
||||
{% from 'global/map.jinja' import GLOBALMERGED %}
|
||||
beacons:
|
||||
pillar_db:
|
||||
- interval: {{ GLOBALMERGED.push.drain_interval }}
|
||||
- disable_during_state_run: True
|
||||
inotify:
|
||||
- disable_during_state_run: True
|
||||
- coalesce: True
|
||||
@@ -35,19 +39,3 @@ beacons:
|
||||
regex: True
|
||||
- '/\.#':
|
||||
regex: True
|
||||
/opt/so/saltstack/local/pillar:
|
||||
mask:
|
||||
- close_write
|
||||
- moved_to
|
||||
- delete
|
||||
recurse: True
|
||||
auto_add: True
|
||||
exclude:
|
||||
- '\.sw[a-z]$':
|
||||
regex: True
|
||||
- '~$':
|
||||
regex: True
|
||||
- '/4913$':
|
||||
regex: True
|
||||
- '/\.#':
|
||||
regex: True
|
||||
|
||||
@@ -1,17 +1,21 @@
|
||||
#!py
|
||||
|
||||
# Reactor invoked by the inotify beacon on pillar file changes under
|
||||
# /opt/so/saltstack/local/pillar/.
|
||||
# Reactor invoked by the pillar_db beacon when SOC records settings changes in
|
||||
# the so_soc.audit_settings table (see salt/_beacons/pillar_db.py). The beacon
|
||||
# emits one event per new row carrying setting_id and node_id.
|
||||
#
|
||||
# Two branches:
|
||||
# A) per-minion override under pillar/minions/<id>.sls or adv_<id>.sls
|
||||
# -> write an intent that runs state.highstate on just that minion.
|
||||
# B) shared app pillar (pillar/<app>/...) -> look up <app> in
|
||||
# pillar_push_map.yaml and write an intent with the entry's actions.
|
||||
# Two branches, keyed on node_id:
|
||||
# A) node_id populated -> the change is scoped to that one minion. Look up the
|
||||
# app in pillar_push_map.yaml and write an intent that runs the app's mapped
|
||||
# state(s) targeted to just that node.
|
||||
# B) node_id empty -> grid-wide app change. Look up the app in
|
||||
# pillar_push_map.yaml and write an intent with the entry's actions as-is.
|
||||
#
|
||||
# The app name is the first dotted segment of setting_id (e.g. "telegraf.output"
|
||||
# -> "telegraf"), which matches the pillar_push_map.yaml keys 1:1.
|
||||
#
|
||||
# Reactors never dispatch directly. The so-push-drainer schedule picks up
|
||||
# ready intents, dedupes across pending files, and dispatches orch.push_batch.
|
||||
# See plan /home/mreeves/.claude/plans/goofy-marinating-hummingbird.md.
|
||||
|
||||
import fcntl
|
||||
import json
|
||||
@@ -28,9 +32,6 @@ PENDING_DIR = '/opt/so/state/push_pending'
|
||||
LOCK_FILE = os.path.join(PENDING_DIR, '.lock')
|
||||
MAX_PATHS = 20
|
||||
|
||||
PILLAR_ROOT = '/opt/so/saltstack/local/pillar/'
|
||||
MINIONS_PREFIX = PILLAR_ROOT + 'minions/'
|
||||
|
||||
# The pillar_push_map.yaml is shipped via salt:// but the reactor runs on the
|
||||
# master, which mounts the default saltstack tree at this path.
|
||||
PUSH_MAP_PATH = '/opt/so/saltstack/default/salt/reactor/pillar_push_map.yaml'
|
||||
@@ -107,24 +108,25 @@ def _write_intent(key, actions, path):
|
||||
os.close(lock_fd)
|
||||
|
||||
|
||||
def _minion_id_from_path(path):
|
||||
# path is e.g. /opt/so/saltstack/local/pillar/minions/sensor1.sls
|
||||
# or /opt/so/saltstack/local/pillar/minions/adv_sensor1.sls
|
||||
filename = os.path.basename(path)
|
||||
if not filename.endswith('.sls'):
|
||||
def _app_from_setting(setting_id):
|
||||
# setting_id is e.g. 'telegraf.output' -> 'telegraf', 'ntp.config.servers' -> 'ntp'
|
||||
if not setting_id:
|
||||
return None
|
||||
stem = filename[:-4]
|
||||
if stem.startswith('adv_'):
|
||||
stem = stem[4:]
|
||||
return stem or None
|
||||
return setting_id.split('.', 1)[0] or None
|
||||
|
||||
|
||||
def _app_from_path(path):
|
||||
# path is e.g. /opt/so/saltstack/local/pillar/zeek/soc_zeek.sls -> 'zeek'
|
||||
remainder = path[len(PILLAR_ROOT):]
|
||||
if '/' not in remainder:
|
||||
return None
|
||||
return remainder.split('/', 1)[0] or None
|
||||
def _node_actions(entry, node_id):
|
||||
# Copy the app's mapped actions but retarget each one to the single node.
|
||||
# Preserves the state/highstate selection and any batch/batch_wait overrides.
|
||||
actions = []
|
||||
for action in entry:
|
||||
if not isinstance(action, dict):
|
||||
continue
|
||||
node_action = dict(action)
|
||||
node_action['tgt'] = node_id
|
||||
node_action['tgt_type'] = 'glob'
|
||||
actions.append(node_action)
|
||||
return actions
|
||||
|
||||
|
||||
def run():
|
||||
@@ -132,39 +134,43 @@ def run():
|
||||
LOG.info('push_pillar: push disabled, skipping')
|
||||
return {}
|
||||
|
||||
path = data.get('path', '') # noqa: F821 -- data provided by reactor
|
||||
if not path or not path.startswith(PILLAR_ROOT):
|
||||
LOG.debug('push_pillar: ignoring path outside pillar root: %s', path)
|
||||
return {}
|
||||
# The pillar_db beacon nests its payload under data['data']; fall back to the
|
||||
# top level so the reactor is robust to either shape.
|
||||
event = data.get('data', data) # noqa: F821 -- data provided by reactor
|
||||
setting_id = event.get('setting_id', '')
|
||||
node_id = (event.get('node_id') or '').strip()
|
||||
|
||||
# Branch A: per-minion override
|
||||
if path.startswith(MINIONS_PREFIX):
|
||||
minion_id = _minion_id_from_path(path)
|
||||
if not minion_id:
|
||||
LOG.debug('push_pillar: ignoring non-sls path under minions/: %s', path)
|
||||
return {}
|
||||
actions = [{'highstate': True, 'tgt': minion_id, 'tgt_type': 'glob'}]
|
||||
_write_intent('minion_{}'.format(minion_id), actions, path)
|
||||
LOG.info('push_pillar: per-minion intent updated for %s (path=%s)', minion_id, path)
|
||||
return {}
|
||||
|
||||
# Branch B: shared app pillar -> allowlist lookup
|
||||
app = _app_from_path(path)
|
||||
app = _app_from_setting(setting_id)
|
||||
if not app:
|
||||
LOG.debug('push_pillar: ignoring path with no app segment: %s', path)
|
||||
LOG.debug('push_pillar: ignoring event with no app segment: setting_id=%s', setting_id)
|
||||
return {}
|
||||
|
||||
push_map = _load_push_map()
|
||||
entry = push_map.get(app)
|
||||
if not entry:
|
||||
LOG.warning(
|
||||
'push_pillar: pillar dir "%s" is not in pillar_push_map.yaml; '
|
||||
'change will be picked up at the next scheduled highstate (path=%s)',
|
||||
app, path,
|
||||
'push_pillar: app "%s" is not in pillar_push_map.yaml; change will be '
|
||||
'picked up at the next scheduled highstate (setting_id=%s)',
|
||||
app, setting_id,
|
||||
)
|
||||
return {}
|
||||
|
||||
# Branch A: per-node change -> retarget the app's states to just that node.
|
||||
if node_id:
|
||||
actions = _node_actions(entry, node_id)
|
||||
if not actions:
|
||||
LOG.warning('push_pillar: no usable actions for app "%s" (setting_id=%s)', app, setting_id)
|
||||
return {}
|
||||
_write_intent(
|
||||
'node_{}_{}'.format(node_id, app), actions,
|
||||
'audit:{}@{}'.format(setting_id, node_id),
|
||||
)
|
||||
LOG.info('push_pillar: per-node intent updated for %s on %s (setting_id=%s)',
|
||||
app, node_id, setting_id)
|
||||
return {}
|
||||
|
||||
# Branch B: grid-wide app change -> use the map entry's actions as-is.
|
||||
actions = list(entry) # copy to avoid mutating the cache
|
||||
_write_intent('pillar_{}'.format(app), actions, path)
|
||||
LOG.info('push_pillar: app intent updated for %s (path=%s)', app, path)
|
||||
_write_intent('pillar_{}'.format(app), actions, 'audit:{}'.format(setting_id))
|
||||
LOG.info('push_pillar: app intent updated for %s (setting_id=%s)', app, setting_id)
|
||||
return {}
|
||||
|
||||
@@ -7,7 +7,5 @@ reactor:
|
||||
- salt://reactor/push_strelka.sls
|
||||
- 'salt/beacon/*/inotify//opt/so/saltstack/local/salt/strelka/rules/compiled/*':
|
||||
- salt://reactor/push_strelka.sls
|
||||
- 'salt/beacon/*/inotify//opt/so/saltstack/local/pillar':
|
||||
- salt://reactor/push_pillar.sls
|
||||
- 'salt/beacon/*/inotify//opt/so/saltstack/local/pillar/*':
|
||||
- 'salt/beacon/*/pillar_db/audit_settings':
|
||||
- salt://reactor/push_pillar.sls
|
||||
|
||||
Reference in New Issue
Block a user