#!/usr/bin/env python3 # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one # or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. """ Script to modify hardware parameters of a KVM virtual machine. Usage: python so-kvm-modify-hardware.py -v [-c ] [-m ] [-p ] Example: python so-kvm-modify-hardware.py -v my_vm -c 4 -m 8192 -p 0000:00:1f.2 """ import argparse import logging import sys import time import libvirt import xml.etree.ElementTree as ET def setup_logging(): logger = logging.getLogger('so-kvm-modify-hardware') logger.setLevel(logging.INFO) # Create handlers c_handler = logging.StreamHandler() f_handler = logging.FileHandler('/opt/so/log/hypervisor/so-kvm-modify-hardware.log') c_handler.setLevel(logging.INFO) f_handler.setLevel(logging.INFO) # Create formatter and add it to handlers formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') c_handler.setFormatter(formatter) f_handler.setFormatter(formatter) # Add handlers to the logger logger.addHandler(c_handler) logger.addHandler(f_handler) return logger def parse_arguments(): parser = argparse.ArgumentParser(description='Modify hardware parameters of a KVM virtual machine.') parser.add_argument('-v', '--vm', required=True, help='Name of the virtual machine to modify.') parser.add_argument('-c', '--cpu', type=int, help='Number of virtual CPUs to assign.') parser.add_argument('-m', '--memory', type=int, help='Amount of memory to assign in MiB.') parser.add_argument('-p', '--pci', help='PCI hardware ID to passthrough to the VM (e.g., 0000:00:1f.2).') args = parser.parse_args() return args def stop_vm(conn, vm_name, logger): try: dom = conn.lookupByName(vm_name) if dom.isActive(): logger.info(f"Shutting down VM '{vm_name}'...") dom.shutdown() # Wait for the VM to shut down while dom.isActive(): time.sleep(1) logger.info(f"VM '{vm_name}' has been stopped.") else: logger.info(f"VM '{vm_name}' is already stopped.") return dom except libvirt.libvirtError as e: logger.error(f"Failed to stop VM '{vm_name}': {e}") sys.exit(1) def modify_vm(dom, cpu_count, memory_amount, pci_id, logger): try: # Get the XML description of the VM xml_desc = dom.XMLDesc() root = ET.fromstring(xml_desc) # Modify CPU count if cpu_count is not None: vcpu_elem = root.find('./vcpu') if vcpu_elem is not None: vcpu_elem.text = str(cpu_count) logger.info(f"Set CPU count to {cpu_count}.") else: logger.error("Could not find element in XML.") sys.exit(1) # Modify memory amount if memory_amount is not None: memory_elem = root.find('./memory') current_memory_elem = root.find('./currentMemory') if memory_elem is not None and current_memory_elem is not None: memory_elem.text = str(memory_amount * 1024) # Convert MiB to KiB current_memory_elem.text = str(memory_amount * 1024) logger.info(f"Set memory to {memory_amount} MiB.") else: logger.error("Could not find elements in XML.") sys.exit(1) # Add PCI device passthrough if pci_id is not None: devices_elem = root.find('./devices') if devices_elem is not None: hostdev_elem = ET.SubElement(devices_elem, 'hostdev', attrib={ 'mode': 'subsystem', 'type': 'pci', 'managed': 'yes' }) source_elem = ET.SubElement(hostdev_elem, 'source') domain_id, bus_slot_func = pci_id.split(':') bus_slot, function = bus_slot_func.split('.') address_attrs = { 'domain': f'0x{domain_id}', 'bus': f'0x{bus_slot}', 'slot': f'0x{bus_slot}', 'function': f'0x{function}' } ET.SubElement(source_elem, 'address', attrib=address_attrs) logger.info(f"Added PCI device passthrough for {pci_id}.") else: logger.error("Could not find element in XML.") sys.exit(1) # Convert XML back to string new_xml_desc = ET.tostring(root, encoding='unicode') return new_xml_desc except Exception as e: logger.error(f"Failed to modify VM XML: {e}") sys.exit(1) def redefine_vm(conn, new_xml_desc, logger): try: conn.defineXML(new_xml_desc) logger.info("VM redefined with new hardware parameters.") except libvirt.libvirtError as e: logger.error(f"Failed to redefine VM: {e}") sys.exit(1) def start_vm(dom, logger): try: dom.create() logger.info("VM started successfully.") except libvirt.libvirtError as e: logger.error(f"Failed to start VM: {e}") sys.exit(1) def main(): try: logger = setup_logging() args = parse_arguments() vm_name = args.vm cpu_count = args.cpu memory_amount = args.memory pci_id = args.pci # Connect to libvirt try: conn = libvirt.open(None) except libvirt.libvirtError as e: logger.error(f"Failed to open connection to libvirt: {e}") sys.exit(1) # Stop VM if running dom = stop_vm(conn, vm_name, logger) # Modify VM XML new_xml_desc = modify_vm(dom, cpu_count, memory_amount, pci_id, logger) # Redefine VM redefine_vm(conn, new_xml_desc, logger) # Start VM dom = conn.lookupByName(vm_name) start_vm(dom, logger) # Close connection conn.close() except KeyboardInterrupt: logger.error("Operation cancelled by user.") sys.exit(1) except Exception as e: logger.error(f"An error occurred: {e}") sys.exit(1) if __name__ == '__main__': main()