Compare commits

...

7 Commits

Author SHA1 Message Date
Josh Brower 125610ed42 Additional test coverage 2026-05-12 10:11:22 -04:00
Josh Brower 306b0af4d0 Initial commit 2026-05-12 09:55:06 -04:00
Josh Brower 006ac31109 Merge pull request #15579 from marcopedrinazzi/3/dev
New Sigma rules pipeline mapping for M365 and Fortigate
2026-05-11 21:03:53 +02:00
Josh Brower 49a643fff4 Merge pull request #15875 from Security-Onion-Solutions/3/sigma-fp-os
proc_creation per OS type
2026-05-08 15:13:14 +02:00
Josh Brower e1d830da76 proc_creation per OS type 2026-05-08 09:11:24 -04:00
Josh Brower e847c46129 Merge pull request #15872 from Security-Onion-Solutions/3/soc-logs
cleanup status code
2026-05-07 19:01:24 +02:00
Marco Pedrinazzi d7e971a0fc m365 and fortigate mappings sigma 2026-03-11 14:49:40 +01:00
3 changed files with 1106 additions and 1 deletions
+381
View File
@@ -0,0 +1,381 @@
#!/usr/bin/env python3
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
# Imports detection overrides (e.g. from so-detections-backup) into the so-detection
# index. Reads <publicId>.<ext> files (NDJSON, one override per line) from a source
# directory, looks up the matching detection by publicId+engine, validates each
# override against the same rules SOC enforces, dedupes against existing overrides
# (operational fields only), and appends new ones.
import argparse
import ipaddress
import json
import os
import re
import sys
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DEFAULT_INDEX = "so-detection"
AUTH_FILE = "/opt/so/conf/elasticsearch/curl.config"
ES_URL = "https://localhost:9200"
# Engines we know how to handle and the file extension the backup script writes.
ENGINES = {
"suricata": "txt",
}
# Standard Suricata variables that ship with Security Onion. Anything else
# referenced in an override is "custom" and the user needs to make sure it
# exists in SOC Config before the override will function.
BUILTIN_SURICATA_VARS = {
"$HOME_NET", "$EXTERNAL_NET",
"$HTTP_SERVERS", "$DNS_SERVERS", "$SQL_SERVERS", "$SMTP_SERVERS",
"$TELNET_SERVERS", "$AIM_SERVERS", "$DC_SERVERS", "$MODBUS_SERVER",
"$MODBUS_CLIENT", "$ENIP_CLIENT", "$ENIP_SERVER",
"$HTTP_PORTS", "$SHELLCODE_PORTS", "$ORACLE_PORTS", "$SSH_PORTS",
"$FTP_PORTS", "$FILE_DATA_PORTS",
}
VAR_PATTERN = re.compile(r"\$[A-Z_][A-Z0-9_]*")
# Canonical valid values, per securityonion-soc/model/detection.go.
SURICATA_OVERRIDE_TYPES = {"suppress", "threshold", "modify"}
SUPPRESS_TRACKS = {"by_src", "by_dst", "by_either"}
THRESHOLD_TRACKS = {"by_src", "by_dst", "by_both"}
THRESHOLD_TYPES = {"limit", "threshold", "both"}
STALE_WARNING = """\
WARNING: so-detections-backup does not remove backup files when overrides are
deleted via the Security Onion web UI. As a result, files in the source
directory may represent overrides that were intentionally deleted and should
NOT be re-imported.
Before continuing, verify that the source directory reflects the overrides you
actually want imported. Remove any files corresponding to overrides you previously deleted.
"""
def make_session(auth_file):
with open(auth_file, "r") as f:
for line in f:
if line.startswith("user ="):
creds = line.split("=", 1)[1].strip().replace('"', "")
user, _, password = creds.partition(":")
session = requests.Session()
session.auth = HTTPBasicAuth(user, password)
session.headers.update({"Content-Type": "application/json"})
session.verify = False
return session
raise RuntimeError(f"Could not find 'user =' line in {auth_file}")
def find_detection(session, index, public_id, engine):
query = {
"query": {"bool": {"must": [
{"term": {"so_detection.publicId": public_id}},
{"term": {"so_detection.engine": engine}},
]}},
"size": 2,
}
r = session.get(f"{ES_URL}/{index}/_search", json=query)
r.raise_for_status()
hits = r.json().get("hits", {}).get("hits", [])
if not hits:
return None, None, None
if len(hits) > 1:
# Shouldn't happen — publicId is unique per engine — but flag it.
print(f" WARN: {len(hits)} detections matched publicId={public_id} engine={engine}; using first")
hit = hits[0]
existing = hit["_source"].get("so_detection", {}).get("overrides") or []
return hit["_id"], hit["_index"], existing
def update_overrides(session, doc_index, doc_id, overrides):
body = {"doc": {"so_detection": {"overrides": overrides}}}
r = session.post(f"{ES_URL}/{doc_index}/_update/{doc_id}", json=body)
r.raise_for_status()
return r.json()
def dedupe_key(override):
"""Operational fields only, per Override.Equal() in detection.go.
Excludes timestamps and isEnabled so re-imports don't appear unique."""
t = override.get("type")
if t == "suppress":
return (t, override.get("track"), override.get("ip"))
if t == "threshold":
return (t, override.get("thresholdType"), override.get("track"),
override.get("count"), override.get("seconds"))
if t == "modify":
return (t, override.get("regex"), override.get("value"))
def _validate_suricata_ip(ip):
if not ip:
return "ip cannot be empty"
if ip.startswith("$"):
return None
if ip.startswith("[") and ip.endswith("]"):
for part in ip[1:-1].split(","):
err = _validate_single_ip(part.strip())
if err:
return f"invalid IP in list: {err}"
return None
return _validate_single_ip(ip)
def _validate_single_ip(ip):
try:
if "/" in ip:
ipaddress.ip_network(ip, strict=False)
else:
ipaddress.ip_address(ip)
except ValueError:
return f"invalid IP/CIDR {ip!r}"
return None
def validate_override(override, engine):
"""Mirror Override.Validate() from securityonion-soc/model/detection.go.
Returns None on success, an error string otherwise."""
t = override.get("type")
if not t:
return "override type is required"
if t not in SURICATA_OVERRIDE_TYPES:
return f"invalid type {t!r}: must be one of {sorted(SURICATA_OVERRIDE_TYPES)}"
has = {k: override.get(k) is not None for k in
("regex", "value", "thresholdType", "track", "ip", "count", "seconds", "customFilter")}
if t == "suppress":
if not has["ip"] or not has["track"]:
return "suppress requires 'ip' and 'track'"
if any(has[k] for k in ("regex", "value", "thresholdType", "count", "seconds", "customFilter")):
return "suppress has unnecessary fields"
if override["track"] not in SUPPRESS_TRACKS:
return f"invalid track {override['track']!r}: must be one of {sorted(SUPPRESS_TRACKS)}"
return _validate_suricata_ip(override["ip"])
if t == "threshold":
if not all(has[k] for k in ("thresholdType", "track", "count", "seconds")):
return "threshold requires 'thresholdType', 'track', 'count', 'seconds'"
if any(has[k] for k in ("regex", "value", "customFilter")):
return "threshold has unnecessary fields"
if override["thresholdType"] not in THRESHOLD_TYPES:
return f"invalid thresholdType {override['thresholdType']!r}: must be one of {sorted(THRESHOLD_TYPES)}"
if override["track"] not in THRESHOLD_TRACKS:
return f"invalid track {override['track']!r}: must be one of {sorted(THRESHOLD_TRACKS)}"
if not isinstance(override["count"], int) or override["count"] <= 0:
return f"count must be a positive integer, got {override['count']!r}"
if not isinstance(override["seconds"], int) or override["seconds"] <= 0:
return f"seconds must be a positive integer, got {override['seconds']!r}"
return None
if t == "modify":
if not has["regex"] or not has["value"]:
return "modify requires 'regex' and 'value'"
if any(has[k] for k in ("thresholdType", "track", "count", "seconds", "customFilter")):
return "modify has unnecessary fields"
try:
re.compile(override["regex"])
except re.error as e:
return f"invalid regex: {e}"
return None
def parse_overrides_file(path):
"""Parse a file written by so-detections-backup.py: NDJSON, one override
per line. Returns a list of (override_dict, line_number)."""
overrides = []
with open(path, "r") as f:
for i, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
overrides.append((json.loads(line), i))
return overrides
def describe(override):
"""Human-readable summary of the operational fields for a given override type."""
t = override.get("type")
if t == "suppress":
return f"type=suppress track={override.get('track')} ip={override.get('ip')}"
if t == "threshold":
return (f"type=threshold track={override.get('track')} "
f"thresholdType={override.get('thresholdType')} "
f"count={override.get('count')} seconds={override.get('seconds')}")
if t == "modify":
return f"type=modify regex={override.get('regex')!r}"
def collect_custom_vars(override):
found = set()
for value in override.values():
if isinstance(value, str):
for match in VAR_PATTERN.findall(value):
if match not in BUILTIN_SURICATA_VARS:
found.add(match)
return found
def parse_args():
p = argparse.ArgumentParser(
description="Import detection overrides into the so-detection index.",
)
p.add_argument("--source", "-s", required=True,
help="Source directory containing <publicId>.<ext> override files.")
p.add_argument("--engine", "-e", default="suricata", choices=list(ENGINES.keys()),
help="Detection engine (default: suricata).")
p.add_argument("--dry-run", "-n", action="store_true",
help="Print what would happen without writing to Elasticsearch.")
p.add_argument("--no-import-note", action="store_true",
help="Do not prepend '[Imported YYYY-MM-DD] ' to the override note.")
p.add_argument("--index", "-i", default=DEFAULT_INDEX,
help=f"Elasticsearch index to update (default: {DEFAULT_INDEX}).")
return p.parse_args()
def confirm_proceed(args):
"""Show the stale-backup warning. Dry-run prints it and continues. Real
runs require the user typing 'yes' at the prompt."""
print(STALE_WARNING)
if args.dry_run:
print("(dry-run: no acknowledgement required)\n")
return True
answer = input("Type 'yes' to acknowledge and continue: ").strip().lower()
print()
return answer == "yes"
def main():
args = parse_args()
if not os.path.isdir(args.source):
print(f"ERROR: source directory not found: {args.source}", file=sys.stderr)
sys.exit(1)
extension = ENGINES[args.engine]
files = sorted(f for f in os.listdir(args.source) if f.endswith(f".{extension}"))
if not files:
print(f"No *.{extension} files found in {args.source}")
sys.exit(0)
if not confirm_proceed(args):
print("Aborted.")
sys.exit(1)
session = make_session(AUTH_FILE)
today = datetime.now().strftime("%Y-%m-%d")
note_prefix = "" if args.no_import_note else f"[Imported {today}] "
counts = {"added": 0, "skipped_dedupe": 0, "skipped_not_found": 0, "invalid": 0, "error": 0}
custom_vars = set()
mode = "DRY-RUN" if args.dry_run else "IMPORT"
print(f"[{mode}] engine={args.engine} source={args.source} index={args.index}\n")
for filename in files:
public_id = os.path.splitext(filename)[0]
path = os.path.join(args.source, filename)
print(f"{public_id}:")
try:
new_overrides = parse_overrides_file(path)
except (json.JSONDecodeError, OSError) as e:
print(f" ERROR: could not parse {filename}: {e}")
counts["error"] += 1
continue
if not new_overrides:
print(" SKIP: empty file")
continue
try:
doc_id, doc_index, existing = find_detection(session, args.index, public_id, args.engine)
except requests.HTTPError as e:
print(f" ERROR: search failed: {e}")
counts["error"] += 1
continue
if doc_id is None:
print(f" WARN: no detection found for publicId={public_id} engine={args.engine}; skipping")
counts["skipped_not_found"] += len(new_overrides)
continue
existing_keys = {dedupe_key(o) for o in existing}
merged = list(existing)
added_this_file = 0
for override, line_no in new_overrides:
err = validate_override(override, args.engine)
if err:
print(f" INVALID (line {line_no}): {err}")
counts["invalid"] += 1
continue
custom_vars.update(collect_custom_vars(override))
key = dedupe_key(override)
if key in existing_keys:
print(f" SKIP (line {line_no}): duplicate of existing override [{describe(override)}]")
counts["skipped_dedupe"] += 1
continue
if note_prefix:
override = dict(override)
override["note"] = note_prefix + (override.get("note") or "")
merged.append(override)
existing_keys.add(key)
added_this_file += 1
print(f" ADD (line {line_no}): {describe(override)}")
if added_this_file == 0:
continue
if args.dry_run:
print(f" DRY-RUN: would update {doc_index}/{doc_id} "
f"({len(existing)} existing → {len(merged)} total)")
counts["added"] += added_this_file
continue
try:
update_overrides(session, doc_index, doc_id, merged)
print(f" UPDATED {doc_index}/{doc_id} ({len(existing)} → {len(merged)})")
counts["added"] += added_this_file
except requests.HTTPError as e:
print(f" ERROR: update failed: {e}")
counts["error"] += 1
print()
print("=" * 60)
print(f"Summary ({mode}):")
print(f" Overrides added: {counts['added']}")
print(f" Skipped (already present): {counts['skipped_dedupe']}")
print(f" Skipped (no detection): {counts['skipped_not_found']}")
print(f" Invalid (failed checks): {counts['invalid']}")
print(f" Errors: {counts['error']}")
if custom_vars:
print()
print("WARNING: detected custom Suricata variables in imported overrides:")
for v in sorted(custom_vars):
print(f" {v}")
print("If any of these are not already defined in SOC Config (Suricata variables),")
print("you must add them manually before the rules will function correctly.")
sys.exit(0 if counts["error"] == 0 and counts["invalid"] == 0 else 1)
if __name__ == "__main__":
main()
@@ -0,0 +1,588 @@
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
import importlib.util
import json
import os
import shutil
import sys
import tempfile
import unittest
from importlib.machinery import SourceFileLoader
from io import StringIO
from unittest.mock import MagicMock, patch
import requests
# The script has no .py extension; spec_from_file_location can't auto-detect a
# loader, so we hand it a SourceFileLoader explicitly. (load_module() is
# deprecated in 3.14 and slated for removal in 3.15.)
HERE = os.path.dirname(os.path.abspath(__file__))
SCRIPT = os.path.join(HERE, "so-detections-overrides-import")
_loader = SourceFileLoader("so_overrides_import", SCRIPT)
_spec = importlib.util.spec_from_loader("so_overrides_import", _loader)
soi = importlib.util.module_from_spec(_spec)
_loader.exec_module(soi)
class TestValidateSuppress(unittest.TestCase):
def test_valid(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}, "suricata"))
def test_valid_var(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_either", "ip": "$HOME_NET"}, "suricata"))
def test_valid_cidr(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_dst", "ip": "10.0.0.0/8"}, "suricata"))
def test_valid_bracket_list(self):
self.assertIsNone(soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "[1.2.3.4,10.0.0.0/8]"}, "suricata"))
def test_missing_ip(self):
err = soi.validate_override({"type": "suppress", "track": "by_src"}, "suricata")
self.assertIn("requires", err)
def test_missing_track(self):
err = soi.validate_override({"type": "suppress", "ip": "1.2.3.4"}, "suricata")
self.assertIn("requires", err)
def test_invalid_track(self):
err = soi.validate_override(
{"type": "suppress", "track": "by_both", "ip": "1.2.3.4"}, "suricata")
self.assertIn("invalid track", err)
def test_invalid_ip(self):
err = soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "not-an-ip"}, "suricata")
self.assertIn("invalid IP", err)
def test_unnecessary_field(self):
err = soi.validate_override(
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4", "count": 5}, "suricata")
self.assertIn("unnecessary fields", err)
class TestValidateThreshold(unittest.TestCase):
def test_valid(self):
self.assertIsNone(soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": 60,
}, "suricata"))
def test_valid_by_both(self):
self.assertIsNone(soi.validate_override({
"type": "threshold", "track": "by_both",
"thresholdType": "both", "count": 1, "seconds": 1,
}, "suricata"))
def test_track_by_either_invalid(self):
err = soi.validate_override({
"type": "threshold", "track": "by_either",
"thresholdType": "limit", "count": 10, "seconds": 60,
}, "suricata")
self.assertIn("invalid track", err)
def test_invalid_threshold_type(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "bogus", "count": 10, "seconds": 60,
}, "suricata")
self.assertIn("invalid thresholdType", err)
def test_zero_count(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 0, "seconds": 60,
}, "suricata")
self.assertIn("count", err)
def test_negative_seconds(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": -1,
}, "suricata")
self.assertIn("seconds", err)
def test_missing_field(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, # missing seconds
}, "suricata")
self.assertIn("requires", err)
def test_unnecessary_field(self):
err = soi.validate_override({
"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": 60,
"regex": "foo",
}, "suricata")
self.assertIn("unnecessary fields", err)
class TestValidateModify(unittest.TestCase):
def test_valid(self):
self.assertIsNone(soi.validate_override(
{"type": "modify", "regex": r"content:\"foo\"", "value": "content:bar"}, "suricata"))
def test_invalid_regex(self):
err = soi.validate_override(
{"type": "modify", "regex": "(unbalanced", "value": "x"}, "suricata")
self.assertIn("invalid regex", err)
def test_missing_value(self):
err = soi.validate_override({"type": "modify", "regex": "x"}, "suricata")
self.assertIn("requires", err)
def test_unnecessary_field(self):
err = soi.validate_override(
{"type": "modify", "regex": "x", "value": "y", "track": "by_src"}, "suricata")
self.assertIn("unnecessary fields", err)
class TestValidateMisc(unittest.TestCase):
def test_unknown_type(self):
err = soi.validate_override({"type": "suppresss", "track": "by_src", "ip": "1.2.3.4"}, "suricata")
self.assertIn("invalid type", err)
def test_missing_type(self):
err = soi.validate_override({"track": "by_src"}, "suricata")
self.assertIn("type is required", err)
class TestValidateIP(unittest.TestCase):
def test_plain_ipv4(self):
self.assertIsNone(soi._validate_suricata_ip("1.2.3.4"))
def test_plain_ipv6(self):
self.assertIsNone(soi._validate_suricata_ip("::1"))
def test_cidr(self):
self.assertIsNone(soi._validate_suricata_ip("10.0.0.0/8"))
def test_var(self):
self.assertIsNone(soi._validate_suricata_ip("$CONCOURSEWORKERS"))
def test_bracket_list(self):
self.assertIsNone(soi._validate_suricata_ip("[1.2.3.4, 10.0.0.0/8]"))
def test_bracket_list_bad_member(self):
err = soi._validate_suricata_ip("[1.2.3.4,nope]")
self.assertIn("invalid IP in list", err)
def test_empty(self):
self.assertIn("empty", soi._validate_suricata_ip(""))
def test_invalid(self):
self.assertIn("invalid", soi._validate_suricata_ip("999.999.999.999"))
class TestDedupeKey(unittest.TestCase):
def test_suppress(self):
a = {"type": "suppress", "track": "by_src", "ip": "1.2.3.4", "count": 99}
b = {"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}
# count is irrelevant for suppress dedupe
self.assertEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_suppress_differs_on_ip(self):
a = {"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}
b = {"type": "suppress", "track": "by_src", "ip": "5.6.7.8"}
self.assertNotEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_threshold(self):
a = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 10, "seconds": 60, "ip": "ignored"}
b = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 10, "seconds": 60}
self.assertEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_threshold_differs_on_count(self):
a = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 10, "seconds": 60}
b = {"type": "threshold", "track": "by_src", "thresholdType": "limit",
"count": 20, "seconds": 60}
self.assertNotEqual(soi.dedupe_key(a), soi.dedupe_key(b))
def test_modify(self):
a = {"type": "modify", "regex": "x", "value": "y"}
b = {"type": "modify", "regex": "x", "value": "y"}
self.assertEqual(soi.dedupe_key(a), soi.dedupe_key(b))
class TestDescribe(unittest.TestCase):
def test_suppress(self):
s = soi.describe({"type": "suppress", "track": "by_src", "ip": "1.2.3.4"})
self.assertIn("suppress", s)
self.assertIn("by_src", s)
self.assertIn("1.2.3.4", s)
def test_threshold_includes_count(self):
s = soi.describe({"type": "threshold", "track": "by_src",
"thresholdType": "limit", "count": 10, "seconds": 60})
self.assertIn("count=10", s)
self.assertIn("seconds=60", s)
def test_modify(self):
s = soi.describe({"type": "modify", "regex": "foo"})
self.assertIn("modify", s)
self.assertIn("foo", s)
class TestParseOverridesFile(unittest.TestCase):
def _write(self, content):
fd, path = tempfile.mkstemp(suffix=".txt")
os.close(fd)
with open(path, "w") as f:
f.write(content)
self.addCleanup(os.unlink, path)
return path
def test_single_line(self):
path = self._write('{"type":"suppress","track":"by_src","ip":"1.2.3.4"}')
result = soi.parse_overrides_file(path)
self.assertEqual(len(result), 1)
self.assertEqual(result[0][0]["type"], "suppress")
self.assertEqual(result[0][1], 1)
def test_ndjson(self):
path = self._write(
'{"type":"suppress","track":"by_src","ip":"1.2.3.4"}\n'
'{"type":"suppress","track":"by_dst","ip":"5.6.7.8"}\n'
)
result = soi.parse_overrides_file(path)
self.assertEqual(len(result), 2)
self.assertEqual(result[1][1], 2)
def test_empty(self):
path = self._write("")
self.assertEqual(soi.parse_overrides_file(path), [])
def test_blank_lines_skipped(self):
path = self._write('\n{"type":"suppress","track":"by_src","ip":"1.2.3.4"}\n\n')
result = soi.parse_overrides_file(path)
self.assertEqual(len(result), 1)
self.assertEqual(result[0][1], 2) # line number reflects original position
def test_invalid_raises(self):
path = self._write("not json")
with self.assertRaises(json.JSONDecodeError):
soi.parse_overrides_file(path)
class TestCollectCustomVars(unittest.TestCase):
def test_finds_custom(self):
v = soi.collect_custom_vars({"ip": "$CONCOURSEWORKERS"})
self.assertEqual(v, {"$CONCOURSEWORKERS"})
def test_filters_builtins(self):
v = soi.collect_custom_vars({"ip": "$HOME_NET"})
self.assertEqual(v, set())
def test_mixed(self):
v = soi.collect_custom_vars({"ip": "[$HOME_NET,$MYNET]"})
self.assertEqual(v, {"$MYNET"})
def test_non_string_fields_ignored(self):
v = soi.collect_custom_vars({"count": 10, "isEnabled": True})
self.assertEqual(v, set())
class TestMakeSession(unittest.TestCase):
def _write(self, content):
fd, path = tempfile.mkstemp()
os.close(fd)
with open(path, "w") as f:
f.write(content)
self.addCleanup(os.unlink, path)
return path
def test_valid_auth_file(self):
path = self._write('user = "admin:secret"\n')
session = soi.make_session(path)
self.assertEqual(session.auth.username, "admin")
self.assertEqual(session.auth.password, "secret")
self.assertFalse(session.verify)
def test_missing_user_line(self):
path = self._write("# no user line here\n")
with self.assertRaises(RuntimeError):
soi.make_session(path)
class TestFindDetection(unittest.TestCase):
def _session_with_response(self, payload):
session = MagicMock()
response = MagicMock()
response.json.return_value = payload
response.raise_for_status.return_value = None
session.get.return_value = response
return session
def test_found(self):
session = self._session_with_response({"hits": {"hits": [{
"_id": "abc", "_index": "so-detection",
"_source": {"so_detection": {"overrides": [{"type": "suppress"}]}},
}]}})
doc_id, idx, existing = soi.find_detection(session, "so-detection", "2049201", "suricata")
self.assertEqual(doc_id, "abc")
self.assertEqual(idx, "so-detection")
self.assertEqual(len(existing), 1)
def test_not_found(self):
session = self._session_with_response({"hits": {"hits": []}})
doc_id, idx, existing = soi.find_detection(session, "so-detection", "x", "suricata")
self.assertIsNone(doc_id)
self.assertIsNone(idx)
self.assertIsNone(existing)
def test_no_overrides_field(self):
session = self._session_with_response({"hits": {"hits": [{
"_id": "abc", "_index": "so-detection",
"_source": {"so_detection": {}},
}]}})
_, _, existing = soi.find_detection(session, "so-detection", "x", "suricata")
self.assertEqual(existing, [])
def test_multiple_hits_warns(self):
session = self._session_with_response({"hits": {"hits": [
{"_id": "a", "_index": "i", "_source": {"so_detection": {"overrides": []}}},
{"_id": "b", "_index": "i", "_source": {"so_detection": {"overrides": []}}},
]}})
with patch("sys.stdout", new=StringIO()) as out:
doc_id, _, _ = soi.find_detection(session, "i", "x", "suricata")
self.assertEqual(doc_id, "a")
self.assertIn("WARN", out.getvalue())
class TestUpdateOverrides(unittest.TestCase):
def test_posts_to_update_endpoint(self):
session = MagicMock()
response = MagicMock()
response.raise_for_status.return_value = None
response.json.return_value = {"result": "updated"}
session.post.return_value = response
result = soi.update_overrides(session, "so-detection", "abc", [{"type": "suppress"}])
self.assertEqual(result, {"result": "updated"})
url = session.post.call_args[0][0]
self.assertIn("/_update/abc", url)
body = session.post.call_args[1]["json"]
self.assertEqual(body["doc"]["so_detection"]["overrides"], [{"type": "suppress"}])
class TestConfirmProceed(unittest.TestCase):
def test_dry_run_skips_prompt(self):
args = MagicMock(dry_run=True)
with patch("sys.stdout", new=StringIO()):
self.assertTrue(soi.confirm_proceed(args))
def test_yes_input(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value="yes"):
self.assertTrue(soi.confirm_proceed(args))
def test_yes_input_case_insensitive(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value="YES"):
self.assertTrue(soi.confirm_proceed(args))
def test_no_input_aborts(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value="no"):
self.assertFalse(soi.confirm_proceed(args))
def test_empty_input_aborts(self):
args = MagicMock(dry_run=False)
with patch("sys.stdout", new=StringIO()):
with patch("builtins.input", return_value=""):
self.assertFalse(soi.confirm_proceed(args))
class TestParseArgs(unittest.TestCase):
def test_defaults(self):
with patch.object(sys, "argv", ["cmd", "--source", "/some/path"]):
args = soi.parse_args()
self.assertEqual(args.source, "/some/path")
self.assertEqual(args.engine, "suricata")
self.assertFalse(args.dry_run)
self.assertFalse(args.no_import_note)
self.assertEqual(args.index, soi.DEFAULT_INDEX)
def test_all_options(self):
argv = ["cmd", "-s", "/x", "-e", "suricata", "-n",
"--no-import-note", "-i", "alt-index"]
with patch.object(sys, "argv", argv):
args = soi.parse_args()
self.assertEqual(args.source, "/x")
self.assertTrue(args.dry_run)
self.assertTrue(args.no_import_note)
self.assertEqual(args.index, "alt-index")
class TestMain(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmpdir, ignore_errors=True)
# Stub make_session so tests don't need /opt/so/conf/elasticsearch/curl.config.
p = patch.object(soi, "make_session", return_value=MagicMock())
p.start()
self.addCleanup(p.stop)
def _write_file(self, public_id, overrides, ext="txt"):
"""Write an NDJSON override file. Entries may be dicts or raw strings (for malformed input)."""
path = os.path.join(self.tmpdir, f"{public_id}.{ext}")
with open(path, "w") as f:
for o in overrides:
f.write(o if isinstance(o, str) else json.dumps(o))
f.write("\n")
return path
def _run_main(self, *extra_argv, input_response="yes"):
"""Run main() with stdout/stderr captured and input mocked. Returns (stdout, stderr, exit_code)."""
argv = ["cmd", "--source", self.tmpdir, *extra_argv]
out, err = StringIO(), StringIO()
with patch.object(sys, "argv", argv), \
patch("sys.stdout", new=out), \
patch("sys.stderr", new=err), \
patch("builtins.input", return_value=input_response):
with self.assertRaises(SystemExit) as cm:
soi.main()
return out.getvalue(), err.getvalue(), cm.exception.code
def test_source_dir_missing(self):
argv = ["cmd", "--source", "/no/such/path/here"]
err = StringIO()
with patch.object(sys, "argv", argv), patch("sys.stderr", new=err):
with self.assertRaises(SystemExit) as cm:
soi.main()
self.assertEqual(cm.exception.code, 1)
self.assertIn("source directory not found", err.getvalue())
def test_no_files_found(self):
out, _, code = self._run_main()
self.assertEqual(code, 0)
self.assertIn("No *.txt files found", out)
def test_user_aborts(self):
self._write_file("1001", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main(input_response="no")
self.assertEqual(code, 1)
self.assertIn("Aborted", out)
def test_parse_error_increments_error(self):
# Malformed JSON line — parse_overrides_file raises JSONDecodeError.
self._write_file("1002", ["not json"])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 1) # invalid+error → non-zero
self.assertIn("could not parse", out)
self.assertIn("Errors: 1", out)
def test_empty_file_skipped(self):
# Blank lines only — parse_overrides_file returns []; main reports "empty file" and continues.
path = os.path.join(self.tmpdir, "1003.txt")
with open(path, "w") as f:
f.write("\n\n")
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
self.assertIn("empty file", out)
@patch.object(soi, "find_detection")
def test_search_http_error(self, mock_find):
mock_find.side_effect = requests.HTTPError("boom")
self._write_file("1004", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 1)
self.assertIn("search failed", out)
@patch.object(soi, "find_detection")
def test_no_detection_found(self, mock_find):
mock_find.return_value = (None, None, None)
self._write_file("1005", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
self.assertIn("no detection found", out)
self.assertIn("Skipped (no detection): 1", out)
@patch.object(soi, "find_detection")
def test_all_duplicates_no_update(self, mock_find):
existing = [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}]
mock_find.return_value = ("doc1", "so-detection", existing)
self._write_file("1006", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
self.assertIn("SKIP", out)
self.assertNotIn("DRY-RUN: would update", out) # added_this_file == 0 branch
@patch.object(soi, "update_overrides")
@patch.object(soi, "find_detection")
def test_happy_path_full(self, mock_find, mock_update):
# Exercises: ADD, dedupe SKIP, INVALID, note prefix, UPDATE, custom-vars warning, exit=1 (invalid present)
existing = [{"type": "suppress", "track": "by_src", "ip": "9.9.9.9"}]
mock_find.return_value = ("doc1", "so-detection", existing)
mock_update.return_value = {"result": "updated"}
self._write_file("1007", [
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}, # ADD
{"type": "suppress", "track": "by_src", "ip": "9.9.9.9"}, # SKIP (dupe of existing)
{"type": "suppress", "track": "bogus", "ip": "1.2.3.4"}, # INVALID
{"type": "suppress", "track": "by_src", "ip": "$CONCOURSEWORKERS"}, # ADD + custom var
])
out, _, code = self._run_main()
self.assertEqual(code, 1) # one invalid -> non-zero
mock_update.assert_called_once()
merged = mock_update.call_args[0][3]
self.assertEqual(len(merged), 3) # 1 existing + 2 new
new_notes = [o.get("note", "") for o in merged if o.get("ip") in ("1.2.3.4", "$CONCOURSEWORKERS")]
self.assertTrue(all(n.startswith("[Imported ") for n in new_notes))
self.assertIn("ADD", out)
self.assertIn("SKIP", out)
self.assertIn("INVALID", out)
self.assertIn("UPDATED", out)
self.assertIn("$CONCOURSEWORKERS", out)
@patch.object(soi, "update_overrides")
@patch.object(soi, "find_detection")
def test_no_import_note_preserves_note(self, mock_find, mock_update):
mock_find.return_value = ("doc1", "so-detection", [])
mock_update.return_value = {"result": "updated"}
self._write_file("1008", [
{"type": "suppress", "track": "by_src", "ip": "1.2.3.4", "note": "original"},
])
_, _, code = self._run_main("--no-import-note")
self.assertEqual(code, 0)
merged = mock_update.call_args[0][3]
self.assertEqual(merged[0]["note"], "original") # no prefix applied
@patch.object(soi, "find_detection")
def test_dry_run_skips_update(self, mock_find):
mock_find.return_value = ("doc1", "so-detection", [])
self._write_file("1009", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
with patch.object(soi, "update_overrides") as mock_update:
out, _, code = self._run_main("--dry-run")
self.assertEqual(code, 0)
mock_update.assert_not_called()
self.assertIn("DRY-RUN: would update", out)
@patch.object(soi, "update_overrides")
@patch.object(soi, "find_detection")
def test_update_http_error(self, mock_find, mock_update):
mock_find.return_value = ("doc1", "so-detection", [])
mock_update.side_effect = requests.HTTPError("nope")
self._write_file("1010", [{"type": "suppress", "track": "by_src", "ip": "1.2.3.4"}])
out, _, code = self._run_main()
self.assertEqual(code, 1)
self.assertIn("update failed", out)
if __name__ == "__main__":
unittest.main()
+137 -1
View File
@@ -117,6 +117,121 @@ transformations:
- type: logsource
product: linux
service: auth
# Maps M365 audit rules to Elastic Agent O365 integration logs
- id: m365_audit_field_mappings
type: field_name_mapping
mapping:
Operation: event.action
ResultStatus: event.outcome
ApplicationId: o365.audit.ApplicationId
ObjectId: o365.audit.ObjectId
RequestType: o365.audit.RequestType
rule_conditions:
- type: logsource
product: m365
service: audit
- id: m365_audit_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: audit
# Maps M365 exchange rules to Elastic Agent O365 integration logs
- id: m365_exchange_field_mappings
type: field_name_mapping
mapping:
eventSource: event.provider
eventName: event.action
status: event.outcome
rule_conditions:
- type: logsource
product: m365
service: exchange
- id: m365_exchange_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: exchange
# Maps M365 threat_management rules to Elastic Agent O365 integration logs
- id: m365_threat_management_field_mappings
type: field_name_mapping
mapping:
eventSource: event.provider
eventName: event.action
status: event.outcome
rule_conditions:
- type: logsource
product: m365
service: threat_management
- id: m365_threat_management_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: threat_management
# Maps M365 threat_detection rules to Elastic Agent O365 integration logs
- id: m365_threat_detection_field_mappings
type: field_name_mapping
mapping:
eventSource: event.provider
eventName: event.action
status: event.outcome
rule_conditions:
- type: logsource
product: m365
service: threat_detection
- id: m365_threat_detection_add-fields
type: add_condition
conditions:
event.dataset: 'o365.audit'
event.module: 'o365'
rule_conditions:
- type: logsource
product: m365
service: threat_detection
# Maps FortiGate event rules to Elastic Agent Fortinet integration logs
- id: fortigate_event_field_mappings
type: field_name_mapping
mapping:
action: fortinet.firewall.action
cfgpath: fortinet.firewall.cfgpath
cfgobj: fortinet.firewall.cfgobj
cfgattr: fortinet.firewall.cfgattr
devname: observer.name
devid: observer.serial_number
logid: event.code
type: fortinet.firewall.type
subtype: fortinet.firewall.subtype
level: log.level
vd: fortinet.firewall.vd
logdesc: fortinet.firewall.desc
user: user.name
ui: fortinet.firewall.ui
cfgtid: fortinet.firewall.cfgtid
msg: message
rule_conditions:
- type: logsource
product: fortigate
service: event
- id: fortigate_event_add-fields
type: add_condition
conditions:
event.dataset: 'fortinet_fortigate.log'
event.module: 'fortinet_fortigate'
rule_conditions:
- type: logsource
product: fortigate
service: event
# event.code should always be a string
- id: convert_event_code_to_string
type: convert_type
@@ -126,15 +241,36 @@ transformations:
fields:
- event.code
# Maps process_creation rules to endpoint process creation logs
# This is an OS-agnostic mapping, to account for logs that don't specify source OS
- id: endpoint_process_create_windows_add-fields
type: add_condition
conditions:
event.category: 'process'
event.type: 'start'
host.os.type: 'windows'
rule_conditions:
- type: logsource
category: process_creation
product: windows
- id: endpoint_process_create_macos_add-fields
type: add_condition
conditions:
event.category: 'process'
event.type: 'start'
host.os.type: 'macos'
rule_conditions:
- type: logsource
category: process_creation
product: macos
- id: endpoint_process_create_linux_add-fields
type: add_condition
conditions:
event.category: 'process'
event.type: 'start'
host.os.type: 'linux'
rule_conditions:
- type: logsource
category: process_creation
product: linux
# Maps file_event rules to endpoint file creation logs
# This is an OS-agnostic mapping, to account for logs that don't specify source OS
- id: endpoint_file_create_add-fields