Replace so-firewall

This commit is contained in:
Mike Reeves
2022-09-20 11:11:19 -04:00
parent 512c044d80
commit 9fffe1b5fa
2 changed files with 87 additions and 519 deletions

View File

@@ -1,142 +1,11 @@
#!/usr/bin/env python3
#!/usr/bin/env bash
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
import ipaddress
import textwrap
import os
import subprocess
import sys
import argparse
import re
from lxml import etree as ET
from datetime import datetime as dt
from datetime import timezone as tz
LOCAL_SALT_DIR='/opt/so/saltstack/local'
VALID_ROLES = {
'a': { 'role': 'analyst','desc': 'Analyst - 80/tcp, 443/tcp' },
'b': { 'role': 'beats_endpoint', 'desc': 'Logstash Beat - 5044/tcp' },
'e': { 'role': 'elasticsearch_rest', 'desc': 'Elasticsearch REST API - 9200/tcp' },
'f': { 'role': 'strelka_frontend', 'desc': 'Strelka frontend - 57314/tcp' },
's': { 'role': 'syslog', 'desc': 'Syslog device - 514/tcp/udp' },
't': { 'role': 'elastic_agent_endpoint', 'desc': 'Elastic Agent endpoint - 8220/tcp,5055/tcp' }
}
def validate_ip_cidr(ip_cidr: str) -> bool:
try:
ipaddress.ip_address(ip_cidr)
except ValueError:
try:
ipaddress.ip_network(ip_cidr)
except ValueError:
return False
return True
def role_prompt() -> str:
print()
print('Choose the role for the IP or Range you would like to allow')
print()
for role in VALID_ROLES:
print(f'[{role}] - {VALID_ROLES[role]["desc"]}')
print()
role = input('Please enter your selection: ')
if role in VALID_ROLES.keys():
return VALID_ROLES[role]['role']
else:
print(f'Invalid role \'{role}\', please try again.', file=sys.stderr)
sys.exit(1)
def ip_prompt() -> str:
ip = input('Enter a single ip address or range to allow (ex: 10.10.10.10 or 10.10.0.0/16): ')
if validate_ip_cidr(ip):
return ip
else:
print(f'Invalid IP address or CIDR block \'{ip}\', please try again.', file=sys.stderr)
sys.exit(1)
def apply(role: str, ip: str) -> int:
firewall_cmd = ['so-firewall', 'includehost', role, ip]
salt_cmd = ['salt-call', 'state.apply', '-l', 'quiet', 'firewall', 'queue=True']
print(f'Adding {ip} to the {role} role. This can take a few seconds...')
cmd = subprocess.run(firewall_cmd)
if cmd.returncode == 0:
cmd = subprocess.run(salt_cmd, stdout=subprocess.DEVNULL)
else:
return cmd.returncode
def main():
if os.geteuid() != 0:
print('You must run this script as root', file=sys.stderr)
sys.exit(1)
main_parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(f'''\
additional information:
To use this script in interactive mode call it with no arguments
'''
))
group = main_parser.add_argument_group(title='roles')
group.add_argument('-a', dest='roles', action='append_const', const=VALID_ROLES['a']['role'], help="Analyst - 80/tcp, 443/tcp")
group.add_argument('-b', dest='roles', action='append_const', const=VALID_ROLES['b']['role'], help="Logstash Beat - 5044/tcp")
group.add_argument('-e', dest='roles', action='append_const', const=VALID_ROLES['e']['role'], help="Elasticsearch REST API - 9200/tcp")
group.add_argument('-f', dest='roles', action='append_const', const=VALID_ROLES['f']['role'], help="Strelka frontend - 57314/tcp")
group.add_argument('-s', dest='roles', action='append_const', const=VALID_ROLES['s']['role'], help="Syslog device - 514/tcp/udp")
group.add_argument('-t', dest='roles', action='append_const', const=VALID_ROLES['t']['role'], help="Elastic Agent endpoint - 8220/tcp,5055/tcp")
ip_g = main_parser.add_argument_group(title='allow')
ip_g.add_argument('-i', help="IP or CIDR block to disallow connections from, requires at least one role argument", metavar='', dest='ip')
args = main_parser.parse_args(sys.argv[1:])
if args.roles is None:
role = role_prompt()
ip = ip_prompt()
try:
return_code = apply(role, ip)
except Exception as e:
print(f'Unexpected exception occurred: {e}', file=sys.stderr)
return_code = e.errno
sys.exit(return_code)
elif args.roles is not None and args.ip is None:
if os.environ.get('IP') is None:
main_parser.print_help()
sys.exit(1)
else:
args.ip = os.environ['IP']
if validate_ip_cidr(args.ip):
try:
for role in args.roles:
return_code = apply(role, args.ip)
if return_code > 0:
break
except Exception as e:
print(f'Unexpected exception occurred: {e}', file=sys.stderr)
return_code = e.errno
else:
print(f'Invalid IP address or CIDR block \'{args.ip}\', please try again.', file=sys.stderr)
return_code = 1
sys.exit(return_code)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
sys.exit(1)
echo "Please use the Configuration section in SOC to allow hosts"
echo ""
echo "If you need command line options on adding hosts please run so-firewall"