#!/usr/bin/env python3 # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one # or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. import sys, argparse, re, subprocess, json from packaging.version import Version, InvalidVersion from itertools import groupby, chain def get_image_name(string) -> str: return ':'.join(string.split(':')[:-1]) def get_so_image_basename(string) -> str: return get_image_name(string).split('/so-')[-1] def get_image_version(string) -> str: ver = string.split(':')[-1] if ver == 'latest': # Version doesn't like "latest", so use a high semver return '99999.9.9' else: try: Version(ver) except InvalidVersion: # Also return a very high semver for any version # with a dash in it since it will likely be a dev version of some kind if '-' in ver: return '999999.9.9' return ver def run_command(command): process = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if process.returncode != 0: print(f"Error executing command: {command}", file=sys.stderr) print(f"Error message: {process.stderr}", file=sys.stderr) exit(1) return process.stdout def main(quiet): try: # Prune old/stopped containers using docker CLI if not quiet: print('Pruning old containers') run_command('docker container prune -f') # Get list of images using docker CLI images_json = run_command('docker images --format "{{json .}}"') # Parse the JSON output image_list = [] for line in images_json.strip().split('\n'): if line: # Skip empty lines image_list.append(json.loads(line)) # Extract tags in the format "name:version" tag_list = [] for img in image_list: # Skip dangling images if img.get('Repository') != "" and img.get('Tag') != "": tag = f"{img.get('Repository')}:{img.get('Tag')}" # Filter to only SO images (base name begins with "so-") if re.match(r'^.*\/so-[^\/]*$', get_image_name(tag)): tag_list.append(tag) # Group tags into lists by base name (sort by same projection first) tag_list.sort(key=lambda x: get_so_image_basename(x)) grouped_tag_lists = [list(it) for k, it in groupby(tag_list, lambda x: get_so_image_basename(x))] no_prunable = True for t_list in grouped_tag_lists: try: # Group tags by version, in case multiple images exist with the same version string t_list.sort(key=lambda x: Version(get_image_version(x)), reverse=True) grouped_t_list = [list(it) for k, it in groupby(t_list, lambda x: get_image_version(x))] # Keep the 2 most current version groups if len(grouped_t_list) <= 2: continue else: no_prunable = False for group in grouped_t_list[2:]: for tag in group: if not quiet: print(f'Removing image {tag}') try: run_command(f'docker rmi -f {tag}') except Exception as e: print(f'Could not remove image {tag}, continuing...') except (InvalidVersion) as e: print(f'so-{get_so_image_basename(t_list[0])}: {e}', file=sys.stderr) exit(1) except Exception as e: print('Unhandled exception occurred:') print(f'so-{get_so_image_basename(t_list[0])}: {e}', file=sys.stderr) exit(1) if no_prunable and not quiet: print('No Security Onion images to prune') except Exception as e: print(f"Error: {e}", file=sys.stderr) exit(1) if __name__ == "__main__": main_parser = argparse.ArgumentParser(add_help=False) main_parser.add_argument('-q', '--quiet', action='store_const', const=True, required=False) args = main_parser.parse_args(sys.argv[1:]) main(args.quiet)