Files
securityonion/salt/common/tools/sbin/so-rule
William Wernert 16d6e116fa Merge branch 'dev' into foxtrot
# Conflicts:
#	salt/idstools/init.sls
2021-03-17 11:52:54 -04:00

460 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright 2014,2015,2016,2017,2018,2019,2020,2021 Security Onion Solutions, LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Local exit codes:
- General error: 1
- Invalid argument: 2
- File error: 3
"""
import sys, os, subprocess, argparse, signal
import copy
import re
import textwrap
import yaml
minion_pillar_dir = '/opt/so/saltstack/local/pillar/minions'
salt_proc: subprocess.CompletedProcess = None
def print_err(string: str):
print(string, file=sys.stderr)
def check_apply(args: dict, prompt: bool = True):
if args.apply:
print('Configuration updated. Applying changes:')
return apply()
else:
if prompt:
message = 'Configuration updated. Would you like to apply your changes now? (y/N) '
answer = input(message)
while answer.lower() not in [ 'y', 'n', '' ]:
answer = input(message)
if answer.lower() in [ 'n', '' ]:
return 0
else:
print('Applying changes:')
return apply()
else:
return 0
def apply():
salt_cmd = ['salt-call', 'state.apply', '-l', 'quiet', 'idstools.sync_files', 'queue=True']
update_cmd = ['so-rule-update']
print('Syncing config files...')
cmd = subprocess.run(salt_cmd, stdout=subprocess.DEVNULL)
if cmd.returncode == 0:
print('Updating rules...')
return subprocess.run(update_cmd).returncode
else:
return cmd.returncode
def find_minion_pillar() -> str:
regex = '^.*_(manager|managersearch|standalone|import|eval)\.sls$'
result = []
for root, _, files in os.walk(minion_pillar_dir):
for f_minion_id in files:
if re.search(regex, f_minion_id):
result.append(os.path.join(root, f_minion_id))
if len(result) == 0:
print_err('Could not find manager-type pillar (eval, standalone, manager, managersearch, import). Are you running this script on the manager?')
sys.exit(3)
elif len(result) > 1:
res_str = ', '.join(f'\"{result}\"')
print_err('(This should not happen, the system is in an error state if you see this message.)\n')
print_err('More than one manager-type pillar exists, minion id\'s listed below:')
print_err(f' {res_str}')
sys.exit(3)
else:
return result[0]
def read_pillar(pillar: str):
try:
with open(pillar, 'r') as f:
loaded_yaml = yaml.safe_load(f.read())
if loaded_yaml is None:
print_err(f'Could not parse {pillar}')
sys.exit(3)
return loaded_yaml
except:
print_err(f'Could not open {pillar}')
sys.exit(3)
def write_pillar(pillar: str, content: dict):
try:
sids = content['idstools']['sids']
if sids['disabled'] is not None:
if len(sids['disabled']) == 0: sids['disabled'] = None
if sids['enabled'] is not None:
if len(sids['enabled']) == 0: sids['enabled'] = None
if sids['modify'] is not None:
if len(sids['modify']) == 0: sids['modify'] = None
with open(pillar, 'w') as f:
return yaml.dump(content, f, default_flow_style=False)
except Exception as e:
print_err(f'Could not open {pillar}')
sys.exit(3)
def check_sid_pattern(sid_pattern: str):
message = f'SID {sid_pattern} is not valid, did you forget the \"re:\" prefix for a regex pattern?'
if sid_pattern.startswith('re:'):
r_string = sid_pattern[3:]
if not valid_regex(r_string):
print_err('Invalid regex pattern.')
return False
else:
return True
else:
sid: int
try:
sid = int(sid_pattern)
except:
print_err(message)
return False
if sid >= 0:
return True
else:
print_err(message)
return False
def valid_regex(pattern: str):
try:
re.compile(pattern)
return True
except re.error:
return False
def sids_key_exists(pillar: dict, key: str):
return key in pillar.get('idstools', {}).get('sids', {})
def rem_from_sids(pillar: dict, key: str, val: str, optional = False):
pillar_dict = copy.deepcopy(pillar)
arr = pillar_dict['idstools']['sids'][key]
if arr is None or val not in arr:
if not optional: print(f'{val} already does not exist in {key}')
else:
pillar_dict['idstools']['sids'][key].remove(val)
return pillar_dict
def add_to_sids(pillar: dict, key: str, val: str, optional = False):
pillar_dict = copy.deepcopy(pillar)
if pillar_dict['idstools']['sids'][key] is None:
pillar_dict['idstools']['sids'][key] = []
if val in pillar_dict['idstools']['sids'][key]:
if not optional: print(f'{val} already exists in {key}')
else:
pillar_dict['idstools']['sids'][key].append(val)
return pillar_dict
def add_rem_disabled(args: dict):
global salt_proc
if not check_sid_pattern(args.sid_pattern):
return 2
pillar_dict = read_pillar(args.pillar)
if not sids_key_exists(pillar_dict, 'disabled'):
pillar_dict['idstools']['sids']['disabled'] = None
if args.remove:
temp_pillar_dict = rem_from_sids(pillar_dict, 'disabled', args.sid_pattern)
else:
temp_pillar_dict = add_to_sids(pillar_dict, 'disabled', args.sid_pattern)
if temp_pillar_dict['idstools']['sids']['disabled'] == pillar_dict['idstools']['sids']['disabled']:
salt_proc = check_apply(args, prompt=False)
return salt_proc
else:
pillar_dict = temp_pillar_dict
if not args.remove:
if sids_key_exists(pillar_dict, 'enabled'):
pillar_dict = rem_from_sids(pillar_dict, 'enabled', args.sid_pattern, optional=True)
modify = pillar_dict.get('idstools', {}).get('sids', {}).get('modify')
if modify is not None:
rem_candidates = []
for action in modify:
if action.startswith(f'{args.sid_pattern} '):
rem_candidates.append(action)
if len(rem_candidates) > 0:
for item in rem_candidates:
print(f' - {item}')
answer = input(f'The above modify actions contain {args.sid_pattern}. Would you like to remove them? (Y/n) ')
while answer.lower() not in [ 'y', 'n', '' ]:
for item in rem_candidates:
print(f' - {item}')
answer = input(f'The above modify actions contain {args.sid_pattern}. Would you like to remove them? (Y/n) ')
if answer.lower() in [ 'y', '' ]:
for item in rem_candidates:
modify.remove(item)
pillar_dict['idstools']['sids']['modify'] = modify
write_pillar(pillar=args.pillar, content=pillar_dict)
salt_proc = check_apply(args)
return salt_proc
def list_disabled_rules(args: dict):
pillar_dict = read_pillar(args.pillar)
disabled = pillar_dict.get('idstools', {}).get('sids', {}).get('disabled')
if disabled is None:
print('No rules disabled.')
return 0
else:
print('Disabled rules:')
for rule in disabled:
print(f' - {rule}')
return 0
def add_rem_enabled(args: dict):
global salt_proc
if not check_sid_pattern(args.sid_pattern):
return 2
pillar_dict = read_pillar(args.pillar)
if not sids_key_exists(pillar_dict, 'enabled'):
pillar_dict['idstools']['sids']['enabled'] = None
if args.remove:
temp_pillar_dict = rem_from_sids(pillar_dict, 'enabled', args.sid_pattern)
else:
temp_pillar_dict = add_to_sids(pillar_dict, 'enabled', args.sid_pattern)
if temp_pillar_dict['idstools']['sids']['enabled'] == pillar_dict['idstools']['sids']['enabled']:
salt_proc = check_apply(args, prompt=False)
return salt_proc
else:
pillar_dict = temp_pillar_dict
if not args.remove:
if sids_key_exists(pillar_dict, 'disabled'):
pillar_dict = rem_from_sids(pillar_dict, 'disabled', args.sid_pattern, optional=True)
write_pillar(pillar=args.pillar, content=pillar_dict)
salt_proc = check_apply(args)
return salt_proc
def list_enabled_rules(args: dict):
pillar_dict = read_pillar(args.pillar)
enabled = pillar_dict.get('idstools', {}).get('sids', {}).get('enabled')
if enabled is None:
print('No rules explicitly enabled.')
return 0
else:
print('Enabled rules:')
for rule in enabled:
print(f' - {rule}')
return 0
def add_rem_modify(args: dict):
global salt_proc
if not check_sid_pattern(args.sid_pattern):
return 2
if not valid_regex(args.search_term):
print_err('Search term is not a valid regex pattern.')
string_val = f'{args.sid_pattern} \"{args.search_term}\" \"{args.replace_term}\"'
pillar_dict = read_pillar(args.pillar)
if not sids_key_exists(pillar_dict, 'modify'):
pillar_dict['idstools']['sids']['modify'] = None
if args.remove:
temp_pillar_dict = rem_from_sids(pillar_dict, 'modify', string_val)
else:
temp_pillar_dict = add_to_sids(pillar_dict, 'modify', string_val)
if temp_pillar_dict['idstools']['sids']['modify'] == pillar_dict['idstools']['sids']['modify']:
salt_proc = check_apply(args, prompt=False)
return salt_proc
else:
pillar_dict = temp_pillar_dict
# TODO: Determine if a rule should be removed from disabled if modified.
if not args.remove:
if sids_key_exists(pillar_dict, 'disabled'):
pillar_dict = rem_from_sids(pillar_dict, 'disabled', args.sid_pattern, optional=True)
write_pillar(pillar=args.pillar, content=pillar_dict)
salt_proc = check_apply(args)
return salt_proc
def list_modified_rules(args: dict):
pillar_dict = read_pillar(args.pillar)
modify = pillar_dict.get('idstools', {}).get('sids', {}).get('modify')
if modify is None:
print('No rules currently modified.')
return 0
else:
print('Modified rules + modifications:')
for rule in modify:
print(f' - {rule}')
return 0
def sigint_handler(*_):
print('Exiting gracefully on Ctrl-C')
if salt_proc is not None: salt_proc.send_signal(signal.SIGINT)
sys.exit(0)
def main():
signal.signal(signal.SIGINT, sigint_handler)
if os.geteuid() != 0:
print_err('You must run this script as root')
sys.exit(1)
apply_help='After updating rule configuration, apply the idstools state.'
main_parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter)
subcommand_desc = textwrap.dedent(
"""\
disabled Manage and list disabled rules (add, remove, list)
enabled Manage and list enabled rules (add, remove, list)
modify Manage and list modified rules (add, remove, list)
"""
)
subparsers = main_parser.add_subparsers(title='commands', description=subcommand_desc, metavar='', dest='command')
sid_or_regex_help = 'A valid SID (ex: "4321") or regular expression pattern (ex: "re:heartbleed|spectre")'
# Disabled actions
disabled = subparsers.add_parser('disabled')
disabled_sub = disabled.add_subparsers()
disabled_add = disabled_sub.add_parser('add')
disabled_add.set_defaults(func=add_rem_disabled)
disabled_add.add_argument('sid_pattern', metavar='SID|REGEX', help=sid_or_regex_help)
disabled_add.add_argument('--apply', action='store_const', const=True, required=False, help=apply_help)
disabled_rem = disabled_sub.add_parser('remove')
disabled_rem.set_defaults(func=add_rem_disabled, remove=True)
disabled_rem.add_argument('sid_pattern', metavar='SID|REGEX', help=sid_or_regex_help)
disabled_rem.add_argument('--apply', action='store_const', const=True, required=False, help=apply_help)
disabled_list = disabled_sub.add_parser('list')
disabled_list.set_defaults(func=list_disabled_rules)
# Enabled actions
enabled = subparsers.add_parser('enabled')
enabled_sub = enabled.add_subparsers()
enabled_add = enabled_sub.add_parser('add')
enabled_add.set_defaults(func=add_rem_enabled)
enabled_add.add_argument('sid_pattern', metavar='SID|REGEX', help=sid_or_regex_help)
enabled_add.add_argument('--apply', action='store_const', const=True, required=False, help=apply_help)
enabled_rem = enabled_sub.add_parser('remove')
enabled_rem.set_defaults(func=add_rem_enabled, remove=True)
enabled_rem.add_argument('sid_pattern', metavar='SID|REGEX', help=sid_or_regex_help)
enabled_rem.add_argument('--apply', action='store_const', const=True, required=False, help=apply_help)
enabled_list = enabled_sub.add_parser('list')
enabled_list.set_defaults(func=list_enabled_rules)
search_term_help='A quoted regex search term (ex: "\$EXTERNAL_NET")'
replace_term_help='The text to replace the search term with'
# Modify actions
modify = subparsers.add_parser('modify')
modify_sub = modify.add_subparsers()
modify_add = modify_sub.add_parser('add')
modify_add.set_defaults(func=add_rem_modify)
modify_add.add_argument('sid_pattern', metavar='SID|REGEX', help=sid_or_regex_help)
modify_add.add_argument('search_term', metavar='SEARCH_TERM', help=search_term_help)
modify_add.add_argument('replace_term', metavar='REPLACE_TERM', help=replace_term_help)
modify_add.add_argument('--apply', action='store_const', const=True, required=False, help=apply_help)
modify_rem = modify_sub.add_parser('remove')
modify_rem.set_defaults(func=add_rem_modify, remove=True)
modify_rem.add_argument('sid_pattern', metavar='SID', help=sid_or_regex_help)
modify_rem.add_argument('search_term', metavar='SEARCH_TERM', help=search_term_help)
modify_rem.add_argument('replace_term', metavar='REPLACE_TERM', help=replace_term_help)
modify_rem.add_argument('--apply', action='store_const', const=True, required=False, help=apply_help)
modify_list = modify_sub.add_parser('list')
modify_list.set_defaults(func=list_modified_rules)
# Begin parse + run
args = main_parser.parse_args(sys.argv[1:])
if not hasattr(args, 'remove'):
args.remove = False
args.pillar = find_minion_pillar()
if hasattr(args, 'func'):
exit_code = args.func(args)
else:
if args.command is None:
main_parser.print_help()
else:
if args.command == 'disabled':
disabled.print_help()
elif args.command == 'enabled':
enabled.print_help()
elif args.command == 'modify':
modify.print_help()
sys.exit(0)
sys.exit(exit_code)
if __name__ == '__main__':
main()