From 2cd0f6906904acdec429c57451765964bbcde851 Mon Sep 17 00:00:00 2001 From: m0duspwnens Date: Mon, 27 Jan 2025 16:40:10 -0500 Subject: [PATCH] watch and build --- .../engines/master/virtual_node_manager.py | 136 ++++++++++++------ 1 file changed, 94 insertions(+), 42 deletions(-) diff --git a/salt/salt/engines/master/virtual_node_manager.py b/salt/salt/engines/master/virtual_node_manager.py index a47c68f32..ed3d10d5a 100644 --- a/salt/salt/engines/master/virtual_node_manager.py +++ b/salt/salt/engines/master/virtual_node_manager.py @@ -97,8 +97,6 @@ log.setLevel(logging.DEBUG) DEFAULT_INTERVAL = 30 DEFAULT_BASE_PATH = '/opt/so/saltstack/local/salt/hypervisor/hosts' VALID_ROLES = ['sensor', 'searchnode', 'idh', 'receiver', 'heavynode', 'fleet'] -MAX_RETRIES = 3 -RETRY_DELAY = 5 LICENSE_PATH = '/opt/so/saltstack/local/pillar/soc/license.sls' def ensure_claimed_section(hw_config: dict) -> dict: @@ -191,17 +189,21 @@ class HardwareManager: log.error("Failed to write hypervisor configuration: %s", str(e)) raise - def get_pci_ids(self, hw_type: str, indices: List[int]) -> List[str]: + def get_pci_ids(self, hw_type: str, indices: Optional[List[int]]) -> List[str]: """ Look up PCI IDs for requested hardware indices. Args: hw_type: Type of hardware (disk, copper, sfp) - indices: List of hardware indices to look up + indices: List of hardware indices to look up, or None if no hardware requested Returns: List of PCI IDs """ + # Skip if no indices provided + if indices is None: + return [] + config = self.read_hypervisor_config() log.debug("Full config structure: %s", config) log.debug("Looking up indices %s for hardware type %s", indices, hw_type) @@ -342,6 +344,12 @@ def parse_add_file(file_path: str) -> Tuple[str, str, dict]: for field in required_fields: if field not in config: raise ValueError(f"Missing required field: {field}") + + # Validate hardware sections + for hw_type in ['disk', 'copper', 'sfp']: + if hw_type in config and config[hw_type] is not None: + if not isinstance(config[hw_type], list): + raise ValueError(f"{hw_type} must be a list when present") return hypervisor, role, config except Exception as e: @@ -405,6 +413,27 @@ def execute_salt_cloud(profile: str, hostname: str, role: str, config: dict, pci log.error("Failed to execute so-salt-cloud: %s", str(e)) raise +def release_hardware(hw_manager: HardwareManager, hw_type: str, indices: List[int]) -> None: + """ + Release claimed hardware back to free pool. + + Args: + hw_manager: HardwareManager instance + hw_type: Type of hardware (disk, copper, sfp) + indices: List of hardware indices to release + """ + config = hw_manager.read_hypervisor_config() + hw_config = config['hypervisor']['hardware'][hw_type] + + for idx in indices: + if idx in hw_config['claimed']: + pci_id = hw_config['claimed'][idx] + hw_config['free'][idx] = pci_id + del hw_config['claimed'][idx] + + hw_manager.write_hypervisor_config(config) + log.info("Released %s hardware indices: %s", hw_type, indices) + def process_add_file(file_path: str, base_path: str) -> None: """ Process a single add_* file for VM creation. @@ -418,39 +447,62 @@ def process_add_file(file_path: str, base_path: str) -> None: log.debug("Parsed config from add file: %s", config) hw_manager = HardwareManager(hypervisor, base_path) - # Collect all PCI IDs + # Phase 1: Collect all hardware information without claiming pci_ids = [] + hardware_to_claim = {} # Store hardware to claim for each type hardware_tracking = { 'cpu': config.get('cpu'), 'memory': config.get('memory') } - # Process each hardware type + # Validate all hardware first for hw_type in ['disk', 'copper', 'sfp']: - if hw_type in config: - indices = config[hw_type] + indices = config.get(hw_type) + if indices is not None: + # Validate indices are available before claiming hw_pci_ids = hw_manager.get_pci_ids(hw_type, indices) - pci_ids.extend(hw_pci_ids) - - # Track hardware allocation - hardware_tracking[hw_type] = [ - {'id': idx, 'pci': pci_id} - for idx, pci_id in zip(indices, hw_pci_ids) - ] - - # Claim the hardware + if hw_pci_ids: + hardware_to_claim[hw_type] = indices + pci_ids.extend(hw_pci_ids) + hardware_tracking[hw_type] = [ + {'id': idx, 'pci': pci_id} + for idx, pci_id in zip(indices, hw_pci_ids) + ] + + # Phase 2: Claim hardware only after all validation passes + try: + # Claim all hardware + for hw_type, indices in hardware_to_claim.items(): hw_manager.claim_hardware(hw_type, indices) - - # Create VM - execute_salt_cloud(hypervisor, config['hostname'], role, config, pci_ids) - - # Create hardware tracking file - hw_manager.create_vm_hardware_file(config['hostname'], role, hardware_tracking) - - # Clean up the add_* file - os.remove(file_path) - log.info("Successfully processed VM creation request: %s_%s", config['hostname'], role) - + + # Create VM + execute_salt_cloud(hypervisor, config['hostname'], role, config, pci_ids) + + # Create hardware tracking file + hw_manager.create_vm_hardware_file(config['hostname'], role, hardware_tracking) + + log.info("Successfully processed VM creation request: %s_%s", + config['hostname'], role) + + except Exception as e: + # If anything fails after claiming, release claimed hardware + log.error("Failed after hardware claim, attempting to release hardware: %s", str(e)) + for hw_type, indices in hardware_to_claim.items(): + try: + release_hardware(hw_manager, hw_type, indices) + except Exception as release_error: + log.error("Failed to release %s hardware %s: %s", + hw_type, indices, str(release_error)) + raise + + finally: + # Always clean up the add file + try: + os.remove(file_path) + log.info("Cleaned up add file: %s", file_path) + except Exception as e: + log.error("Failed to clean up add file: %s", str(e)) + except Exception as e: log.error("Failed to process add file %s: %s", file_path, str(e)) raise @@ -467,19 +519,10 @@ def monitor_add_files(base_path: str) -> None: try: for file_path in glob.glob(pattern): log.info("Found new VM request file: %s", file_path) - - for attempt in range(MAX_RETRIES): - try: - process_add_file(file_path, base_path) - break - except Exception as e: - if attempt < MAX_RETRIES - 1: - log.warning("Attempt %d failed, retrying in %d seconds: %s", - attempt + 1, RETRY_DELAY, str(e)) - time.sleep(RETRY_DELAY) - else: - log.error("All attempts failed for file %s", file_path) - raise + try: + process_add_file(file_path, base_path) + except Exception as e: + log.error("Failed to process file %s: %s", file_path, str(e)) except Exception as e: log.error("Error monitoring add files: %s", str(e)) raise @@ -498,9 +541,18 @@ def start(interval: int = DEFAULT_INTERVAL, if not validate_hvn_license(): return + processed_files = set() while True: try: - monitor_add_files(base_path) + for file_path in glob.glob(os.path.join(base_path, '*', 'add_*')): + if file_path not in processed_files: + try: + process_add_file(file_path, base_path) + processed_files.add(file_path) + except Exception as e: + log.error("Failed to process file %s: %s", file_path, str(e)) + # Still mark as processed to avoid retrying + processed_files.add(file_path) except Exception as e: log.error("Error in main engine loop: %s", str(e))