#!/usr/bin/env python3 # Copyright 2014,2015,2016,2017,2018,2019,2020,2021 Security Onion Solutions, LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import signal import sys import os import re import subprocess import argparse import textwrap from typing import List import yaml minion_pillar_dir = '/opt/so/saltstack/local/pillar/minions' salt_proc = subprocess.CompletedProcess = None # Temp store of modules, will likely be broken out into salt learn_modules = [ { 'name': 'logscan', 'enabled': False, 'description': 'Scan log files against pre-trained models to alert on anomalies.' } ] def sigint_handler(*_): print('Exiting gracefully on Ctrl-C') if salt_proc is not None: salt_proc.send_signal(signal.SIGINT) sys.exit(0) def find_minion_pillar() -> str: regex = '^.*_(manager|managersearch|standalone|import|eval)\.sls$' result = [] for root, _, files in os.walk(minion_pillar_dir): for f_minion_id in files: if re.search(regex, f_minion_id): result.append(os.path.join(root, f_minion_id)) if len(result) == 0: print('Could not find manager-type pillar (eval, standalone, manager, managersearch, import). Are you running this script on the manager?', file=sys.stderr) sys.exit(3) elif len(result) > 1: res_str = ', '.join(f'\"{result}\"') print('(This should not happen, the system is in an error state if you see this message.)\n', file=sys.stderr) print('More than one manager-type pillar exists, minion id\'s listed below:', file=sys.stderr) print(f' {res_str}', file=sys.stderr) sys.exit(3) else: return result[0] def read_pillar(pillar: str): try: with open(pillar, 'r') as f: loaded_yaml = yaml.safe_load(f.read()) if loaded_yaml is None: print(f'Could not parse {pillar}', file=sys.stderr) sys.exit(3) return loaded_yaml except: print(f'Could not open {pillar}', file=sys.stderr) sys.exit(3) def write_pillar(pillar: str, content: dict): try: with open(pillar, 'w') as f: return yaml.dump(content, f, default_flow_style=False) except: print(f'Could not open {pillar}', file=sys.stderr) sys.exit(3) def create_pillar_if_not_exist(pillar:str, content: dict): if content.get('learn', {}).get('modules') is None: content['learn']['modules'] = learn_modules write_pillar(pillar, content) def apply(module_list: List): return_code = 0 for module in module_list: salt_cmd = ['salt-call', 'state.apply', '-l', 'quiet', f'learn.{module}', 'queue=True'] print(f'Applying salt state for {module} module...') cmd = subprocess.run(salt_cmd, stdout=subprocess.DEVNULL) if cmd.returncode != 0: print(f'[ERROR] Failed to apply salt state for {module}') return_code = cmd.returncode return return_code def check_apply(args: dict): if args.apply: print('Configuration updated. Applying changes:') return apply(args.modules) else: if not hasattr(args, 'yes'): message = 'Configuration updated. Would you like to apply your changes now? (y/N) ' answer = input(message) while answer.lower() not in [ 'y', 'n', '' ]: answer = input(message) if answer.lower() in [ 'n', '' ]: return 0 else: print('Applying changes:') return apply(args.modules) else: return 0 def enable_disable_modules(args, enable: bool): pillar_modules = args.pillar_dict.get('learn', {}).get('modules') if 'all' in args.modules: for module in pillar_modules: module['enabled'] = enable args.pillar_dict.update() write_pillar(args.pillar, args.pillar_dict) else: write_needed = False for module in args.modules: if module in pillar_modules: pillar_modules[module]['enabled'] = enable write_needed = enable if write_needed: args.pillar_dict.update() write_pillar(args.pillarm, args.pillar_dict) check_apply(args) def enable_modules(args): enable_disable_modules(args, enable=True) def disable_modules(args): enable_disable_modules(args, enable=False) def list_modules(*_): print('Available ML modules:') for module in learn_modules: print(f' - { module["name"] } : {module["description"]}') def main(): beta_str = 'BETA - SUBJECT TO CHANGE\n' apply_help='After ACTION the chosen modules, apply any necessary salt states.' enable_apply_help = apply_help.replace('ACTION', 'enabling') disable_apply_help = apply_help.replace('ACTION', 'disabling') yes_help = 'Accept apply prompt.' signal.signal(signal.SIGINT, sigint_handler) if os.geteuid() != 0: print('You must run this script as root', file=sys.stderr) sys.exit(1) main_parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter) subcommand_desc = textwrap.dedent( """\ enable Enable one or more ML modules. disable Disable one or more ML modules. list List all available ML modules. """ ) subparsers = main_parser.add_subparsers(title='commands', description=subcommand_desc, metavar='', dest='command') module_help_str = 'One or more ML modules, which can be listed using \'so-learn list\'. Use the keyword \'all\' to apply the action to all available modules.' enable = subparsers.add_parser('enable') enable.set_defaults(func=enable_modules) enable.add_argument('modules', metavar='ML_MODULES', nargs='+', help=module_help_str) enable.add_argument('--apply', action='store_const', const=True, required=False, help=enable_apply_help) enable.add_argument('--yes', '-y', action='store_const', const=True, required=False, help=yes_help) disable = subparsers.add_parser('disable') disable.set_defaults(func=disable_modules) disable.add_argument('modules', metavar='ML_MODULES', nargs='+', help=module_help_str) disable.add_argument('--apply', action='store_const', const=True, required=False, help=disable_apply_help) disable.add_argument('--yes', '-y', action='store_const', const=True, required=False, help=yes_help) list = subparsers.add_parser('list') list.set_defaults(func=list_modules) args = main_parser.parse_args(sys.argv[1:]) args.pillar = find_minion_pillar() args.pillar_dict = read_pillar(args.pillar) create_pillar_if_not_exist(args.pillar, args.pillar_dict) if hasattr(args, 'func'): exit_code = args.func(args) else: if args.command is None: print(beta_str) main_parser.print_help() sys.exit(0) sys.exit(exit_code) if __name__ == '__main__': main()