mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-06 09:12:45 +01:00
139 lines
4.5 KiB
Python
Executable File
139 lines
4.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
|
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
|
# https://securityonion.net/license; you may not use this file except in compliance with the
|
|
# Elastic License 2.0.
|
|
|
|
|
|
|
|
import ipaddress
|
|
import textwrap
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import argparse
|
|
import re
|
|
from lxml import etree as ET
|
|
from xml.dom import minidom
|
|
|
|
|
|
LOCAL_SALT_DIR='/opt/so/saltstack/local'
|
|
VALID_ROLES = {
|
|
'a': { 'role': 'analyst','desc': 'Analyst - 80/tcp, 443/tcp' },
|
|
'b': { 'role': 'beats_endpoint', 'desc': 'Logstash Beat - 5044/tcp' },
|
|
'e': { 'role': 'elasticsearch_rest', 'desc': 'Elasticsearch REST API - 9200/tcp' },
|
|
'f': { 'role': 'strelka_frontend', 'desc': 'Strelka frontend - 57314/tcp' },
|
|
's': { 'role': 'syslog', 'desc': 'Syslog device - 514/tcp/udp' },
|
|
}
|
|
|
|
|
|
def validate_ip_cidr(ip_cidr: str) -> bool:
|
|
try:
|
|
ipaddress.ip_address(ip_cidr)
|
|
except ValueError:
|
|
try:
|
|
ipaddress.ip_network(ip_cidr)
|
|
except ValueError:
|
|
return False
|
|
return True
|
|
|
|
|
|
def role_prompt() -> str:
|
|
print()
|
|
print('Choose the role for the IP or Range you would like to deny')
|
|
print()
|
|
for role in VALID_ROLES:
|
|
print(f'[{role}] - {VALID_ROLES[role]["desc"]}')
|
|
print()
|
|
role = input('Please enter your selection: ')
|
|
if role in VALID_ROLES.keys():
|
|
return VALID_ROLES[role]['role']
|
|
else:
|
|
print(f'Invalid role \'{role}\', please try again.', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def ip_prompt() -> str:
|
|
ip = input('Enter a single ip address or range to deny (ex: 10.10.10.10 or 10.10.0.0/16): ')
|
|
if validate_ip_cidr(ip):
|
|
return ip
|
|
else:
|
|
print(f'Invalid IP address or CIDR block \'{ip}\', please try again.', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def apply(role: str, ip: str) -> int:
|
|
firewall_cmd = ['so-firewall', 'excludehost', role, ip]
|
|
salt_cmd = ['salt-call', 'state.apply', '-l', 'quiet', 'firewall', 'queue=True']
|
|
print(f'Removing {ip} from the {role} role. This can take a few seconds...')
|
|
cmd = subprocess.run(firewall_cmd)
|
|
if cmd.returncode == 0:
|
|
cmd = subprocess.run(salt_cmd, stdout=subprocess.DEVNULL)
|
|
else:
|
|
return cmd.returncode
|
|
|
|
|
|
def main():
|
|
if os.geteuid() != 0:
|
|
print('You must run this script as root', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
main_parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=textwrap.dedent(f'''\
|
|
additional information:
|
|
To use this script in interactive mode call it with no arguments
|
|
'''
|
|
))
|
|
|
|
group = main_parser.add_argument_group(title='roles')
|
|
group.add_argument('-a', dest='roles', action='append_const', const=VALID_ROLES['a']['role'], help="Analyst - 80/tcp, 443/tcp")
|
|
group.add_argument('-b', dest='roles', action='append_const', const=VALID_ROLES['b']['role'], help="Logstash Beat - 5044/tcp")
|
|
group.add_argument('-e', dest='roles', action='append_const', const=VALID_ROLES['e']['role'], help="Elasticsearch REST API - 9200/tcp")
|
|
group.add_argument('-f', dest='roles', action='append_const', const=VALID_ROLES['f']['role'], help="Strelka frontend - 57314/tcp")
|
|
group.add_argument('-s', dest='roles', action='append_const', const=VALID_ROLES['s']['role'], help="Syslog device - 514/tcp/udp")
|
|
|
|
ip_g = main_parser.add_argument_group(title='allow')
|
|
ip_g.add_argument('-i', help="IP or CIDR block to disallow connections from, requires at least one role argument", metavar='', dest='ip')
|
|
|
|
args = main_parser.parse_args(sys.argv[1:])
|
|
|
|
if args.roles is None:
|
|
role = role_prompt()
|
|
ip = ip_prompt()
|
|
try:
|
|
return_code = apply(role, ip)
|
|
except Exception as e:
|
|
print(f'Unexpected exception occurred: {e}', file=sys.stderr)
|
|
return_code = e.errno
|
|
sys.exit(return_code)
|
|
elif args.roles is not None and args.ip is None:
|
|
if os.environ.get('IP') is None:
|
|
main_parser.print_help()
|
|
sys.exit(1)
|
|
else:
|
|
args.ip = os.environ['IP']
|
|
|
|
if validate_ip_cidr(args.ip):
|
|
try:
|
|
for role in args.roles:
|
|
return_code = apply(role, args.ip)
|
|
if return_code > 0:
|
|
break
|
|
except Exception as e:
|
|
print(f'Unexpected exception occurred: {e}', file=sys.stderr)
|
|
return_code = e.errno
|
|
else:
|
|
print(f'Invalid IP address or CIDR block \'{args.ip}\', please try again.', file=sys.stderr)
|
|
return_code = 1
|
|
|
|
sys.exit(return_code)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
sys.exit(1)
|