change locking method

This commit is contained in:
Josh Patterson
2025-02-15 18:56:04 -05:00
parent ad54afe39a
commit ec2fc0a5f2

View File

@@ -19,7 +19,7 @@ Usage:
base_path: /opt/so/saltstack/local/salt/hypervisor/hosts base_path: /opt/so/saltstack/local/salt/hypervisor/hosts
Options: Options:
interval: Time in seconds between processing cycles (default: 30) interval: Time in seconds between engine runs (managed by salt-master, default: 30)
base_path: Base directory containing hypervisor configurations (default: /opt/so/saltstack/local/salt/hypervisor/hosts) base_path: Base directory containing hypervisor configurations (default: /opt/so/saltstack/local/salt/hypervisor/hosts)
Memory values in VM configuration should be specified in GB. These values Memory values in VM configuration should be specified in GB. These values
@@ -55,47 +55,52 @@ State Files:
- <vm_name>: Active VM configuration and status - <vm_name>: Active VM configuration and status
- <vm_name>_failed: Failed VM creation details - <vm_name>_failed: Failed VM creation details
- <vm_name>_invalidHW: Invalid hardware request details - <vm_name>_invalidHW: Invalid hardware request details
Lock Files:
- .lock: Prevents concurrent processing of VMs
- Contains VM name and timestamp
- Automatically removed after processing
Notes: Notes:
- Requires 'hvn' feature license - Requires 'hvn' feature license
- Uses hypervisor's sosmodel grain for hardware capabilities - Uses hypervisor's sosmodel grain for hardware capabilities
- Hardware allocation based on model-specific configurations - Hardware allocation based on model-specific configurations
- All created files maintain socore ownership - All created files maintain socore ownership
- Comprehensive logging for troubleshooting - Comprehensive logging for troubleshooting
- Lock files prevent concurrent processing - Single engine-wide lock prevents concurrent instances
- Lock remains if error occurs (requires admin intervention)
Description: Description:
The engine operates in the following phases: The engine operates in the following phases:
1. License Validation 1. Engine Lock Acquisition
- Verifies 'hvn' feature is licensed - Acquires single engine-wide lock
- Prevents operation if license is invalid - Prevents multiple instances from running
- Lock remains until clean shutdown or error
2. Configuration Processing 2. License Validation
- Reads nodes file from each hypervisor directory - Verifies 'hvn' feature is licensed
- Validates configuration parameters - Prevents operation if license is invalid
- Compares against existing VM tracking files
3. Hardware Allocation 3. Configuration Processing
- Retrieves hypervisor model from grains cache - Reads nodes file from each hypervisor directory
- Loads model-specific hardware capabilities - Validates configuration parameters
- Validates hardware requests against model limits - Compares against existing VM tracking files
- Converts hardware indices to PCI IDs
- Ensures proper type handling for hardware indices
- Creates state tracking files with socore ownership
4. VM Provisioning 4. Hardware Allocation
- Creates lock file to prevent concurrent operations - Retrieves hypervisor model from grains cache
- Executes so-salt-cloud with validated configuration - Loads model-specific hardware capabilities
- Handles network setup (static/DHCP) - Validates hardware requests against model limits
- Configures hardware passthrough with converted PCI IDs - Converts hardware indices to PCI IDs
- Updates VM state tracking - Ensures proper type handling for hardware indices
- Removes lock file after completion - Creates state tracking files with socore ownership
5. VM Provisioning
- Executes so-salt-cloud with validated configuration
- Handles network setup (static/DHCP)
- Configures hardware passthrough with converted PCI IDs
- Updates VM state tracking
Lock Management:
- Lock acquired at engine start
- Released only on clean shutdown
- Remains if error occurs
- Admin must restart service to clear lock
- Error-level logging for lock issues
Exit Codes: Exit Codes:
0: Success 0: Success
@@ -129,6 +134,7 @@ import salt.config
import salt.runner import salt.runner
from typing import Dict, List, Optional, Tuple, Any from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime from datetime import datetime
from threading import Lock
# Get socore uid/gid # Get socore uid/gid
SOCORE_UID = pwd.getpwnam('socore').pw_uid SOCORE_UID = pwd.getpwnam('socore').pw_uid
@@ -150,6 +156,9 @@ VALID_ROLES = ['sensor', 'searchnode', 'idh', 'receiver', 'heavynode', 'fleet']
LICENSE_PATH = '/opt/so/saltstack/local/pillar/soc/license.sls' LICENSE_PATH = '/opt/so/saltstack/local/pillar/soc/license.sls'
DEFAULTS_PATH = '/opt/so/saltstack/default/salt/hypervisor/defaults.yaml' DEFAULTS_PATH = '/opt/so/saltstack/default/salt/hypervisor/defaults.yaml'
# Single engine-wide lock for virtual node manager
engine_lock = Lock()
def read_json_file(file_path: str) -> Any: def read_json_file(file_path: str) -> Any:
""" """
Read and parse a JSON file. Read and parse a JSON file.
@@ -225,53 +234,20 @@ def convert_pci_id(pci_id: str) -> str:
log.error("Failed to convert PCI ID %s: %s", pci_id, str(e)) log.error("Failed to convert PCI ID %s: %s", pci_id, str(e))
raise raise
def create_lock_file(hypervisor_path: str, vm_name: str) -> bool:
"""Create .lock file for VM processing."""
lock_file = os.path.join(hypervisor_path, '.lock')
try:
if os.path.exists(lock_file):
log.warning("Lock file already exists at %s", lock_file)
return False
write_json_file(lock_file, {
'vm': vm_name,
'timestamp': datetime.now().isoformat()
})
return True
except Exception as e:
log.error("Failed to create lock file: %s", str(e))
return False
def remove_lock_file(hypervisor_path: str) -> None:
"""Remove .lock file after processing."""
lock_file = os.path.join(hypervisor_path, '.lock')
try:
if os.path.exists(lock_file):
os.remove(lock_file)
except Exception as e:
log.error("Failed to remove lock file: %s", str(e))
def is_locked(hypervisor_path: str) -> bool:
"""Check if hypervisor directory is locked."""
return os.path.exists(os.path.join(hypervisor_path, '.lock'))
def get_hypervisor_model(hypervisor: str) -> str: def get_hypervisor_model(hypervisor: str) -> str:
"""Get sosmodel from hypervisor grains.""" """Get sosmodel from hypervisor grains."""
log.info(hypervisor) #MOD
try: try:
# Get cached grains using Salt runner # Get cached grains using Salt runner
grains = runner.cmd( grains = runner.cmd(
'cache.grains', 'cache.grains',
[f'{hypervisor}_*', 'glob'] [f'{hypervisor}_*', 'glob']
) )
log.info(grains) #MOD
if not grains: if not grains:
raise ValueError(f"No grains found for hypervisor {hypervisor}") raise ValueError(f"No grains found for hypervisor {hypervisor}")
# Get the first minion ID that matches our hypervisor # Get the first minion ID that matches our hypervisor
minion_id = next(iter(grains.keys())) minion_id = next(iter(grains.keys()))
log.info(minion_id) #MOD
model = grains[minion_id].get('sosmodel') model = grains[minion_id].get('sosmodel')
log.info(model) #MOD
if not model: if not model:
raise ValueError(f"No sosmodel grain found for hypervisor {hypervisor}") raise ValueError(f"No sosmodel grain found for hypervisor {hypervisor}")
@@ -452,7 +428,7 @@ def mark_invalid_hardware(hypervisor_path: str, vm_name: str, config: dict, erro
def validate_hvn_license() -> bool: def validate_hvn_license() -> bool:
"""Check if the license file exists and contains required values.""" """Check if the license file exists and contains required values."""
if not os.path.exists(LICENSE_PATH): if not os.path.exists(LICENSE_PATH):
log.error("LICENSE: License file not found at %s", LICENSE_PATH) log.error("License file not found at %s", LICENSE_PATH)
return False return False
try: try:
@@ -460,14 +436,14 @@ def validate_hvn_license() -> bool:
license_data = yaml.safe_load(f) license_data = yaml.safe_load(f)
if not license_data: if not license_data:
log.error("LICENSE: Empty or invalid license file") log.error("Empty or invalid license file")
return False return False
license_id = license_data.get('license_id') license_id = license_data.get('license_id')
features = license_data.get('features', []) features = license_data.get('features', [])
if not license_id: if not license_id:
log.error("LICENSE: No license_id found in license file") log.error("No license_id found in license file")
return False return False
if 'hvn' not in features: if 'hvn' not in features:
@@ -476,29 +452,35 @@ def validate_hvn_license() -> bool:
"for more information about purchasing a license to enable this feature.") "for more information about purchasing a license to enable this feature.")
return False return False
log.info("LICENSE: License validation successful") log.info("License validation successful")
return True return True
except Exception as e: except Exception as e:
log.error("LICENSE: Error reading license file: %s", str(e)) log.error("Error reading license file: %s", str(e))
return False return False
def process_vm_creation(hypervisor_path: str, vm_config: dict) -> None: def process_vm_creation(hypervisor_path: str, vm_config: dict) -> None:
"""Process a single VM creation request.""" """
Process a single VM creation request.
This function handles the creation of a new VM, including hardware validation,
resource allocation, and provisioning. All operations are protected by the
engine-wide lock that is acquired at engine start.
Args:
hypervisor_path: Path to the hypervisor directory
vm_config: Dictionary containing VM configuration
"""
# Get the actual hypervisor name (last directory in path) # Get the actual hypervisor name (last directory in path)
hypervisor = os.path.basename(hypervisor_path) hypervisor = os.path.basename(hypervisor_path)
vm_name = f"{vm_config['hostname']}_{vm_config['role']}" vm_name = f"{vm_config['hostname']}_{vm_config['role']}"
try: try:
# Create lock file
if not create_lock_file(hypervisor_path, vm_name):
return
# Get hypervisor model and capabilities # Get hypervisor model and capabilities
model = get_hypervisor_model(hypervisor) model = get_hypervisor_model(hypervisor)
model_config = load_hardware_defaults(model) model_config = load_hardware_defaults(model)
# Validate hardware request # Initial hardware validation
is_valid, errors = validate_hardware_request(model_config, vm_config) is_valid, errors = validate_hardware_request(model_config, vm_config)
if not is_valid: if not is_valid:
mark_invalid_hardware(hypervisor_path, vm_name, vm_config, errors) mark_invalid_hardware(hypervisor_path, vm_name, vm_config, errors)
@@ -507,96 +489,54 @@ def process_vm_creation(hypervisor_path: str, vm_config: dict) -> None:
# Check hardware availability # Check hardware availability
if not check_hardware_availability(hypervisor_path, vm_name): if not check_hardware_availability(hypervisor_path, vm_name):
mark_vm_failed(os.path.join(hypervisor_path, vm_name), 3, mark_vm_failed(os.path.join(hypervisor_path, vm_name), 3,
"Requested hardware is already in use") "Requested hardware is already in use")
return return
# Create tracking file # Create tracking file
create_vm_tracking_file(hypervisor_path, vm_name, vm_config) create_vm_tracking_file(hypervisor_path, vm_name, vm_config)
# Build so-salt-cloud command # Build and execute so-salt-cloud command
log.debug("Building so-salt-cloud command for VM %s", vm_name)
cmd = ['so-salt-cloud', '-p', f'sool9-{hypervisor}', vm_name] cmd = ['so-salt-cloud', '-p', f'sool9-{hypervisor}', vm_name]
log.debug("Base command: %s", ' '.join(cmd))
# Add network configuration # Add network configuration
if vm_config['network_mode'] == 'static4': if vm_config['network_mode'] == 'static4':
log.debug("Adding static network configuration")
cmd.extend(['--static4', '--ip4', vm_config['ip4'], '--gw4', vm_config['gw4']]) cmd.extend(['--static4', '--ip4', vm_config['ip4'], '--gw4', vm_config['gw4']])
if 'dns4' in vm_config: if 'dns4' in vm_config:
cmd.extend(['--dns4', vm_config['dns4']]) cmd.extend(['--dns4', vm_config['dns4']])
if 'search4' in vm_config: if 'search4' in vm_config:
cmd.extend(['--search4', vm_config['search4']]) cmd.extend(['--search4', vm_config['search4']])
else: else:
log.debug("Using DHCP network configuration")
cmd.append('--dhcp4') cmd.append('--dhcp4')
# Add hardware configuration # Add hardware configuration
if 'cpu' in vm_config: if 'cpu' in vm_config:
log.debug("Adding CPU configuration: %s cores", vm_config['cpu'])
cmd.extend(['-c', str(vm_config['cpu'])]) cmd.extend(['-c', str(vm_config['cpu'])])
if 'memory' in vm_config: if 'memory' in vm_config:
memory_mib = int(vm_config['memory']) * 1024 memory_mib = int(vm_config['memory']) * 1024
log.debug("Adding memory configuration: %sGB (%sMiB)", vm_config['memory'], memory_mib)
cmd.extend(['-m', str(memory_mib)]) cmd.extend(['-m', str(memory_mib)])
# Add PCI devices with proper conversion # Add PCI devices
for hw_type in ['disk', 'copper', 'sfp']: for hw_type in ['disk', 'copper', 'sfp']:
if hw_type in vm_config and vm_config[hw_type]: if hw_type in vm_config and vm_config[hw_type]:
log.debug("Processing %s hardware configuration", hw_type)
indices = [int(x) for x in str(vm_config[hw_type]).split(',')] indices = [int(x) for x in str(vm_config[hw_type]).split(',')]
log.debug("Requested %s indices: %s", hw_type, indices)
for idx in indices: for idx in indices:
try: hw_config = {int(k): v for k, v in model_config['hardware'][hw_type].items()}
log.debug("Looking up PCI ID for %s index %d in model config", hw_type, idx) pci_id = hw_config[idx]
log.debug("Model config for %s: %s", hw_type, model_config['hardware'][hw_type]) converted_pci_id = convert_pci_id(pci_id)
cmd.extend(['-P', converted_pci_id])
# Convert all keys to integers for comparison # Execute command
hw_config = {int(k): v for k, v in model_config['hardware'][hw_type].items()} result = subprocess.run(cmd, capture_output=True, text=True, check=True)
log.debug("Converted hardware config: %s", hw_config)
pci_id = hw_config[idx]
log.debug("Found PCI ID for %s index %d: %s", hw_type, idx, pci_id)
converted_pci_id = convert_pci_id(pci_id)
log.debug("Converted PCI ID from %s to %s", pci_id, converted_pci_id)
cmd.extend(['-P', converted_pci_id])
except Exception as e:
log.error("Failed to process PCI ID for %s index %d: %s", hw_type, idx, str(e))
log.error("Hardware config keys: %s, looking for index: %s",
list(model_config['hardware'][hw_type].keys()), idx)
raise
# Execute so-salt-cloud
log.info("Executing command: %s", ' '.join(cmd))
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
log.debug("Command completed successfully")
log.debug("Command stdout: %s", result.stdout)
if result.stderr:
log.warning("Command stderr (non-fatal): %s", result.stderr)
except subprocess.CalledProcessError as e:
error_msg = f"so-salt-cloud execution failed (code {e.returncode})"
if e.stdout:
log.error("Command stdout: %s", e.stdout)
if e.stderr:
log.error("Command stderr: %s", e.stderr)
error_msg = f"{error_msg}: {e.stderr}"
log.error(error_msg)
mark_vm_failed(os.path.join(hypervisor_path, vm_name), 4, error_msg)
raise
# Update tracking file status # Update tracking file status
tracking_file = os.path.join(hypervisor_path, vm_name) tracking_file = os.path.join(hypervisor_path, vm_name)
data = read_json_file(tracking_file) data = read_json_file(tracking_file)
data['status'] = 'running' data['status'] = 'running'
write_json_file(tracking_file, data) write_json_file(tracking_file, data)
log.info("Successfully updated VM status to running")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
error_msg = f"so-salt-cloud execution failed (code {e.returncode})" error_msg = f"so-salt-cloud execution failed (code {e.returncode})"
if e.stdout:
log.error("Command stdout: %s", e.stdout)
if e.stderr: if e.stderr:
log.error("Command stderr: %s", e.stderr)
error_msg = f"{error_msg}: {e.stderr}" error_msg = f"{error_msg}: {e.stderr}"
log.error(error_msg) log.error(error_msg)
mark_vm_failed(os.path.join(hypervisor_path, vm_name), 4, error_msg) mark_vm_failed(os.path.join(hypervisor_path, vm_name), 4, error_msg)
@@ -604,24 +544,28 @@ def process_vm_creation(hypervisor_path: str, vm_config: dict) -> None:
except Exception as e: except Exception as e:
error_msg = f"VM creation failed: {str(e)}" error_msg = f"VM creation failed: {str(e)}"
log.error(error_msg) log.error(error_msg)
# If we haven't created the tracking file yet, create a failed one
if not os.path.exists(os.path.join(hypervisor_path, vm_name)): if not os.path.exists(os.path.join(hypervisor_path, vm_name)):
mark_vm_failed(os.path.join(hypervisor_path, f"{vm_name}_failed"), 4, error_msg) mark_vm_failed(os.path.join(hypervisor_path, f"{vm_name}_failed"), 4, error_msg)
finally: raise
remove_lock_file(hypervisor_path)
def process_vm_deletion(hypervisor_path: str, vm_name: str) -> None: def process_vm_deletion(hypervisor_path: str, vm_name: str) -> None:
"""Process a single VM deletion request.""" """
try: Process a single VM deletion request.
if not create_lock_file(hypervisor_path, vm_name):
return
This function handles the deletion of an existing VM. All operations are protected
by the engine-wide lock that is acquired at engine start.
Args:
hypervisor_path: Path to the hypervisor directory
vm_name: Name of the VM to delete
"""
try:
# Get the actual hypervisor name (last directory in path) # Get the actual hypervisor name (last directory in path)
hypervisor = os.path.basename(hypervisor_path) hypervisor = os.path.basename(hypervisor_path)
cmd = ['so-salt-cloud', '-p', f'sool9-{hypervisor}', vm_name, '-yd'] cmd = ['so-salt-cloud', '-p', f'sool9-{hypervisor}', vm_name, '-yd']
log.info("Executing: %s", ' '.join(cmd)) log.info("Executing: %s", ' '.join(cmd))
result = subprocess.run(cmd, capture_output=True, text=True) result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Log command output # Log command output
if result.stdout: if result.stdout:
@@ -629,35 +573,39 @@ def process_vm_deletion(hypervisor_path: str, vm_name: str) -> None:
if result.stderr: if result.stderr:
log.warning("Command stderr: %s", result.stderr) log.warning("Command stderr: %s", result.stderr)
# Check return code
if result.returncode != 0:
error_msg = f"so-salt-cloud deletion failed (code {result.returncode}): {result.stderr}"
log.error(error_msg)
raise subprocess.CalledProcessError(
result.returncode, cmd,
output=result.stdout,
stderr=result.stderr
)
# Remove VM tracking file # Remove VM tracking file
vm_file = os.path.join(hypervisor_path, vm_name) vm_file = os.path.join(hypervisor_path, vm_name)
if os.path.exists(vm_file): if os.path.exists(vm_file):
os.remove(vm_file) os.remove(vm_file)
log.info("Successfully removed VM tracking file") log.info("Successfully removed VM tracking file")
except subprocess.CalledProcessError as e:
error_msg = f"so-salt-cloud deletion failed (code {e.returncode}): {e.stderr}"
log.error(error_msg)
raise
except Exception as e: except Exception as e:
log.error("Error processing VM deletion: %s", str(e)) log.error("Error processing VM deletion: %s", str(e))
raise raise
finally:
remove_lock_file(hypervisor_path)
def process_hypervisor(hypervisor_path: str) -> None: def process_hypervisor(hypervisor_path: str) -> None:
"""Process VM configurations for a single hypervisor.""" """
try: Process VM configurations for a single hypervisor.
if is_locked(hypervisor_path):
return
# Read nodes file This function handles the processing of VM configurations for a hypervisor,
including creation of new VMs and deletion of removed VMs. All operations
are protected by the engine-wide lock that is acquired at engine start.
The function performs the following steps:
1. Reads and validates nodes configuration
2. Identifies existing VMs
3. Processes new VM creation requests
4. Handles VM deletions for removed configurations
Args:
hypervisor_path: Path to the hypervisor directory
"""
try:
# Detection phase - no lock needed
nodes_file = os.path.join(hypervisor_path, 'nodes') nodes_file = os.path.join(hypervisor_path, 'nodes')
if not os.path.exists(nodes_file): if not os.path.exists(nodes_file):
return return
@@ -666,7 +614,7 @@ def process_hypervisor(hypervisor_path: str) -> None:
if not nodes_config: if not nodes_config:
return return
# Get existing VMs # Get existing VMs - no lock needed
existing_vms = set() existing_vms = set()
for file_path in glob.glob(os.path.join(hypervisor_path, '*_*')): for file_path in glob.glob(os.path.join(hypervisor_path, '*_*')):
basename = os.path.basename(file_path) basename = os.path.basename(file_path)
@@ -684,6 +632,7 @@ def process_hypervisor(hypervisor_path: str) -> None:
configured_vms.add(vm_name) configured_vms.add(vm_name)
if vm_name not in existing_vms: if vm_name not in existing_vms:
# process_vm_creation handles its own locking
process_vm_creation(hypervisor_path, vm_config) process_vm_creation(hypervisor_path, vm_config)
# Process VM deletions # Process VM deletions
@@ -692,29 +641,52 @@ def process_hypervisor(hypervisor_path: str) -> None:
except Exception as e: except Exception as e:
log.error("Failed to process hypervisor %s: %s", hypervisor_path, str(e)) log.error("Failed to process hypervisor %s: %s", hypervisor_path, str(e))
raise
def start(interval: int = DEFAULT_INTERVAL, def start(interval: int = DEFAULT_INTERVAL,
base_path: str = DEFAULT_BASE_PATH) -> None: base_path: str = DEFAULT_BASE_PATH) -> None:
""" """
Main engine loop. Process virtual node configurations.
This function implements a single engine-wide lock to ensure only one
instance of the virtual node manager runs at a time. The lock is:
- Acquired at start
- Released after processing completes
- Maintained if error occurs
Args: Args:
interval: Time in seconds between processing cycles interval: Time in seconds between engine runs (managed by salt-master)
base_path: Base path containing hypervisor configurations base_path: Base path containing hypervisor configurations
Notes:
- Lock remains if engine encounters an error
- Admin must restart service to clear lock
- Error-level logging used for lock issues
""" """
log.info("Starting virtual node manager engine") log.info("Starting virtual node manager engine")
if not validate_hvn_license(): if not validate_hvn_license():
return return
while True: # Attempt to acquire engine lock
try: if not engine_lock.acquire(blocking=False):
# Process each hypervisor directory log.error("Another virtual node manager is already running")
for hypervisor_path in glob.glob(os.path.join(base_path, '*')): return
if os.path.isdir(hypervisor_path):
process_hypervisor(hypervisor_path)
except Exception as e: log.debug("Virtual node manager acquired engine lock")
log.error("Error in main engine loop: %s", str(e))
time.sleep(interval) try:
# Process each hypervisor directory
for hypervisor_path in glob.glob(os.path.join(base_path, '*')):
if os.path.isdir(hypervisor_path):
process_hypervisor(hypervisor_path)
# Clean shutdown - release lock
log.debug("Virtual node manager releasing engine lock")
engine_lock.release()
log.info("Virtual node manager completed successfully")
except Exception as e:
log.error("Error in virtual node manager - lock will remain until cleared: %s", str(e))
# Don't release lock on error - requires admin intervention
return