#!/usr/bin/env python3 # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one # or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. import json import os from rich import box from rich.console import Console from rich.table import Table import subprocess import sys import time EXPECTED_CONTAINERS_FILE = "/opt/so/conf/so-status/so-status.conf" HIGHSTATE_COMPLETE_FILE = "/opt/so/log/salt/lasthighstate" UPTIME_FILE = "/proc/uptime" def showUsage(options, args): print('Usage: {} [OPTIONS]'.format(sys.argv[0])) print(' Options:') print(' -h - Prints this usage information') print(' -q - Suppress output; useful for automation of exit code value') print(' -j - Output in JSON format') print(' -i - Consider the installation outcome regardless of whether the system appears healthy') print('') print(' Exit codes:') print(' 0 - Success, system appears to be running correctly') print(' 1 - Error, one or more subsystems are not running') print(' 2 - System is starting') print(' 99 - Installation in progress') print(' 100 - System installation encountered errors') sys.exit(1) def fail(msg): print(msg, file=sys.stderr) sys.exit(1) def check_installation_status(options, console): if "-i" in options: if os.path.isfile('/root/failure'): return 100 if os.path.isfile('/root/success'): return 0 return 99 return 0 def check_system_status(options, console): code = 0 highstate_end_time = 0 try: highstate_end_time = os.path.getmtime(HIGHSTATE_COMPLETE_FILE) uptime_file = open(UPTIME_FILE, "r") uptime_contents = uptime_file.readline() uptime = uptime_contents.split() if len(uptime) != 2: fail("Unable to determine system uptime") system_start_time = time.time() - float(uptime[0]) if highstate_end_time < system_start_time: code = 2 except OSError: code = 99 return code def output(options, console, code, data): if "-j" in options: summary = { "status_code": code, "containers": data } print(json.dumps(summary)) elif "-q" not in options: if code == 2: console.print(" [bold yellow]:hourglass: [bold white]System appears to be starting. No highstate has completed since the system was restarted.") elif code == 99: console.print(" [bold red]:exclamation: [bold white]Installation does not appear to be complete. A highstate has not fully completed.") elif code == 100: console.print(" [bold red]:exclamation: [bold white]Installation encountered errors.") else: table = Table(title = "Security Onion Status", show_edge = False, safe_box = True, box = box.MINIMAL) table.add_column("Container", justify="right", style="white", no_wrap=True) table.add_column("Status", justify="left", style="green", no_wrap=True) table.add_column("Details", justify="right", style="cyan", no_wrap=True) data.sort(key = lambda x: x['Name']) for container in data: color = "[green]" if container['Status'] != "running": color = "[bold red]" table.add_row(container['Name'], color + container['Status'], container['Details']) console.print() console.print(table) console.print() if code == 0: console.print(" [green]:heavy_check_mark: [bold white]This onion is ready to make your adversaries cry!") elif code == 1: console.print(" [bold red]:exclamation: [bold white]Check container logs and /opt/so/log for more details.") console.print() def check_container_status(options, console): code = 0 cli = "docker" proc = subprocess.run([cli, 'ps', '--format', 'json'], stdout=subprocess.PIPE, encoding="utf-8") if proc.returncode != 0: fail("Container system error; unable to obtain container process statuses") container_list = [] expected_file = open(EXPECTED_CONTAINERS_FILE, "r") for container in expected_file: if container.startswith('#'): continue container = container.strip() exists = False details = { "Name": container, "Status": "missing", "Details": ""} try: # Podman format containers_data = json.loads(proc.stdout) for item in containers_data: if item['Names'][0] == container: details['Status'] = item['State'] details['Details'] = item['Status'] container_list.append(details) exists = True if item['State'] != "running": code = 1 break except: # Docker format for line in proc.stdout.splitlines(): if len(line) > 2: item = json.loads(line) if item['Names'] == container: details['Status'] = item['State'] details['Details'] = item['Status'] container_list.append(details) exists = True if item['State'] != "running": code = 1 break if not exists: container_list.append(details) code = 1 return code, container_list def check_status(options, console): container_list = [] code = check_installation_status(options, console) if code == 0: code = check_system_status(options, console) if code == 0: code, container_list = check_container_status(options, console) output(options, console, code, container_list) return code def main(): options = [] args = sys.argv[1:] for option in args: if option.startswith("-"): options.append(option) if "-h" in options or "--help" in options or "-?" in options: showUsage(options, None) proc = subprocess.run(['id', '-u'], stdout=subprocess.PIPE, encoding="utf-8") if proc.stdout.strip() != "0": fail("This program must be run as root") console = Console() sys.exit(check_status(options, console)) if __name__ == "__main__": main()