From ec2fc0a5f2740aee9d2a67b47aef9518dda09ed5 Mon Sep 17 00:00:00 2001 From: Josh Patterson Date: Sat, 15 Feb 2025 18:56:04 -0500 Subject: [PATCH] change locking method --- .../engines/master/virtual_node_manager.py | 302 ++++++++---------- 1 file changed, 137 insertions(+), 165 deletions(-) diff --git a/salt/salt/engines/master/virtual_node_manager.py b/salt/salt/engines/master/virtual_node_manager.py index 80e49813a..5cce857b4 100644 --- a/salt/salt/engines/master/virtual_node_manager.py +++ b/salt/salt/engines/master/virtual_node_manager.py @@ -19,7 +19,7 @@ Usage: base_path: /opt/so/saltstack/local/salt/hypervisor/hosts Options: - interval: Time in seconds between processing cycles (default: 30) + interval: Time in seconds between engine runs (managed by salt-master, default: 30) base_path: Base directory containing hypervisor configurations (default: /opt/so/saltstack/local/salt/hypervisor/hosts) Memory values in VM configuration should be specified in GB. These values @@ -55,47 +55,52 @@ State Files: - : Active VM configuration and status - _failed: Failed VM creation details - _invalidHW: Invalid hardware request details - - Lock Files: - - .lock: Prevents concurrent processing of VMs - - Contains VM name and timestamp - - Automatically removed after processing - Notes: - Requires 'hvn' feature license - Uses hypervisor's sosmodel grain for hardware capabilities - Hardware allocation based on model-specific configurations - All created files maintain socore ownership - Comprehensive logging for troubleshooting - - Lock files prevent concurrent processing + - Single engine-wide lock prevents concurrent instances + - Lock remains if error occurs (requires admin intervention) Description: - The engine operates in the following phases: + The engine operates in the following phases: - 1. License Validation - - Verifies 'hvn' feature is licensed - - Prevents operation if license is invalid + 1. Engine Lock Acquisition + - Acquires single engine-wide lock + - Prevents multiple instances from running + - Lock remains until clean shutdown or error - 2. Configuration Processing - - Reads nodes file from each hypervisor directory - - Validates configuration parameters - - Compares against existing VM tracking files + 2. License Validation + - Verifies 'hvn' feature is licensed + - Prevents operation if license is invalid - 3. Hardware Allocation - - Retrieves hypervisor model from grains cache - - Loads model-specific hardware capabilities - - Validates hardware requests against model limits - - Converts hardware indices to PCI IDs - - Ensures proper type handling for hardware indices - - Creates state tracking files with socore ownership + 3. Configuration Processing + - Reads nodes file from each hypervisor directory + - Validates configuration parameters + - Compares against existing VM tracking files - 4. VM Provisioning - - Creates lock file to prevent concurrent operations - - Executes so-salt-cloud with validated configuration - - Handles network setup (static/DHCP) - - Configures hardware passthrough with converted PCI IDs - - Updates VM state tracking - - Removes lock file after completion + 4. Hardware Allocation + - Retrieves hypervisor model from grains cache + - Loads model-specific hardware capabilities + - Validates hardware requests against model limits + - Converts hardware indices to PCI IDs + - Ensures proper type handling for hardware indices + - Creates state tracking files with socore ownership + + 5. VM Provisioning + - Executes so-salt-cloud with validated configuration + - Handles network setup (static/DHCP) + - Configures hardware passthrough with converted PCI IDs + - Updates VM state tracking + + Lock Management: + - Lock acquired at engine start + - Released only on clean shutdown + - Remains if error occurs + - Admin must restart service to clear lock + - Error-level logging for lock issues Exit Codes: 0: Success @@ -129,6 +134,7 @@ import salt.config import salt.runner from typing import Dict, List, Optional, Tuple, Any from datetime import datetime +from threading import Lock # Get socore uid/gid SOCORE_UID = pwd.getpwnam('socore').pw_uid @@ -150,6 +156,9 @@ VALID_ROLES = ['sensor', 'searchnode', 'idh', 'receiver', 'heavynode', 'fleet'] LICENSE_PATH = '/opt/so/saltstack/local/pillar/soc/license.sls' DEFAULTS_PATH = '/opt/so/saltstack/default/salt/hypervisor/defaults.yaml' +# Single engine-wide lock for virtual node manager +engine_lock = Lock() + def read_json_file(file_path: str) -> Any: """ Read and parse a JSON file. @@ -225,53 +234,20 @@ def convert_pci_id(pci_id: str) -> str: log.error("Failed to convert PCI ID %s: %s", pci_id, str(e)) raise -def create_lock_file(hypervisor_path: str, vm_name: str) -> bool: - """Create .lock file for VM processing.""" - lock_file = os.path.join(hypervisor_path, '.lock') - try: - if os.path.exists(lock_file): - log.warning("Lock file already exists at %s", lock_file) - return False - write_json_file(lock_file, { - 'vm': vm_name, - 'timestamp': datetime.now().isoformat() - }) - return True - except Exception as e: - log.error("Failed to create lock file: %s", str(e)) - return False - -def remove_lock_file(hypervisor_path: str) -> None: - """Remove .lock file after processing.""" - lock_file = os.path.join(hypervisor_path, '.lock') - try: - if os.path.exists(lock_file): - os.remove(lock_file) - except Exception as e: - log.error("Failed to remove lock file: %s", str(e)) - -def is_locked(hypervisor_path: str) -> bool: - """Check if hypervisor directory is locked.""" - return os.path.exists(os.path.join(hypervisor_path, '.lock')) - def get_hypervisor_model(hypervisor: str) -> str: """Get sosmodel from hypervisor grains.""" - log.info(hypervisor) #MOD try: # Get cached grains using Salt runner grains = runner.cmd( 'cache.grains', [f'{hypervisor}_*', 'glob'] ) - log.info(grains) #MOD if not grains: raise ValueError(f"No grains found for hypervisor {hypervisor}") # Get the first minion ID that matches our hypervisor minion_id = next(iter(grains.keys())) - log.info(minion_id) #MOD model = grains[minion_id].get('sosmodel') - log.info(model) #MOD if not model: raise ValueError(f"No sosmodel grain found for hypervisor {hypervisor}") @@ -452,7 +428,7 @@ def mark_invalid_hardware(hypervisor_path: str, vm_name: str, config: dict, erro def validate_hvn_license() -> bool: """Check if the license file exists and contains required values.""" if not os.path.exists(LICENSE_PATH): - log.error("LICENSE: License file not found at %s", LICENSE_PATH) + log.error("License file not found at %s", LICENSE_PATH) return False try: @@ -460,14 +436,14 @@ def validate_hvn_license() -> bool: license_data = yaml.safe_load(f) if not license_data: - log.error("LICENSE: Empty or invalid license file") + log.error("Empty or invalid license file") return False license_id = license_data.get('license_id') features = license_data.get('features', []) if not license_id: - log.error("LICENSE: No license_id found in license file") + log.error("No license_id found in license file") return False if 'hvn' not in features: @@ -476,29 +452,35 @@ def validate_hvn_license() -> bool: "for more information about purchasing a license to enable this feature.") return False - log.info("LICENSE: License validation successful") + log.info("License validation successful") return True except Exception as e: - log.error("LICENSE: Error reading license file: %s", str(e)) + log.error("Error reading license file: %s", str(e)) return False def process_vm_creation(hypervisor_path: str, vm_config: dict) -> None: - """Process a single VM creation request.""" + """ + Process a single VM creation request. + + This function handles the creation of a new VM, including hardware validation, + resource allocation, and provisioning. All operations are protected by the + engine-wide lock that is acquired at engine start. + + Args: + hypervisor_path: Path to the hypervisor directory + vm_config: Dictionary containing VM configuration + """ # Get the actual hypervisor name (last directory in path) hypervisor = os.path.basename(hypervisor_path) vm_name = f"{vm_config['hostname']}_{vm_config['role']}" try: - # Create lock file - if not create_lock_file(hypervisor_path, vm_name): - return - # Get hypervisor model and capabilities model = get_hypervisor_model(hypervisor) model_config = load_hardware_defaults(model) - # Validate hardware request + # Initial hardware validation is_valid, errors = validate_hardware_request(model_config, vm_config) if not is_valid: mark_invalid_hardware(hypervisor_path, vm_name, vm_config, errors) @@ -506,97 +488,55 @@ def process_vm_creation(hypervisor_path: str, vm_config: dict) -> None: # Check hardware availability if not check_hardware_availability(hypervisor_path, vm_name): - mark_vm_failed(os.path.join(hypervisor_path, vm_name), 3, - "Requested hardware is already in use") + mark_vm_failed(os.path.join(hypervisor_path, vm_name), 3, + "Requested hardware is already in use") return # Create tracking file create_vm_tracking_file(hypervisor_path, vm_name, vm_config) - # Build so-salt-cloud command - log.debug("Building so-salt-cloud command for VM %s", vm_name) + # Build and execute so-salt-cloud command cmd = ['so-salt-cloud', '-p', f'sool9-{hypervisor}', vm_name] - log.debug("Base command: %s", ' '.join(cmd)) # Add network configuration if vm_config['network_mode'] == 'static4': - log.debug("Adding static network configuration") cmd.extend(['--static4', '--ip4', vm_config['ip4'], '--gw4', vm_config['gw4']]) if 'dns4' in vm_config: cmd.extend(['--dns4', vm_config['dns4']]) if 'search4' in vm_config: cmd.extend(['--search4', vm_config['search4']]) else: - log.debug("Using DHCP network configuration") cmd.append('--dhcp4') # Add hardware configuration if 'cpu' in vm_config: - log.debug("Adding CPU configuration: %s cores", vm_config['cpu']) cmd.extend(['-c', str(vm_config['cpu'])]) if 'memory' in vm_config: memory_mib = int(vm_config['memory']) * 1024 - log.debug("Adding memory configuration: %sGB (%sMiB)", vm_config['memory'], memory_mib) cmd.extend(['-m', str(memory_mib)]) - # Add PCI devices with proper conversion + # Add PCI devices for hw_type in ['disk', 'copper', 'sfp']: if hw_type in vm_config and vm_config[hw_type]: - log.debug("Processing %s hardware configuration", hw_type) indices = [int(x) for x in str(vm_config[hw_type]).split(',')] - log.debug("Requested %s indices: %s", hw_type, indices) for idx in indices: - try: - log.debug("Looking up PCI ID for %s index %d in model config", hw_type, idx) - log.debug("Model config for %s: %s", hw_type, model_config['hardware'][hw_type]) - - # Convert all keys to integers for comparison - hw_config = {int(k): v for k, v in model_config['hardware'][hw_type].items()} - log.debug("Converted hardware config: %s", hw_config) - - pci_id = hw_config[idx] - log.debug("Found PCI ID for %s index %d: %s", hw_type, idx, pci_id) - converted_pci_id = convert_pci_id(pci_id) - log.debug("Converted PCI ID from %s to %s", pci_id, converted_pci_id) - cmd.extend(['-P', converted_pci_id]) - except Exception as e: - log.error("Failed to process PCI ID for %s index %d: %s", hw_type, idx, str(e)) - log.error("Hardware config keys: %s, looking for index: %s", - list(model_config['hardware'][hw_type].keys()), idx) - raise + hw_config = {int(k): v for k, v in model_config['hardware'][hw_type].items()} + pci_id = hw_config[idx] + converted_pci_id = convert_pci_id(pci_id) + cmd.extend(['-P', converted_pci_id]) - # Execute so-salt-cloud - log.info("Executing command: %s", ' '.join(cmd)) - try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - log.debug("Command completed successfully") - log.debug("Command stdout: %s", result.stdout) - if result.stderr: - log.warning("Command stderr (non-fatal): %s", result.stderr) - except subprocess.CalledProcessError as e: - error_msg = f"so-salt-cloud execution failed (code {e.returncode})" - if e.stdout: - log.error("Command stdout: %s", e.stdout) - if e.stderr: - log.error("Command stderr: %s", e.stderr) - error_msg = f"{error_msg}: {e.stderr}" - log.error(error_msg) - mark_vm_failed(os.path.join(hypervisor_path, vm_name), 4, error_msg) - raise + # Execute command + result = subprocess.run(cmd, capture_output=True, text=True, check=True) # Update tracking file status tracking_file = os.path.join(hypervisor_path, vm_name) data = read_json_file(tracking_file) data['status'] = 'running' write_json_file(tracking_file, data) - log.info("Successfully updated VM status to running") except subprocess.CalledProcessError as e: error_msg = f"so-salt-cloud execution failed (code {e.returncode})" - if e.stdout: - log.error("Command stdout: %s", e.stdout) if e.stderr: - log.error("Command stderr: %s", e.stderr) error_msg = f"{error_msg}: {e.stderr}" log.error(error_msg) mark_vm_failed(os.path.join(hypervisor_path, vm_name), 4, error_msg) @@ -604,24 +544,28 @@ def process_vm_creation(hypervisor_path: str, vm_config: dict) -> None: except Exception as e: error_msg = f"VM creation failed: {str(e)}" log.error(error_msg) - # If we haven't created the tracking file yet, create a failed one if not os.path.exists(os.path.join(hypervisor_path, vm_name)): mark_vm_failed(os.path.join(hypervisor_path, f"{vm_name}_failed"), 4, error_msg) - finally: - remove_lock_file(hypervisor_path) + raise def process_vm_deletion(hypervisor_path: str, vm_name: str) -> None: - """Process a single VM deletion request.""" + """ + Process a single VM deletion request. + + This function handles the deletion of an existing VM. All operations are protected + by the engine-wide lock that is acquired at engine start. + + Args: + hypervisor_path: Path to the hypervisor directory + vm_name: Name of the VM to delete + """ try: - if not create_lock_file(hypervisor_path, vm_name): - return - # Get the actual hypervisor name (last directory in path) hypervisor = os.path.basename(hypervisor_path) cmd = ['so-salt-cloud', '-p', f'sool9-{hypervisor}', vm_name, '-yd'] log.info("Executing: %s", ' '.join(cmd)) - result = subprocess.run(cmd, capture_output=True, text=True) + result = subprocess.run(cmd, capture_output=True, text=True, check=True) # Log command output if result.stdout: @@ -629,35 +573,39 @@ def process_vm_deletion(hypervisor_path: str, vm_name: str) -> None: if result.stderr: log.warning("Command stderr: %s", result.stderr) - # Check return code - if result.returncode != 0: - error_msg = f"so-salt-cloud deletion failed (code {result.returncode}): {result.stderr}" - log.error(error_msg) - raise subprocess.CalledProcessError( - result.returncode, cmd, - output=result.stdout, - stderr=result.stderr - ) - # Remove VM tracking file vm_file = os.path.join(hypervisor_path, vm_name) if os.path.exists(vm_file): os.remove(vm_file) log.info("Successfully removed VM tracking file") + except subprocess.CalledProcessError as e: + error_msg = f"so-salt-cloud deletion failed (code {e.returncode}): {e.stderr}" + log.error(error_msg) + raise except Exception as e: log.error("Error processing VM deletion: %s", str(e)) raise - finally: - remove_lock_file(hypervisor_path) def process_hypervisor(hypervisor_path: str) -> None: - """Process VM configurations for a single hypervisor.""" + """ + Process VM configurations for a single hypervisor. + + This function handles the processing of VM configurations for a hypervisor, + including creation of new VMs and deletion of removed VMs. All operations + are protected by the engine-wide lock that is acquired at engine start. + + The function performs the following steps: + 1. Reads and validates nodes configuration + 2. Identifies existing VMs + 3. Processes new VM creation requests + 4. Handles VM deletions for removed configurations + + Args: + hypervisor_path: Path to the hypervisor directory + """ try: - if is_locked(hypervisor_path): - return - - # Read nodes file + # Detection phase - no lock needed nodes_file = os.path.join(hypervisor_path, 'nodes') if not os.path.exists(nodes_file): return @@ -666,7 +614,7 @@ def process_hypervisor(hypervisor_path: str) -> None: if not nodes_config: return - # Get existing VMs + # Get existing VMs - no lock needed existing_vms = set() for file_path in glob.glob(os.path.join(hypervisor_path, '*_*')): basename = os.path.basename(file_path) @@ -684,6 +632,7 @@ def process_hypervisor(hypervisor_path: str) -> None: configured_vms.add(vm_name) if vm_name not in existing_vms: + # process_vm_creation handles its own locking process_vm_creation(hypervisor_path, vm_config) # Process VM deletions @@ -692,29 +641,52 @@ def process_hypervisor(hypervisor_path: str) -> None: except Exception as e: log.error("Failed to process hypervisor %s: %s", hypervisor_path, str(e)) + raise def start(interval: int = DEFAULT_INTERVAL, base_path: str = DEFAULT_BASE_PATH) -> None: """ - Main engine loop. + Process virtual node configurations. + + This function implements a single engine-wide lock to ensure only one + instance of the virtual node manager runs at a time. The lock is: + - Acquired at start + - Released after processing completes + - Maintained if error occurs Args: - interval: Time in seconds between processing cycles + interval: Time in seconds between engine runs (managed by salt-master) base_path: Base path containing hypervisor configurations + + Notes: + - Lock remains if engine encounters an error + - Admin must restart service to clear lock + - Error-level logging used for lock issues """ log.info("Starting virtual node manager engine") if not validate_hvn_license(): return + + # Attempt to acquire engine lock + if not engine_lock.acquire(blocking=False): + log.error("Another virtual node manager is already running") + return - while True: - try: - # Process each hypervisor directory - for hypervisor_path in glob.glob(os.path.join(base_path, '*')): - if os.path.isdir(hypervisor_path): - process_hypervisor(hypervisor_path) - - except Exception as e: - log.error("Error in main engine loop: %s", str(e)) - - time.sleep(interval) + log.debug("Virtual node manager acquired engine lock") + + try: + # Process each hypervisor directory + for hypervisor_path in glob.glob(os.path.join(base_path, '*')): + if os.path.isdir(hypervisor_path): + process_hypervisor(hypervisor_path) + + # Clean shutdown - release lock + log.debug("Virtual node manager releasing engine lock") + engine_lock.release() + log.info("Virtual node manager completed successfully") + + except Exception as e: + log.error("Error in virtual node manager - lock will remain until cleared: %s", str(e)) + # Don't release lock on error - requires admin intervention + return