#!/usr/bin/python3 # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one # or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at # https://securityonion.net/license; you may not use this file except in compliance with the # Elastic License 2.0. """ Script to modify hardware parameters of a KVM virtual machine. Usage: python so-kvm-modify-hardware.py -v [-c ] [-m ] [-p ] [-s] Example: python so-kvm-modify-hardware.py -v my_vm -c 4 -m 8192 -p 0000:00:1f.2 -s """ import argparse import sys import libvirt import logging import xml.etree.ElementTree as ET from so_vm_utils import start_vm, stop_vm from so_logging_utils import setup_logging def parse_arguments(): parser = argparse.ArgumentParser(description='Modify hardware parameters of a KVM virtual machine.') parser.add_argument('-v', '--vm', required=True, help='Name of the virtual machine to modify.') parser.add_argument('-c', '--cpu', type=int, help='Number of virtual CPUs to assign.') parser.add_argument('-m', '--memory', type=int, help='Amount of memory to assign in MiB.') parser.add_argument('-p', '--pci', help='PCI hardware ID to passthrough to the VM (e.g., 0000:00:1f.2).') parser.add_argument('-s', '--start', action='store_true', help='Start the VM after modification.') args = parser.parse_args() return args def modify_vm(dom, cpu_count, memory_amount, pci_id, logger): try: # Get the XML description of the VM xml_desc = dom.XMLDesc() root = ET.fromstring(xml_desc) # Modify CPU count if cpu_count is not None: vcpu_elem = root.find('./vcpu') if vcpu_elem is not None: vcpu_elem.text = str(cpu_count) logger.info(f"Set CPU count to {cpu_count}.") else: logger.error("Could not find element in XML.") sys.exit(1) # Modify memory amount if memory_amount is not None: memory_elem = root.find('./memory') current_memory_elem = root.find('./currentMemory') if memory_elem is not None and current_memory_elem is not None: memory_elem.text = str(memory_amount * 1024) # Convert MiB to KiB current_memory_elem.text = str(memory_amount * 1024) logger.info(f"Set memory to {memory_amount} MiB.") else: logger.error("Could not find elements in XML.") sys.exit(1) # Add PCI device passthrough if pci_id is not None: devices_elem = root.find('./devices') if devices_elem is not None: hostdev_elem = ET.SubElement(devices_elem, 'hostdev', attrib={ 'mode': 'subsystem', 'type': 'pci', 'managed': 'yes' }) source_elem = ET.SubElement(hostdev_elem, 'source') domain_id, bus_slot_func = pci_id.split(':') bus_slot, function = bus_slot_func.split('.') address_attrs = { 'domain': f'0x{domain_id}', 'bus': f'0x{bus_slot}', 'slot': f'0x{bus_slot}', 'function': f'0x{function}' } ET.SubElement(source_elem, 'address', attrib=address_attrs) logger.info(f"Added PCI device passthrough for {pci_id}.") else: logger.error("Could not find element in XML.") sys.exit(1) # Convert XML back to string new_xml_desc = ET.tostring(root, encoding='unicode') return new_xml_desc except Exception as e: logger.error(f"Failed to modify VM XML: {e}") sys.exit(1) def redefine_vm(conn, new_xml_desc, logger): try: conn.defineXML(new_xml_desc) logger.info("VM redefined with new hardware parameters.") except libvirt.libvirtError as e: logger.error(f"Failed to redefine VM: {e}") sys.exit(1) def main(): # Set up logging using the so_logging_utils library logger = setup_logging( logger_name='so-kvm-modify-hardware', log_file_path='/opt/so/log/hypervisor/so-kvm-modify-hardware.log', log_level=logging.INFO, format_str='%(asctime)s - %(levelname)s - %(message)s' ) try: args = parse_arguments() vm_name = args.vm cpu_count = args.cpu memory_amount = args.memory pci_id = args.pci start_vm_flag = args.start # Connect to libvirt try: conn = libvirt.open(None) except libvirt.libvirtError as e: logger.error(f"Failed to open connection to libvirt: {e}") sys.exit(1) # Stop VM if running dom = stop_vm(conn, vm_name, logger) # Modify VM XML new_xml_desc = modify_vm(dom, cpu_count, memory_amount, pci_id, logger) # Redefine VM redefine_vm(conn, new_xml_desc, logger) # Start VM if -s or --start argument is provided if start_vm_flag: dom = conn.lookupByName(vm_name) start_vm(dom, logger) else: logger.info("VM start flag not provided; VM will remain stopped.") # Close connection conn.close() except KeyboardInterrupt: logger.error("Operation cancelled by user.") sys.exit(1) except Exception as e: logger.error(f"An error occurred: {e}") sys.exit(1) if __name__ == '__main__': main()