#!/usr/bin/env python3 # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one # or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. import ipaddress import textwrap import os import subprocess import sys import argparse import re from lxml import etree as ET from datetime import datetime as dt from datetime import timezone as tz LOCAL_SALT_DIR='/opt/so/saltstack/local' VALID_ROLES = { 'a': { 'role': 'analyst','desc': 'Analyst - 80/tcp, 443/tcp' }, 'b': { 'role': 'beats_endpoint', 'desc': 'Logstash Beat - 5044/tcp' }, 'e': { 'role': 'elasticsearch_rest', 'desc': 'Elasticsearch REST API - 9200/tcp' }, 'f': { 'role': 'strelka_frontend', 'desc': 'Strelka frontend - 57314/tcp' }, 's': { 'role': 'syslog', 'desc': 'Syslog device - 514/tcp/udp' }, 't': { 'role': 'elastic_agent_endpoint', 'desc': 'Elastic Agent endpoint - 8220/tcp,5055/tcp' } } def validate_ip_cidr(ip_cidr: str) -> bool: try: ipaddress.ip_address(ip_cidr) except ValueError: try: ipaddress.ip_network(ip_cidr) except ValueError: return False return True def role_prompt() -> str: print() print('Choose the role for the IP or Range you would like to allow') print() for role in VALID_ROLES: print(f'[{role}] - {VALID_ROLES[role]["desc"]}') print() role = input('Please enter your selection: ') if role in VALID_ROLES.keys(): return VALID_ROLES[role]['role'] else: print(f'Invalid role \'{role}\', please try again.', file=sys.stderr) sys.exit(1) def ip_prompt() -> str: ip = input('Enter a single ip address or range to allow (ex: 10.10.10.10 or 10.10.0.0/16): ') if validate_ip_cidr(ip): return ip else: print(f'Invalid IP address or CIDR block \'{ip}\', please try again.', file=sys.stderr) sys.exit(1) def apply(role: str, ip: str) -> int: firewall_cmd = ['so-firewall', 'includehost', role, ip] salt_cmd = ['salt-call', 'state.apply', '-l', 'quiet', 'firewall', 'queue=True'] print(f'Adding {ip} to the {role} role. This can take a few seconds...') cmd = subprocess.run(firewall_cmd) if cmd.returncode == 0: cmd = subprocess.run(salt_cmd, stdout=subprocess.DEVNULL) else: return cmd.returncode def main(): if os.geteuid() != 0: print('You must run this script as root', file=sys.stderr) sys.exit(1) main_parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(f'''\ additional information: To use this script in interactive mode call it with no arguments ''' )) group = main_parser.add_argument_group(title='roles') group.add_argument('-a', dest='roles', action='append_const', const=VALID_ROLES['a']['role'], help="Analyst - 80/tcp, 443/tcp") group.add_argument('-b', dest='roles', action='append_const', const=VALID_ROLES['b']['role'], help="Logstash Beat - 5044/tcp") group.add_argument('-e', dest='roles', action='append_const', const=VALID_ROLES['e']['role'], help="Elasticsearch REST API - 9200/tcp") group.add_argument('-f', dest='roles', action='append_const', const=VALID_ROLES['f']['role'], help="Strelka frontend - 57314/tcp") group.add_argument('-s', dest='roles', action='append_const', const=VALID_ROLES['s']['role'], help="Syslog device - 514/tcp/udp") group.add_argument('-t', dest='roles', action='append_const', const=VALID_ROLES['t']['role'], help="Elastic Agent endpoint - 8220/tcp,5055/tcp") ip_g = main_parser.add_argument_group(title='allow') ip_g.add_argument('-i', help="IP or CIDR block to disallow connections from, requires at least one role argument", metavar='', dest='ip') args = main_parser.parse_args(sys.argv[1:]) if args.roles is None: role = role_prompt() ip = ip_prompt() try: return_code = apply(role, ip) except Exception as e: print(f'Unexpected exception occurred: {e}', file=sys.stderr) return_code = e.errno sys.exit(return_code) elif args.roles is not None and args.ip is None: if os.environ.get('IP') is None: main_parser.print_help() sys.exit(1) else: args.ip = os.environ['IP'] if validate_ip_cidr(args.ip): try: for role in args.roles: return_code = apply(role, args.ip) if return_code > 0: break except Exception as e: print(f'Unexpected exception occurred: {e}', file=sys.stderr) return_code = e.errno else: print(f'Invalid IP address or CIDR block \'{args.ip}\', please try again.', file=sys.stderr) return_code = 1 sys.exit(return_code) if __name__ == '__main__': try: main() except KeyboardInterrupt: sys.exit(1)