Files
securityonion/salt/common/tools/sbin/so-status

168 lines
5.4 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
import json
import os
from rich import box
from rich.console import Console
from rich.table import Table
import subprocess
import sys
import time
EXPECTED_CONTAINERS_FILE = "/opt/so/conf/so-status/so-status.conf"
HIGHSTATE_COMPLETE_FILE = "/opt/so/log/salt/lasthighstate"
UPTIME_FILE = "/proc/uptime"
def showUsage(options, args):
print('Usage: {} [OPTIONS]'.format(sys.argv[0]))
print(' Options:')
print(' -h - Prints this usage information')
print(' -q - Suppress output; useful for automation of exit code value')
print(' -j - Output in JSON format')
print('')
print(' Exit codes:')
print(' 0 - Success, system appears to be running correctly')
print(' 1 - Error, one or more subsystems are not running')
print(' 2 - System is starting')
print(' 99 - Installation in progress')
sys.exit(1)
def fail(msg):
print(msg, file=sys.stderr)
sys.exit(1)
def check_system_status(options, console):
code = 0
highstate_end_time = 0
try:
highstate_end_time = os.path.getmtime(HIGHSTATE_COMPLETE_FILE)
uptime_file = open(UPTIME_FILE, "r")
uptime_contents = uptime_file.readline()
uptime = uptime_contents.split()
if len(uptime) != 2:
fail("Unable to determine system uptime")
system_start_time = time.time() - float(uptime[0])
if highstate_end_time < system_start_time:
code = 2
except OSError:
code = 99
return code
def output(options, console, code, data):
if "-j" in options:
summary = { "status_code": code, "containers": data }
print(json.dumps(summary))
elif "-q" not in options:
if code == 2:
console.print(" [bold yellow]:hourglass: [bold white]System appears to be starting. No highstate has completed since the system was restarted.")
elif code == 99:
console.print(" [bold red]:exclamation: [bold white]Installation does not appear to be complete. A highstate has not fully completed.")
else:
table = Table(title = "Security Onion Status", show_edge = False, safe_box = True, box = box.MINIMAL)
table.add_column("Container", justify="right", style="white", no_wrap=True)
table.add_column("Status", justify="left", style="green", no_wrap=True)
table.add_column("Details", justify="right", style="cyan", no_wrap=True)
data.sort(key = lambda x: x['Name'])
for container in data:
color = "[green]"
if container['Status'] != "running":
color = "[bold red]"
table.add_row(container['Name'], color + container['Status'], container['Details'])
console.print()
console.print(table)
console.print()
if code == 0:
console.print(" [green]:heavy_check_mark: [bold white]This onion is ready to make your adversaries cry!")
elif code == 1:
console.print(" [bold red]:exclamation: [bold white]Check container logs and /opt/so/log for more details.")
console.print()
def check_container_status(options, console):
code = 0
cli = "docker"
proc = subprocess.run([cli, 'ps', '--format', '{{json .}}'], stdout=subprocess.PIPE, encoding="utf-8")
if proc.returncode != 0:
fail("Container system error; unable to obtain container process statuses")
container_list = []
expected_file = open(EXPECTED_CONTAINERS_FILE, "r")
for container in expected_file:
if container.startswith('#'):
continue
container = container.strip()
exists = False
details = { "Name": container, "Status": "missing", "Details": ""}
try:
# Podman format
containers_data = json.loads(proc.stdout)
for item in containers_data:
if item['Names'][0] == container:
details['Status'] = item['State']
details['Details'] = item['Status']
container_list.append(details)
exists = True
if item['State'] != "running":
code = 1
break
except:
# Docker format
for line in proc.stdout.splitlines():
if len(line) > 2:
item = json.loads(line)
if item['Names'] == container:
details['Status'] = item['State']
details['Details'] = item['Status']
container_list.append(details)
exists = True
if item['State'] != "running":
code = 1
break
if not exists:
container_list.append(details)
code = 1
return code, container_list
def check_status(options, console):
container_list = []
code = check_system_status(options, console)
if code == 0:
code, container_list = check_container_status(options, console)
output(options, console, code, container_list)
return code
def main():
options = []
args = sys.argv[1:]
for option in args:
if option.startswith("-"):
options.append(option)
args.remove(option)
if len(args) != 0 or "-h" in options:
showUsage(options, None)
if os.environ["USER"] != "root":
fail("This program must be run as root")
console = Console()
sys.exit(check_status(options, console))
if __name__ == "__main__":
main()