mirror of
https://github.com/Security-Onion-Solutions/securityonion.git
synced 2025-12-06 09:12:45 +01:00
425 lines
16 KiB
Python
425 lines
16 KiB
Python
#!/usr/bin/python3
|
|
|
|
# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
|
|
# or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
|
|
# https://securityonion.net/license; you may not use this file except in compliance with the
|
|
# Elastic License 2.0.
|
|
|
|
"""
|
|
Script for configuring network interface predictability in Security Onion VMs.
|
|
This script modifies the necessary files to ensure consistent network interface naming.
|
|
|
|
The script performs the following operations:
|
|
1. Modifies the BLS entry to set net.ifnames=1
|
|
2. Removes any existing persistent network rules
|
|
3. Updates GRUB configuration
|
|
|
|
**Usage:**
|
|
so-qcow2-network-predictable -n <domain_name> [-I <qcow2_image_path>]
|
|
|
|
**Options:**
|
|
-n, --name Domain name of the VM to configure
|
|
-I, --image (Optional) Path to the QCOW2 image. If not provided,
|
|
defaults to /nsm/libvirt/images/<domain_name>/<domain_name>.qcow2
|
|
|
|
**Examples:**
|
|
|
|
1. **Configure using domain name:**
|
|
```bash
|
|
so-qcow2-network-predictable -n sool9
|
|
```
|
|
This command will:
|
|
- Use default image path: /nsm/libvirt/images/sool9/sool9.qcow2
|
|
- Configure network interface predictability
|
|
|
|
2. **Configure using custom image path:**
|
|
```bash
|
|
so-qcow2-network-predictable -n sool9 -I /path/to/custom/image.qcow2
|
|
```
|
|
This command will:
|
|
- Use the specified image path
|
|
- Configure network interface predictability
|
|
|
|
**Notes:**
|
|
- The VM must not be running when executing this script
|
|
- Requires root privileges
|
|
- Will automatically find and modify the appropriate BLS entry
|
|
- Removes /etc/udev/rules.d/70-persistent-net.rules if it exists
|
|
- Updates GRUB configuration after changes
|
|
|
|
**Exit Codes:**
|
|
- 0: Success
|
|
- 1: General error (invalid arguments, file operations, etc.)
|
|
- 2: VM is running
|
|
- 3: Required files not found
|
|
- 4: Permission denied
|
|
|
|
**Logging:**
|
|
- Logs are written to /opt/so/log/hypervisor/so-qcow2-network-predictable.log
|
|
- Both file and console logging are enabled
|
|
- Log entries include:
|
|
- Timestamps
|
|
- Operation details
|
|
- Error messages
|
|
- Configuration changes
|
|
"""
|
|
|
|
import argparse
|
|
import guestfs
|
|
import glob
|
|
import libvirt
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from so_logging_utils import setup_logging
|
|
|
|
# Set up logging
|
|
logger = setup_logging(
|
|
logger_name='so-qcow2-network-predictable',
|
|
log_file_path='/opt/so/log/hypervisor/so-qcow2-network-predictable.log',
|
|
log_level=logging.INFO,
|
|
format_str='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
def check_domain_status(domain_name):
|
|
"""
|
|
Check if the specified domain exists and is not running.
|
|
|
|
Args:
|
|
domain_name (str): Name of the libvirt domain to check
|
|
|
|
Returns:
|
|
bool: True if domain exists and is not running, False otherwise
|
|
|
|
Raises:
|
|
RuntimeError: If domain is running or connection to libvirt fails
|
|
"""
|
|
try:
|
|
conn = libvirt.open('qemu:///system')
|
|
try:
|
|
dom = conn.lookupByName(domain_name)
|
|
is_running = dom.isActive()
|
|
if is_running:
|
|
logger.error(f"Domain '{domain_name}' is running - cannot modify configuration")
|
|
raise RuntimeError(f"Domain '{domain_name}' must not be running")
|
|
logger.info(f"Domain '{domain_name}' exists and is not running")
|
|
return True
|
|
except libvirt.libvirtError as e:
|
|
if "no domain with matching name" in str(e):
|
|
logger.error(f"Domain '{domain_name}' not found")
|
|
raise RuntimeError(f"Domain '{domain_name}' not found")
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
except libvirt.libvirtError as e:
|
|
logger.error(f"Failed to connect to libvirt: {e}")
|
|
raise RuntimeError(f"Failed to connect to libvirt: {e}")
|
|
|
|
def modify_bls_entry(g):
|
|
"""
|
|
Find and modify the BLS entry to set net.ifnames=1.
|
|
|
|
Args:
|
|
g: Mounted guestfs handle
|
|
|
|
Returns:
|
|
bool: True if successful, False if no changes needed
|
|
|
|
Raises:
|
|
RuntimeError: If BLS entry cannot be found or modified
|
|
"""
|
|
bls_dir = "/boot/loader/entries"
|
|
logger.info(f"Checking BLS directory: {bls_dir}")
|
|
if g.is_dir(bls_dir):
|
|
logger.info("BLS directory exists")
|
|
else:
|
|
logger.info("Listing /boot contents:")
|
|
try:
|
|
boot_contents = g.ls("/boot")
|
|
logger.info(f"/boot contains: {boot_contents}")
|
|
if g.is_dir("/boot/loader"):
|
|
logger.info("Listing /boot/loader contents:")
|
|
loader_contents = g.ls("/boot/loader")
|
|
logger.info(f"/boot/loader contains: {loader_contents}")
|
|
except Exception as e:
|
|
logger.error(f"Error listing /boot contents: {e}")
|
|
raise RuntimeError(f"BLS directory not found: {bls_dir}")
|
|
|
|
# Find BLS entry file
|
|
entries = g.glob_expand(f"{bls_dir}/*.conf")
|
|
logger.info(f"Found BLS entries: {entries}")
|
|
if not entries:
|
|
logger.error("No BLS entry files found")
|
|
raise RuntimeError("No BLS entry files found")
|
|
|
|
# Use the first entry found
|
|
bls_file = entries[0]
|
|
logger.info(f"Found BLS entry file: {bls_file}")
|
|
|
|
try:
|
|
logger.info(f"Reading BLS file contents from: {bls_file}")
|
|
content = g.read_file(bls_file).decode('utf-8')
|
|
logger.info("Current BLS file content:")
|
|
logger.info("---BEGIN BLS CONTENT---")
|
|
logger.info(content)
|
|
logger.info("---END BLS CONTENT---")
|
|
|
|
lines = content.splitlines()
|
|
modified = False
|
|
|
|
for i, line in enumerate(lines):
|
|
if line.startswith('options '):
|
|
logger.info(f"Found options line: {line}")
|
|
|
|
# First remove any existing net.ifnames parameters (both =0 and =1)
|
|
new_line = re.sub(r'\s*net\.ifnames=[01]\s*', ' ', line)
|
|
# Also remove any quoted versions
|
|
new_line = re.sub(r'\s*"net\.ifnames=[01]"\s*', ' ', new_line)
|
|
# Clean up multiple spaces
|
|
new_line = re.sub(r'\s+', ' ', new_line).strip()
|
|
|
|
# Now add net.ifnames=1 at the end
|
|
new_line = f"{new_line} net.ifnames=1"
|
|
|
|
if new_line != line:
|
|
lines[i] = new_line
|
|
modified = True
|
|
logger.info(f"Updated options line. New line: {new_line}")
|
|
break
|
|
|
|
if modified:
|
|
new_content = '\n'.join(lines) + '\n'
|
|
logger.info("New BLS file content:")
|
|
logger.info("---BEGIN NEW BLS CONTENT---")
|
|
logger.info(new_content)
|
|
logger.info("---END NEW BLS CONTENT---")
|
|
g.write(bls_file, new_content.encode('utf-8'))
|
|
logger.info("Successfully updated BLS entry")
|
|
return True
|
|
|
|
logger.info("No changes needed for BLS entry")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to modify BLS entry: {e}")
|
|
raise RuntimeError(f"Failed to modify BLS entry: {e}")
|
|
|
|
def remove_persistent_net_rules(g):
|
|
"""
|
|
Remove the persistent network rules file if it exists.
|
|
|
|
Args:
|
|
g: Mounted guestfs handle
|
|
|
|
Returns:
|
|
bool: True if file was removed, False if it didn't exist
|
|
"""
|
|
rules_file = "/etc/udev/rules.d/70-persistent-net.rules"
|
|
logger.info(f"Checking for persistent network rules file: {rules_file}")
|
|
try:
|
|
if g.is_file(rules_file):
|
|
logger.info("Found persistent network rules file, removing...")
|
|
g.rm(rules_file)
|
|
logger.info(f"Successfully removed persistent network rules file: {rules_file}")
|
|
return True
|
|
logger.info("No persistent network rules file found")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to remove persistent network rules: {e}")
|
|
raise RuntimeError(f"Failed to remove persistent network rules: {e}")
|
|
|
|
def update_grub_config(g):
|
|
"""
|
|
Update GRUB configuration.
|
|
|
|
Args:
|
|
g: Mounted guestfs handle
|
|
|
|
Raises:
|
|
RuntimeError: If GRUB update fails
|
|
"""
|
|
try:
|
|
# First, read the current grubenv to get the existing kernelopts
|
|
logger.info("Reading current grubenv...")
|
|
grubenv_content = g.read_file('/boot/grub2/grubenv').decode('utf-8')
|
|
logger.info("Current grubenv content:")
|
|
logger.info(grubenv_content)
|
|
|
|
# Extract current kernelopts
|
|
kernelopts_match = re.search(r'^kernelopts="([^"]+)"', grubenv_content, re.MULTILINE)
|
|
if kernelopts_match:
|
|
current_kernelopts = kernelopts_match.group(1)
|
|
logger.info(f"Current kernelopts: {current_kernelopts}")
|
|
|
|
# Remove any existing net.ifnames parameters
|
|
new_kernelopts = re.sub(r'\s*net\.ifnames=[01]\s*', ' ', current_kernelopts)
|
|
# Clean up multiple spaces
|
|
new_kernelopts = re.sub(r'\s+', ' ', new_kernelopts).strip()
|
|
# Add net.ifnames=1
|
|
new_kernelopts = f"{new_kernelopts} net.ifnames=1"
|
|
|
|
logger.info(f"New kernelopts: {new_kernelopts}")
|
|
|
|
# Update grubenv with the new kernelopts
|
|
logger.info("Setting kernelopts with net.ifnames=1...")
|
|
output_editenv = g.command(['grub2-editenv', '-', 'set', f'kernelopts={new_kernelopts}'])
|
|
logger.info("grub2-editenv output:")
|
|
logger.info(output_editenv)
|
|
else:
|
|
# If we can't find existing kernelopts, use the default
|
|
logger.warning("Could not find existing kernelopts, using default")
|
|
output_editenv = g.command(['grub2-editenv', '-', 'set', 'kernelopts=console=tty0 no_timer_check biosdevname=0 resume=/dev/mapper/vg_main-lv_swap rd.lvm.lv=vg_main/lv_root rd.lvm.lv=vg_main/lv_swap net.ifnames=1 crashkernel=1G-64G:448M,64G-:512M'])
|
|
logger.info("grub2-editenv output:")
|
|
logger.info(output_editenv)
|
|
|
|
logger.info("Updating grubby with net.ifnames=1...")
|
|
# First remove any existing net.ifnames arguments
|
|
output_grubby_remove = g.command(['grubby', '--update-kernel=ALL', '--remove-args=net.ifnames=0 net.ifnames=1'])
|
|
logger.info("grubby remove output:")
|
|
logger.info(output_grubby_remove)
|
|
|
|
# Then add net.ifnames=1
|
|
output_grubby_add = g.command(['grubby', '--update-kernel=ALL', '--args=net.ifnames=1'])
|
|
logger.info("grubby add output:")
|
|
logger.info(output_grubby_add)
|
|
|
|
logger.info("Updating GRUB configuration...")
|
|
output_mkconfig = g.command(['grub2-mkconfig', '-o', '/boot/grub2/grub.cfg'])
|
|
logger.info("GRUB update output:")
|
|
logger.info(output_mkconfig)
|
|
logger.info("Successfully updated GRUB configuration")
|
|
except Exception as e:
|
|
logger.error(f"Failed to update GRUB configuration: {e}")
|
|
raise RuntimeError(f"Failed to update GRUB configuration: {e}")
|
|
|
|
def configure_network_predictability(domain_name, image_path=None):
|
|
"""
|
|
Configure network interface predictability for a VM.
|
|
|
|
Args:
|
|
domain_name (str): Name of the domain to configure
|
|
image_path (str, optional): Path to the QCOW2 image
|
|
|
|
Raises:
|
|
RuntimeError: If configuration fails
|
|
"""
|
|
# Check domain status
|
|
check_domain_status(domain_name)
|
|
|
|
# Use default image path if none provided
|
|
if not image_path:
|
|
image_path = f"/nsm/libvirt/images/{domain_name}/{domain_name}.qcow2"
|
|
|
|
if not os.path.exists(image_path):
|
|
logger.error(f"Image file not found: {image_path}")
|
|
raise RuntimeError(f"Image file not found: {image_path}")
|
|
|
|
if not os.access(image_path, os.R_OK | os.W_OK):
|
|
logger.error(f"Permission denied: Cannot access image file {image_path}")
|
|
raise RuntimeError(f"Permission denied: Cannot access image file {image_path}")
|
|
|
|
logger.info(f"Configuring network predictability for domain: {domain_name}")
|
|
logger.info(f"Using image: {image_path}")
|
|
|
|
g = guestfs.GuestFS(python_return_dict=True)
|
|
try:
|
|
logger.info("Initializing guestfs...")
|
|
g.set_network(False)
|
|
g.selinux = False
|
|
g.add_drive_opts(image_path, format="qcow2")
|
|
g.launch()
|
|
|
|
logger.info("Inspecting operating system...")
|
|
roots = g.inspect_os()
|
|
if not roots:
|
|
raise RuntimeError("No operating system found in image")
|
|
|
|
root = roots[0]
|
|
logger.info(f"Found root filesystem: {root}")
|
|
logger.info(f"Operating system type: {g.inspect_get_type(root)}")
|
|
logger.info(f"Operating system distro: {g.inspect_get_distro(root)}")
|
|
logger.info(f"Operating system major version: {g.inspect_get_major_version(root)}")
|
|
logger.info(f"Operating system minor version: {g.inspect_get_minor_version(root)}")
|
|
|
|
logger.info("Getting mount points...")
|
|
mountpoints = g.inspect_get_mountpoints(root)
|
|
logger.info(f"Found mount points: {mountpoints}")
|
|
logger.info("Converting mount points to sortable list...")
|
|
# Convert dictionary to list of tuples
|
|
mountpoints = list(mountpoints.items())
|
|
logger.info(f"Converted mount points: {mountpoints}")
|
|
logger.info("Sorting mount points by path length for proper mount order...")
|
|
mountpoints.sort(key=lambda m: len(m[0]))
|
|
logger.info(f"Mount order will be: {[mp[0] for mp in mountpoints]}")
|
|
|
|
for mp_path, mp_device in mountpoints:
|
|
try:
|
|
logger.info(f"Attempting to mount {mp_device} at {mp_path}")
|
|
g.mount(mp_device, mp_path)
|
|
logger.info(f"Successfully mounted {mp_device} at {mp_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not mount {mp_device} at {mp_path}: {str(e)}")
|
|
# Continue with other mounts
|
|
|
|
# Perform configuration steps
|
|
bls_modified = modify_bls_entry(g)
|
|
rules_removed = remove_persistent_net_rules(g)
|
|
|
|
if bls_modified or rules_removed:
|
|
update_grub_config(g)
|
|
logger.info("Network predictability configuration completed successfully")
|
|
else:
|
|
logger.info("No changes were necessary")
|
|
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to configure network predictability: {e}")
|
|
finally:
|
|
try:
|
|
logger.info("Unmounting all filesystems...")
|
|
g.umount_all()
|
|
logger.info("Successfully unmounted all filesystems")
|
|
except Exception as e:
|
|
logger.warning(f"Error unmounting filesystems: {e}")
|
|
g.close()
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Configure network interface predictability for Security Onion VMs"
|
|
)
|
|
parser.add_argument("-n", "--name", required=True,
|
|
help="Domain name of the VM to configure")
|
|
parser.add_argument("-I", "--image",
|
|
help="Path to the QCOW2 image (optional)")
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
"""Main entry point for the script."""
|
|
try:
|
|
args = parse_arguments()
|
|
configure_network_predictability(args.name, args.image)
|
|
sys.exit(0)
|
|
except RuntimeError as e:
|
|
if "must not be running" in str(e):
|
|
logger.error(str(e))
|
|
sys.exit(2)
|
|
elif "not found" in str(e):
|
|
logger.error(str(e))
|
|
sys.exit(3)
|
|
elif "Permission denied" in str(e):
|
|
logger.error(str(e))
|
|
sys.exit(4)
|
|
else:
|
|
logger.error(str(e))
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
logger.error("Operation cancelled by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|