Files
securityonion/salt/hypervisor/tools/sbin/so-qcow2-network-predictable
2025-06-04 16:57:59 -04:00

425 lines
16 KiB
Python

#!/usr/bin/python3
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
# https://securityonion.net/license; you may not use this file except in compliance with the
# Elastic License 2.0.
"""
Script for configuring network interface predictability in Security Onion VMs.
This script modifies the necessary files to ensure consistent network interface naming.
The script performs the following operations:
1. Modifies the BLS entry to set net.ifnames=1
2. Removes any existing persistent network rules
3. Updates GRUB configuration
**Usage:**
so-qcow2-network-predictable -n <domain_name> [-I <qcow2_image_path>]
**Options:**
-n, --name Domain name of the VM to configure
-I, --image (Optional) Path to the QCOW2 image. If not provided,
defaults to /nsm/libvirt/images/<domain_name>/<domain_name>.qcow2
**Examples:**
1. **Configure using domain name:**
```bash
so-qcow2-network-predictable -n sool9
```
This command will:
- Use default image path: /nsm/libvirt/images/sool9/sool9.qcow2
- Configure network interface predictability
2. **Configure using custom image path:**
```bash
so-qcow2-network-predictable -n sool9 -I /path/to/custom/image.qcow2
```
This command will:
- Use the specified image path
- Configure network interface predictability
**Notes:**
- The VM must not be running when executing this script
- Requires root privileges
- Will automatically find and modify the appropriate BLS entry
- Removes /etc/udev/rules.d/70-persistent-net.rules if it exists
- Updates GRUB configuration after changes
**Exit Codes:**
- 0: Success
- 1: General error (invalid arguments, file operations, etc.)
- 2: VM is running
- 3: Required files not found
- 4: Permission denied
**Logging:**
- Logs are written to /opt/so/log/hypervisor/so-qcow2-network-predictable.log
- Both file and console logging are enabled
- Log entries include:
- Timestamps
- Operation details
- Error messages
- Configuration changes
"""
import argparse
import guestfs
import glob
import libvirt
import logging
import os
import re
import sys
from so_logging_utils import setup_logging
# Set up logging
logger = setup_logging(
logger_name='so-qcow2-network-predictable',
log_file_path='/opt/so/log/hypervisor/so-qcow2-network-predictable.log',
log_level=logging.INFO,
format_str='%(asctime)s - %(levelname)s - %(message)s'
)
def check_domain_status(domain_name):
"""
Check if the specified domain exists and is not running.
Args:
domain_name (str): Name of the libvirt domain to check
Returns:
bool: True if domain exists and is not running, False otherwise
Raises:
RuntimeError: If domain is running or connection to libvirt fails
"""
try:
conn = libvirt.open('qemu:///system')
try:
dom = conn.lookupByName(domain_name)
is_running = dom.isActive()
if is_running:
logger.error(f"Domain '{domain_name}' is running - cannot modify configuration")
raise RuntimeError(f"Domain '{domain_name}' must not be running")
logger.info(f"Domain '{domain_name}' exists and is not running")
return True
except libvirt.libvirtError as e:
if "no domain with matching name" in str(e):
logger.error(f"Domain '{domain_name}' not found")
raise RuntimeError(f"Domain '{domain_name}' not found")
raise
finally:
conn.close()
except libvirt.libvirtError as e:
logger.error(f"Failed to connect to libvirt: {e}")
raise RuntimeError(f"Failed to connect to libvirt: {e}")
def modify_bls_entry(g):
"""
Find and modify the BLS entry to set net.ifnames=1.
Args:
g: Mounted guestfs handle
Returns:
bool: True if successful, False if no changes needed
Raises:
RuntimeError: If BLS entry cannot be found or modified
"""
bls_dir = "/boot/loader/entries"
logger.info(f"Checking BLS directory: {bls_dir}")
if g.is_dir(bls_dir):
logger.info("BLS directory exists")
else:
logger.info("Listing /boot contents:")
try:
boot_contents = g.ls("/boot")
logger.info(f"/boot contains: {boot_contents}")
if g.is_dir("/boot/loader"):
logger.info("Listing /boot/loader contents:")
loader_contents = g.ls("/boot/loader")
logger.info(f"/boot/loader contains: {loader_contents}")
except Exception as e:
logger.error(f"Error listing /boot contents: {e}")
raise RuntimeError(f"BLS directory not found: {bls_dir}")
# Find BLS entry file
entries = g.glob_expand(f"{bls_dir}/*.conf")
logger.info(f"Found BLS entries: {entries}")
if not entries:
logger.error("No BLS entry files found")
raise RuntimeError("No BLS entry files found")
# Use the first entry found
bls_file = entries[0]
logger.info(f"Found BLS entry file: {bls_file}")
try:
logger.info(f"Reading BLS file contents from: {bls_file}")
content = g.read_file(bls_file).decode('utf-8')
logger.info("Current BLS file content:")
logger.info("---BEGIN BLS CONTENT---")
logger.info(content)
logger.info("---END BLS CONTENT---")
lines = content.splitlines()
modified = False
for i, line in enumerate(lines):
if line.startswith('options '):
logger.info(f"Found options line: {line}")
# First remove any existing net.ifnames parameters (both =0 and =1)
new_line = re.sub(r'\s*net\.ifnames=[01]\s*', ' ', line)
# Also remove any quoted versions
new_line = re.sub(r'\s*"net\.ifnames=[01]"\s*', ' ', new_line)
# Clean up multiple spaces
new_line = re.sub(r'\s+', ' ', new_line).strip()
# Now add net.ifnames=1 at the end
new_line = f"{new_line} net.ifnames=1"
if new_line != line:
lines[i] = new_line
modified = True
logger.info(f"Updated options line. New line: {new_line}")
break
if modified:
new_content = '\n'.join(lines) + '\n'
logger.info("New BLS file content:")
logger.info("---BEGIN NEW BLS CONTENT---")
logger.info(new_content)
logger.info("---END NEW BLS CONTENT---")
g.write(bls_file, new_content.encode('utf-8'))
logger.info("Successfully updated BLS entry")
return True
logger.info("No changes needed for BLS entry")
return False
except Exception as e:
logger.error(f"Failed to modify BLS entry: {e}")
raise RuntimeError(f"Failed to modify BLS entry: {e}")
def remove_persistent_net_rules(g):
"""
Remove the persistent network rules file if it exists.
Args:
g: Mounted guestfs handle
Returns:
bool: True if file was removed, False if it didn't exist
"""
rules_file = "/etc/udev/rules.d/70-persistent-net.rules"
logger.info(f"Checking for persistent network rules file: {rules_file}")
try:
if g.is_file(rules_file):
logger.info("Found persistent network rules file, removing...")
g.rm(rules_file)
logger.info(f"Successfully removed persistent network rules file: {rules_file}")
return True
logger.info("No persistent network rules file found")
return False
except Exception as e:
logger.error(f"Failed to remove persistent network rules: {e}")
raise RuntimeError(f"Failed to remove persistent network rules: {e}")
def update_grub_config(g):
"""
Update GRUB configuration.
Args:
g: Mounted guestfs handle
Raises:
RuntimeError: If GRUB update fails
"""
try:
# First, read the current grubenv to get the existing kernelopts
logger.info("Reading current grubenv...")
grubenv_content = g.read_file('/boot/grub2/grubenv').decode('utf-8')
logger.info("Current grubenv content:")
logger.info(grubenv_content)
# Extract current kernelopts
kernelopts_match = re.search(r'^kernelopts="([^"]+)"', grubenv_content, re.MULTILINE)
if kernelopts_match:
current_kernelopts = kernelopts_match.group(1)
logger.info(f"Current kernelopts: {current_kernelopts}")
# Remove any existing net.ifnames parameters
new_kernelopts = re.sub(r'\s*net\.ifnames=[01]\s*', ' ', current_kernelopts)
# Clean up multiple spaces
new_kernelopts = re.sub(r'\s+', ' ', new_kernelopts).strip()
# Add net.ifnames=1
new_kernelopts = f"{new_kernelopts} net.ifnames=1"
logger.info(f"New kernelopts: {new_kernelopts}")
# Update grubenv with the new kernelopts
logger.info("Setting kernelopts with net.ifnames=1...")
output_editenv = g.command(['grub2-editenv', '-', 'set', f'kernelopts={new_kernelopts}'])
logger.info("grub2-editenv output:")
logger.info(output_editenv)
else:
# If we can't find existing kernelopts, use the default
logger.warning("Could not find existing kernelopts, using default")
output_editenv = g.command(['grub2-editenv', '-', 'set', 'kernelopts=console=tty0 no_timer_check biosdevname=0 resume=/dev/mapper/vg_main-lv_swap rd.lvm.lv=vg_main/lv_root rd.lvm.lv=vg_main/lv_swap net.ifnames=1 crashkernel=1G-64G:448M,64G-:512M'])
logger.info("grub2-editenv output:")
logger.info(output_editenv)
logger.info("Updating grubby with net.ifnames=1...")
# First remove any existing net.ifnames arguments
output_grubby_remove = g.command(['grubby', '--update-kernel=ALL', '--remove-args=net.ifnames=0 net.ifnames=1'])
logger.info("grubby remove output:")
logger.info(output_grubby_remove)
# Then add net.ifnames=1
output_grubby_add = g.command(['grubby', '--update-kernel=ALL', '--args=net.ifnames=1'])
logger.info("grubby add output:")
logger.info(output_grubby_add)
logger.info("Updating GRUB configuration...")
output_mkconfig = g.command(['grub2-mkconfig', '-o', '/boot/grub2/grub.cfg'])
logger.info("GRUB update output:")
logger.info(output_mkconfig)
logger.info("Successfully updated GRUB configuration")
except Exception as e:
logger.error(f"Failed to update GRUB configuration: {e}")
raise RuntimeError(f"Failed to update GRUB configuration: {e}")
def configure_network_predictability(domain_name, image_path=None):
"""
Configure network interface predictability for a VM.
Args:
domain_name (str): Name of the domain to configure
image_path (str, optional): Path to the QCOW2 image
Raises:
RuntimeError: If configuration fails
"""
# Check domain status
check_domain_status(domain_name)
# Use default image path if none provided
if not image_path:
image_path = f"/nsm/libvirt/images/{domain_name}/{domain_name}.qcow2"
if not os.path.exists(image_path):
logger.error(f"Image file not found: {image_path}")
raise RuntimeError(f"Image file not found: {image_path}")
if not os.access(image_path, os.R_OK | os.W_OK):
logger.error(f"Permission denied: Cannot access image file {image_path}")
raise RuntimeError(f"Permission denied: Cannot access image file {image_path}")
logger.info(f"Configuring network predictability for domain: {domain_name}")
logger.info(f"Using image: {image_path}")
g = guestfs.GuestFS(python_return_dict=True)
try:
logger.info("Initializing guestfs...")
g.set_network(False)
g.selinux = False
g.add_drive_opts(image_path, format="qcow2")
g.launch()
logger.info("Inspecting operating system...")
roots = g.inspect_os()
if not roots:
raise RuntimeError("No operating system found in image")
root = roots[0]
logger.info(f"Found root filesystem: {root}")
logger.info(f"Operating system type: {g.inspect_get_type(root)}")
logger.info(f"Operating system distro: {g.inspect_get_distro(root)}")
logger.info(f"Operating system major version: {g.inspect_get_major_version(root)}")
logger.info(f"Operating system minor version: {g.inspect_get_minor_version(root)}")
logger.info("Getting mount points...")
mountpoints = g.inspect_get_mountpoints(root)
logger.info(f"Found mount points: {mountpoints}")
logger.info("Converting mount points to sortable list...")
# Convert dictionary to list of tuples
mountpoints = list(mountpoints.items())
logger.info(f"Converted mount points: {mountpoints}")
logger.info("Sorting mount points by path length for proper mount order...")
mountpoints.sort(key=lambda m: len(m[0]))
logger.info(f"Mount order will be: {[mp[0] for mp in mountpoints]}")
for mp_path, mp_device in mountpoints:
try:
logger.info(f"Attempting to mount {mp_device} at {mp_path}")
g.mount(mp_device, mp_path)
logger.info(f"Successfully mounted {mp_device} at {mp_path}")
except Exception as e:
logger.warning(f"Could not mount {mp_device} at {mp_path}: {str(e)}")
# Continue with other mounts
# Perform configuration steps
bls_modified = modify_bls_entry(g)
rules_removed = remove_persistent_net_rules(g)
if bls_modified or rules_removed:
update_grub_config(g)
logger.info("Network predictability configuration completed successfully")
else:
logger.info("No changes were necessary")
except Exception as e:
raise RuntimeError(f"Failed to configure network predictability: {e}")
finally:
try:
logger.info("Unmounting all filesystems...")
g.umount_all()
logger.info("Successfully unmounted all filesystems")
except Exception as e:
logger.warning(f"Error unmounting filesystems: {e}")
g.close()
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Configure network interface predictability for Security Onion VMs"
)
parser.add_argument("-n", "--name", required=True,
help="Domain name of the VM to configure")
parser.add_argument("-I", "--image",
help="Path to the QCOW2 image (optional)")
return parser.parse_args()
def main():
"""Main entry point for the script."""
try:
args = parse_arguments()
configure_network_predictability(args.name, args.image)
sys.exit(0)
except RuntimeError as e:
if "must not be running" in str(e):
logger.error(str(e))
sys.exit(2)
elif "not found" in str(e):
logger.error(str(e))
sys.exit(3)
elif "Permission denied" in str(e):
logger.error(str(e))
sys.exit(4)
else:
logger.error(str(e))
sys.exit(1)
except KeyboardInterrupt:
logger.error("Operation cancelled by user")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()