#!/usr/bin/env python3 __version__ = '1.2.17' # -*- coding: utf-8 -*- """ SSD Power Cycling and Firmware Upgrade Tool for Vast Data Systems This tool safely power cycles all SSD drives in a Vast Data system. It can also upgrade SSD firmware when specified. It maps drives to their physical locations, cycles them one at a time, and ensures system health before proceeding to the next drive. """ import argparse import datetime import errno import json import logging import logging.handlers import os import signal import subprocess import sys import time import threading import queue import getpass import ipaddress import socket import requests from typing import Dict, List, Tuple, Optional, Any, Set, Union from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # Vast API imports try: from vapi.commander import Commander, TypeId, DeviceModifyResultCode from vapi.vtool import connect_leader from vproto.python_builder import GUID from vapi.vproto.control.imdb.system_vproto import RaidStateCode except Exception as e: print(f"Failed to import Vast API please run from a Vast Data container, bashdocker: {e}") sys.exit(1) # Constants DEFAULT_LOG_FILE = "/vast/log/drive_cycle.log" STATE_FILE = "/vast/log/drive_cycle_state.json" SSH_KEY_PATH = "/vast/deploy/ssh_key.pem" SSH_USER = "vastdata" # Default values for configurable parameters POWER_CYCLE_WAIT_TIME = 30 # seconds RAID_HEALTH_CHECK_INTERVAL = 30 # seconds RAID_HEALTH_CHECK_TIMEOUT = 3600 # seconds (1 hour) NVME_CLI_COMMAND = "/vast/data/bashdocker.sh sudo nvme_cli" NVME_LIST_COMMAND = f"{NVME_CLI_COMMAND} list --as-json" IPMI_BUS_MASTER = "sudo ipmitool raw 0x3c 0x33 0x00" UPLOAD_PATH = f"s3://vast-support/Customers" VMS_USER = 'support' VMS_PASSWORD = '654321' MGMT_VIP = '/vast/vman/mgmt-vip' IS_EBOX = os.path.exists('/vast/vman/physical_hw.flag') EBOX_POWER_CYCLE_WAIT_TIME = 1200 # seconds # Initialize loggers logger = logging.getLogger("drive_power_cycle") file_logger = logging.getLogger("drive_power_cycle.file_only") # Dashboard globals dashboard_lock = threading.Lock() message_queue = queue.Queue(maxsize=1000) DRIVE_TYPE = TypeId.DriveType NVRAM_MODE=False UPGRADE_MODE=False FAST_UPGRADE=False DRIVE_TITLE = "SSD" OPERATION = "Power Cycle" # Global firmware upgrade configuration fw_upgrade_model = None fw_file_path = None CONNECTION_LOST_ERRNOS = {errno.ECONNRESET, errno.EPIPE, errno.ECONNREFUSED} # PCI link reset via expose_drives.py (NVRAM recovery) PCI_LINK_RESET_ENABLED = True SUPPORTED_CHASSIS_FOR_EXPOSE = frozenset({ "CERES", "MAVERICKS", "2NIC", "SANMINA_VIKING", }) _expose_drives_path_cache: Dict[str, Optional[str]] = {} NVRAM_RECOVERY_RAID_ALLOWED_STATES = ("HEALTHY", "REBUILD", "REBALANCE") NON_NVRAM_RAID_ALLOWED_STATES = ("HEALTHY", "ENABLED") RAID_FAILED_STATE_MARKERS = ("FAILED", "ERROR", "OFFLINE", "FAULT") class ConnectionLostError(Exception): """Raised when the TCP connection to the cluster leader is lost. Callers should save state and stop; the operator can --resume once the network is stable. """ pass def _raise_if_connection_lost(exc: Exception) -> None: """Re-raise *exc* as ConnectionLostError if it is a TCP transport error.""" if isinstance(exc, (ConnectionResetError, BrokenPipeError, ConnectionRefusedError)): raise ConnectionLostError( f"Leader connection lost ({type(exc).__name__}): {exc}" ) from exc if isinstance(exc, OSError) and getattr(exc, 'errno', None) in CONNECTION_LOST_ERRNOS: raise ConnectionLostError( f"Leader connection lost (OSError errno={exc.errno}): {exc}" ) from exc # Utility to get enum name or fallback to str def get_enum_name(val): try: return val.name except AttributeError: return str(val) def log_to_file_only(message: str, level=logging.INFO) -> None: """ Log a message to the file only, not to the console. Args: message: The message to log level: The logging level (default: INFO) """ # Use dedicated file logger file_logger.log(level, message) def setup_logging(log_file: str = DEFAULT_LOG_FILE, verbose: bool = False) -> None: """ Set up logging configuration. Args: log_file: Path to the log file (default: DEFAULT_LOG_FILE) verbose: Whether to use verbose logging """ # Create the directory for the log file if it doesn't exist os.makedirs(os.path.dirname(log_file), exist_ok=True) # Configure logging log_level = logging.DEBUG if verbose else logging.INFO # Configure main logger logger.setLevel(logging.DEBUG) # Always set to DEBUG for file logging logger.propagate = False # Prevent double logging # Clear existing handlers for handler in logger.handlers[:]: logger.removeHandler(handler) # Create rotating file handler (20MB, 5 backups) file_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=20*1024*1024, backupCount=5 ) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) file_handler.setLevel(logging.DEBUG) # Always log debug to file logger.addHandler(file_handler) # Create console handler console_handler = logging.StreamHandler() console_formatter = logging.Formatter('%(levelname)s: %(message)s') console_handler.setFormatter(console_formatter) console_handler.setLevel(log_level) logger.addHandler(console_handler) # Configure file-only logger file_logger.setLevel(logging.DEBUG) file_logger.propagate = False # Never propagate to console # Clear existing handlers for handler in file_logger.handlers[:]: file_logger.removeHandler(handler) # Create file handler for file_logger (shares the same file) file_only_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=20*1024*1024, backupCount=5 ) file_only_handler.setFormatter(file_formatter) file_only_handler.setLevel(logging.DEBUG) file_logger.addHandler(file_only_handler) logger.info(f"Logging initialized. Log file: {log_file}") def connect_to_commander() -> Optional[Commander]: """ Connect to the cluster leader and create a Commander object. This is used both for initial connection and for reconnecting after an ebox reboot (the leader may have moved to another node). Returns: Commander object or None if connection fails """ try: leader = connect_leader() logger.info("Successfully connected to cluster leader") except Exception as e: logger.exception(f"Failed connecting to leader: {e}") return None try: commander = Commander(leader) logger.info("Commander initialized successfully") return commander except Exception as e: logger.exception(f"Failed to initialize Commander: {e}") return None def load_state() -> Dict[str, Any]: """ Load state from state file. Returns: Dictionary with state information """ try: if os.path.exists(STATE_FILE): with open(STATE_FILE, 'r') as f: state = json.load(f) completed_drives = state.get("completed_drives", []) total_drives = state.get("total_drives", 0) # Log the full state to the file only, not the console log_to_file_only(f"Loaded state successfully: {len(completed_drives)}/{total_drives} drives completed") # Print a simplified version to console add_message(f"Loaded state successfully: {len(completed_drives)}/{total_drives} drives completed") return state else: logger.info(f"No state file found at {STATE_FILE}, creating new state") add_message("No existing state found, creating new state") return {} except Exception as e: logger.exception(f"Error loading state: {e}") add_message(f"Error loading state: {str(e)}") return {} def save_state(state: Dict[str, Any]) -> None: """ Save the current state to the state file. Args: state: Dictionary containing the state to save """ try: # Create directory if it doesn't exist os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) # Convert mapping to serializable format serializable_state = state.copy() if "mapping" in serializable_state: serializable_state["mapping"] = { k: v for k, v in serializable_state["mapping"].items() } with open(STATE_FILE, 'w') as f: json.dump(serializable_state, f, indent=2) logger.debug(f"Saved state: completed_drives {len(serializable_state['completed_drives'])} - current {serializable_state['current_drive']}") except Exception as e: logger.exception(f"Error saving state: {e}") def get_vms_ip() -> str: """ Read the VMS management VIP from the local file. Returns: VMS IP address string """ try: with open(MGMT_VIP, 'r') as f: return f.read().strip() except Exception as e: logger.exception(f"Failed to read VMS IP from {MGMT_VIP}: {e}") return "" def vms_api_request(method: str, endpoint: str, vms_ip: str, data: dict = None, timeout: int = 30, vms_user: str = None, vms_password: str = None) -> Optional[dict]: """ Make a VMS API request. Args: method: HTTP method (GET, PATCH, etc.) endpoint: API endpoint path (e.g. 'eboxes/') vms_ip: VMS management IP address data: Optional JSON payload for PATCH/POST requests timeout: Request timeout in seconds vms_user: VMS username (defaults to global VMS_USER or args.user) vms_password: VMS password (defaults to global VMS_PASSWORD or args.password) Returns: Response JSON as dict, or None on failure """ if vms_user is None: vms_user = VMS_USER if vms_password is None: vms_password = VMS_PASSWORD url = f"https://{vms_ip}/api/{endpoint}" try: response = requests.request(method, url, auth=(vms_user, vms_password), json=data, verify=False, timeout=timeout) if response.status_code >= 400: logger.error(f"VMS API {method} {url} returned {response.status_code}: {response.text}") return None return response.json() except requests.exceptions.ConnectionError as e: logger.error(f"VMS API connection error for {url}: {e}") return None except requests.exceptions.Timeout: logger.error(f"VMS API request timed out for {url}") return None except Exception as e: logger.exception(f"VMS API request failed for {url}: {e}") return None def test_vms_api(vms_ip: str) -> bool: """ Test VMS API connectivity and authentication by calling GET /api/clusters/. Args: vms_ip: VMS management IP address Returns: True if API is reachable and credentials are valid """ logger.info(f"Testing VMS API connectivity to {vms_ip}...") result = vms_api_request("GET", "clusters/", vms_ip) if result is not None: logger.info("VMS API connectivity test passed") return True logger.error("VMS API connectivity test failed - check VMS IP and credentials") return False def get_vms_eboxes(vms_ip: str, verify_active: bool = False) -> List[Dict[str, Any]]: """ Fetch all eboxes from VMS API, optionally verifying they are all active and enabled. Args: vms_ip: VMS management IP address verify_active: If True, also verify all eboxes are ACTIVE and enabled Returns: List of ebox dictionaries, or empty list on failure. If verify_active is True and any ebox is not active/enabled, returns empty list. """ result = vms_api_request("GET", "eboxes/", vms_ip) if result is None: logger.error("Failed to fetch eboxes from VMS API") return [] if not isinstance(result, list): logger.error(f"Unexpected eboxes response format: {type(result)}") return [] logger.info(f"Found {len(result)} eboxes from VMS API") if verify_active: for ebox in result: ebox_name = ebox.get('name', 'unknown') ebox_state = ebox.get('state', 'UNKNOWN') enabled = ebox.get('enabled', False) if ebox_state != 'ACTIVE': logger.error(f"Ebox {ebox_name} is in state {ebox_state}, expected ACTIVE") return [] if not enabled: logger.error(f"Ebox {ebox_name} is not enabled") return [] return result def set_ebox_enabled(vms_ip: str, ebox_id: int, enabled: bool, dryrun: bool = False) -> bool: """ Enable or disable an ebox via VMS API. Args: vms_ip: VMS management IP address ebox_id: Ebox ID in VMS enabled: True to enable, False to disable dryrun: If True, only log without executing Returns: True if successful """ action = "Enabling" if enabled else "Disabling" logger.info(f"Dryrun {dryrun} | {action} ebox {ebox_id}") if dryrun: logger.info(f"[DRY RUN] Would {'enable' if enabled else 'disable'} ebox {ebox_id} via PATCH /api/eboxes/{ebox_id}/") return True result = vms_api_request("PATCH", f"eboxes/{ebox_id}/", vms_ip, data={"enabled": enabled}) if result is not None: logger.info(f"Successfully {'enabled' if enabled else 'disabled'} ebox {ebox_id}") return True logger.error(f"Failed to {'enable' if enabled else 'disable'} ebox {ebox_id}") return False def wait_for_ebox_state(vms_ip: str, ebox_id: int, target_enabled: bool, timeout_seconds: int = 600, dryrun: bool = False) -> bool: """ Wait for an ebox to reach the expected enabled/disabled state by polling the VMS API. Args: vms_ip: VMS management IP address ebox_id: Ebox ID in VMS target_enabled: Expected enabled state (True = enabled, False = disabled) timeout_seconds: Maximum wait time in seconds (default 10 minutes) dryrun: If True, skip waiting Returns: True if ebox reached the target state within timeout """ target_desc = "enabled" if target_enabled else "disabled" if dryrun: logger.info(f"[DRY RUN] Would wait for ebox {ebox_id} to become {target_desc}") return True logger.info(f"Waiting for ebox {ebox_id} to become {target_desc} (timeout: {timeout_seconds}s)...") add_message(f"Waiting for ebox {ebox_id} to become {target_desc}...") check_interval = 15 start_time = time.time() while time.time() - start_time < timeout_seconds: result = vms_api_request("GET", f"eboxes/{ebox_id}/", vms_ip) if result is not None: current_enabled = result.get('enabled', None) current_state = result.get('state', 'UNKNOWN') if current_enabled == target_enabled: elapsed = int(time.time() - start_time) logger.info(f"Ebox {ebox_id} is now {target_desc} (state: {current_state}, waited {elapsed}s)") add_message(f"Ebox {ebox_id} is now {target_desc}") return True remaining = timeout_seconds - int(time.time() - start_time) logger.info(f"Ebox {ebox_id}: enabled={current_enabled}, state={current_state}, waiting... ({remaining}s remaining)") add_message(f"Ebox {ebox_id} not yet {target_desc}... ({remaining}s remaining)") else: logger.warning(f"Failed to check ebox {ebox_id} state, retrying...") time.sleep(check_interval) logger.error(f"Ebox {ebox_id} did not become {target_desc} within {timeout_seconds}s") add_message(f"ERROR: Ebox {ebox_id} did not become {target_desc} within {timeout_seconds // 60} minutes") return False def get_local_ips() -> Set[str]: """ Get all IP addresses assigned to the local machine. Returns: Set of local IP address strings """ local_ips = set() try: hostname = socket.gethostname() for info in socket.getaddrinfo(hostname, None): ip = info[4][0] if ip and not ip.startswith('127.'): local_ips.add(ip) except Exception as e: logger.debug(f"Failed to get IPs via getaddrinfo: {e}") try: result = subprocess.run( "ip -4 addr show | grep -oP '(?<=inet\\s)\\d+(\\.\\d+){3}'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): ip = line.strip() if ip and not ip.startswith('127.'): local_ips.add(ip) except Exception as e: logger.debug(f"Failed to get IPs via ip addr: {e}") logger.info(f"Detected local IPs: {local_ips}") return local_ips def find_local_ebox(eboxes: List[Dict[str, Any]], local_ips: Set[str]) -> Optional[Dict[str, Any]]: """ Find which ebox the script is running on by matching local IPs to ebox dnode IPs. Args: eboxes: List of ebox dictionaries from VMS API local_ips: Set of local IP addresses Returns: The ebox dict if found, None otherwise """ for ebox in eboxes: dnode_ips = set() for dnode in ebox.get('dnode_containers', []): dnode_ip = dnode.get('ip', '') if dnode_ip: dnode_ips.add(dnode_ip) # Also check vm_ip (management IP) vm_ip = ebox.get('vm_ip', '') if vm_ip: dnode_ips.add(vm_ip) # Check mgmt_ip from dnodes for dnode in ebox.get('dnode_containers', []): mgmt_ip = dnode.get('mgmt_ip', '') if mgmt_ip: dnode_ips.add(mgmt_ip) if local_ips & dnode_ips: logger.info(f"Local ebox detected: {ebox.get('name', 'unknown')} (matching IPs: {local_ips & dnode_ips})") return ebox return None def exe_get(cmd, silent_errors=False): try: return subprocess.check_output(cmd, shell=True, stderr=subprocess.DEVNULL).decode('ascii') except Exception as e: if not silent_errors: logger.exception(f"General Exception! Unable to run {e}") raise Exception('Failed to ssh, CMD: {}'.format(cmd)) def fetch_cluster_and_customer_info(vms_ip, vms_user, vms_password): try: cluster_info = json.loads(exe_get(f"curl --connect-timeout 5 -s -k https://{vms_ip}/api/clusters/?fields=name,psnt -u {vms_user}:{vms_password}")) customer_name = json.loads(exe_get(f"curl --connect-timeout 5 -s -k https://{vms_ip}/api/callhomeconfigs/1/?fields=customer -u {vms_user}:{vms_password}")) except Exception as e: logger.exception(f"Failed getting cluster info {e}") cluster_info = None customer_name = None return cluster_info, customer_name def map_drives(commander: Commander) -> Dict[str, Dict[str, Any]]: """ Map drives to their physical locations (DBOX -> DNODE -> DRIVE -> SLOT). Args: commander: Commander object for interfacing with Vast system Returns: Dictionary mapping drive GUIDs to their physical locations """ logger.info("Mapping drives to physical locations...") # Get all objects needed for mapping # Note: We need to list all objects here as we're building a comprehensive mapping # and not just looking for a specific object try: drives = list(commander.list_objects(DRIVE_TYPE)) dboxes = list(commander.list_objects(TypeId.DBoxType)) dnodes = list(commander.list_objects(TypeId.DNodeType)) logger.info(f"Found {len(drives)} drives, {len(dboxes)} dboxes, and {len(dnodes)} dnodes") # Create lookup tables dbox_lookup = {dbox.base_proto.guid: dbox for dbox in dboxes} dnode_lookup = {dnode.base_proto.guid: dnode for dnode in dnodes} # For ebox, deduplicate dnodes by parent_guid - only one dnode per ebox is needed if IS_EBOX: dnode_by_dbox = {} for dnode in dnodes: parent_guid = dnode.base_proto.parent_guid if parent_guid not in dnode_by_dbox: dnode_by_dbox[parent_guid] = dnode dnodes = list(dnode_by_dbox.values()) logger.info(f"Ebox mode: using {len(dnodes)} unique dnodes (one per ebox)") # Get nvme drive information from all dnodes in parallel nvme_info_by_dnode = {} threads = [] thread_results = {} def get_nvme_info_thread(dnode): dnode_ip = dnode.base_node_proto.tcp_address.address dnode_guid = str(dnode.base_proto.guid) nvme_info = get_nvme_info_from_dnode(dnode_ip) thread_results[dnode_guid] = { "dnode": dnode, "nvme_info": nvme_info } # Start a thread for each dnode for dnode in dnodes: thread = threading.Thread(target=get_nvme_info_thread, args=(dnode,)) thread.start() threads.append(thread) # Wait for all threads to complete for thread in threads: thread.join() # Process the results for dnode_guid, result in thread_results.items(): dnode = result["dnode"] nvme_info = result["nvme_info"] dnode_ip = dnode.base_node_proto.tcp_address.address if nvme_info: nvme_info_by_dnode[dnode_guid] = nvme_info logger.info(f"Retrieved NVME info from dnode {dnode.base_proto.name} ({dnode_ip})") else: logger.warning(f"Failed to get NVME info from dnode {dnode.base_proto.name} ({dnode_ip})") # Map drives to their physical locations drive_mapping = {} for drive in drives: parent_guid = drive.base_proto.parent_guid parent_type = drive.base_proto.parent_type if NVRAM_MODE: if parent_type != TypeId.DNodeType or parent_guid not in dnode_lookup: logger.warning(f"NVRAM {drive.base_proto.guid} not in a DNode, skipping") continue dnode = dnode_lookup[parent_guid] dbox = dbox_lookup[dnode.base_proto.parent_guid] slot = drive.device_proto.pci_switch_slot drive_serial = drive.device_proto.serial chassis_type = str(dbox.chassis_type) if hasattr(dbox, 'chassis_type') else "UNKNOWN" active_dnode = dnode else: # Skip drives that don't have a parent or aren't in a DBox if parent_type != TypeId.DBoxType or parent_guid not in dbox_lookup: logger.warning(f"Drive {drive.base_proto.guid} not in a DBox, skipping") continue dbox = dbox_lookup[parent_guid] if IS_EBOX: # Ebox: use the single dnode we kept per dbox active_dnode = dnode_by_dbox.get(dbox.base_proto.guid) if not active_dnode: logger.warning(f"No dnode found for dbox {dbox.base_proto.name}, skipping drive {drive.base_proto.guid}") continue else: # Find the dnodes for this drive's dbox drive_dnodes = [ dnode for dnode in dnodes if dnode.base_proto.parent_guid == dbox.base_proto.guid ] if not drive_dnodes: logger.warning(f"No dnodes found for drive {drive.base_proto.guid}, skipping") continue # Get slot information drive_serial = drive.device_proto.serial # Find which dnode currently has this drive connected (active) active_dnode = None for dnode in drive_dnodes: dnode_guid = str(dnode.base_proto.guid) if dnode_guid in nvme_info_by_dnode: # Check if this drive's serial number appears in the nvme list for this dnode nvme_data = nvme_info_by_dnode[dnode_guid] if has_drive_with_serial(nvme_data, drive_serial): active_dnode = dnode logger.info(f"Drive {drive_serial} is currently connected to dnode {dnode.base_proto.name}") break if not active_dnode: logger.warning(f"Could not find active dnode for drive {drive_serial}, skipping") continue slot = drive.device_proto.pci_switch_slot drive_serial = drive.device_proto.serial # Get chassis type information for the dbox chassis_type = str(dbox.chassis_type) if hasattr(dbox, 'chassis_type') else "UNKNOWN" # For SANMINA chassis, find the bus master using IPMI command bus_master_dnode = None bus_master_dnode_ip = None if "SANMINA" in chassis_type.upper(): # Get all dnodes in this dbox dbox_dnodes = [] for dnode in dnodes: if dnode.base_proto.parent_guid == dbox.base_proto.guid: dbox_dnodes.append(dnode) # Check if we have a cached bus master for this dbox dbox_guid_str = str(dbox.base_proto.guid) cache_entry = dbox_bus_master_cache.get(dbox_guid_str) cache_valid = False # Check if cache entry exists and is less than 1 hour old if cache_entry and time.time() - cache_entry.get('timestamp', 0) < 3600: # 1 hour cache validity bus_master_dnode_ip = cache_entry.get('master_dnode_ip') # Find the dnode object that matches this IP for dnode in dbox_dnodes: if dnode.base_node_proto.tcp_address.address == bus_master_dnode_ip: bus_master_dnode = dnode cache_valid = True logger.info(f"Using cached bus master dnode {dnode.base_proto.name} ({bus_master_dnode_ip}) for SANMINA chassis dbox {dbox.base_proto.name}") break if not cache_valid: # For SANMINA chassis, we need to check which dnode is the bus master using an IPMI command for dnode in dbox_dnodes: dnode_ip = dnode.base_node_proto.tcp_address.address if check_dnode_is_sanmina_master(dnode_ip): bus_master_dnode = dnode bus_master_dnode_ip = dnode_ip logger.info(f"Found bus master dnode {dnode.base_proto.name} ({bus_master_dnode_ip}) for SANMINA chassis dbox {dbox.base_proto.name} using IPMI command") # Cache this result dbox_bus_master_cache[dbox_guid_str] = { 'master_dnode_ip': bus_master_dnode_ip, 'timestamp': time.time() } break if bus_master_dnode_ip is None and len(dbox_dnodes) > 0: logger.warning(f"Could not determine bus master for SANMINA chassis dbox {dbox.base_proto.name} using IPMI command, will use active dnode for power cycling") drive_mapping[str(drive.base_proto.guid)] = { "drive_guid": str(drive.base_proto.guid), "drive_serial": drive_serial, "drive_model": drive.device_proto.model, "dbox_guid": str(dbox.base_proto.guid), "dbox_name": dbox.base_proto.name, "dnode_guid": str(active_dnode.base_proto.guid), "dnode_name": active_dnode.base_proto.name, "dnode_ip": active_dnode.base_node_proto.tcp_address.address, "slot": slot, "state": str(drive.device_proto.state), "enabled": drive.device_proto.enabled, "chassis_type": chassis_type, "bus_master_dnode_ip": bus_master_dnode_ip, "insertion_time": drive.device_proto.insertion_time, "state_timestamp": drive.device_proto.state_timestamp, "attached_timestamp": drive.device_proto.attached_timestamp } logger.info(f"Successfully mapped {len(drive_mapping)} drives") return drive_mapping except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error mapping drives: {e}") return {} def check_raid_health(commander: Commander) -> bool: """ Check RAID health. """ global status_data try: # Use raid_status() instead of get_raid_state() which doesn't exist raids = commander.raid_status() # Extract state objects and use get_enum_name ssd_state_clean = get_enum_name(raids.ssd_state.state) nvram_state_clean = get_enum_name(raids.nvram_state.state) memory_state_clean = get_enum_name(raids.memory_state.state) rio_nvram_state_clean = get_enum_name(raids.rio_nvram_state.state) # Check if each component is healthy (HEALTHY or ENABLED) ssd_healthy = "HEALTHY" in ssd_state_clean.upper() nvram_healthy = "HEALTHY" in nvram_state_clean.upper() memory_healthy = "HEALTHY" in memory_state_clean.upper() rio_nvram_healthy = "HEALTHY" in rio_nvram_state_clean.upper() # Get previous states to detect changes previous_states = {} with dashboard_lock: if "raid_state" in status_data: previous_states = status_data["raid_state"].copy() # Update dashboard status with lock to prevent race conditions with dashboard_lock: if "raid_state" not in status_data: status_data["raid_state"] = {} status_data["raid_state"]["ssd"] = ssd_state_clean status_data["raid_state"]["nvram"] = nvram_state_clean status_data["raid_state"]["memory"] = memory_state_clean status_data["raid_state"]["rio_nvram"] = rio_nvram_state_clean # Log state changes and update dashboard messages current_states = { "ssd": ssd_state_clean, "nvram": nvram_state_clean, "memory": memory_state_clean, "rio_nvram": rio_nvram_state_clean } for component, state in current_states.items(): if component in previous_states and previous_states[component] != state: change_msg = f"RAID {component} state changed: {previous_states[component]} -> {state}" logger.info(change_msg) add_message(change_msg) if "REBUILD" in state.upper(): rebuild_msg = f"RAID {component} is now REBUILDING" logger.warning(rebuild_msg) add_message(rebuild_msg) try: message_queue.put_nowait(("update", None)) except queue.Full: pass logger.info(f"RAID status - SSD: {ssd_state_clean}, NVRAM: {nvram_state_clean}, Memory: {memory_state_clean}, RIO NVRAM: {rio_nvram_state_clean}") is_healthy = ssd_healthy and nvram_healthy and memory_healthy and rio_nvram_healthy if is_healthy: logger.info("All RAID states are healthy") add_message("RAID health check: All components HEALTHY") else: unhealthy_components = [] if not ssd_healthy: unhealthy_components.append(f"SSD: {ssd_state_clean}") if not nvram_healthy: unhealthy_components.append(f"NVRAM: {nvram_state_clean}") if not memory_healthy: unhealthy_components.append(f"Memory: {memory_state_clean}") if not rio_nvram_healthy: unhealthy_components.append(f"RIO: {rio_nvram_state_clean}") msg = f"RAID health check: Non-healthy components - {', '.join(unhealthy_components)}" logger.warning(msg) add_message(msg) return is_healthy except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error checking RAID health: {e}") return False def check_node_and_drive_failures(commander: Commander, skip_drives: List[str] = None) -> bool: """ Check for node and drive failures in the cluster. Args: commander: Commander object for interfacing with Vast system skip_drives: List of drive GUIDs to skip even if they're in failed state Returns: True if no failures were detected (or if all failed drives are in the skip list), False otherwise """ # Ensure we're using global status data global status_data try: # Check drives for failures - need to scan all drives to identify failures # This can't be replaced with get_object() as we need to check the status of all drives drives = list(commander.list_objects(DRIVE_TYPE)) # From the logs, we see drives with "DeviceState.ACTIVE" and "DeviceFailReason.NONE" are actually healthy failed_drives = [] healthy_drives = 0 total_drives = len(drives) for drive in drives: state_str = str(drive.device_proto.state) fail_reason_str = str(drive.device_proto.fail_reason) # Check if it's actually failed - a working drive is ACTIVE if "ACTIVE" not in state_str.upper(): failed_drives.append({ "guid": drive.base_proto.guid, "state": state_str, "reason": fail_reason_str }) else: healthy_drives += 1 # Update dashboard with the correct key names - use lock for thread safety with dashboard_lock: status_data["healthy_drives"] = healthy_drives status_data["total_drives"] = total_drives if failed_drives: # If we have a skip list, filter out drives that should be skipped actual_failed_drives = failed_drives if skip_drives: # Convert any GUIDs to strings for comparison skip_drive_strs = [str(guid) for guid in skip_drives] actual_failed_drives = [drive for drive in failed_drives if str(drive['guid']) not in skip_drive_strs] # Log which drives we're skipping skipped_drives = [drive for drive in failed_drives if str(drive['guid']) in skip_drive_strs] for drive in skipped_drives: logger.info(f"Skipping failed drive: {drive['guid']} (specified in --skip-failed-drives)") add_message(f"Skipping failed drive: {drive['guid']} (user-specified)") # Check for remaining failed drives after skipping if actual_failed_drives: logger.error(f"Found {len(actual_failed_drives)} failed drives") add_message(f"Found {len(actual_failed_drives)} failed drives out of {len(drives)}") for drive_info in actual_failed_drives: logger.error(f"Failed drive: {drive_info['guid']}, State: {drive_info['state']}, Reason: {drive_info['reason']}") return False else: # No remaining failed drives after skipping if len(failed_drives) > 0 and skip_drives: logger.info(f"All {len(failed_drives)} failed drives are in the skip list") add_message(f"All failed drives are in the skip list - proceeding") else: logger.info(f"All {len(drives)} drives are healthy") add_message(f"All {len(drives)} drives are healthy") return True else: logger.info(f"All {len(drives)} drives are healthy") add_message(f"All {len(drives)} drives are healthy") # Check dnodes for failures - need to scan all dnodes # This can't be replaced with get_object() as we need to check the status of all dnodes dnodes = list(commander.list_objects(TypeId.DNodeType)) failed_dnodes = [] healthy_dnodes = 0 for dnode in dnodes: # Healthy dnode has state ACTIVE and is enabled state_str = str(dnode.base_node_proto.state) enabled = dnode.base_node_proto.enabled # Check if it's actually failed - a working dnode is ACTIVE and enabled if "ACTIVE" in state_str.upper() and enabled: healthy_dnodes += 1 else: failed_dnodes.append({ "guid": dnode.base_proto.guid, "state": state_str, "enabled": enabled }) # Update dashboard with correct key names - use lock for thread safety with dashboard_lock: status_data["healthy_dnodes"] = healthy_dnodes status_data["total_dnodes"] = len(dnodes) if failed_dnodes: logger.error(f"Found {len(failed_dnodes)} failed dnodes") add_message(f"Found {len(failed_dnodes)} failed dnodes out of {len(dnodes)}") for dnode_info in failed_dnodes: logger.error(f"Failed dnode: {dnode_info['guid']}, State: {dnode_info['state']}, Enabled: {dnode_info['enabled']}") return False else: logger.info(f"All {len(dnodes)} dnodes are healthy") add_message(f"All {len(dnodes)} dnodes are healthy") logger.info("No node or drive failures detected") return True except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error checking for node and drive failures: {e}") add_message(f"Error checking node and drive failures: {str(e)}") return False def deactivate_drive(commander: Commander, drive_guid: str, dry_run: bool = False) -> bool: """ Deactivate a drive. Args: commander: Commander object for interfacing with Vast system drive_guid: GUID of the drive to deactivate dry_run: If True, only log actions without executing them Returns: True if successful, False otherwise """ try: logger.info(f"Deactivating drive {drive_guid}") # Update dashboard drive state with dashboard_lock: status_data["drive_state"][drive_guid] = "DEACTIVATING" if dry_run: logger.info(f"[DRY RUN] Would deactivate drive {drive_guid}") return True # Convert string GUID to GUID object try: # Try to create a GUID object from the string guid_obj = GUID(drive_guid) logger.debug(f"Successfully created GUID object from string: {drive_guid}") except Exception as guid_e: logger.exception(f"Failed to create GUID from string '{drive_guid}': {guid_e}") return False commander.drive_modify(guid_obj, enabled=False, force=True) logger.info(f"Drive {drive_guid} deactivated successfully") # Update dashboard drive state with dashboard_lock: status_data["drive_state"][drive_guid] = "INACTIVE" return True except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error deactivating drive {drive_guid}: {e}") return False def wait_for_drive_active(commander: Commander, drive_guid: str, timeout_seconds: int = 600, dry_run: bool = False) -> bool: """ Wait for a drive to transition to ACTIVE state, with timeout. Args: commander: Commander object for interfacing with Vast system drive_guid: GUID of the drive to check timeout_seconds: Maximum time to wait in seconds dry_run: If True, skip the wait and return True immediately Returns: True if drive becomes ACTIVE, False otherwise """ try: if dry_run: logger.info(f"[DRY RUN] Would wait for drive {drive_guid} to become ACTIVE") with dashboard_lock: status_data["drive_state"][drive_guid] = "ACTIVE" return True logger.info(f"Waiting for drive {drive_guid} to become ACTIVE (timeout: {timeout_seconds}s)...") # Update dashboard state with dashboard_lock: status_data["drive_state"][drive_guid] = "WAITING FOR ACTIVE" # Convert string GUID to GUID object for filtering try: guid_obj = GUID(drive_guid) except Exception as e: logger.error(f"Error converting GUID string to GUID object: {e}") return False start_time = time.time() while time.time() - start_time < timeout_seconds: try: # Get the specific drive directly by GUID drive = commander.get_object(DRIVE_TYPE, guid_obj) if drive: state_str = str(drive.device_proto.state) logger.debug(f"Drive {drive_guid} current state: {state_str}") # Update dashboard state with current state with dashboard_lock: if "ACTIVATING" in state_str.upper(): status_data["drive_state"][drive_guid] = "ACTIVATING" elif "ACTIVE" in state_str.upper(): status_data["drive_state"][drive_guid] = "ACTIVE" else: status_data["drive_state"][drive_guid] = get_enum_name(drive.device_proto.state) if "ACTIVE" in state_str.upper() and "ACTIVATING" not in state_str.upper(): logger.info(f"Drive {drive_guid} is now ACTIVE") with dashboard_lock: status_data["drive_state"][drive_guid] = "ACTIVE" return True elif "ACTIVATING" in state_str.upper(): logger.debug(f"Drive {drive_guid} is still in ACTIVATING state, waiting...") else: logger.warning(f"Drive {drive_guid} is in unexpected state: {state_str}") except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.warning(f"Could not find drive {drive_guid}: {e}") # Wait before checking again time.sleep(10) logger.error(f"Timeout waiting for drive {drive_guid} to become ACTIVE after {timeout_seconds} seconds") return False except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error waiting for drive to become active: {e}") return False def wait_for_drive_inactive(commander: Commander, drive_guid: str, timeout_seconds: int = 600, dry_run: bool = False) -> bool: """ Wait for a drive to transition to INACTIVE state, with timeout. Args: commander: Commander object for interfacing with Vast system drive_guid: GUID of the drive to check timeout_seconds: Maximum time to wait in seconds dry_run: If True, skip the wait and return True immediately Returns: True if drive becomes INACTIVE, False otherwise """ try: if dry_run: logger.info(f"[DRY RUN] Would wait for drive {drive_guid} to become INACTIVE") with dashboard_lock: status_data["drive_state"][drive_guid] = "INACTIVE" return True logger.info(f"Waiting for drive {drive_guid} to become INACTIVE (timeout: {timeout_seconds}s)...") # Update dashboard state with dashboard_lock: status_data["drive_state"][drive_guid] = "WAITING FOR INACTIVE" # Convert string GUID to GUID object for filtering try: guid_obj = GUID(drive_guid) except Exception as e: logger.error(f"Error converting GUID string to GUID object: {e}") return False start_time = time.time() while time.time() - start_time < timeout_seconds: try: # Get the specific drive directly by GUID drive = commander.get_object(DRIVE_TYPE, guid_obj) if drive: state_str = str(drive.device_proto.state) logger.debug(f"Drive {drive_guid} current state: {state_str}") # Update dashboard state with current state with dashboard_lock: if "DEACTIVATING" in state_str.upper(): status_data["drive_state"][drive_guid] = "DEACTIVATING" elif "INACTIVE" in state_str.upper(): status_data["drive_state"][drive_guid] = "INACTIVE" elif "FAILED" in state_str.upper(): status_data["drive_state"][drive_guid] = "FAILED" else: status_data["drive_state"][drive_guid] = get_enum_name(drive.device_proto.state) if "INACTIVE" in state_str.upper() or "FAILED" in state_str.upper(): logger.info(f"Drive {drive_guid} is now {state_str.upper()}") return True elif "DEACTIVATING" in state_str.upper(): logger.debug(f"Drive {drive_guid} is still in DEACTIVATING state, waiting...") else: logger.warning(f"Drive {drive_guid} is in unexpected state: {state_str}") except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.warning(f"Could not find drive {drive_guid}: {e}") # Wait before checking again time.sleep(10) logger.error(f"Timeout waiting for drive {drive_guid} to become INACTIVE after {timeout_seconds} seconds") return False except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error waiting for drive to become inactive: {e}") return False def _ssh_dnode_cmd(dnode_ip: str, remote_cmd: str, timeout: int = 30, dry_run: bool = False) -> Optional[subprocess.CompletedProcess]: """Run a single command on a DNode via SSH. Returns CompletedProcess or None on error.""" full_cmd = ( f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no " f"-o ConnectTimeout=10 {SSH_USER}@{dnode_ip} '{remote_cmd}'" ) if dry_run: logger.info(f"[DRY RUN] Would run on {dnode_ip}: {remote_cmd}") return subprocess.CompletedProcess(args=full_cmd, returncode=0, stdout="", stderr="") logger.info(f"Running on {dnode_ip}: {remote_cmd}") try: result = subprocess.run( full_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=timeout, ) if result.returncode != 0: logger.error(f"Command failed on {dnode_ip} (rc={result.returncode}): {result.stderr.strip()}") else: logger.debug(f"Command succeeded on {dnode_ip}: {result.stdout.strip()[:200]}") return result except subprocess.TimeoutExpired: logger.error(f"SSH command timed out ({timeout}s) on {dnode_ip}: {remote_cmd}") return None except Exception as e: logger.exception(f"Error running SSH command on {dnode_ip}: {e}") return None def _resolve_expose_drives_path(dnode_ip: str) -> Optional[str]: """Find the absolute path to expose_drives.py on a DNode. Caches confirmed results per IP: found path or confirmed-not-found (""). SSH transport failures are NOT cached so the next call retries the probe. """ cached = _expose_drives_path_cache.get(dnode_ip) if cached is not None: return cached if cached else None probe_cmd = ( "which expose_drives.py 2>/dev/null " "|| ls /usr/bin/expose_drives.py 2>/dev/null " "|| ls /usr/sbin/expose_drives.py 2>/dev/null " "|| ls /vast/utils/expose_drives.py 2>/dev/null " "|| ls /vast/data/expose_drives.py 2>/dev/null " '|| echo "NOT_FOUND"' ) result = _ssh_dnode_cmd(dnode_ip, probe_cmd, timeout=15) if result is None or result.returncode != 0: logger.warning(f"SSH probe failed for expose_drives.py on {dnode_ip} (rc={getattr(result, 'returncode', 'N/A')}, will retry next time)") return None path = result.stdout.strip().split('\n')[0].strip() if not path or path == "NOT_FOUND": _expose_drives_path_cache[dnode_ip] = "" logger.warning(f"expose_drives.py NOT found on {dnode_ip}") return None _expose_drives_path_cache[dnode_ip] = path logger.info(f"expose_drives.py found at {path} on {dnode_ip}") return path def _normalize_chassis_type(chassis_type) -> str: """Extract the short chassis name from DBoxChassisType enum or string representation.""" ct = str(chassis_type or "UNKNOWN").upper() for prefix in ("DBOXCHASSISTYPE.", "CHASSISTYPE."): if ct.startswith(prefix): ct = ct[len(prefix):] return ct def pci_link_reset_slot(dnode_ip: str, slot: int, dry_run: bool = False, chassis_type: str = None) -> bool: """Reset the PCI link for a single NVRAM slot using expose_drives.py. Follows the Kioxia recovery sequence: disable -> enable -> power x2 -> PERST x2 -> check (soft) Returns True if all steps succeeded (or were skipped due to guardrails). On failure, returns False but does NOT raise — the caller should fall through to the existing power_cycle_drive() path. """ slot = int(slot) normalized_chassis = _normalize_chassis_type(chassis_type) if normalized_chassis not in SUPPORTED_CHASSIS_FOR_EXPOSE: logger.info(f"PCI link reset skipped: chassis {chassis_type} ({normalized_chassis}) not in supported set {SUPPORTED_CHASSIS_FOR_EXPOSE}") add_message(f"PCI link reset skipped (chassis {normalized_chassis} not supported)") return True expose_path = "sudo expose_drives.py" if not dry_run: resolved = _resolve_expose_drives_path(dnode_ip) if not resolved: logger.warning(f"PCI link reset skipped: expose_drives.py not found on {dnode_ip}") add_message(f"PCI link reset skipped (expose_drives.py not found on {dnode_ip})") return True expose_path = f"sudo {resolved}" logger.info(f"Step 0: PCI link reset for slot {slot} on {dnode_ip} (chassis={normalized_chassis})") add_message(f"Step 0: PCI link reset for slot {slot} on {dnode_ip}...") steps = [ ("disable", 2, "Disabling PCI link"), ("enable", 5, "Re-enabling PCI link"), ("power", 10, "Power cycle 1/2"), ("power", 15, "Power cycle 2/2"), ("perst", 5, "PERST reset 1/2"), ("perst", 10, "PERST reset 2/2"), ] for cmd, settle_secs, description in steps: logger.info(f" PCI link reset [{description}]: {expose_path} -c {cmd} -s {slot}") add_message(f" [{description}] expose_drives.py -c {cmd} -s {slot}") result = _ssh_dnode_cmd( dnode_ip, f"{expose_path} -c {cmd} -s {slot}", timeout=30, dry_run=dry_run, ) if result is None or result.returncode != 0: logger.error( f"PCI link reset FAILED at [{description}] for slot {slot} on {dnode_ip} " f"- aborting Step 0, falling through to power_cycle_drive()" ) add_message(f"PCI link reset failed at [{description}] - will try power cycle instead") return False if not dry_run: logger.info(f" Settling {settle_secs}s after {cmd}...") time.sleep(settle_secs) check_result = _ssh_dnode_cmd( dnode_ip, f"{expose_path} -c check --json", timeout=30, dry_run=dry_run, ) if check_result and check_result.returncode == 0: logger.info(f"PCI link reset check output: {check_result.stdout.strip()[:500]}") else: logger.warning(f"PCI link reset: post-check failed (non-fatal)") logger.info(f"Step 0: PCI link reset completed for slot {slot} on {dnode_ip}") add_message(f"Step 0: PCI link reset completed for slot {slot}") return True def check_dnode_is_sanmina_master(dnode_ip: str) -> bool: """ Check if a dnode is the bus master in a SANMINA chassis using IPMI. The master will return "01 01" from the IPMI command. Args: dnode_ip: IP of the dnode to check Returns: True if the dnode is the master, False otherwise """ try: cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} '{IPMI_BUS_MASTER}'" logger.debug(f"Running command to check SANMINA bus master: {cmd}") result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # The master dnode will return "01 01", non-master will return "01 00" if result.returncode == 0 and "01 01" in result.stdout: logger.info(f"Dnode {dnode_ip} is the SANMINA bus master (IPMI returned: {result.stdout.strip()})") return True elif result.returncode == 0: logger.debug(f"Dnode {dnode_ip} is NOT the SANMINA bus master (IPMI returned: {result.stdout.strip()})") return False else: logger.warning(f"Failed to check if dnode {dnode_ip} is SANMINA bus master: {result.stderr}") return False except Exception as e: logger.warning(f"Error checking if dnode {dnode_ip} is SANMINA bus master: {e}") return False def get_nvme_info_from_dnode(dnode_ip: str) -> Optional[Dict]: """ Get NVME drive information from a dnode via SSH. Args: dnode_ip: IP address of the dnode Returns: Dictionary containing parsed JSON output of NVME CLI command, or None if failed """ try: # Construct the SSH command ssh_cmd = [ "ssh", "-i", SSH_KEY_PATH, "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", f"{SSH_USER}@{dnode_ip}", NVME_LIST_COMMAND ] logger.debug(f"Running command on {dnode_ip}: {' '.join(ssh_cmd)}") # Execute the SSH command process = subprocess.run( ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, # Python 3.6 compatible alternative to text=True check=False # Don't raise exception on non-zero exit code ) if process.returncode != 0: logger.warning(f"Failed to get NVME info from {dnode_ip}: {process.stderr.strip()}") return None # Parse the JSON output try: nvme_data = json.loads(process.stdout) return nvme_data except json.JSONDecodeError as e: logger.warning(f"Failed to parse NVME info from {dnode_ip}: {e}") logger.debug(f"Raw output: {process.stdout}") return None except Exception as e: logger.error(f"Error getting NVME info from dnode {dnode_ip}: {e}") return None def has_drive_with_serial(nvme_data: Dict, serial: str) -> bool: """ Check if the NVME data contains a drive with the given serial number. Args: nvme_data: Dictionary containing NVME CLI output serial: Serial number to search for Returns: True if a drive with the given serial number was found, False otherwise """ try: # Check in 'drives' section if 'drives' in nvme_data: for drive in nvme_data['drives']: if drive.get('serial') == serial: return True # Also check in 'nvrams' section if 'nvrams' in nvme_data: for nvram in nvme_data['nvrams']: if nvram.get('serial') == serial: return True return False except Exception as e: logger.error(f"Error checking for drive {serial} in NVME data: {e}") return False def check_drive_attached(commander: Commander, drive_guid: str) -> bool: """ Check if a drive is attached to at least one dnode. Raises ConnectionLostError if the cluster leader connection is broken (instead of silently returning False, which previously caused the tool to misinterpret TCP errors as "drive not attached"). Args: commander: Commander object for interfacing with Vast system drive_guid: GUID of the drive to check Returns: True if the drive is attached to at least one dnode, False otherwise Raises: ConnectionLostError: if the leader TCP connection is down """ try: guid_obj = GUID(drive_guid) except Exception as e: logger.error(f"Error converting GUID string to GUID object: {e}") return False try: drive = commander.get_object(DRIVE_TYPE, guid_obj) attached = drive.device_proto.attached logger.info(f"Drive {drive_guid} attachment status: {attached}") if any(attached): logger.info(f"Drive {drive_guid} is attached to at least one dnode") return True else: logger.warning(f"Drive {drive_guid} is not attached to any dnode") return False except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.error(f"Drive {drive_guid} not found or error accessing it: {e}") return False def get_firmware_upgrade_params(drive_model: str) -> Optional[Dict[str, Any]]: """ Get firmware upgrade parameters for a specific drive model. Args: drive_model: The drive model string Returns: Dictionary with firmware upgrade parameters or None if unsupported """ # Mapping of drive models to their firmware upgrade parameters # Based on the ssd_fw_update.sh script fw_params = { # Intel Optane drives "SSDPE21K015TA": {"slot": 0, "action": 1, "xfer": "0x20000", "description": "Intel Optane P4800X 1.5TB"}, "SSDPD21K015TA": {"slot": 0, "action": 1, "xfer": "0x20000", "description": "Intel Optane D4800X 1.5TB"}, "SSDPE21D960GA": {"slot": 0, "action": 1, "xfer": "0x20000", "description": "Intel Optane 905p 960GB"}, "SSDPF21Q016TB": {"slot": 1, "action": 3, "description": "Intel Optane P5800X 1.6TB"}, "SSDPF21M016TA": {"slot": 1, "action": 3, "description": "Intel Optane P5820X 1.6TB"}, "SSDPF21M800GA": {"slot": 1, "action": 3, "description": "Intel Optane P5820X 800GB"}, # Solidigm (Intel) QLC drives "SSDPE2NV153T8": {"slot": 1, "action": 1, "description": "Solidigm (Intel) P4326 15.36TB QLC"}, "SSDPF2NV153TZ": {"slot": 0, "action": 1, "description": "Solidigm (Intel) P5316 U.2 15.36TB QLC"}, "SSDPFWNV153TZ": {"slot": 0, "action": 1, "description": "Solidigm (Intel) P5316 E1.L 15.36TB QLC"}, "SSDPF2NV307TZ": {"slot": 0, "action": 1, "description": "Solidigm (Intel) P5316 U.2 30TB QLC"}, "SSDPFWNV307TZ": {"slot": 0, "action": 1, "description": "Solidigm (Intel) P5316 E1.L 30TB QLC"}, # Solidigm P5336 series "SBFPFWBV153T": {"slot": None, "action": 1, "description": "Solidigm P5336 E1.L 15.36TB QLC"}, "SBFPFWBV307T": {"slot": None, "action": 1, "description": "Solidigm P5336 E1.L 30TB QLC"}, "SBFPFWBV614T": {"slot": None, "action": 1, "description": "Solidigm P5336 E1.L 61.4TB QLC"}, "SBFPF2BV153T": {"slot": None, "action": 1, "description": "Solidigm P5336 U.2 15.36TB QLC"}, "SBFPF2BV307T": {"slot": None, "action": 1, "description": "Solidigm P5336 U.2 30TB QLC"}, "SBFPF2BV614T": {"slot": None, "action": 1, "description": "Solidigm P5336 U.2 61.4TB QLC"}, "SBFPF2BV0P12": {"slot": None, "action": 1, "description": "Solidigm P5336 U.2 122.90TB QLC"}, # KIOXIA drives "KCM730T7P5xnETRI": {"slot": 1, "action": 3, "description": "KIOXIA 30TB CM7 QLC for HPE Raider"}, "KFL61HUL1T60": {"slot": 1, "action": 3, "description": "KIOXIA FL6 1.6TB NVRAM (ES)"}, "KFL6XHUL1T60": {"slot": 1, "action": 3, "description": "KIOXIA FL6 1.6TB NVRAM (CS)"}, "KFL61HUL800G": {"slot": 1, "action": 3, "description": "KIOXIA FL6 800G NVRAM (ES)"}, "KFL6XHUL800G": {"slot": 1, "action": 3, "description": "KIOXIA FL6 800G NVRAM (CS)"}, # Solidigm SCM drives "SSDPF2KX038TZ": {"slot": 0, "action": 3, "description": "Solidigm (Intel) P5510 800GB ES"}, "SSDPF2SQ800GZ": {"slot": 0, "action": 3, "description": "Solidigm P5810 800GB"}, "SSDPF2SQ016TZ": {"slot": 0, "action": 3, "description": "Solidigm P5810 1.6TB"}, # Micron drives "Micron_9300_MTFDHAL15T3TDP": {"slot": 2, "action": 3, "description": "Micron 9300 15.36TB"}, "MTFDKBN15T3TGR": {"slot": 2, "action": 1, "description": "Micron 6500 E1.L 15.36TB"}, "MTFDKBN30T7TGR": {"slot": 2, "action": 1, "description": "Micron 6500 E1.L 30.73TB"}, "MTFDKCC30T7TGR": {"slot": 2, "action": 1, "description": "Micron 6500 U.2 30.73TB"}, "MTFDKCC960TFR-1BC1ZHEYY": {"slot": 2, "action": 1, "description": "Micron XTR U.2 960.20GB"}, "MTFDKCC1T9TFR-1BC1ZHEYY": {"slot": 2, "action": 1, "description": "Micron XTR U.2 1.92TB"}, "MTFDLBN61T4THL-1BK1JABYY": {"slot": 2, "action": 1, "description": "Micron 6550 ION E1.L 61.44TB"}, "MTFDLBN61T4THL-ABK1JABYYES": {"slot": 2, "action": 1, "description": "Micron 6550 ION E1.L 61.44TB - Engineering Sample"}, "MTFDLAL61T4THL-1BK1JABYY": {"slot": 2, "action": 1, "description": "Micron 6550 ION U.2 61.44TB"}, "MTFDLAL61T4THL-1BK1JABYYES": {"slot": 2, "action": 1, "description": "Micron 6550 ION U.2 61.44TB - Engineering Sample"}, # Samsung drives "SAMSUNG MZEMO15THCLC-00AVT": {"slot": 0, "action": 1, "description": "SAMSUNG BM1743 E1.L 15.36TB"}, "SAMSUNG MZEMO61THCLF-00AVT": {"slot": 0, "action": 1, "description": "SAMSUNG BM1743 E1.L 61.45TB"}, # Western Digital drives "WUS5BB1A1E9ELE3": {"slot": 0, "action": 3, "description": "WD SN650 15.36TB E1.L"}, "SDS5EC0C1E9ELY3": {"slot": 0, "action": 3, "description": "Western Digital SN655 61TB E1.L"}, # Phison drives "PASCARI XX208H033T20Z3116T300": {"slot": 0, "action": 1, "description": "Phison PASCARI X200Z 3.2 TB"}, "PASCARI XX208H031T60Z318T1900": {"slot": 0, "action": 1, "description": "Phison PASCARI X200Z 1.6 TB"}, "PASCARI XX208H03800GZ314T0900": {"slot": 0, "action": 1, "description": "Phison PASCARI X200Z 800 GB"}, "PASCARI XX208H033T20Z3116T30A": {"slot": 0, "action": 1, "description": "Phison PASCARI X200Z 3.2 TB"}, "PASCARI XX208H031T60Z318T190A": {"slot": 0, "action": 1, "description": "Phison PASCARI X200Z 1.6 TB"}, "PASCARI XX208H03800GZ314T090A": {"slot": 0, "action": 1, "description": "Phison PASCARI X200Z 800 GB"}, } # Check for exact match first if drive_model in fw_params: return fw_params[drive_model] # Check for partial matches (for drive families) for model_pattern, params in fw_params.items(): if model_pattern in drive_model: return params # Check for SBFPF pattern (Solidigm P5336 series) if "SBFPF" in drive_model: return {"slot": None, "action": 1, "description": f"Solidigm P5336 drive ({drive_model})"} # Check for KFL6 pattern (KIOXIA FL6 series) if "KFL6" in drive_model: return {"slot": 1, "action": 3, "description": f"KIOXIA FL6 drive ({drive_model})"} # Check for SAMSUNG pattern if "SAMSUNG" in drive_model: return {"slot": 0, "action": 1, "description": f"Samsung drive ({drive_model})"} # Check for PASCARI pattern if "PASCARI" in drive_model: return {"slot": 0, "action": 1, "description": f"Phison PASCARI drive ({drive_model})"} return None def find_drive_nvme_device(dnode_ip: str, drive_serial: str) -> Optional[str]: """ Find the NVMe device path for a drive with a specific serial number on a dnode. This works even when the drive is deactivated/failed. Args: dnode_ip: IP address of the dnode drive_serial: Serial number of the drive to find Returns: NVMe device path (e.g., '/dev/nvme1') or None if not found """ try: # First try the normal nvme list approach nvme_info = get_nvme_info_from_dnode(dnode_ip) if nvme_info: # Search for the drive with matching serial in drives section for drive in nvme_info.get("drives", []): if drive.get("serial") == drive_serial: device_node = drive.get("path", "") if '/dev/nvme' in device_node: nvme_device = device_node.replace('/vast', '') logger.info(f"Found ssd drive {drive_serial} at device {nvme_device} on dnode {dnode_ip}") return nvme_device # Also check nvrams section for nvram in nvme_info.get("nvrams", []): if nvram.get("serial") == drive_serial: device_node = nvram.get("path", "") if '/dev/nvme' in device_node: nvme_device = device_node.replace('/vast', '') logger.info(f"Found nvram drive {drive_serial} at device {nvme_device} on dnode {dnode_ip}") return nvme_device # If not found via nvme list, try direct approach by checking all nvme devices # This handles cases where deactivated drives might not show up in nvme list but are still accessible logger.info(f"Drive {drive_serial} not found via nvme list, trying direct device enumeration") # Use a simpler command to list all nvme devices and check their serial numbers direct_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'for dev in /dev/nvme*; do if [[ $dev =~ /dev/nvme[0-9]+$ ]]; then echo \"Device: $dev\"; sudo nvme id-ctrl $dev 2>/dev/null | grep -E \"sn|fr\" || echo \"Failed to read $dev\"; echo \"---\"; fi; done'" logger.debug(f"Running direct device enumeration: {direct_cmd}") result = subprocess.run(direct_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if result.returncode == 0: output_lines = result.stdout.split('\n') current_device = None for line in output_lines: line = line.strip() if line.startswith("Device: /dev/nvme"): current_device = line.replace("Device: ", "") elif line.startswith("sn") and drive_serial in line: logger.info(f"Found drive {drive_serial} at device {current_device} on dnode {dnode_ip} via direct enumeration") return current_device logger.warning(f"Drive with serial {drive_serial} not found on dnode {dnode_ip} via any method") return None except Exception as e: logger.exception(f"Error finding NVMe device for drive {drive_serial} on dnode {dnode_ip}: {e}") return None def probe_drive_nvme_visibility(dnode_ip: str, drive_serial: str) -> Tuple[str, Optional[str], str]: """Probe whether a drive serial is visible, missing, or unknown on a DNode. Returns: (status, device, reason), where status is one of: - "visible": probe succeeded and serial was found - "missing": probe succeeded and serial was not found - "unknown": probe failed, timed out, or output could not be trusted """ result = _ssh_dnode_cmd(dnode_ip, NVME_LIST_COMMAND, timeout=30) if result and result.returncode == 0: try: nvme_info = json.loads(result.stdout) for section in ("drives", "nvrams"): for device in nvme_info.get(section, []): if device.get("serial") == drive_serial: device_node = device.get("path", "") if "/dev/nvme" in device_node: nvme_device = device_node.replace("/vast", "") return "visible", nvme_device, f"found via nvme_cli {section}" except Exception as e: logger.warning(f"Could not parse nvme_cli list output from {dnode_ip}: {e}; trying direct enumeration") elif result is None: logger.warning(f"nvme_cli visibility probe timed out/failed on {dnode_ip}; trying direct enumeration") else: logger.warning(f"nvme_cli visibility probe failed on {dnode_ip} (rc={result.returncode}); trying direct enumeration: {result.stderr.strip()}") direct_cmd = ( 'found_any=0; read_ok=0; read_fail=0; ' 'for dev in /dev/nvme[0-9]*; do ' '[ -e "$dev" ] || continue; ' 'case "$dev" in *n[0-9]*) continue;; esac; ' 'found_any=$((found_any+1)); ' 'echo "Device: $dev"; ' 'id_out=$(sudo nvme id-ctrl "$dev" 2>/dev/null); id_rc=$?; ' 'if [ "$id_rc" -eq 0 ]; then ' 'read_ok=$((read_ok+1)); ' 'printf "%s\\n" "$id_out" | grep -E "sn|fr" || true; ' 'else ' 'read_fail=$((read_fail+1)); ' 'echo "ID_CTRL_FAILED: $dev"; ' 'fi; ' 'echo "---"; ' 'done; ' 'echo "__SUMMARY__:found=$found_any read_ok=$read_ok read_fail=$read_fail"' ) direct_result = _ssh_dnode_cmd(dnode_ip, direct_cmd, timeout=30) if direct_result is None: return "unknown", None, "direct NVMe enumeration timed out or failed" if direct_result.returncode != 0: return "unknown", None, f"direct NVMe enumeration failed rc={direct_result.returncode}: {direct_result.stderr.strip()}" current_device = None summary_found = False read_ok = 0 read_fail = 0 found_any = 0 for line in direct_result.stdout.splitlines(): line = line.strip() if line.startswith("Device: /dev/nvme"): current_device = line.replace("Device: ", "") elif line.startswith("sn") and drive_serial in line: return "visible", current_device, "found via direct NVMe enumeration" elif line.startswith("__SUMMARY__:"): summary_found = True for part in line.replace("__SUMMARY__:", "").split(): key, _, value = part.partition("=") try: parsed_value = int(value or 0) except ValueError: return "unknown", None, f"direct NVMe enumeration summary has invalid value: {part}" if key == "found": found_any = parsed_value elif key == "read_ok": read_ok = parsed_value elif key == "read_fail": read_fail = parsed_value if not summary_found: return "unknown", None, "direct NVMe enumeration summary missing" if found_any == 0: return "unknown", None, "direct NVMe enumeration found no NVMe controllers" if read_ok == 0: return "unknown", None, "direct NVMe enumeration could not read any controller identity" if read_fail > 0: return "unknown", None, f"direct NVMe enumeration had incomplete controller coverage: read_ok={read_ok}, read_fail={read_fail}" return "missing", None, "serial not found by nvme_cli or direct NVMe enumeration" def _state_matches_any(state: str, allowed_markers: Tuple[str, ...]) -> bool: """Return True when a normalized enum-like state matches any allowed marker.""" normalized = str(state or "UNKNOWN").upper() for marker in allowed_markers: marker = marker.upper() if marker in ("HEALTHY", "ENABLED"): if normalized == marker or normalized.endswith(f".{marker}"): return True elif marker in normalized: return True return False def _state_has_failure_marker(state: str) -> bool: """Return True when a RAID state clearly indicates a failed/unsafe condition.""" normalized = str(state or "UNKNOWN").upper() return any(marker in normalized for marker in RAID_FAILED_STATE_MARKERS) def _get_raid_state_snapshot(commander: Commander) -> Tuple[bool, Dict[str, str], str]: """Fetch and evaluate RAID state before a potentially destructive NVRAM recovery.""" try: raids = commander.raid_status() states = { "ssd": get_enum_name(raids.ssd_state.state), "nvram": get_enum_name(raids.nvram_state.state), "memory": get_enum_name(raids.memory_state.state), "rio_nvram": get_enum_name(raids.rio_nvram_state.state), } except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Could not read RAID state before NVRAM recovery: {e}") return False, {}, f"could not read RAID state: {e}" logger.info( "Pre-recovery RAID state - SSD: %s, NVRAM: %s, Memory: %s, RIO NVRAM: %s", states["ssd"], states["nvram"], states["memory"], states["rio_nvram"], ) add_message( f"Pre-recovery RAID state - SSD: {states['ssd']}, NVRAM: {states['nvram']}, " f"Memory: {states['memory']}, RIO: {states['rio_nvram']}" ) failed_components = [ f"{component}: {state}" for component, state in states.items() if _state_has_failure_marker(state) ] if failed_components: return False, states, f"RAID has failed components: {', '.join(failed_components)}" for component in ("ssd", "memory"): if not _state_matches_any(states[component], NON_NVRAM_RAID_ALLOWED_STATES): return False, states, f"RAID {component} state {states[component]} is not safe for recovery" for component in ("nvram", "rio_nvram"): if not _state_matches_any(states[component], NVRAM_RECOVERY_RAID_ALLOWED_STATES): return False, states, f"RAID {component} state {states[component]} is not safe for recovery" return True, states, "RAID state allows NVRAM recovery" def _get_drive_leader_snapshot(commander: Commander, drive_guid: str) -> Tuple[bool, Dict[str, Any], str]: """Fetch the target drive state from the leader before recovery.""" try: guid_obj = GUID(drive_guid) drive = commander.get_object(DRIVE_TYPE, guid_obj) except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Could not read drive {drive_guid} from leader before recovery: {e}") return False, {}, f"could not read drive from leader: {e}" device_proto = drive.device_proto attached = list(getattr(device_proto, "attached", []) or []) fail_reason = "NONE" if getattr(device_proto, "fail_reason", None): fail_reason = get_enum_name(device_proto.fail_reason) snapshot = { "state": get_enum_name(device_proto.state), "enabled": getattr(device_proto, "enabled", None), "attached": attached, "is_attached": any(attached), "fail_reason": fail_reason, "slot": getattr(device_proto, "pci_switch_slot", None), "state_timestamp": getattr(device_proto, "state_timestamp", None), "attached_timestamp": getattr(device_proto, "attached_timestamp", None), } logger.info( "Pre-recovery leader drive snapshot for %s - state: %s, enabled: %s, " "attached: %s, fail_reason: %s, slot: %s", drive_guid, snapshot["state"], snapshot["enabled"], snapshot["attached"], snapshot["fail_reason"], snapshot["slot"], ) add_message( f"Pre-recovery drive state: {snapshot['state']}, attached={snapshot['attached']}, " f"fail_reason={snapshot['fail_reason']}" ) return True, snapshot, "drive snapshot read" def _log_recent_dnode_reset_evidence(dnode_ip: str, drive_serial: str) -> None: """Log recent DNode kernel evidence around NVMe/controller resets for diagnosis.""" safe_serial = "".join(c for c in str(drive_serial or "") if c.isalnum() or c in ("_", "-", ".")) grep_pattern = "nvme|pcie|pcieport|reset|controller" if safe_serial: grep_pattern = f"{grep_pattern}|{safe_serial}" reset_log_cmd = ( f'(sudo journalctl -k --since "-10 minutes" --no-pager 2>/dev/null ' f'|| sudo dmesg -T 2>/dev/null) ' f'| grep -Ei "{grep_pattern}" | tail -50 || true' ) result = _ssh_dnode_cmd(dnode_ip, reset_log_cmd, timeout=30) if result is None: logger.warning(f"Could not collect recent DNode reset evidence from {dnode_ip}") return evidence = (result.stdout or "").strip() if evidence: logger.info(f"Recent DNode reset/NVMe evidence from {dnode_ip}:\n{evidence[:4000]}") else: logger.info(f"No recent DNode reset/NVMe evidence found on {dnode_ip}") def assess_nvram_recovery_need(commander: Commander, drive_guid: str, drive_info: Dict[str, Any], failure_context: str) -> Dict[str, Any]: """Decide whether NVRAM recovery is safe and actually needed after an FW failure. Recovery is intentionally limited to the cases the run logs showed we care about: the target NVRAM is not attached to the leader anymore, is not visible on the DNode, or is unstable/re-enumerating. If the drive is attached, visible, and stable, the script exits instead of issuing slot resets/power cycles for an unrelated FW error. """ decision = { "attempt_recovery": False, "hard_abort": True, "reason": "not assessed", } if not NVRAM_MODE: decision["hard_abort"] = False decision["reason"] = "not in NVRAM mode" return decision drive_serial = drive_info.get("drive_serial", "UNKNOWN") dnode_ip = drive_info.get("dnode_ip") logger.info("=" * 80) logger.info(f"Assessing whether NVRAM recovery is needed for {drive_guid} ({drive_serial})") logger.info(f"Failure context: {failure_context}") logger.info("=" * 80) add_message(f"Assessing NVRAM recovery need: {failure_context}") raid_ok, _raid_states, raid_reason = _get_raid_state_snapshot(commander) if not raid_ok: decision["reason"] = f"RAID state is unsafe: {raid_reason}" logger.error(f"{decision['reason']} - exiting without recovery") add_message(f"CRITICAL: {decision['reason']}") return decision leader_ok, leader_snapshot, leader_reason = _get_drive_leader_snapshot(commander, drive_guid) if not leader_ok: decision["reason"] = f"leader drive state unavailable: {leader_reason}" logger.error(f"{decision['reason']} - exiting without recovery") add_message(f"CRITICAL: {decision['reason']}") return decision if not dnode_ip or not drive_serial or drive_serial == "UNKNOWN": decision["reason"] = "missing dnode IP or drive serial for visibility check" logger.error(f"{decision['reason']} - exiting without recovery") add_message(f"CRITICAL: {decision['reason']}") return decision first_status, first_device, first_reason = probe_drive_nvme_visibility(dnode_ip, drive_serial) if first_status == "visible": logger.info(f"Pre-recovery DNode visibility check 1/2: {drive_serial} is visible at {first_device} ({first_reason})") elif first_status == "missing": logger.warning(f"Pre-recovery DNode visibility check 1/2: {drive_serial} is NOT visible on {dnode_ip} ({first_reason})") else: logger.error(f"Pre-recovery DNode visibility check 1/2 is UNKNOWN for {drive_serial} on {dnode_ip}: {first_reason}") logger.info("Waiting 10 seconds to confirm NVRAM device stability before deciding on recovery...") time.sleep(10) second_status, second_device, second_reason = probe_drive_nvme_visibility(dnode_ip, drive_serial) if second_status == "visible": logger.info(f"Pre-recovery DNode visibility check 2/2: {drive_serial} is visible at {second_device} ({second_reason})") elif second_status == "missing" and first_status == "visible": logger.warning(f"Pre-recovery DNode visibility check 2/2: {drive_serial} disappeared after being visible ({second_reason})") elif second_status == "missing": logger.warning(f"Pre-recovery DNode visibility check 2/2: {drive_serial} is still NOT visible on {dnode_ip} ({second_reason})") else: logger.error(f"Pre-recovery DNode visibility check 2/2 is UNKNOWN for {drive_serial} on {dnode_ip}: {second_reason}") _log_recent_dnode_reset_evidence(dnode_ip, drive_serial) attached = bool(leader_snapshot.get("is_attached")) probe_unknown = first_status == "unknown" or second_status == "unknown" missing_on_dnode = first_status == "missing" and second_status == "missing" unstable = bool( first_status == "visible" and ( second_status == "missing" or (second_status == "visible" and first_device != second_device) ) ) if probe_unknown: decision["reason"] = ( f"DNode visibility probe is unknown: first={first_status} ({first_reason}), " f"second={second_status} ({second_reason})" ) logger.error(f"{decision['reason']} - exiting without slot recovery") add_message(f"ERROR: {decision['reason']}") return decision if not attached or missing_on_dnode or unstable: reasons = [] if not attached: reasons.append("leader reports drive not attached") if missing_on_dnode: reasons.append("drive is not visible on DNode in either check") if unstable: reasons.append(f"drive re-enumerated during stability check ({first_device} -> {second_device})") decision["attempt_recovery"] = True decision["hard_abort"] = False decision["reason"] = "; ".join(reasons) logger.warning(f"NVRAM recovery is needed and RAID state allows it: {decision['reason']}") add_message(f"NVRAM recovery needed: {decision['reason']}") return decision decision["reason"] = ( f"recovery evidence is insufficient: attached={attached}, " f"first={first_status}/{first_device}, second={second_status}/{second_device}, " f"missing_on_dnode={missing_on_dnode}, unstable={unstable}" ) logger.error(f"{decision['reason']} - exiting without slot recovery") add_message(f"ERROR: {decision['reason']}") return decision def upgrade_drive_firmware(dnode_ip: str, drive_serial: str, drive_model: str, fw_file: str, dry_run: bool = False, target_fw_version: str = None, drive_info: dict = None, skip_fw_verify: bool = False) -> bool: """ Upgrade firmware on a specific drive. Args: dnode_ip: IP address of the dnode where the drive is connected drive_serial: Serial number of the drive drive_model: Model of the drive fw_file: Path to the firmware file dry_run: If True, only log actions without executing them target_fw_version: Expected firmware version after upgrade drive_info: Optional drive info dict to update with fw_revision skip_fw_verify: If True, skip post-upgrade firmware version verification (used in ebox flow where FW version only updates after power cycle) Returns: True if successful, False otherwise """ if drive_info is None: drive_info = {} try: logger.info(f"Starting firmware upgrade for drive {drive_serial} ({drive_model}) on dnode {dnode_ip}") add_message(f"Starting firmware upgrade for drive {drive_serial}") # Get firmware parameters for this drive model fw_params = get_firmware_upgrade_params(drive_model) if not fw_params: logger.error(f"Unsupported drive model for firmware upgrade: {drive_model}") add_message(f"ERROR: Unsupported drive model for firmware upgrade: {drive_model}") return False logger.info(f"Using firmware parameters for {fw_params['description']}") add_message(f"Drive type: {fw_params['description']}") if dry_run: logger.info(f"[DRY RUN] Would upgrade firmware on drive {drive_serial} using file {fw_file}") add_message(f"[DRY RUN] Would upgrade firmware on drive {drive_serial}") return True # Find the NVMe device for this drive # Note: When drive is deactivated, it should still be accessible via NVMe commands even if it shows as failed nvme_device = find_drive_nvme_device(dnode_ip, drive_serial) if not nvme_device: logger.error(f"Could not find NVMe device for drive {drive_serial} on dnode {dnode_ip}") add_message(f"ERROR: Could not find NVMe device for drive {drive_serial}") return False drive_info['nvme_device'] = nvme_device logger.info(f"Found drive at device {nvme_device}") add_message(f"Found drive at device {nvme_device}") fw_check_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'sudo nvme id-ctrl {nvme_device} | grep fr'" logger.debug(f"Checking firmware version: {fw_check_cmd}") check_result = subprocess.run(fw_check_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) found_version = None if check_result.returncode == 0: logger.info(f"Firmware version check before upgrade: {check_result.stdout}") add_message(f"Firmware version before upgrade: {check_result.stdout.strip()}") found_version = check_result.stdout.strip().splitlines()[0].split()[-1] drive_info['fw_revision'] = found_version else: logger.warning(f"Could not verify firmware version for drive {nvme_device} on {dnode_ip}: {check_result.stderr}") if target_fw_version and found_version and found_version == target_fw_version: logger.info(f"Drive {drive_serial} already at target firmware {target_fw_version} - skipping upgrade") add_message(f"Drive already at target FW {target_fw_version} - skipping") return True # Update the drive state state = load_state() current_drive_guid = state.get("current_drive") if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "FIRMWARE UPGRADING" # Step 1: Copy firmware file to the dnode if it doesn't exist there dnode_fw_file = f"/tmp/{os.path.basename(fw_file)}" copy_cmd = f"scp -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {fw_file} {SSH_USER}@{dnode_ip}:{dnode_fw_file}" logger.debug(f"Copying firmware file to dnode: {copy_cmd}") copy_result = subprocess.run(copy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if copy_result.returncode != 0: logger.error(f"Failed to copy firmware file to dnode: {copy_result.stderr}") add_message(f"ERROR: Failed to copy firmware file to dnode: {copy_result.stderr}") return False logger.info(f"Successfully copied firmware file to {dnode_ip}:{dnode_fw_file}") add_message(f"Copied firmware file to dnode: {dnode_fw_file}") # Step 2: Set admin timeout to 30 seconds timeout_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'echo 30 | sudo tee /sys/module/nvme_core/parameters/admin_timeout'" logger.debug(f"Setting admin timeout: {timeout_cmd}") result = subprocess.run(timeout_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if result.returncode != 0: logger.warning(f"Failed to set admin timeout: {result.stderr}") else: logger.info("Set NVME admin timeout to 30 seconds") # Step 3: Download firmware add_message(f"Downloading firmware to drive {drive_serial}...") if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "FIRMWARE DOWNLOADING" # Build fw-download command based on drive parameters, using the copied file on the dnode fw_download_cmd_parts = [ f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip}", f"'sudo nvme fw-download {nvme_device} --fw={dnode_fw_file}" ] # Add xfer parameter if specified if fw_params.get("xfer"): fw_download_cmd_parts[1] += f" --xfer={fw_params['xfer']}" fw_download_cmd_parts[1] += "'" fw_download_cmd = " ".join(fw_download_cmd_parts) logger.debug(f"Firmware download command: {fw_download_cmd}") download_result = subprocess.run(fw_download_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if download_result.returncode != 0: logger.error(f"Firmware download failed: {download_result.stderr}") add_message(f"ERROR: Firmware download failed: {download_result.stderr}") return False logger.info(f"Firmware download successful: {download_result.stdout}") add_message(f"Firmware download completed successfully") if not FAST_UPGRADE: # Step 4: Commit firmware add_message(f"Committing firmware to drive {drive_serial}...") if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "FIRMWARE COMMITTING" # Build fw-commit command based on drive parameters fw_commit_cmd_parts = [ f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip}", f"'sudo nvme fw-commit {nvme_device}" ] # Add slot parameter if specified if fw_params.get("slot") is not None: fw_commit_cmd_parts[1] += f" --slot={fw_params['slot']}" # Add action parameter fw_commit_cmd_parts[1] += f" --action={fw_params['action']}" fw_commit_cmd_parts[1] += "'" fw_commit_cmd = " ".join(fw_commit_cmd_parts) logger.debug(f"Firmware commit command: {fw_commit_cmd}") commit_result = subprocess.run(fw_commit_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if commit_result.returncode != 0: if "Interrupted system call" in commit_result.stderr and fw_params.get('action') == 3: logger.info(f"Firmware commit returned 'Interrupted system call' - expected for action=3 (controller reset in progress)") add_message(f"Firmware commit: controller reset in progress (expected for action=3)") else: logger.error(f"Firmware commit failed: {commit_result.stderr}") add_message(f"ERROR: Firmware commit failed: {commit_result.stderr}") return False else: logger.info(f"Firmware commit successful: {commit_result.stdout}") add_message(f"Firmware commit completed successfully") else: add_message(f"Committing firmware to drive {drive_serial}...") if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "FIRMWARE COMMITTING" # Build fw-commit command based on drive parameters fw_activate_cmd_parts = [ f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip}", f"'sudo nvme fw-activate {nvme_device} -a 3 -s 1'" ] fw_activate_cmd = " ".join(fw_activate_cmd_parts) logger.debug(f"Firmware commit command: {fw_activate_cmd}") activate_result = subprocess.run(fw_activate_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if activate_result.returncode != 0: if "Interrupted system call" in activate_result.stderr: logger.info(f"Firmware activate returned 'Interrupted system call' - expected for action=3 (controller reset in progress)") add_message(f"Firmware activate: controller reset in progress (expected for action=3)") else: logger.error(f"Firmware activate failed: {activate_result.stderr}") add_message(f"ERROR: Firmware activate failed: {activate_result.stderr}") return False # Step 5: Wait for firmware activation (some drives need time) # For NVRAM drives, we need significantly more time for stabilization if NVRAM_MODE: wait_time = 30 # 30 seconds for NVRAM drives logger.info(f"Waiting {wait_time} seconds for NVRAM firmware activation and stabilization...") add_message(f"Waiting {wait_time} seconds for NVRAM firmware activation...") time.sleep(wait_time) else: logger.info("Waiting 5 seconds for firmware activation...") add_message("Waiting 5 seconds for firmware activation...") time.sleep(5) # Step 6: Verify drive responsiveness (especially important for NVRAM) # After fw-activate, the NVMe controller may reset and the device path can # change or temporarily disappear. Re-discover the device by serial number. if NVRAM_MODE: logger.info("Re-discovering drive device path after firmware activation...") add_message("Re-discovering drive device path after firmware activation...") max_rediscovery_attempts = 5 rediscovery_wait = 15 new_nvme_device = None for attempt in range(1, max_rediscovery_attempts + 1): new_nvme_device = find_drive_nvme_device(dnode_ip, drive_serial) if new_nvme_device: if new_nvme_device != nvme_device: logger.info(f"Drive {drive_serial} re-enumerated at {new_nvme_device} (was {nvme_device})") break logger.warning(f"Drive {drive_serial} not yet visible (attempt {attempt}/{max_rediscovery_attempts}), waiting {rediscovery_wait}s...") add_message(f"Drive not yet visible (attempt {attempt}/{max_rediscovery_attempts}), waiting {rediscovery_wait}s...") if attempt < max_rediscovery_attempts: time.sleep(rediscovery_wait) if not new_nvme_device: logger.error(f"Drive {drive_serial} did not re-appear after firmware activation ({max_rediscovery_attempts} attempts)") add_message(f"ERROR: Drive {drive_serial} did not re-appear after firmware activation") return False nvme_device = new_nvme_device drive_info['nvme_device'] = nvme_device logger.info(f"Verifying drive responsiveness at {nvme_device}...") add_message(f"Verifying drive responsiveness at {nvme_device}...") try: # Check that drive is still accessible health_check_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'sudo nvme id-ctrl {nvme_device}'" health_result = subprocess.run(health_check_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=30) if health_result.returncode != 0: logger.error(f"Drive {drive_serial} not responsive at {nvme_device} after firmware upgrade: {health_result.stderr}") add_message(f"ERROR: Drive {drive_serial} not responsive after firmware upgrade") return False logger.info("Drive verified responsive after firmware upgrade") add_message("Drive responsive - health check passed") except subprocess.TimeoutExpired: logger.error(f"Drive {drive_serial} health check timed out after firmware upgrade") add_message(f"ERROR: Drive {drive_serial} health check timed out") return False except Exception as e: logger.error(f"Failed to verify drive responsiveness: {e}") add_message(f"ERROR: Failed to verify drive responsiveness: {str(e)}") return False # Step 7: Verify firmware version (skip in ebox flow - FW version only updates after power cycle) if skip_fw_verify: logger.info(f"Skipping post-upgrade FW verification for drive {drive_serial} (will verify after ebox power cycle)") add_message(f"Skipping FW verification (will verify after power cycle)") else: fw_check_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'sudo nvme id-ctrl {nvme_device} | grep fr'" logger.debug(f"Checking firmware version: {fw_check_cmd}") check_result = subprocess.run(fw_check_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if check_result.returncode == 0: logger.info(f"Firmware version check: {check_result.stdout}") add_message(f"Firmware version: {check_result.stdout.strip()}") found_version = check_result.stdout.strip().splitlines()[0].split()[-1] drive_info['fw_revision'] = found_version if target_fw_version and found_version.upper() != target_fw_version.strip().upper(): if NVRAM_MODE: logger.error(f"Firmware update failed: - Firmware version after activate is {check_result.stdout.strip()}, Target Version {target_fw_version}") add_message(f"ERROR: Firmware update failed: - Firmware version after activate is {check_result.stdout.strip()}, Target Version {target_fw_version}") return False else: logger.info(f"SSD firmware still shows {found_version} (expected {target_fw_version}) -- " f"this is normal for action=1 commits; FW activates after power cycle") add_message(f"SSD FW pending activation after power cycle (current: {found_version}, target: {target_fw_version})") else: logger.error(f"Could not verify firmware version for drive {nvme_device} on {dnode_ip}: {check_result.stderr}") return False # Step 7: Restore admin timeout restore_timeout_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'echo 65 | sudo tee /sys/module/nvme_core/parameters/admin_timeout'" logger.debug(f"Restoring admin timeout: {restore_timeout_cmd}") result = subprocess.run(restore_timeout_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if result.returncode != 0: logger.warning(f"Failed to restore admin timeout: {result.stderr}") else: logger.info("Restored NVME admin timeout to 65 seconds") # Update dashboard state if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "FIRMWARE UPGRADED" # Step 8: Cleanup - remove firmware file from dnode cleanup_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'rm -f {dnode_fw_file}'" logger.debug(f"Cleaning up firmware file: {cleanup_cmd}") cleanup_result = subprocess.run(cleanup_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if cleanup_result.returncode != 0: logger.warning(f"Failed to cleanup firmware file: {cleanup_result.stderr}") else: logger.debug(f"Successfully cleaned up firmware file {dnode_fw_file} from dnode") logger.info(f"Firmware upgrade completed successfully for drive {drive_serial}") add_message(f"✅ Firmware upgrade completed for drive {drive_serial}") return True except Exception as e: logger.exception(f"Error upgrading firmware for drive {drive_serial}: {e}") add_message(f"ERROR: Firmware upgrade failed for drive {drive_serial}: {str(e)}") return False def recover_failed_drive(commander: Commander, drive_guid: str, dnode_ip: str, slot: int, drive_info: Dict[str, Any], dry_run: bool = False) -> bool: """ Attempt to recover a failed drive after firmware upgrade. This follows the recovery procedure from the support guide: 0. (NVRAM only) PCI link reset via expose_drives.py: disable -> enable -> power x2 -> PERST x2 1. Power cycle the drive via nvme_cli slot_power 2. Wait for drive to become active 3. Wait for NVRAM rebalance 4. Verify no new failures Args: commander: Commander object for interfacing with Vast system drive_guid: GUID of the failed drive dnode_ip: IP address of the dnode where the drive is connected slot: Slot number of the drive drive_info: Drive information dictionary dry_run: If True, only log actions without executing them Returns: True if recovery successful, False otherwise """ logger.info(f"=" * 80) logger.info(f"Attempting to recover failed drive {drive_guid} ({drive_info.get('drive_serial', 'Unknown')})") logger.info(f"=" * 80) add_message(f"Attempting to recover failed drive {drive_guid}...") try: # Step 0 (NVRAM only): Reset PCI link before power cycling if NVRAM_MODE and PCI_LINK_RESET_ENABLED: chassis_type = drive_info.get("chassis_type", "UNKNOWN") pci_ok = pci_link_reset_slot(dnode_ip, slot, dry_run=dry_run, chassis_type=str(chassis_type)) if not pci_ok: logger.warning( "PCI link reset failed for slot %d on %s — proceeding with power cycle anyway", slot, dnode_ip, ) add_message(f"WARNING: PCI link reset failed for slot {slot} - proceeding with power cycle") if dry_run: logger.info("[DRY RUN] Would attempt drive recovery (steps 1-5)") return True # Step 1: Power cycle the drive to reset it logger.info("Step 1: Power cycling drive to reset...") add_message("Step 1: Power cycling drive to reset...") chassis_info = { "chassis_type": drive_info.get("chassis_type", "UNKNOWN"), "bus_master_dnode_ip": drive_info.get("bus_master_dnode_ip") } if not power_cycle_drive(dnode_ip, slot, dry_run, chassis_info, wait_time=30): logger.error("Failed to power cycle drive during recovery") add_message("ERROR: Failed to power cycle drive during recovery") return False logger.info("Power cycle completed, waiting for drive to stabilize...") add_message("Power cycle completed, waiting for drive to stabilize...") time.sleep(10) # Give drive time to stabilize # Step 2: Activate the drive logger.info("Step 2: Activating drive...") add_message("Step 2: Activating drive...") if not activate_drive(commander, drive_guid, dnode_ip, slot, dry_run, drive_info, wait_time=30): logger.error("Failed to activate drive during recovery") add_message("ERROR: Failed to activate drive during recovery") return False logger.info("Drive activated successfully") add_message("Drive activated successfully") # Step 3: Wait for NVRAM rebalance (critical for NVRAM drives) if NVRAM_MODE: logger.info("Step 3: Waiting for NVRAM rebalance after recovery...") add_message("Step 3: Waiting for NVRAM rebalance...") if not wait_for_nvram_rebalance(commander, timeout_minutes=15): logger.error("NVRAM rebalance failed after recovery") add_message("ERROR: NVRAM rebalance failed after recovery") return False logger.info("NVRAM rebalance completed successfully") add_message("NVRAM rebalance completed successfully") # Step 4: Verify drive is healthy and no new failures logger.info("Step 4: Verifying drive health...") add_message("Step 4: Verifying drive health...") # Check drive state try: drives = commander.list_objects(DRIVE_TYPE) for drive in drives: if str(drive.base_proto.guid) == drive_guid: state_str = get_enum_name(drive.device_proto.state) if state_str.upper() != "ACTIVE": logger.error(f"Drive recovered but state is {state_str}, not ACTIVE") add_message(f"ERROR: Drive state is {state_str}, not ACTIVE") return False # Check for failure reason if hasattr(drive, 'device_proto') and getattr(drive.device_proto, 'fail_reason', None): fail_reason = get_enum_name(drive.device_proto.fail_reason) if fail_reason and fail_reason.upper() != "NONE": logger.error(f"Drive has failure reason: {fail_reason}") add_message(f"ERROR: Drive has failure reason: {fail_reason}") return False logger.info(f"Drive is ACTIVE with no failure reasons") add_message(f"✅ Drive recovered successfully - state: ACTIVE") break except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.error(f"Error checking drive state after recovery: {e}") add_message(f"ERROR: Failed to verify drive state: {str(e)}") return False # Step 5: Check RAID health logger.info("Step 5: Checking RAID health...") add_message("Step 5: Checking RAID health...") try: raids = commander.raid_status() nvram_state = get_enum_name(raids.nvram_state.state) if "HEALTHY" not in nvram_state.upper(): logger.error(f"NVRAM RAID state is {nvram_state} after recovery - recovery incomplete") add_message(f"ERROR: NVRAM RAID still unhealthy after recovery: {nvram_state}") return False else: logger.info(f"RAID health check passed - NVRAM state: {nvram_state}") add_message(f"RAID health check passed") except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.error(f"Error checking RAID health after recovery: {e}") add_message(f"ERROR: Could not verify RAID health: {str(e)}") return False logger.info(f"=" * 80) logger.info(f"Drive {drive_guid} recovered successfully") logger.info(f"=" * 80) add_message(f"✅ Drive {drive_guid} recovered successfully!") return True except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error during drive recovery: {e}") add_message(f"ERROR during drive recovery: {str(e)}") return False def wait_for_nvram_rebalance(commander: Commander, timeout_minutes: int = 15) -> bool: """ Wait for NVRAM rebalance to complete after firmware upgrade. This is critical for NVRAM drives - after firmware upgrade, the NVRAM subsystem needs time to rebalance before the drive can be safely brought back online. Args: commander: Commander object for interfacing with Vast system timeout_minutes: Maximum time to wait in minutes (default: 15) Returns: True if rebalance completed or not needed, False if timeout or error """ if not NVRAM_MODE: # Not in NVRAM mode, no need to wait return True logger.info(f"Waiting for NVRAM rebalance to complete (timeout: {timeout_minutes} minutes)...") add_message(f"Waiting for NVRAM rebalance (up to {timeout_minutes} min)...") start_time = time.time() timeout_seconds = timeout_minutes * 60 check_interval = 30 # Check every 30 seconds try: while True: elapsed = time.time() - start_time if elapsed >= timeout_seconds: logger.error(f"NVRAM rebalance wait timed out after {timeout_minutes} minutes") add_message(f"ERROR: NVRAM rebalance wait timed out after {timeout_minutes} minutes") return False # Check RAID health - if NVRAM is HEALTHY, rebalance is likely complete try: raids = commander.raid_status() nvram_state = get_enum_name(raids.nvram_state.state) if "HEALTHY" in nvram_state.upper(): elapsed_min = int(elapsed / 60) logger.info(f"NVRAM rebalance complete - NVRAM state is {nvram_state} (waited {elapsed_min} minutes)") add_message(f"NVRAM rebalance complete (waited {elapsed_min} min)") return True else: elapsed_min = int(elapsed / 60) remaining_min = timeout_minutes - elapsed_min logger.info(f"NVRAM state: {nvram_state}, waiting for rebalance... ({elapsed_min}/{timeout_minutes} min, {remaining_min} min remaining)") add_message(f"NVRAM rebalancing... ({elapsed_min}/{timeout_minutes} min)") except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.warning(f"Error checking NVRAM state during rebalance wait: {e}") # Wait before next check time.sleep(check_interval) except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.error(f"Error during NVRAM rebalance wait: {e}") add_message(f"ERROR during NVRAM rebalance wait: {str(e)}") return False def activate_drive(commander: Commander, drive_guid: str, dnode_ip: str = None, slot: int = None, dry_run: bool = False, drive_info: Dict[str, Any] = None, wait_time: int = POWER_CYCLE_WAIT_TIME) -> bool: """ Activate a drive. Args: commander: Commander object for interfacing with Vast system drive_guid: GUID of the drive to activate dnode_ip: IP of the dnode (needed if power cycling is required) slot: Slot number of the drive (needed if power cycling is required) dry_run: If True, only log actions without executing them drive_info: Additional drive information including chassis type Returns: True if successful, False otherwise """ try: logger.info(f"Activating drive {drive_guid}") add_message(f"Activating drive {drive_guid}") # Update dashboard drive state with dashboard_lock: status_data["drive_state"][drive_guid] = "ACTIVATING" if dry_run: logger.info(f"[DRY RUN] Would activate drive {drive_guid}") return True # Check if the drive is attached to at least one dnode # Build chassis_info once for use in power cycling chassis_info = None if dnode_ip and slot: # Use drive_info if provided, otherwise create a basic chassis_info if drive_info: chassis_info = { "chassis_type": drive_info.get("chassis_type", "UNKNOWN"), "bus_master_dnode_ip": drive_info.get("bus_master_dnode_ip") } else: # Default to CERES if no drive_info is provided chassis_info = { "chassis_type": "UNKNOWN", "bus_master_dnode_ip": None } if not check_drive_attached(commander, drive_guid): if dnode_ip is not None and slot is not None: # Check if the drive is attached to dnode, if not, try power cycle again logger.warning(f"Drive {drive_guid} is not attached to any dnode, attempting to power cycle again") add_message(f"WARNING: Drive {drive_guid} is not attached, trying to power cycle again") if not power_cycle_drive(dnode_ip, slot, dry_run, chassis_info, wait_time): logger.error(f"Failed to power cycle drive {drive_guid} again") add_message(f"ERROR: Failed to power cycle drive {drive_guid} again") return False # Check if the drive is now attached if not check_drive_attached(commander, drive_guid): logger.error(f"Drive {drive_guid} is still not attached after second power cycle, giving up") add_message(f"ERROR: Drive {drive_guid} is still not attached after second power cycle") return False else: logger.error(f"Drive {drive_guid} is not attached to any dnode and cannot be power cycled (missing dnode_ip or slot)") add_message(f"ERROR: Drive {drive_guid} is not attached and missing dnode_ip/slot info") return False # Convert string GUID to GUID object try: guid_obj = GUID(drive_guid) except Exception as e: logger.error(f"Error converting GUID string to GUID object: {e}") add_message(f"ERROR: Failed to convert GUID: {str(e)}") return False # Retry logic for NOT_FOUND errors - drives may need time to reappear after FW upgrade max_retries = 5 retry_wait_seconds = 30 for attempt in range(max_retries): try: result = commander.drive_modify(guid_obj, enabled=True) if result.code == DeviceModifyResultCode.SUCCESS: break # Success, exit retry loop elif result.code == DeviceModifyResultCode.NOT_FOUND: if attempt < max_retries - 1: logger.warning(f"Drive {drive_guid} returned NOT_FOUND (attempt {attempt + 1}/{max_retries}), " f"waiting {retry_wait_seconds}s for drive to reappear...") add_message(f"⏳ Drive NOT_FOUND, waiting {retry_wait_seconds}s before retry ({attempt + 1}/{max_retries})...") time.sleep(retry_wait_seconds) # Try power cycling again if we have the info if dnode_ip is not None and slot is not None and attempt >= 1: logger.info(f"Attempting power cycle before retry {attempt + 2}") power_cycle_drive(dnode_ip, slot, dry_run, chassis_info, wait_time) time.sleep(10) # Additional wait after power cycle continue else: logger.error(f"Failed to activate drive {drive_guid}: {result.code} after {max_retries} attempts") add_message(f"ERROR: Failed to activate drive: {result.code} after {max_retries} retries") return False else: logger.error(f"Failed to activate drive {drive_guid}: {result.code}") add_message(f"ERROR: Failed to activate drive: {result.code}") return False except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) err_str = str(e) if "NOT_ATTACHED" in err_str or "NOT_FOUND" in err_str: if attempt < max_retries - 1: logger.warning(f"Drive {drive_guid} returned error (attempt {attempt + 1}/{max_retries}): {e}, " f"waiting {retry_wait_seconds}s for drive to reappear...") add_message(f"⏳ Drive error, waiting {retry_wait_seconds}s before retry ({attempt + 1}/{max_retries})...") time.sleep(retry_wait_seconds) if dnode_ip is not None and slot is not None and attempt >= 1: logger.info(f"Attempting power cycle before retry {attempt + 2}") power_cycle_drive(dnode_ip, slot, dry_run, chassis_info, wait_time) time.sleep(10) continue else: logger.error(f"Drive {drive_guid} error after {max_retries} attempts: {e}") add_message(f"ERROR: Drive {drive_guid} error after {max_retries} retries") return False else: raise logger.info(f"Drive {drive_guid} activated successfully") add_message(f"Successfully activated drive {drive_guid}") # Update dashboard drive state with dashboard_lock: status_data["drive_state"][drive_guid] = "ACTIVE" # Wait for drive to become ACTIVE if not wait_for_drive_active(commander, drive_guid, dry_run=dry_run): logger.error(f"Drive {drive_guid} failed to become ACTIVE after activation") add_message(f"ERROR: Drive {drive_guid} failed to become ACTIVE after activation") return False return True except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error activating drive: {e}") add_message(f"ERROR: Failed to activate drive: {str(e)}") return False def power_cycle_drive(dnode_ip: str, slot: int, dry_run: bool = False, chassis_info: Dict[str, Any] = None, wait_time: int = POWER_CYCLE_WAIT_TIME) -> bool: """ Power cycle a drive slot. Args: dnode_ip: IP of the dnode where the drive is connected slot: Slot number of the drive dry_run: If True, only log actions without executing them chassis_info: Dictionary with chassis information including type and bus master IP Returns: True if successful, False otherwise """ try: # Determine which dnode should be used for power cycling control_dnode_ip = dnode_ip chassis_type = "CERES" # Default to CERES if not specified if chassis_info: chassis_type = chassis_info.get("chassis_type", "UNKNOWN").upper() # For SANMINA chassis, use the bus master if available if "SANMINA" in chassis_type and chassis_info.get("bus_master_dnode_ip"): control_dnode_ip = chassis_info["bus_master_dnode_ip"] logger.info(f"Using SANMINA bus master {control_dnode_ip} for power cycling slot {slot}") logger.info(f"Power cycling drive at dnode {control_dnode_ip}, slot {slot} (chassis type: {chassis_type})") # Update the drive state if we have a current_drive in the state state = load_state() current_drive_guid = state.get("current_drive") if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "POWER CYCLING" if dry_run: logger.info(f"[DRY RUN] Would power cycle drive at dnode {control_dnode_ip}, slot {slot}") logger.info(f"[DRY RUN] Would execute: 'ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{control_dnode_ip} '{NVME_CLI_COMMAND} slot_power {slot} OFF ; sleep {wait_time} ; {NVME_CLI_COMMAND} slot_power {slot} ON; sleep {wait_time*2}'") return True # Check the current power status check_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{control_dnode_ip} '{NVME_CLI_COMMAND} slot_power {slot} status'" logger.debug(f"Running command: {check_cmd}") status_result = subprocess.run(check_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if status_result.returncode != 0: logger.error(f"Failed to check power status: {status_result.stderr}") return False logger.debug(f"Power status before cycle: {status_result.stdout}") # Execute power off command off_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{control_dnode_ip} '{NVME_CLI_COMMAND} slot_power {slot} OFF'" logger.debug(f"Running command: {off_cmd}") off_result = subprocess.run(off_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if off_result.returncode != 0: logger.error(f"Failed to power off drive: {off_result.stderr}") return False # Update the dashboard state to show power off if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "POWERED OFF" logger.debug(f"Power off result: {off_result.stdout}") logger.info(f"Drive at slot {slot} powered off, waiting {wait_time} seconds before power on...") add_message(f"Drive at slot {slot} powered off, waiting {wait_time} seconds...") # Sleep between power off and power on time.sleep(wait_time) # Execute power on command on_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{control_dnode_ip} '{NVME_CLI_COMMAND} slot_power {slot} ON'" logger.debug(f"Running command: {on_cmd}") on_result = subprocess.run(on_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if on_result.returncode != 0: logger.error(f"Failed to power on drive: {on_result.stderr}") return False # Update the dashboard state to show power on if current_drive_guid: with dashboard_lock: status_data["drive_state"][current_drive_guid] = "POWERED ON" logger.debug(f"Power on result: {on_result.stdout}") logger.info(f"Drive at slot {slot} powered on, waiting {wait_time*2} seconds for initialization...") add_message(f"Drive at slot {slot} powered on, waiting {wait_time*2} seconds for initialization...") # Sleep after power on to allow drive to initialize time.sleep(wait_time*2) # Verify power status after cycle verify_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{control_dnode_ip} '{NVME_CLI_COMMAND} slot_power {slot} status'" logger.debug(f"Running command: {verify_cmd}") verify_result = subprocess.run(verify_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if verify_result.returncode != 0: logger.error(f"Failed to verify power status: {verify_result.stderr}") return False logger.debug(f"Power status after cycle: {verify_result.stdout}") logger.info(f"Successfully power cycled drive at dnode {control_dnode_ip}, slot {slot}") return True except Exception as e: logger.exception(f"Error power cycling drive: {e}") return False def check_leader_connection(commander: Commander) -> None: """Verify the leader connection is alive before starting a new drive operation. Raises ConnectionLostError if the connection is down. This should be called before deactivating a drive so we don't end up with a deactivated drive we can't reactivate. """ try: commander.raid_status() except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) raise ConnectionLostError(f"Leader health-check failed: {e}") from e def power_cycle_ebox(dnode_ip: str, dryrun: bool = False) -> bool: """ Power cycle an ebox by SSHing to it and running ipmitool power cycle. The command is sent to the local BMC which then power cycles the machine. Args: dnode_ip: IP address of the ebox's dnode to SSH into dryrun: If True, only log without executing Returns: True if the command was sent successfully """ logger.info(f"Dryrun {dryrun} | Power cycling ebox via dnode {dnode_ip}") add_message(f"Power cycling ebox via dnode {dnode_ip}") if dryrun: logger.info(f"[DRY RUN] Would SSH to {dnode_ip} and run 'sudo ipmitool power cycle'") return True cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no -o ConnectTimeout=10 {SSH_USER}@{dnode_ip} 'sudo ipmitool power cycle'" logger.info(f"Running: {cmd}") try: result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=30) if result.returncode == 0: logger.info(f"Successfully sent power cycle command to ebox at {dnode_ip}: {result.stdout.strip()}") add_message(f"Power cycle command sent to ebox at {dnode_ip}") return True else: logger.error(f"Failed to power cycle ebox at {dnode_ip}: {result.stderr}") add_message(f"ERROR: Failed to power cycle ebox at {dnode_ip}") return False except subprocess.TimeoutExpired: # The SSH connection may drop if the power cycle happens very quickly logger.warning(f"SSH command timed out for ebox at {dnode_ip} - this may be expected if the ebox rebooted quickly") add_message(f"Power cycle command sent to ebox at {dnode_ip} (SSH timed out, likely rebooting)") return True except Exception as e: logger.exception(f"Error power cycling ebox at {dnode_ip}: {e}") add_message(f"ERROR: Failed to power cycle ebox at {dnode_ip}: {e}") return False def wait_for_ebox_boot(dnode_ip: str, timeout_seconds: int = EBOX_POWER_CYCLE_WAIT_TIME, dryrun: bool = False) -> bool: """ Wait for an ebox to come back online after power cycle by polling SSH connectivity. Args: dnode_ip: IP address of the ebox's dnode timeout_seconds: Maximum wait time in seconds (default 20 minutes) dryrun: If True, skip waiting Returns: True if ebox is reachable via SSH within timeout """ if dryrun: logger.info(f"[DRY RUN] Would wait for ebox at {dnode_ip} to boot (timeout: {timeout_seconds}s)") return True start_time = time.time() initial_wait_seconds = 300 check_interval = 30 logger.info(f"Waiting for ebox at {dnode_ip} to boot (timeout: {timeout_seconds}s)...") add_message(f"Waiting for ebox at {dnode_ip} to boot (up to {timeout_seconds // 60} minutes)...") # Initial wait to allow the ebox to actually start rebooting logger.info(f"Waiting {initial_wait_seconds} seconds before starting SSH checks to allow reboot to begin...") time.sleep(initial_wait_seconds) logger.info(f"Initial wait complete, starting SSH connectivity checks every {check_interval}s...") while time.time() - start_time < timeout_seconds: elapsed = int(time.time() - start_time) remaining = timeout_seconds - elapsed try: cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no -o ConnectTimeout=10 -o BatchMode=yes {SSH_USER}@{dnode_ip} 'echo ok'" result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=20) if result.returncode == 0 and 'ok' in result.stdout: elapsed_min = elapsed // 60 logger.info(f"Ebox at {dnode_ip} is back online after {elapsed_min} minutes ({elapsed}s)") add_message(f"Ebox at {dnode_ip} is back online (waited {elapsed_min} min)") return True except Exception: pass logger.info(f"Ebox at {dnode_ip} not yet reachable via SSH, retrying in {check_interval}s... (elapsed: {elapsed}s, remaining: {remaining}s)") add_message(f"Waiting for ebox boot... ({remaining}s remaining)") time.sleep(check_interval) total_elapsed = int(time.time() - start_time) logger.error(f"Ebox at {dnode_ip} did not come back online within {timeout_seconds}s (waited {total_elapsed}s)") add_message(f"ERROR: Ebox at {dnode_ip} did not boot within {timeout_seconds // 60} minutes") return False def set_ebox_dnodes_enabled(commander: Commander, ebox: Dict[str, Any], enabled: bool, dryrun: bool = False) -> bool: """ Enable or disable all dnodes in an ebox via commander. When disabling (enabled=False), for each dnode: 1. commander.disconnect_dnode_fabric(GUID(dnode_guid)) 2. commander.dnode_modify(GUID(dnode_guid), enabled=False) When enabling (enabled=True), for each dnode: 1. commander.dnode_modify(GUID(dnode_guid), enabled=True) Args: commander: Commander object for Vast API ebox: Ebox dictionary from VMS API enabled: True to enable dnodes, False to disable dryrun: If True, only log without executing Returns: True if all dnodes were successfully modified """ action = "Enabling" if enabled else "Deactivating" ebox_name = ebox.get('name', 'unknown') dnode_guids = [] for dnode in ebox.get('dnode_containers', []): guid = dnode.get('guid', '') if guid: dnode_guids.append(guid) if not dnode_guids: logger.error(f"No dnode GUIDs found for ebox {ebox_name}") return False logger.info(f"Dryrun {dryrun} | {action} {len(dnode_guids)} dnodes for ebox {ebox_name}") for dnode_guid in dnode_guids: logger.info(f"{action} dnode {dnode_guid} on ebox {ebox_name}") add_message(f"{action} dnode {dnode_guid}") if dryrun: if enabled: logger.info(f"[DRY RUN] Would dnode_modify(enabled=True) for {dnode_guid}") else: logger.info(f"[DRY RUN] Would disconnect_dnode_fabric and dnode_modify(enabled=False) for {dnode_guid}") continue try: guid_obj = GUID(dnode_guid) if not enabled: logger.info(f"Disconnecting dnode fabric for {dnode_guid}") commander.disconnect_dnode_fabric(guid_obj) commander.dnode_modify(guid_obj, enabled=enabled) logger.info(f"Successfully {'enabled' if enabled else 'deactivated'} dnode {dnode_guid}") time.sleep(30) except Exception as e: logger.exception(f"Failed to {'enable' if enabled else 'deactivate'} dnode {dnode_guid}: {e}") add_message(f"ERROR: Failed to {'enable' if enabled else 'deactivate'} dnode {dnode_guid}: {e}") return False return True def wait_for_ebox_dnodes_inactive(vms_ip: str, ebox: Dict[str, Any], ebox_drive_guids: List[str] = None, commander: Commander = None, timeout_seconds: int = 600, dryrun: bool = False) -> bool: """ Wait for all dnodes in an ebox to become INACTIVE via VMS API, and verify all relevant SSDs are in FAILED state via commander. Args: vms_ip: VMS management IP address ebox: Ebox dictionary from VMS API ebox_drive_guids: List of drive GUIDs belonging to this ebox to verify FAILED state commander: Commander object for checking drive states (required if ebox_drive_guids provided) timeout_seconds: Maximum wait time in seconds (default 10 minutes) dryrun: If True, skip waiting Returns: True if all dnodes are INACTIVE and all SSDs are FAILED within timeout """ ebox_name = ebox.get('name', 'unknown') ebox_id = ebox.get('id', 0) if dryrun: logger.info(f"[DRY RUN] Would wait for dnodes on ebox {ebox_name} to become INACTIVE and SSDs to become FAILED") return True logger.info(f"Waiting for dnodes on ebox {ebox_name} to become INACTIVE (timeout: {timeout_seconds}s)...") add_message(f"Waiting for ebox {ebox_name} dnodes to become INACTIVE...") check_interval = 15 start_time = time.time() dnodes_inactive = False while time.time() - start_time < timeout_seconds: # Check dnode states via VMS API if not dnodes_inactive: ebox_data = vms_api_request("GET", f"eboxes/{ebox_id}/", vms_ip) if ebox_data is None: logger.warning(f"Failed to fetch ebox {ebox_name} state, retrying...") time.sleep(check_interval) continue all_inactive = True for dnode in ebox_data.get('dnode_containers', []): dnode_state = dnode.get('state', 'UNKNOWN') if dnode_state != 'INACTIVE': all_inactive = False break if all_inactive: elapsed = int(time.time() - start_time) logger.info(f"All dnodes on ebox {ebox_name} are INACTIVE (waited {elapsed}s)") add_message(f"All dnodes on ebox {ebox_name} are INACTIVE") dnodes_inactive = True # If dnodes are inactive, verify SSD FAILED state via commander if dnodes_inactive: if not ebox_drive_guids or not commander: return True try: drives = list(commander.list_objects(DRIVE_TYPE)) all_failed = True for drive in drives: drive_guid = str(drive.base_proto.guid) if drive_guid in ebox_drive_guids: state_str = str(drive.device_proto.state) if "FAILED" not in state_str.upper() and "INACTIVE" not in state_str.upper(): all_failed = False logger.info(f"Drive {drive_guid} still in state {state_str}, waiting for FAILED...") break if all_failed: elapsed = int(time.time() - start_time) logger.info(f"All SSDs on ebox {ebox_name} are FAILED/INACTIVE (waited {elapsed}s)") add_message(f"All SSDs on ebox {ebox_name} confirmed FAILED") return True except Exception as e: logger.warning(f"Error checking drive states: {e}, retrying...") remaining = timeout_seconds - int(time.time() - start_time) status = "dnodes INACTIVE, waiting for SSDs to FAIL" if dnodes_inactive else "waiting for dnodes to go INACTIVE" logger.info(f"Ebox {ebox_name}: {status}... ({remaining}s remaining)") add_message(f"Ebox {ebox_name}: {status}... ({remaining}s remaining)") time.sleep(check_interval) if not dnodes_inactive: logger.error(f"Dnodes on ebox {ebox_name} did not all become INACTIVE within {timeout_seconds}s") add_message(f"ERROR: Dnodes on ebox {ebox_name} did not become INACTIVE in time") else: logger.error(f"SSDs on ebox {ebox_name} did not all become FAILED within {timeout_seconds}s") add_message(f"ERROR: SSDs on ebox {ebox_name} did not become FAILED in time") return False def scan_physical_nvme_drives(dnode_ip: str, fw_upgrade_model: str) -> List[Dict[str, str]]: """ Scan all physical NVMe devices on a dnode and return those matching the target model. Uses nvme id-ctrl to get serial, model, and firmware for each device. Args: dnode_ip: IP address of the dnode fw_upgrade_model: Drive model to filter by Returns: List of dicts with keys: serial, model, firmware, device """ matching_drives = [] remote_script = ( "for dev in /dev/nvme*; do " "if [[ $dev =~ /dev/nvme[0-9]+$ ]]; then " "echo DEVICE:$dev; " "sudo nvme id-ctrl $dev 2>/dev/null | grep -E 'sn|mn|fr' || echo FAILED; " "fi; done" ) scan_cmd = [ "ssh", "-i", SSH_KEY_PATH, "-o", "StrictHostKeyChecking=no", f"{SSH_USER}@{dnode_ip}", remote_script ] try: result = subprocess.run(scan_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=120) if result.returncode != 0 or not result.stdout.strip(): logger.warning(f"Could not scan physical NVMe devices on {dnode_ip}: returncode={result.returncode}, " f"stderr={result.stderr.strip()}") return matching_drives logger.debug(f"Physical scan raw output from {dnode_ip} ({len(result.stdout)} bytes):\n{result.stdout[:2000]}") current_device = None current_sn = None current_mn = None current_fr = None def process_device(): if current_device and current_sn and current_mn and current_fr: if fw_upgrade_model in current_mn.upper(): matching_drives.append({ 'serial': current_sn, 'model': current_mn, 'firmware': current_fr, 'device': current_device }) for line in result.stdout.splitlines(): line = line.strip() if line.startswith("DEVICE:"): process_device() current_device = line.split("DEVICE:", 1)[1] current_sn = None current_mn = None current_fr = None elif ":" in line: field_name = line.split(":")[0].strip() field_value = line.split(":", 1)[1].strip() if field_name == "sn": current_sn = field_value elif field_name == "mn": current_mn = field_value elif field_name == "fr": current_fr = field_value process_device() # Process last device except Exception as e: logger.warning(f"Error scanning physical NVMe devices on {dnode_ip}: {e}") logger.info(f"Physical scan on {dnode_ip}: found {len(matching_drives)} drives matching {fw_upgrade_model}") return matching_drives def check_ebox_drives_need_upgrade(dnode_ip: str, drives: List[Dict[str, Any]], target_fw_version: str, fw_upgrade_model: str) -> List[Dict[str, Any]]: """ Check which drives on an ebox need firmware upgrade by comparing current FW version to target. This is a read-only operation (SSH + nvme id-ctrl) so it always runs, even in dry-run mode. Args: dnode_ip: IP address of the ebox's dnode drives: List of drive info dicts for this ebox target_fw_version: Target firmware version to compare against fw_upgrade_model: Drive model to filter by Returns: List of drive info dicts that need firmware upgrade """ drives_needing_upgrade = [] for drive_info in drives: if fw_upgrade_model not in drive_info.get('drive_model', ''): continue drive_serial = drive_info.get('drive_serial', '') nvme_device = find_drive_nvme_device(dnode_ip, drive_serial) if not nvme_device: logger.warning(f"Could not find NVMe device for drive {drive_serial} on {dnode_ip}, including in upgrade list") drives_needing_upgrade.append(drive_info) continue # Check current firmware version fw_check_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'sudo nvme id-ctrl {nvme_device} | grep fr'" try: result = subprocess.run(fw_check_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=30) if result.returncode == 0 and result.stdout.strip(): current_fw = result.stdout.strip().splitlines()[0].split()[-1] drive_info['fw_revision'] = current_fw if current_fw.upper() == target_fw_version.strip().upper(): logger.info(f"Drive {drive_serial} already at target FW version {target_fw_version}, skipping") continue else: logger.info(f"Drive {drive_serial} FW version {current_fw} != target {target_fw_version}, needs upgrade") drives_needing_upgrade.append(drive_info) else: logger.warning(f"Could not read FW version for drive {drive_serial}, including in upgrade list") drives_needing_upgrade.append(drive_info) except Exception as e: logger.warning(f"Error checking FW version for drive {drive_serial}: {e}, including in upgrade list") drives_needing_upgrade.append(drive_info) # Also scan all physical NVMe devices to catch drives not in the API mapping known_serials = {d.get('drive_serial', '') for d in drives} physical_drives = scan_physical_nvme_drives(dnode_ip, fw_upgrade_model) for phys_drive in physical_drives: serial = phys_drive['serial'] if serial in known_serials: continue firmware = phys_drive['firmware'] device = phys_drive['device'] if firmware.strip().upper() != target_fw_version.strip().upper(): logger.warning( f"Unmapped drive {serial} at {device} on {dnode_ip} has FW {firmware} != target {target_fw_version}. " f"This drive is not in the API mapping but needs upgrade." ) drives_needing_upgrade.append({ 'drive_serial': serial, 'drive_model': phys_drive['model'], 'fw_revision': firmware, 'dnode_ip': dnode_ip, 'unmapped': True }) else: logger.info(f"Unmapped drive {serial} at {device} on {dnode_ip} already at target FW {target_fw_version}") return drives_needing_upgrade def verify_ebox_drives_upgraded(dnode_ip: str, drives: List[Dict[str, Any]], target_fw_version: str, fw_upgrade_model: str) -> Tuple[bool, List[str]]: """ Verify that all drives on an ebox have been upgraded to the target firmware version. Performs two verification passes: 1. Checks all drives from the API mapping that were scheduled for upgrade 2. Scans ALL physical NVMe devices on the dnode to catch drives not in the API mapping Args: dnode_ip: IP address of the ebox's dnode drives: List of drive info dicts that were upgraded target_fw_version: Expected firmware version fw_upgrade_model: Drive model to check Returns: Tuple of (all_upgraded: bool, failed_serials: list of serials that failed verification) """ failed_serials = [] verified_serials = set() # Pass 1: Verify drives from the API mapping for drive_info in drives: if fw_upgrade_model not in drive_info.get('drive_model', ''): continue drive_serial = drive_info.get('drive_serial', '') nvme_device = find_drive_nvme_device(dnode_ip, drive_serial) if not nvme_device: logger.warning(f"Could not find NVMe device for drive {drive_serial} on {dnode_ip} during verification") failed_serials.append(drive_serial) continue fw_check_cmd = f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no {SSH_USER}@{dnode_ip} 'sudo nvme id-ctrl {nvme_device} | grep fr'" try: result = subprocess.run(fw_check_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=30) if result.returncode == 0 and result.stdout.strip(): current_fw = result.stdout.strip().splitlines()[0].split()[-1] drive_info['fw_revision'] = current_fw if current_fw.upper() != target_fw_version.strip().upper(): logger.error(f"Drive {drive_serial} FW version {current_fw} != target {target_fw_version}") failed_serials.append(drive_serial) else: logger.info(f"Drive {drive_serial} verified at target FW version {target_fw_version}") verified_serials.add(drive_serial) else: logger.warning(f"Could not verify FW version for drive {drive_serial}") failed_serials.append(drive_serial) except Exception as e: logger.warning(f"Error verifying FW version for drive {drive_serial}: {e}") failed_serials.append(drive_serial) # Pass 2: Scan ALL physical NVMe devices on the dnode to catch drives not in the API mapping logger.info(f"Scanning all physical NVMe devices on {dnode_ip} for unmapped drives matching {fw_upgrade_model}...") physical_drives = scan_physical_nvme_drives(dnode_ip, fw_upgrade_model) for phys_drive in physical_drives: serial = phys_drive['serial'] if serial in verified_serials: continue firmware = phys_drive['firmware'] device = phys_drive['device'] if firmware.strip().upper() != target_fw_version.strip().upper(): logger.error( f"Unmapped drive {serial} at {device} on {dnode_ip} has FW {firmware} " f"!= target {target_fw_version} (model: {phys_drive['model']}). " f"This drive was not in the API mapping and was missed during upgrade." ) failed_serials.append(serial) else: logger.info(f"Unmapped drive {serial} at {device} already at target FW {target_fw_version}") all_upgraded = len(failed_serials) == 0 return all_upgraded, failed_serials def check_cluster_state(commander: Commander, force: bool = False, skip_denylist: bool = False) -> bool: """ Check if the cluster state is ONLINE and if any new denylist items have appeared. skip_denylist: If True, skip all denylist checks Args: commander: Commander object for interfacing with Vast system force: If True, skip denylist check Returns: True if cluster state is ONLINE and no new denylist items, False otherwise """ global status_data try: # Get the cluster object - there's only one system object cluster_objects = list(commander.list_objects(TypeId.SystemType)) if cluster_objects: system_guid = cluster_objects[0].base_proto.guid # Now use get_object for more efficient access cluster = commander.get_object(TypeId.SystemType, system_guid) cluster_state = cluster.state_info.state.name else: raise Exception("No system object found") # Get previous cluster state to detect changes previous_state = None with dashboard_lock: if "cluster_state" in status_data: previous_state = status_data["cluster_state"] # Check if cluster state has changed if previous_state and previous_state != cluster_state: change_msg = f"Cluster state changed: {previous_state} -> {cluster_state}" logger.warning(change_msg) add_message(change_msg) # Force dashboard update to show new cluster state try: message_queue.put_nowait(("update", None)) except queue.Full: pass logger.info(f"Cluster state: {cluster_state}") # Update dashboard with lock for thread safety with dashboard_lock: status_data["cluster_state"] = cluster_state # Denylist check (skip if skip_denylist) if not skip_denylist: # Load denylist snapshot from state state = load_state() initial_denylist = state.get("denylist_snapshot", None) if initial_denylist is not None: current_denylist = fetch_denylist_snapshot(commander) changes = compare_denylist_snapshots(initial_denylist, current_denylist) if changes: logger.error(f"New denylist items detected: {changes}") add_message(f"CRITICAL ERROR: New denylist items detected! Aborting.") add_message(json.dumps(changes, indent=2)) print_dashboard(0, 0, 0, None, [], full_redraw=True) sys.exit(2) if cluster_state == "ONLINE": add_message(f"Cluster state verified: {cluster_state}") return True else: error_msg = f"Cluster is not ONLINE. Current state: {cluster_state}" logger.error(error_msg) add_message(f"CRITICAL ERROR: {error_msg}") return False except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error checking cluster state: {e}") add_message(f"Error checking cluster state: {str(e)}") return False def wait_for_raid_health(commander: Commander, timeout: int = RAID_HEALTH_CHECK_TIMEOUT) -> bool: """ Wait for RAID to become healthy. Args: commander: Commander object for interfacing with Vast system timeout: Maximum time to wait in seconds Returns: True if RAID becomes healthy within timeout, False otherwise """ logger.info(f"Waiting for RAID to become healthy (timeout: {timeout}s)...") start_time = time.time() # Seed previous states from dashboard so we only log genuine transitions with dashboard_lock: if "raid_state" in status_data: previous_states = status_data["raid_state"].copy() else: previous_states = {} while time.time() - start_time < timeout: # Get current RAID status try: raids = commander.raid_status() # Extract state objects and convert them to strings, cleaning up state names current_states = { "ssd": get_enum_name(raids.ssd_state.state), "nvram": get_enum_name(raids.nvram_state.state), "memory": get_enum_name(raids.memory_state.state), "rio_nvram": get_enum_name(raids.rio_nvram_state.state) } # Update dashboard with current states with dashboard_lock: if "raid_state" not in status_data: status_data["raid_state"] = {} status_data["raid_state"].update(current_states) # Log state: summary on first check, per-component only on genuine changes if not previous_states: states_str = ", ".join(f"{comp}: {st}" for comp, st in current_states.items()) logger.info(f"RAID status - {states_str}") else: for component, state in current_states.items(): prev = previous_states.get(component) if prev and prev != state: logger.info(f"RAID {component} state changed: {prev} -> {state}") add_message(f"RAID {component} state changed: {prev} -> {state}") if "REBUILD" in state.upper(): logger.warning(f"RAID {component} is now REBUILDING") # Force dashboard update to show new RAID state try: message_queue.put_nowait(("update", None)) except queue.Full: pass # Update previous states previous_states = current_states.copy() # Check if all components are healthy all_healthy = all("HEALTHY" in state.upper() for state in current_states.values()) if all_healthy: logger.info("RAID is now healthy") add_message("RAID health check: All components HEALTHY") return True # Log current status states_str = ", ".join([f"{comp}: {state}" for comp, state in current_states.items()]) elapsed = time.time() - start_time remaining = timeout - elapsed logger.info(f"RAID not yet healthy: {states_str}. Elapsed: {int(elapsed)}s, Remaining: {int(remaining)}s") add_message(f"RAID not yet healthy: {states_str}. Remaining: {int(remaining)}s") except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.exception(f"Error checking RAID status: {e}") # Sleep before next check time.sleep(RAID_HEALTH_CHECK_INTERVAL*4 if IS_EBOX else RAID_HEALTH_CHECK_INTERVAL) logger.error(f"Timed out waiting for RAID to become healthy after {timeout}s") return False # Global variables for dashboard status status_data = { "messages": [], "last_message": "", "last_updated": "", "cluster_state": "Unknown", "cluster_name": "Unknown", "cluster_psnt": "N/A", "raid_state": { "ssd": "UNKNOWN", "nvram": "UNKNOWN", "memory": "UNKNOWN", "rio_nvram": "UNKNOWN" }, "healthy_drives": 0, "total_drives": 0, "healthy_dnodes": 0, "total_dnodes": 0, "drive_state": {} # Tracks current state of drives being processed } # Cache for dbox bus masters to avoid repeated IPMI checks # Structure: {dbox_guid: {"master_dnode_ip": ip, "timestamp": time.time()}} dbox_bus_master_cache = {} # Flag to control dashboard updates pause_dashboard_updates = False # Flag to indicate graceful shutdown is in progress graceful_shutdown = False # Global variable to store initial denylist snapshot initial_denylist_snapshot = None # Utility to format timestamps (epoch or ns) to readable string def format_timestamp(ts): if isinstance(ts, list): # For lists, join formatted values return ', '.join([format_timestamp(x) for x in ts if x]) if not ts or ts == 0: return "--" # Try to detect if it's in ns or s try: ts_int = int(ts) if ts_int > 1e12: # nanoseconds dt = datetime.datetime.fromtimestamp(ts_int / 1e9) else: # seconds dt = datetime.datetime.fromtimestamp(ts_int) return dt.strftime('%Y-%m-%d %H:%M:%S') except Exception: return str(ts) # Write a master report of all drives and their mapping/timestamps def write_drive_report(mapping: dict, completed_drives: list, power_cycle_times: dict, firmware_upgrade_times: dict = None): report_file = f"/vast/log/{DRIVE_TITLE.lower()}_{OPERATION.lower().replace(' ','_')}_report.txt" with open(report_file, 'w') as f: f.write(f"{DRIVE_TITLE} {OPERATION} and Firmware Upgrade Master Report\n") f.write(f"Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") headers = [ "#", "Drive GUID", "Serial", "Model", "DBox", "DNode", "DNode IP", "Slot", "Chassis", "State", "Enabled", "Insertion Time", "State TS", "Attached TS", "Power Cycle TS", "FW Upgrade TS", "FW Revision", "Completed" ] f.write("\t".join(headers) + "\n") for idx, (guid, info) in enumerate(sorted(mapping.items()), 1): insertion_time = format_timestamp(info.get('insertion_time', '--')) state_timestamp = format_timestamp(info.get('state_timestamp', '--')) attached_timestamp = format_timestamp(info.get('attached_timestamp', '--')) power_cycle_ts = format_timestamp(power_cycle_times.get(guid, '--')) fw_upgrade_ts = format_timestamp(firmware_upgrade_times.get(guid, '--') if firmware_upgrade_times else '--') completed = 'YES' if guid in completed_drives else 'NO' row = [ str(idx), guid, info.get('drive_serial', '--'), info.get('drive_model', '--'), info.get('dbox_name', '--'), info.get('dnode_name', '--'), info.get('dnode_ip', '--'), # Add dnode_ip to the report str(info.get('slot', '--')), info.get('chassis_type', '--'), get_enum_name(info.get('state', '--')), str(info.get('enabled', '--')), insertion_time, state_timestamp, attached_timestamp, power_cycle_ts, fw_upgrade_ts, info.get('fw_revision', '--'), completed ] f.write("\t".join(row) + "\n") f.write("\nLegend:\n") f.write("Insertion Time: time the drive was inserted to the slot\n") f.write("State TS: last state change timestamp\n") f.write("Attached TS: attached timestamp(s)\n") f.write("Power Cycle TS: time the drive was power cycled (completion)\n") f.write("FW Upgrade TS: time the drive firmware was upgraded (completion)\n") f.write("Completed: whether the drive was completed in this run\n") logger.info(f"Wrote drive master report to {report_file}") def add_message(msg: str) -> None: """Add a message to the dashboard message log with timestamp and thread safety. Args: msg: The message to add to the dashboard """ timestamp = datetime.datetime.now().strftime("%H:%M:%S") timestamped_msg = f"[{timestamp}] {msg}" # Update status_data messages with lock to prevent race conditions with dashboard_lock: status_data["last_message"] = timestamped_msg status_data["messages"].insert(0, timestamped_msg) # Keep only the last 15 messages if len(status_data["messages"]) > 15: status_data["messages"] = status_data["messages"][:15] # Also send to the message queue for the dashboard thread try: message_queue.put_nowait(("msg", msg)) except queue.Full: # Just ignore if queue is full pass def clear_terminal(): """Clear the terminal screen.""" # Don't clear if we have an error to display global status_data with dashboard_lock: if status_data.get("last_error"): return # Don't clear terminal when there's an error os.system('cls' if os.name == 'nt' else 'clear') def print_dashboard(completed: int, total: int, start_time: float, current_drive: Dict[str, Any] = None, completed_drives: List[str] = None, full_redraw: bool = False) -> None: """ Print dashboard with progress information and system status. Args: completed: Number of completed drives total: Total number of drives start_time: Start time of the operation current_drive: Currently processing drive completed_drives: List of completed drive GUIDs full_redraw: Whether to force a full redraw of the dashboard """ # Don't update the dashboard if updates are paused global pause_dashboard_updates if pause_dashboard_updates: return # Clear or reposition the cursor before rendering the dashboard to avoid duplicates if full_redraw: # Clear entire screen and move cursor to top-left sys.stdout.write("\033[2J\033[H") else: # Just move cursor to top-left and clear from cursor down sys.stdout.write("\033[H") # Temporarily raise console handler level to prevent log messages # from corrupting the dashboard during redraw console_handler = None original_level = None for handler in logger.handlers: if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): console_handler = handler original_level = handler.level handler.setLevel(logging.CRITICAL) break try: # Clear the terminal to redraw the dashboard if full_redraw: clear_terminal() # Calculate progress percent = 0 if total > 0: percent = min(100, int((completed / total) * 100)) # Format the header padding = max(0, (80 - len(f"VastData {DRIVE_TITLE} {OPERATION} TOOL")) // 2) header = " " * padding + f"VastData {DRIVE_TITLE} {OPERATION} TOOL" + " " * padding # Format the progress bar bar_width = 50 filled_length = int(bar_width * percent / 100) bar = "█" * filled_length + "-" * (bar_width - filled_length) progress_bar = f"Progress ({completed}/{total}): [{bar}] {percent}%" # Format time info elapsed_time = 0 if start_time > 0: elapsed_time = int(time.time() - start_time) elapsed_str = str(datetime.timedelta(seconds=elapsed_time)) # Estimate remaining time remaining_str = "--:--:--" if percent > 0 and elapsed_time > 0: total_estimated_time = elapsed_time * 100 / percent remaining_seconds = total_estimated_time - elapsed_time remaining_str = str(datetime.timedelta(seconds=int(remaining_seconds))) time_info = f"Elapsed: {elapsed_str} | Remaining: {remaining_str}" # Format current drive info if current_drive: current_info = format_current_drive_info(current_drive) else: current_info = "" # No current drive # Get status data with lock to prevent race conditions with dashboard_lock: # Format other status components cluster = f"🌐 Cluster: {status_data['cluster_name']} | PSNT: {status_data['cluster_psnt']} | State: {status_data['cluster_state']}" raid = f"🛡️ RAID State - SSD: {status_data['raid_state']['ssd']}, NVRAM: {status_data['raid_state']['nvram']}, Memory: {status_data['raid_state']['memory']}, RIO: {status_data['raid_state']['rio_nvram']}" components = f"💾 Drives State: {status_data['healthy_drives']}/{status_data['total_drives']} healthy | 🖥️ DNodes: {status_data['healthy_dnodes']}/{status_data['total_dnodes']} healthy" # Format message section message_section = "\n".join(status_data["messages"]) # Format error section if there's an error error_section = "" if status_data.get("last_error"): error_section = f""" ┌─ ERROR ───────────────────────────────────────────────────────────┐ │ ❌ {status_data['last_error'][:60]:<60} │ │ Check log: {DEFAULT_LOG_FILE:<48} │ └───────────────────────────────────────────────────────────────────┘ """ # Create the dashboard dashboard = f""" {header} {progress_bar} {time_info} {current_info} {cluster} {raid} {components} {error_section} --- Recent Activity --- {message_section} """ # Print without logging sys.stdout.write(dashboard) sys.stdout.flush() finally: # Restore console handler level if console_handler and original_level is not None: console_handler.setLevel(original_level) def update_dashboard_thread() -> None: """Thread function to update the dashboard periodically. Processes messages from the message_queue and updates the dashboard with the latest progress information. Designed to avoid race conditions and to present a consistent UI even while logging is active. """ # Initial progress state progress = (0, 0, 0, None) # completed, total, start_time, current while True: try: # Drain queue non-blocking while True: try: kind, payload = message_queue.get_nowait() if kind == "msg": # Messages were already handled in add_message pass elif kind == "progress": # Update progress information progress = payload elif kind == "update": # Force a dashboard update without changing progress # This is used when RAID state changes pass except queue.Empty: break # Unpack progress information completed, total, start, current = progress # Update the dashboard print_dashboard(completed, total, start, current, full_redraw=True) sys.stdout.flush() # Sleep for a bit time.sleep(1) except Exception as e: # Don't crash the thread on error file_logger.error(f"Dashboard thread error: {str(e)}") time.sleep(5) # Longer delay on error def start_dashboard(): """Start the dashboard update thread.""" threading.Thread(target=update_dashboard_thread, daemon=True).start() def print_progress(completed: int, total: int, start_time: float, current_drive: Dict[str, Any], completed_drives: List[str]) -> None: """Update progress information in the dashboard and message queue. Args: completed: Number of completed drives total: Total number of drives to process start_time: Time when the process started current_drive: Current drive being processed completed_drives: List of completed drive GUIDs """ # Send progress to the message queue for the dashboard thread try: message_queue.put_nowait(("progress", (completed, total, start_time, current_drive))) except queue.Full: # Just ignore if queue is full pass # Also immediately update the dashboard print_dashboard(completed, total, start_time, current_drive, completed_drives) def check_initial_system_health(commander: Commander, force: bool = False, skip_denylist: bool = False, skip_drives: List[str] = None, skip_raid_check: bool = False) -> bool: """ Check initial system health before starting drive operations. Verifies cluster state, node/drive failures, and optionally RAID health. Args: commander: Commander object for Vast API force: If True, continue despite failures skip_denylist: If True, skip denylist checks skip_drives: List of drive GUIDs to skip when checking failures skip_raid_check: If True, skip RAID health check (used in ebox flow where RAID will be unhealthy while dnodes are deactivated) Returns: True if system is healthy (or force is True and we can continue) """ logger.info("Checking initial system health...") if not check_cluster_state(commander, force, skip_denylist): logger.error("Cluster is not ONLINE before starting") return False if not check_node_and_drive_failures(commander, skip_drives): logger.error("Node or drive failures detected before starting") if not force: logger.error("Use --force to continue anyway") return False logger.warning("Continuing despite node or drive failures due to --force flag") if skip_raid_check: logger.info("Skipping RAID health check (ebox mode)") else: if not check_raid_health(commander): logger.error("RAID is not healthy before starting") if not force: logger.error("Use --force to continue anyway") return False logger.warning("Continuing despite unhealthy RAID due to --force flag") return True def find_leader_and_vms_eboxes(eboxes: List[Dict[str, Any]]) -> Tuple[Optional[int], Optional[int]]: """ Find which eboxes host the cluster leader and VMS using the VMS API ebox data. Leader is identified by cnode_containers[].is_leader == True. VMS is identified by cnode_containers[].is_mgmt == True. Args: eboxes: List of ebox dictionaries from VMS API Returns: Tuple of (leader_ebox_id, vms_ebox_id), either may be None if not found """ leader_ebox_id = None vms_ebox_id = None for ebox in eboxes: ebox_id = ebox.get('id', None) ebox_name = ebox.get('name', 'unknown') for cnode in ebox.get('cnode_containers', []): if cnode.get('is_leader', False) and leader_ebox_id is None: leader_ebox_id = ebox_id logger.info(f"Leader ebox detected: {ebox_name} (cnode {cnode.get('name', 'unknown')}, ebox_id={ebox_id})") if cnode.get('is_mgmt', False) and vms_ebox_id is None: vms_ebox_id = ebox_id logger.info(f"VMS ebox detected: {ebox_name} (cnode {cnode.get('name', 'unknown')}, ebox_id={ebox_id})") if leader_ebox_id is None: logger.info("No leader cnode found in ebox data") if vms_ebox_id is None: logger.info("No VMS (is_mgmt) cnode found in ebox data") return leader_ebox_id, vms_ebox_id def order_ebox_groups(ebox_groups: Dict[int, Dict[str, Any]], leader_ebox_id: Optional[int], vms_ebox_id: Optional[int]) -> List[int]: """ Order ebox IDs for processing: regular eboxes first, leader ebox second-to-last, VMS ebox last. If leader and VMS are on the same ebox, that ebox goes last. Args: ebox_groups: Dict mapping ebox_id to ebox group info leader_ebox_id: Ebox ID hosting the leader, or None vms_ebox_id: Ebox ID hosting VMS, or None Returns: Ordered list of ebox IDs """ regular = [] for ebox_id in sorted(ebox_groups.keys()): if ebox_id == leader_ebox_id or ebox_id == vms_ebox_id: continue regular.append(ebox_id) ordered = regular if leader_ebox_id is not None and leader_ebox_id in ebox_groups and leader_ebox_id != vms_ebox_id: ordered.append(leader_ebox_id) if vms_ebox_id is not None and vms_ebox_id in ebox_groups: ordered.append(vms_ebox_id) return ordered def group_drives_by_ebox(drive_mapping: Dict[str, Dict[str, Any]], eboxes: List[Dict[str, Any]]) -> Dict[int, Dict[str, Any]]: """ Group drives from drive_mapping by ebox, using dnode_ip to match ebox dnode containers. Args: drive_mapping: Drive GUID to drive info mapping eboxes: List of ebox dicts from VMS API Returns: Dict mapping ebox_id to {'ebox': ebox_dict, 'drives': [drive_info_dicts], 'dnode_ip': str} """ # Build a lookup: dnode_ip -> ebox ip_to_ebox = {} for ebox in eboxes: for dnode in ebox.get('dnode_containers', []): dnode_ip = dnode.get('ip', '') if dnode_ip: ip_to_ebox[dnode_ip] = ebox ebox_groups = {} unmatched_drives = [] for drive_guid, drive_info in drive_mapping.items(): dnode_ip = drive_info.get('dnode_ip', '') ebox = ip_to_ebox.get(dnode_ip, None) if ebox is None: unmatched_drives.append(drive_guid) continue ebox_id = ebox['id'] if ebox_id not in ebox_groups: # Use the first dnode IP we find for this ebox (for SSH commands) ebox_groups[ebox_id] = { 'ebox': ebox, 'drives': [], 'dnode_ip': dnode_ip } drive_info_with_guid = dict(drive_info) drive_info_with_guid['drive_guid'] = drive_guid ebox_groups[ebox_id]['drives'].append(drive_info_with_guid) if unmatched_drives: logger.warning(f"{len(unmatched_drives)} drives could not be matched to any ebox") return ebox_groups def cycle_ebox_drives(commander: Commander, drive_mapping: Dict[str, Dict[str, Any]], vms_ip: str, eboxes: List[Dict[str, Any]], force: bool = False, dryrun: bool = False, skip_denylist: bool = False, fw_upgrade_model: str = None, fw_file: str = None, target_fw_version: str = None, upload_reports: bool = False) -> bool: """ Upgrade firmware on ebox drives, processing one ebox at a time. For each ebox: 1. Check if any drives need upgrade (skip ebox if none) 2. Verify all eboxes are in active state 3. Deactivate ebox dnodes via commander and wait for INACTIVE + SSDs FAILED 4. Upgrade all relevant drives' firmware 5. Power cycle the ebox 6. Wait for the ebox to boot 7. Verify all drives have been upgraded 8. Re-enable ebox dnodes via commander 9. Wait for RAID to become healthy and verify cluster state Args: commander: Commander object for Vast API drive_mapping: Drive GUID to drive info mapping vms_ip: VMS management IP address eboxes: List of ebox dicts from VMS API force: Continue on errors dryrun: Simulate without making changes skip_denylist: Skip denylist checks fw_upgrade_model: Drive model to upgrade fw_file: Path to firmware file target_fw_version: Target firmware version upload_reports: Upload reports when done Returns: True if all eboxes processed successfully """ if dryrun: logger.info("[DRY RUN] Running ebox firmware upgrade in dry run mode") # Group drives by ebox ebox_groups = group_drives_by_ebox(drive_mapping, eboxes) if not ebox_groups: logger.error("No drives could be matched to any ebox") add_message("ERROR: No drives could be matched to any ebox") return False total_eboxes = len(ebox_groups) logger.info(f"Found {total_eboxes} eboxes to process") add_message(f"Found {total_eboxes} eboxes to process") # Determine ebox ordering: regular first, leader second-to-last, VMS last leader_ebox_id, vms_ebox_id = find_leader_and_vms_eboxes(eboxes) ordered_ebox_ids = order_ebox_groups(ebox_groups, leader_ebox_id, vms_ebox_id) order_parts = [] for eid in ordered_ebox_ids: name = ebox_groups[eid]['ebox'].get('name', f'ebox-{eid}') tags = [] if eid == leader_ebox_id: tags.append("leader") if eid == vms_ebox_id: tags.append("VMS") suffix = f" ({', '.join(tags)})" if tags else "" order_parts.append(f"{name}{suffix}") logger.info(f"Ebox processing order: {', '.join(order_parts)}") add_message(f"Processing order: {', '.join(order_parts)}") # Check initial system health # Skip RAID check in dry-run mode (we're not making changes, no need to block on transient RAID states) # and in normal mode (RAID will be checked after each ebox re-enable) if not check_initial_system_health(commander, force, skip_denylist, skip_raid_check=dryrun): return False # Process each ebox processed_eboxes = 0 skipped_eboxes = 0 for ebox_id in ordered_ebox_ids: ebox_group = ebox_groups[ebox_id] if graceful_shutdown: logger.info("Graceful shutdown in progress, stopping ebox processing") return False ebox = ebox_group['ebox'] ebox_drives = ebox_group['drives'] dnode_ip = ebox_group['dnode_ip'] ebox_name = ebox.get('name', f'ebox-{ebox_id}') logger.info(f"{'=' * 60}") logger.info(f"Processing ebox {ebox_name} (ID: {ebox_id}) with {len(ebox_drives)} drives") add_message(f"Processing ebox {ebox_name} ({len(ebox_drives)} drives)") # Step 1: Check if any drives need upgrade logger.info(f"Checking which drives on ebox {ebox_name} need firmware upgrade...") add_message(f"Checking firmware versions on ebox {ebox_name}...") drives_needing_upgrade = check_ebox_drives_need_upgrade(dnode_ip, ebox_drives, target_fw_version, fw_upgrade_model) if not drives_needing_upgrade: logger.info(f"No drives on ebox {ebox_name} need firmware upgrade, skipping entirely") add_message(f"Ebox {ebox_name}: all drives already at target FW version, skipping") skipped_eboxes += 1 continue logger.info(f"Ebox {ebox_name}: {len(drives_needing_upgrade)} of {len(ebox_drives)} drives need firmware upgrade") add_message(f"Ebox {ebox_name}: {len(drives_needing_upgrade)} drives need upgrade") # Step 2: Verify all eboxes are in active state logger.info("Refreshing ebox list and verifying all eboxes are active...") if not get_vms_eboxes(vms_ip, verify_active=True): logger.error("Not all eboxes are in ACTIVE state or failed to fetch ebox list") add_message("ERROR: Not all eboxes are in ACTIVE state") if not force: return False logger.warning("Continuing despite inactive eboxes due to --force flag") # Step 3: Deactivate ebox dnodes via commander and wait for INACTIVE + SSDs FAILED logger.info(f"Deactivating dnodes on ebox {ebox_name}...") add_message(f"Deactivating dnodes on ebox {ebox_name}...") ebox_drive_guids = [str(d.get('drive_guid', '')) for d in ebox_drives if d.get('drive_guid', '')] if not set_ebox_dnodes_enabled(commander, ebox, enabled=False, dryrun=dryrun): logger.error(f"Failed to deactivate dnodes on ebox {ebox_name}") add_message(f"ERROR: Failed to deactivate dnodes on ebox {ebox_name}") if not force: return False logger.warning("Continuing despite dnode deactivation failure due to --force flag") # Wait for dnodes to become INACTIVE and SSDs to become FAILED if not wait_for_ebox_dnodes_inactive(vms_ip, ebox, ebox_drive_guids=ebox_drive_guids, commander=commander, dryrun=dryrun): logger.error(f"Ebox {ebox_name} dnodes did not fully deactivate") add_message(f"ERROR: Ebox {ebox_name} dnodes did not fully deactivate") if not force: logger.info(f"Attempting to re-enable dnodes on ebox {ebox_name} after failure...") set_ebox_dnodes_enabled(commander, ebox, enabled=True, dryrun=dryrun) return False logger.warning("Continuing despite dnode deactivation timeout due to --force flag") # Step 4: Upgrade all relevant drives' firmware upgrade_failures = [] for drive_info in drives_needing_upgrade: if graceful_shutdown: logger.info("Graceful shutdown in progress") # Re-enable ebox dnodes before exiting set_ebox_dnodes_enabled(commander, ebox, enabled=True, dryrun=dryrun) return False drive_serial = drive_info.get('drive_serial', '') drive_model = drive_info.get('drive_model', '') logger.info(f"Upgrading firmware on drive {drive_serial} ({drive_model})") add_message(f"Upgrading firmware on drive {drive_serial}") if not upgrade_drive_firmware(dnode_ip, drive_serial, drive_model, fw_file, dryrun, target_fw_version, drive_info, skip_fw_verify=True): logger.error(f"Failed to upgrade firmware on drive {drive_serial}") add_message(f"ERROR: Failed to upgrade firmware on drive {drive_serial}") upgrade_failures.append(drive_serial) if not force: # Re-enable ebox dnodes before returning logger.info(f"Re-enabling dnodes on ebox {ebox_name} after failure...") set_ebox_dnodes_enabled(commander, ebox, enabled=True, dryrun=dryrun) return False logger.warning(f"Continuing despite firmware upgrade failure for {drive_serial} due to --force flag") if upgrade_failures: logger.warning(f"Firmware upgrade failed for {len(upgrade_failures)} drives: {upgrade_failures}") add_message(f"WARNING: {len(upgrade_failures)} drive upgrades failed") # Step 5: Power cycle the ebox logger.info(f"Power cycling ebox {ebox_name}...") add_message(f"Power cycling ebox {ebox_name}...") if not power_cycle_ebox(dnode_ip, dryrun): logger.error(f"Failed to power cycle ebox {ebox_name}") add_message(f"ERROR: Failed to power cycle ebox {ebox_name}") if not force: # Try to re-enable dnodes before returning set_ebox_dnodes_enabled(commander, ebox, enabled=True, dryrun=dryrun) return False logger.warning("Continuing despite power cycle failure due to --force flag") # Step 6: Wait for ebox to boot logger.info(f"Waiting for ebox {ebox_name} to boot...") add_message(f"Waiting for ebox {ebox_name} to boot...") if not wait_for_ebox_boot(dnode_ip, dryrun=dryrun): logger.error(f"Ebox {ebox_name} did not come back online") add_message(f"ERROR: Ebox {ebox_name} did not come back online") if not force: return False logger.warning("Continuing despite boot timeout due to --force flag") # Reconnect to leader after ebox reboot # The leader may have been on this ebox and moved to another node if not dryrun: logger.info("Reconnecting to cluster leader after ebox reboot...") add_message("Reconnecting to cluster leader...") new_commander = connect_to_commander() if new_commander: commander = new_commander else: logger.error("Failed to reconnect to commander after ebox reboot") if not force: return False logger.warning("Continuing despite reconnect failure due to --force flag") # Step 7: Verify all drives have been upgraded if not dryrun: logger.info(f"Verifying firmware versions on ebox {ebox_name}...") add_message(f"Verifying firmware versions on ebox {ebox_name}...") all_upgraded, failed_serials = verify_ebox_drives_upgraded(dnode_ip, drives_needing_upgrade, target_fw_version, fw_upgrade_model) if not all_upgraded: logger.error(f"Firmware verification failed for drives: {failed_serials}") add_message(f"ERROR: FW verification failed for {len(failed_serials)} drives") if not force: # Still re-enable ebox dnodes before returning set_ebox_dnodes_enabled(commander, ebox, enabled=True, dryrun=dryrun) return False logger.warning("Continuing despite verification failure due to --force flag") else: logger.info(f"All drives on ebox {ebox_name} verified at target FW version") add_message(f"All drives on ebox {ebox_name} verified at target FW version") # Step 8: Re-enable ebox dnodes via commander logger.info(f"Re-enabling dnodes on ebox {ebox_name}...") add_message(f"Re-enabling dnodes on ebox {ebox_name}...") if not set_ebox_dnodes_enabled(commander, ebox, enabled=True, dryrun=dryrun): logger.error(f"Failed to re-enable dnodes on ebox {ebox_name}") add_message(f"ERROR: Failed to re-enable dnodes on ebox {ebox_name}") if not force: return False logger.warning("Continuing despite dnode re-enable failure due to --force flag") # Step 9: Wait for RAID to become healthy and verify cluster state, since we reboot the ebox we have to wait for nvram rebalance if not dryrun: logger.info("Waiting for RAID to become healthy after dnode re-enable...") add_message("Waiting for RAID to become healthy...") if not wait_for_raid_health(commander, timeout=7200): logger.error("RAID did not become healthy after dnode re-enable") add_message("ERROR: RAID did not become healthy") if not force: return False logger.warning("Continuing despite unhealthy RAID due to --force flag") # Also verify cluster state if not check_cluster_state(commander, force, skip_denylist): logger.error("Cluster state check failed after dnode re-enable") if not force: return False logger.warning("Continuing despite cluster state check failure due to --force flag") processed_eboxes += 1 logger.info(f"Ebox {ebox_name} processed successfully ({processed_eboxes}/{total_eboxes})") add_message(f"Ebox {ebox_name} completed ({processed_eboxes}/{total_eboxes})") # Write reports and upload if requested if not dryrun: if next((d for d in drive_mapping.values() if d.get('fw_revision', None)), None): save_fw_report(drive_mapping) if upload_reports: logger.info("Getting info for upload...") customer_info = get_cluster(vms_user=VMS_USER, vms_password=VMS_PASSWORD) for file in ['/vast/log/drive_fw_revisions.report', '/vast/log/nvram_fw_upgrade_report.txt']: if os.path.exists(file): logger.info(f"Uploading file {file} to Vast support...") add_message(f"Uploading file {file} to Vast support...") upload_output(file, customer_info) logger.info(f"Ebox firmware upgrade complete: {processed_eboxes} processed, {skipped_eboxes} skipped (already up to date)") add_message(f"Complete: {processed_eboxes} eboxes upgraded, {skipped_eboxes} skipped") return True def cycle_drives(commander: Commander, force: bool = False, dry_run: bool = False, skip_drives: List[str] = None, wait_time: int = POWER_CYCLE_WAIT_TIME, skip_denylist: bool = False, fw_upgrade_model: str = None, fw_file: str = None, target_fw_version: str = None, upload_reports: bool = False) -> bool: """ Cycle all drives one by one. """ # Load state state = load_state() completed_drives = state.get("completed_drives", []) current_drive_guid = state.get("current_drive") start_time = state.get("start_time", time.time()) # --- Add power_cycle_times tracking --- power_cycle_times = state.get("power_cycle_times", {}) if dry_run: logger.info("[DRY RUN] Running in dry run mode - no actions will be executed") # Map drives if not already mapped drive_mapping = state.get("mapping", {}) if not drive_mapping: drive_mapping = map_drives(commander) if not drive_mapping: logger.error("Failed to map drives, exiting") return False if not dry_run: state["mapping"] = drive_mapping state["total_drives"] = len(drive_mapping) save_state(state) else: logger.info(f"[DRY RUN] Would save mapping of {len(drive_mapping)} drives to state file") # Get total number of drives total_drives = state.get("total_drives", len(drive_mapping)) # Check initial system health if not check_initial_system_health(commander, force, skip_denylist, skip_drives): return False # Check for node and drive failures if not check_node_and_drive_failures(commander, skip_drives): logger.error("Node or drive failures detected before starting") if not force: logger.error("Use --force to continue anyway") return False logger.warning("Continuing despite node or drive failures due to --force flag") # Check RAID health if not check_raid_health(commander): logger.error("RAID is not healthy before starting") if not force: logger.error("Use --force to continue anyway") return False logger.warning("Continuing despite unhealthy RAID due to --force flag") # --- Resume recovery: handle a drive that was interrupted mid-cycle --- if current_drive_guid and current_drive_guid not in completed_drives and current_drive_guid in drive_mapping: drive_info_resume = drive_mapping[current_drive_guid] logger.info(f"Detected interrupted drive {current_drive_guid} ({drive_info_resume['drive_serial']}) - checking its state before continuing") add_message(f"Checking interrupted drive {current_drive_guid} ({drive_info_resume['drive_serial']})...") try: guid_obj = GUID(current_drive_guid) drive_obj = commander.get_object(DRIVE_TYPE, guid_obj) drive_state = get_enum_name(drive_obj.device_proto.state).upper() logger.info(f"Interrupted drive {current_drive_guid} current state: {drive_state}") if "ACTIVE" in drive_state and "ACTIVATING" not in drive_state: logger.info(f"Drive {current_drive_guid} is already ACTIVE - marking as completed") add_message(f"Interrupted drive {current_drive_guid} recovered on its own (ACTIVE) - marking completed") if not dry_run: completed_drives.append(current_drive_guid) state["completed_drives"] = completed_drives save_state(state) elif "ACTIVATING" in drive_state: logger.info(f"Drive {current_drive_guid} is ACTIVATING - waiting for it to finish") add_message(f"Interrupted drive is ACTIVATING - waiting...") if wait_for_drive_active(commander, current_drive_guid, dry_run=dry_run): logger.info(f"Drive {current_drive_guid} became ACTIVE") if not dry_run: completed_drives.append(current_drive_guid) state["completed_drives"] = completed_drives save_state(state) else: logger.error(f"Drive {current_drive_guid} did not become ACTIVE") if not force: return False elif "FAILED" in drive_state or "INACTIVE" in drive_state: logger.info(f"Drive {current_drive_guid} is {drive_state} - attempting to reactivate") add_message(f"Interrupted drive is {drive_state} - attempting reactivation...") if activate_drive(commander, current_drive_guid, drive_info_resume.get("dnode_ip"), drive_info_resume.get("slot"), dry_run, drive_info_resume, wait_time): logger.info(f"Successfully reactivated interrupted drive {current_drive_guid}") add_message(f"Interrupted drive {current_drive_guid} reactivated successfully") if not dry_run: completed_drives.append(current_drive_guid) state["completed_drives"] = completed_drives save_state(state) else: logger.error(f"Failed to reactivate interrupted drive {current_drive_guid}") add_message(f"ERROR: Could not reactivate interrupted drive {current_drive_guid}") if not force: return False logger.warning("Continuing despite reactivation failure due to --force flag") else: logger.warning(f"Interrupted drive {current_drive_guid} is in unexpected state: {drive_state}") except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.warning(f"Could not check state of interrupted drive {current_drive_guid}: {e}") add_message(f"WARNING: Could not check interrupted drive state: {e}") # Print initial progress print_progress( len(completed_drives), total_drives, start_time, drive_mapping.get(current_drive_guid, None), completed_drives ) # Process each drive for drive_guid, drive_info in sorted(drive_mapping.items()): # Check for graceful shutdown flag if graceful_shutdown: logger.info("Graceful shutdown in progress, stopping drive processing") return False # Step 0: Check if the drive is already processed or should be skipped if drive_guid in completed_drives: logger.info(f"Drive {drive_guid} already processed, skipping") continue # Skip drives that are in the skip_drives list - ensure we're comparing strings if skip_drives and str(drive_guid) in [str(guid) for guid in skip_drives]: logger.info(f"Drive {drive_guid} is in the skip list, skipping power cycling") add_message(f"Skipping drive {drive_guid} ({drive_info['drive_serial']}) - in skip list") # Mark as completed so we don't try again if not dry_run: completed_drives.append(drive_guid) state["completed_drives"] = completed_drives save_state(state) continue # Pre-flight: verify leader connection is alive before touching this drive if not dry_run: check_leader_connection(commander) # Pre-flight: RAID must be healthy before starting work on any drive. # This is a hard gate -- not overridable by --force. if not dry_run: logger.info(f"Pre-flight RAID health check before processing drive {drive_guid}") add_message(f"Pre-flight RAID health check...") if not wait_for_raid_health(commander): logger.error("RAID is not healthy before starting next drive - stopping") add_message("CRITICAL: RAID not healthy - refusing to process next drive") return False # Update dashboard with current drive information add_message(f"Processing drive {drive_guid} ({drive_info['drive_serial']})") # Initialize drive state in the dashboard with dashboard_lock: status_data["drive_state"][drive_guid] = get_enum_name(drive_info.get("state", "UNKNOWN")) # Force a full dashboard redraw print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives, full_redraw=True) # Update current drive in state if not dry_run: state["current_drive"] = drive_guid save_state(state) else: logger.info(f"[DRY RUN] Would update current drive in state to {drive_guid}") logger.info(f"Processing drive {drive_guid} ({drive_info['drive_serial']})") print_progress( len(completed_drives), total_drives, start_time, drive_info, completed_drives ) # Step 1: Deactivate the drive print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not FAST_UPGRADE: add_message(f"Deactivating drive {drive_guid} ({drive_info['drive_serial']})") if not deactivate_drive(commander, drive_guid, dry_run): logger.error(f"Failed to deactivate drive {drive_guid}") add_message(f"ERROR: Failed to deactivate drive {drive_guid}") # Force dashboard update with error state with dashboard_lock: status_data["drive_state"][drive_guid] = "ERROR: DEACTIVATION FAILED" print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not force: return False logger.warning("Continuing despite deactivation failure due to --force flag") add_message("WARNING: Continuing despite deactivation failure due to --force flag") # Add explicit delay after deactivation to allow RAID state to update if not dry_run: if not FAST_UPGRADE: logger.info("Waiting 10 seconds for RAID state to update after drive deactivation...") add_message("Waiting 10 seconds for RAID state to update...") time.sleep(10) # Explicitly check RAID state to capture any changes logger.info("Explicitly checking RAID state after drive deactivation") add_message("Checking RAID state after drive deactivation...") raid_status = commander.raid_status() # Extract and clean state strings ssd_state = get_enum_name(raid_status.ssd_state.state) nvram_state = get_enum_name(raid_status.nvram_state.state) memory_state = get_enum_name(raid_status.memory_state.state) rio_nvram_state = get_enum_name(raid_status.rio_nvram_state.state) # Update dashboard with dashboard_lock: status_data["raid_state"]["ssd"] = ssd_state status_data["raid_state"]["nvram"] = nvram_state status_data["raid_state"]["memory"] = memory_state status_data["raid_state"]["rio_nvram"] = rio_nvram_state # Log state information logger.info(f"Post-deactivation RAID state - SSD: {ssd_state}, NVRAM: {nvram_state}, Memory: {memory_state}, RIO: {rio_nvram_state}") add_message(f"Current RAID state - SSD: {ssd_state}, NVRAM: {nvram_state}, Memory: {memory_state}, RIO: {rio_nvram_state}") # Force dashboard update to show new RAID state try: message_queue.put_nowait(("update", None)) except queue.Full: pass # Force a dashboard update print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives, full_redraw=True) # Check system state immediately after deactivation to capture state changes if not FAST_UPGRADE and not dry_run: logger.info("Checking system state after drive deactivation") # Check cluster state first check_cluster_state(commander, force, skip_denylist) # Then check RAID state check_raid_health(commander) # Force dashboard update after status checks print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) # Wait for drive to become inactive if not FAST_UPGRADE and not wait_for_drive_inactive(commander, drive_guid, timeout_seconds=600, dry_run=dry_run): logger.error(f"Drive {drive_guid} did not become INACTIVE after deactivation") add_message(f"ERROR: Drive {drive_guid} did not become INACTIVE after deactivation") # Force dashboard update with error state with dashboard_lock: status_data["drive_state"][drive_guid] = "ERROR: INACTIVE TIMEOUT" print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not force: return False logger.warning("Continuing despite inactive state failure due to --force flag") add_message("WARNING: Continuing despite inactive state failure due to --force flag") # Check system state again after drive becomes inactive if not dry_run: logger.info("Checking system state after drive became inactive") # Check cluster state first check_cluster_state(commander, force, skip_denylist) # Then check RAID state check_raid_health(commander) # Force dashboard update after status checks print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) # Step 2: Firmware upgrade (if enabled and drive model matches) - AFTER deactivation firmware_upgraded = False if fw_upgrade_model and fw_file and fw_upgrade_model in drive_info.get('drive_model', '').upper(): add_message(f"Upgrading firmware for drive {drive_guid} ({drive_info['drive_serial']}) - model matches {fw_upgrade_model}") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) drive_info['fw_revision'] = 'UNKNOWN' if not upgrade_drive_firmware( drive_info["dnode_ip"], drive_info["drive_serial"], drive_info["drive_model"], fw_file, dry_run, target_fw_version, drive_info ): logger.error(f"Failed to upgrade firmware for drive {drive_guid}") add_message(f"ERROR: Failed to upgrade firmware for drive {drive_guid}") with dashboard_lock: status_data["drive_state"][drive_guid] = "ERROR: FIRMWARE UPGRADE FAILED" print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) fw_recovery_success = False if NVRAM_MODE and not dry_run: recovery_decision = assess_nvram_recovery_need( commander, drive_guid, drive_info, "firmware upgrade returned failure" ) if recovery_decision["hard_abort"]: logger.error(f"NVRAM recovery blocked: {recovery_decision['reason']}") add_message(f"CRITICAL: NVRAM recovery blocked - {recovery_decision['reason']}") return False if recovery_decision["attempt_recovery"]: logger.info(f"NVRAM firmware upgrade failed and recovery is needed: {recovery_decision['reason']}") add_message("Attempting NVRAM drive recovery (drive missing/unstable)...") fw_recovery_success = recover_failed_drive( commander, drive_guid, drive_info["dnode_ip"], drive_info["slot"], drive_info, dry_run ) if fw_recovery_success: logger.info(f"Successfully recovered drive {drive_guid} after firmware upgrade failure") add_message(f"Drive {drive_guid} recovered successfully after FW upgrade failure") firmware_upgraded = True with dashboard_lock: status_data["drive_state"][drive_guid] = "RECOVERED AFTER FW UPGRADE" if "firmware_upgrade_times" not in state: state["firmware_upgrade_times"] = {} state["firmware_upgrade_times"][drive_guid] = time.time() save_state(state) else: logger.error(f"Failed to recover drive {drive_guid} after firmware upgrade failure") add_message(f"ERROR: Failed to recover drive {drive_guid}") else: logger.error(f"NVRAM recovery not attempted: {recovery_decision['reason']}") add_message(f"ERROR: NVRAM recovery not attempted - {recovery_decision['reason']}") return False if not fw_recovery_success: if not force: return False logger.warning("Continuing despite firmware upgrade failure due to --force flag") add_message("WARNING: Continuing despite firmware upgrade failure due to --force flag") else: firmware_upgraded = True # Record firmware upgrade timestamp if not dry_run: if "firmware_upgrade_times" not in state: state["firmware_upgrade_times"] = {} state["firmware_upgrade_times"][drive_guid] = time.time() save_state(state) # Wait for NVRAM rebalance after firmware upgrade (CRITICAL for NVRAM drives) if NVRAM_MODE and not dry_run: logger.info("Firmware upgrade completed, waiting for NVRAM rebalance before proceeding...") add_message("Waiting for NVRAM rebalance after firmware upgrade...") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not wait_for_nvram_rebalance(commander, timeout_minutes=15): logger.error(f"NVRAM rebalance wait failed for drive {drive_guid}") add_message(f"ERROR: NVRAM rebalance wait failed for drive {drive_guid}") with dashboard_lock: status_data["drive_state"][drive_guid] = "ERROR: NVRAM REBALANCE FAILED" print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not force: return False logger.warning("Continuing despite NVRAM rebalance failure due to --force flag") add_message("WARNING: Continuing despite NVRAM rebalance failure due to --force flag") # After rebalance, check if drive is in failed state (like in the ticket) logger.info("Checking drive state after NVRAM rebalance...") add_message("Checking drive state after rebalance...") try: drives = commander.list_objects(DRIVE_TYPE) drive_failed = False for drive in drives: if str(drive.base_proto.guid) == drive_guid: state_str = get_enum_name(drive.device_proto.state) if state_str.upper() == "FAILED": drive_failed = True fail_reason = "UNKNOWN" if hasattr(drive, 'device_proto') and getattr(drive.device_proto, 'fail_reason', None): fail_reason = get_enum_name(drive.device_proto.fail_reason) logger.warning(f"Drive {drive_guid} is in FAILED state after rebalance: {fail_reason}") add_message(f"⚠️ Drive {drive_guid} is FAILED after rebalance: {fail_reason}") break # If drive failed, attempt recovery if drive_failed: recovery_decision = assess_nvram_recovery_need( commander, drive_guid, drive_info, "drive FAILED after firmware upgrade rebalance" ) if recovery_decision["hard_abort"] or not recovery_decision["attempt_recovery"]: logger.error(f"NVRAM recovery blocked: {recovery_decision['reason']}") add_message(f"CRITICAL: NVRAM recovery blocked - {recovery_decision['reason']}") return False logger.info(f"Drive failed after firmware upgrade and rebalance - attempting recovery: {recovery_decision['reason']}") add_message("⚠️ Drive failed after FW upgrade - attempting recovery...") if recover_failed_drive(commander, drive_guid, drive_info["dnode_ip"], drive_info["slot"], drive_info, dry_run): logger.info(f"Successfully recovered drive {drive_guid}") add_message(f"✅ Drive {drive_guid} recovered successfully!") else: logger.error(f"Failed to recover drive {drive_guid}") add_message(f"❌ Failed to recover drive {drive_guid}") with dashboard_lock: status_data["drive_state"][drive_guid] = "ERROR: RECOVERY FAILED" print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not force: return False logger.warning("Continuing despite recovery failure due to --force flag") add_message("WARNING: Continuing despite recovery failure due to --force flag") else: logger.info(f"Drive {drive_guid} is healthy after rebalance") add_message(f"✅ Drive {drive_guid} is healthy after rebalance") except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.error(f"Error checking drive state after rebalance: {e}") add_message(f"ERROR: Failed to check drive state: {str(e)}") elif fw_upgrade_model and fw_upgrade_model not in drive_info.get('drive_model', ''): logger.info(f"Skipping firmware upgrade for drive {drive_guid} - model {drive_info.get('drive_model', 'Unknown')} does not match {fw_upgrade_model}") add_message(f"Skipping firmware upgrade for drive {drive_guid} - model mismatch") if not UPGRADE_MODE: # Step 3: Power cycle the drive add_message(f"Power cycling drive {drive_guid} ({drive_info['drive_serial']}) at slot {drive_info['slot']} on {drive_info['dnode_ip']}") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) chassis_info = { "chassis_type": drive_info.get("chassis_type", "UNKNOWN"), "bus_master_dnode_ip": drive_info.get("bus_master_dnode_ip") } power_cycle_success = power_cycle_drive(drive_info["dnode_ip"], drive_info["slot"], dry_run, chassis_info, wait_time) if power_cycle_success: # Record the power cycle timestamp power_cycle_times[drive_guid] = time.time() if not dry_run: state["power_cycle_times"] = power_cycle_times save_state(state) if not power_cycle_success: logger.error(f"Failed to power cycle drive {drive_guid}") add_message(f"ERROR: Failed to power cycle drive {drive_guid}") with dashboard_lock: status_data["drive_state"][drive_guid] = "ERROR: POWER CYCLE FAILED" print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not force: return False logger.warning("Continuing despite power cycle failure due to --force flag") add_message("WARNING: Continuing despite power cycle failure due to --force flag") # Step 4: Activate the drive add_message(f"Activating drive {drive_guid} ({drive_info['drive_serial']})") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not activate_drive(commander, drive_guid, drive_info["dnode_ip"], drive_info["slot"], dry_run, drive_info, wait_time): logger.error(f"Failed to activate drive {drive_guid}") add_message(f"ERROR: Failed to activate drive {drive_guid}") # Force dashboard update with error state with dashboard_lock: status_data["drive_state"][drive_guid] = "ERROR: ACTIVATION FAILED" print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) # For NVRAM drives with firmware upgrade, attempt recovery if NVRAM_MODE and firmware_upgraded and not dry_run: recovery_decision = assess_nvram_recovery_need( commander, drive_guid, drive_info, "activation failed after firmware upgrade" ) if recovery_decision["hard_abort"] or not recovery_decision["attempt_recovery"]: logger.error(f"NVRAM recovery blocked: {recovery_decision['reason']}") add_message(f"CRITICAL: NVRAM recovery blocked - {recovery_decision['reason']}") return False logger.info(f"Activation failed after firmware upgrade - attempting recovery: {recovery_decision['reason']}") add_message("⚠️ Activation failed - attempting recovery procedure...") if recover_failed_drive(commander, drive_guid, drive_info["dnode_ip"], drive_info["slot"], drive_info, dry_run): logger.info(f"Successfully recovered drive {drive_guid}") add_message(f"✅ Drive {drive_guid} recovered successfully!") # Continue with normal flow else: logger.error(f"Failed to recover drive {drive_guid}") add_message(f"❌ Failed to recover drive {drive_guid}") if not force: return False logger.warning("Continuing despite recovery failure due to --force flag") add_message("WARNING: Continuing despite recovery failure due to --force flag") else: if not force: return False logger.warning("Continuing despite activation failure due to --force flag") add_message("WARNING: Continuing despite activation failure due to --force flag") # Add explicit delay after activation to allow RAID state to update if not dry_run: logger.info("Waiting 120 seconds for RAID state to update after drive activation...") add_message("Waiting 120 seconds for RAID state to update...") time.sleep(120) # Explicitly check RAID state to capture any changes logger.info("Explicitly checking RAID state after drive activation") add_message("Checking RAID state after drive activation...") raid_status = commander.raid_status() # Extract and clean state strings ssd_state = get_enum_name(raid_status.ssd_state.state) nvram_state = get_enum_name(raid_status.nvram_state.state) memory_state = get_enum_name(raid_status.memory_state.state) rio_nvram_state = get_enum_name(raid_status.rio_nvram_state.state) # Update dashboard with dashboard_lock: status_data["raid_state"]["ssd"] = ssd_state status_data["raid_state"]["nvram"] = nvram_state status_data["raid_state"]["memory"] = memory_state status_data["raid_state"]["rio_nvram"] = rio_nvram_state # Log state information logger.info(f"Post-activation RAID state - SSD: {ssd_state}, NVRAM: {nvram_state}, Memory: {memory_state}, RIO: {rio_nvram_state}") add_message(f"Current RAID state - SSD: {ssd_state}, NVRAM: {nvram_state}, Memory: {memory_state}, RIO: {rio_nvram_state}") # Force dashboard update to show new RAID state try: message_queue.put_nowait(("update", None)) except queue.Full: pass # Force a dashboard update print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives, full_redraw=True) # Step 5: Check cluster state if not dry_run: add_message(f"Verifying cluster state after cycling drive {drive_guid}") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not check_cluster_state(commander, force, skip_denylist): logger.error(f"Cluster is not ONLINE after cycling drive {drive_guid}") add_message(f"CRITICAL ERROR: Cluster is not ONLINE after cycling drive {drive_guid}") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) return False add_message("Cluster state verified: ONLINE") else: logger.info(f"[DRY RUN] Would check if cluster is ONLINE after cycling drive {drive_guid}") # Step 6: Check for node and drive failures if not dry_run: if not check_node_and_drive_failures(commander, skip_drives): logger.error(f"Node or drive failures detected after cycling drive {drive_guid}") if not force: return False logger.warning("Continuing despite failures due to --force flag") else: logger.info(f"[DRY RUN] Would check for node and drive failures after cycling drive {drive_guid}") # Step 7: Wait for RAID to become healthy if not dry_run: add_message(f"Waiting for RAID to become healthy after cycling drive {drive_guid}") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if not wait_for_raid_health(commander): logger.error(f"Timed out waiting for RAID to become healthy after cycling drive {drive_guid}") add_message(f"ERROR: Timed out waiting for RAID to become healthy after cycling drive {drive_guid}") print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives) if NVRAM_MODE: logger.error("NVRAM RAID health is a hard gate - cannot proceed with unhealthy RAID") add_message("CRITICAL: NVRAM RAID unhealthy - stopping") return False if not force: return False logger.warning("Continuing despite RAID health issues due to --force flag") add_message("WARNING: Continuing despite RAID health issues due to --force flag") else: add_message("RAID health verified: HEALTHY") else: logger.info(f"[DRY RUN] Would wait for RAID to become healthy after cycling drive {drive_guid}") # Post-cycle firmware verification (only when upgrading firmware) if fw_upgrade_model and fw_file and not dry_run and target_fw_version: logger.info(f"Verifying firmware version on drive {drive_guid} after power cycle...") add_message(f"Verifying firmware on {drive_info['drive_serial']} post-cycle...") dnode_ip_for_verify = drive_info.get('dnode_ip') drive_serial_for_verify = drive_info.get('drive_serial') if dnode_ip_for_verify and drive_serial_for_verify: nvme_dev = find_drive_nvme_device(dnode_ip_for_verify, drive_serial_for_verify) if not nvme_dev: logger.warning(f"Could not re-discover NVMe device for {drive_serial_for_verify} on {dnode_ip_for_verify} after power cycle") add_message(f"WARNING: Could not find drive {drive_serial_for_verify} after power cycle for FW verification") else: if nvme_dev != drive_info.get('nvme_device'): logger.info(f"Drive re-enumerated after power cycle: {drive_info.get('nvme_device')} -> {nvme_dev}") drive_info['nvme_device'] = nvme_dev fw_verify_cmd = ( f"ssh -i {SSH_KEY_PATH} -o StrictHostKeyChecking=no " f"{SSH_USER}@{dnode_ip_for_verify} " f"'sudo nvme id-ctrl {nvme_dev} | grep fr'" ) verify_result = subprocess.run( fw_verify_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) if verify_result.returncode == 0: post_fw = verify_result.stdout.strip().splitlines()[0].split()[-1] logger.info(f"Post-cycle firmware version: {post_fw}") add_message(f"Post-cycle firmware: {post_fw}") if post_fw == target_fw_version: logger.info(f"Firmware upgrade CONFIRMED: {post_fw} matches target {target_fw_version}") add_message(f"Firmware upgrade confirmed: {post_fw}") else: logger.warning(f"Firmware version mismatch: got {post_fw}, expected {target_fw_version}") add_message(f"WARNING: FW mismatch: {post_fw} != {target_fw_version}") else: logger.warning(f"Could not verify post-cycle firmware version: {verify_result.stderr}") else: logger.debug("Skipping post-cycle FW check - drive_serial or dnode_ip not available in drive_info") # Mark drive as completed completed_drives.append(drive_guid) add_message(f"✓ Successfully completed drive {drive_guid} ({drive_info['drive_serial']})") # Update the dashboard state to show completion with dashboard_lock: status_data["drive_state"][drive_guid] = "COMPLETED" # Force a full dashboard update after successfully completing a drive print_dashboard(len(completed_drives), total_drives, start_time, drive_info, completed_drives, full_redraw=True) # Update the state file if not dry_run: state["completed_drives"] = completed_drives save_state(state) else: logger.info(f"[DRY RUN] Would update state to mark drive {drive_guid} as completed") if dry_run: logger.info(f"[DRY RUN] Marked drive {drive_guid} as completed (simulated)") else: logger.info(f"Drive {drive_guid} completed successfully") # Update progress print_progress( len(completed_drives) + (1 if dry_run else 0), # Add 1 in dry run to simulate completion total_drives, start_time, drive_info, completed_drives ) logger.info(f"{'[DRY RUN] Would complete' if dry_run else 'Successfully completed'} drive {drive_guid} ({drive_info['drive_serial']})") if next((d for d in drive_mapping.values() if d.get('fw_revision', None)), None): save_fw_report(drive_mapping) # Clear current drive since all are done if not dry_run: state["current_drive"] = None save_state(state) # Write the master report at the end firmware_upgrade_times = state.get("firmware_upgrade_times", {}) write_drive_report(drive_mapping, completed_drives, power_cycle_times, firmware_upgrade_times) if upload_reports: logger.info(f"Getting info for upload...") customer_info = get_cluster(vms_user=args.vms, vms_password=args.password) for file in ['/vast/log/drive_fw_revisions.report', '/vast/log/nvram_fw_upgrade_report.txt']: if os.path.exists(file): logger.info(f"Uploading file {file} to Vast support...") add_message(f"Uploading file {file} to Vast support...") upload_output(file, customer_info) else: logger.info("[DRY RUN] Would clear current drive in state") # Final progress update print_progress( total_drives, total_drives, start_time, None, completed_drives ) logger.info(f"{'[DRY RUN] Would have' if dry_run else 'All drives successfully'} {OPERATION}d: {len(completed_drives)}/{total_drives}") return True def upload_output(output_file_path, customer_info): os.putenv("AWS_ACCESS_KEY_ID", "AKIARTN2ZMSY4TBYMPNM") os.putenv("AWS_SECRET_ACCESS_KEY", "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") output_name = f"{output_file_path.split('/')[-1]}-{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}" aws_upload = f"aws s3 cp {output_file_path} '{UPLOAD_PATH}/{customer_info['customer_name']}/{customer_info['cluster_name']}/drive_cycle/{output_name}'" exe_get(aws_upload) logger.info(f"Uploaded vnetmap output to '{UPLOAD_PATH}/{customer_info['customer_name']}/{customer_info['cluster_name']}/drive_cycle/{output_name}'") add_message(f"Uploaded vnetmap output to '{UPLOAD_PATH}/{customer_info['customer_name']}/{customer_info['cluster_name']}/drive_cycle/{output_name}'") def get_cluster(vms_user=VMS_USER, vms_password=VMS_PASSWORD, interactive=True): user_pass = '' user_cluster_name = '' user_customer_name = '' with open(MGMT_VIP, 'r') as mgmtvip: vms_ip = mgmtvip.read().strip() cluster_info, customer_name = fetch_cluster_and_customer_info(vms_ip, vms_user, vms_password) if isinstance(cluster_info, list): psnt = cluster_info[0].get('psnt', 'N/A') or 'N/A' return {'cluster_name': cluster_info[0]['name'], 'customer_name': customer_name['customer'], 'psnt': psnt} elif not cluster_info and not customer_name: logger.error(f"Was unable to reach {vms_ip} to get Info for Upload - please provide info below") add_message(f"Was unable to reach {vms_ip} to get Info for Upload - please provide info below") if not interactive: return {'cluster_name': None, 'customer_name': None, 'psnt': 'N/A'} add_message("Please enter customer name, then press Enter: ") user_cluster_name = input("Please enter cluster name: ") add_message("Please enter cluster name, then press Enter: ") user_customer_name = input("Please enter customer name: ") elif cluster_info['detail'].startswith("Invalid username/password"): logger.info(f"Failed to connect using VMS {vms_user} user to get info for Uploading reports, please enter password") add_message("Failed to connect using VMS {vms_user} user to get info for Uploading reports, please enter password") if not interactive: return {'cluster_name': None, 'customer_name': None, 'psnt': 'N/A'} add_message(f"Please enter VMS {vms_user} Password, then press Enter: ") user_pass = getpass.getpass() if user_pass: cluster_info, customer_name = fetch_cluster_and_customer_info(vms_ip, vms_user, user_pass) if isinstance(cluster_info, list): psnt = cluster_info[0].get('psnt', 'N/A') or 'N/A' return {'cluster_name': cluster_info[0]['name'], 'customer_name': customer_name['customer'], 'psnt': psnt} elif cluster_info['detail'].startswith("Invalid username/password"): logger.error(f"Failed to get cluster info") add_message(f"Failed to get cluster info") add_message("Please enter customer name, then press Enter: ") user_cluster_name = input("Please enter cluster name: ") add_message("Please enter cluster name, then press Enter: ") user_customer_name = input("Please enter customer name: ") else: logger.error(f"Failed to get cluster info") add_message(f"Failed to get cluster info") add_message("Please enter customer name, then press Enter: ") user_cluster_name = input("Please enter cluster name: ") add_message("Please enter cluster name, then press Enter: ") user_customer_name = input("Please enter customer name: ") if user_cluster_name and user_customer_name: cluster_name = user_cluster_name customer_name = user_customer_name else: return {'cluster_name': None, 'customer_name': None, 'psnt': 'N/A'} return {'cluster_name': cluster_name, 'customer_name': customer_name, 'psnt': 'N/A'} def get_user_confirmation(prompt_msg: str) -> bool: """ Get user confirmation with proper dashboard handling. Args: prompt_msg: The message to display as a prompt Returns: True if user confirmed, False otherwise """ global pause_dashboard_updates # Add message to the dashboard log add_message(prompt_msg) # Pause dashboard updates during the prompt pause_dashboard_updates = True # Make sure the dashboard is visible with the prompt # but no further updates will happen until we're done clear_terminal() print(prompt_msg) response = input("\nContinue? (y/n): ").lower() == 'y' # Resume dashboard updates pause_dashboard_updates = False if response: add_message("User confirmed, proceeding") else: add_message("Operation cancelled by user") return response def signal_handler(sig, frame): """ Signal handler for SIGINT (Ctrl+C). This handler allows the tool to exit gracefully by: 1. Saving the current state one more time 2. Displaying a summary of what was done 3. Instructing the user how to resume later """ global graceful_shutdown # Only process once to avoid handling re-entrant signals during shutdown if graceful_shutdown: return graceful_shutdown = True print("\n\n") logger.warning("Received interrupt signal (Ctrl+C). Preparing for graceful shutdown...") add_message("⚠️ Interrupted by user - preparing for graceful shutdown") # Load state to get up-to-date information try: state = load_state() completed_drives = state.get("completed_drives", []) total_drives = state.get("total_drives", 0) start_time = state.get("start_time", time.time()) current_drive = state.get("current_drive") elapsed = time.time() - start_time hours, remainder = divmod(elapsed, 3600) minutes, seconds = divmod(remainder, 60) # Print summary summary = f"\nSUMMARY OF INTERRUPTED RUN:\n" summary += f"- Run time: {int(hours):02d}h {int(minutes):02d}m {int(seconds):02d}s\n" summary += f"- Completed: {len(completed_drives)}/{total_drives} drives ({(len(completed_drives)/total_drives*100) if total_drives > 0 else 0:.1f}%)\n" if current_drive: summary += f"- Last drive in process: {current_drive}\n" # Mark the current drive as not completed in the state file # so it will be retried on resume if current_drive in completed_drives: completed_drives.remove(current_drive) state["completed_drives"] = completed_drives state["current_drive"] = None save_state(state) summary += "\nThe current state has been saved. You can resume this run later\n" summary += "by running the tool again with the same parameters.\n" print(summary) logger.info(summary) except Exception as e: logger.exception(f"Error during graceful shutdown: {e}") logger.info("Exiting after interrupt") sys.exit(130) # Standard exit code for SIGINT def parse_ip_list(ip_string): """Parse a string containing IPs, IP ranges, and bracketed IP ranges into a flat list of individual IPs. Supported formats: - Single IP: "10.10.128.111" - Comma-separated: "10.10.128.111,10.10.128.112" - Range format: "10.10.128.111-10.10.128.115" - Bracketed range: "10.10.128.[111-115]" - Mixed: "10.10.128.111,10.10.128.120-10.10.128.125" Raises: ValueError: If IP range format is malformed or invalid """ def expand_ip_range(ip_part): """Expand a range expression of IPs into individual IPs.""" if '[' in ip_part and ']' in ip_part: prefix = ip_part[:ip_part.index('[')] suffix = ip_part[ip_part.index(']') + 1:] range_part = ip_part[ip_part.index('[') + 1:ip_part.index(']')] if '-' in range_part: start, end = map(int, range_part.split('-')) return [f"{prefix}{i}{suffix}" for i in range(start, end + 1)] else: return [f"{prefix}{range_part}{suffix}"] return [ip_part] ip_list = [] for part in ip_string.split(','): part = part.strip() if not part: continue # Skip empty parts from trailing commas if '-' in part and '[' not in part: # Validate range format: must have exactly one hyphen with IPs on both sides range_parts = part.split('-') if len(range_parts) != 2: raise ValueError(f"Invalid IP range format '{part}': ranges must be in format 'START_IP-END_IP'") start_str, end_str = range_parts[0].strip(), range_parts[1].strip() if not start_str or not end_str: raise ValueError(f"Invalid IP range format '{part}': both start and end IPs must be specified") try: start_ip = ipaddress.ip_address(start_str) end_ip = ipaddress.ip_address(end_str) except ValueError as e: raise ValueError(f"Invalid IP address in range '{part}': {e}") if int(start_ip) > int(end_ip): raise ValueError( f"Invalid IP range '{part}': start IP ({start_str}) must be less than or equal to end IP ({end_str})") # Iterate through all IPs in the range (inclusive) for ip_int in range(int(start_ip), int(end_ip) + 1): ip_list.append(str(ipaddress.ip_address(ip_int))) elif '[' in part or ']' in part: ip_list.extend(expand_ip_range(part)) else: ip_list.append(part) return ip_list def get_password(): user_pass = getpass.getpass("Please provide VMS password: ") if user_pass: return user_pass else: raise Exception(f"No VMS password provided, exiting") def main() -> int: """ Main function for the SSD power cycling tool. Returns: Exit code (0 for success, non-zero for failure) """ global args, DRIVE_TYPE, NVRAM_MODE, DRIVE_TITLE, FAST_UPGRADE, UPGRADE_MODE, OPERATION, VMS_USER, VMS_PASSWORD, PCI_LINK_RESET_ENABLED # Register the signal handler for SIGINT signal.signal(signal.SIGINT, signal_handler) parser = argparse.ArgumentParser(description='VastData Drive Power Cycling And FW Upgrade Tool') parser.add_argument('--log-file', '--log', type=str, default=DEFAULT_LOG_FILE, help='Path to log file') parser.add_argument('--wait-time', type=int, default=POWER_CYCLE_WAIT_TIME, help=f'Wait time in seconds after powering on drives (default: {POWER_CYCLE_WAIT_TIME})') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') parser.add_argument('--force', action='store_true', help='Continue on errors') parser.add_argument('--reset', action='store_true', help='Reset state and start from scratch') parser.add_argument('--resume', action='store_true', help='Resume from last state (default behavior if state exists)') parser.add_argument('--dry-run', action='store_true', help='Run without making any actual changes (simulates actions)') parser.add_argument('--save-mapping', type=str, metavar='FILE', help='Save drive mapping to specified file') parser.add_argument('--load-mapping', type=str, metavar='FILE', help='Load drive mapping from specified file') parser.add_argument('--list', action='store_true', help='List all drives and exit') parser.add_argument('--drives', type=str, metavar='GUID_LIST', help='Specify drives to cycle (comma-separated GUIDs or indices)') parser.add_argument('--skip-failed-drives', type=str, metavar='GUID_LIST', help='Specify drives to skip even if in failed state (comma-separated GUIDs)') parser.add_argument('--yes', '-y', action='store_true', help='Assume yes for all prompts') parser.add_argument('--skip-denylist-check', action='store_true', help='Skip all denylist checks') parser.add_argument('--fw-upgrade', type=str, metavar='DRIVE_MODEL', help='Enable firmware upgrade for drives matching this model (e.g., "SSDPFWNV153TZ")') parser.add_argument('--fw-file', type=str, metavar='FIRMWARE_FILE', help='Path to firmware file for upgrade') parser.add_argument('--target-fw-version', type=str, help='Target FW Version being upgraded to') parser.add_argument('--nvrams', action='store_true', help='Perform actions on NVRAMs instead of SSDs') parser.add_argument('--upgrade-only', action='store_true', help="Only run FW Upgrade, don't run cycle") parser.add_argument('--fast-fw-upgrade', action='store_true', help="For SSDs, run upgrade for SSD without restarting the nvme device") parser.add_argument('--upload-reports', action='store_true', help="Upload Drive reports to s3 when done") parser.add_argument('--disable-pci-link-reset', action='store_true', default=False, help="Disable the PCI link reset (expose_drives.py) step in NVRAM recovery. " "By default, PCI link reset is enabled when --nvrams is used.") parser.add_argument('--ebox-ips', action='store', dest='ips_ebox', type=parse_ip_list, help = 'List of IPs for Ebox. Supports comma-separated IPs, ranges, and bracketed ranges ' '(e.g., "10.10.128.111,10.10.128.120-10.10.128.125" or "10.10.128.[111-115]"). Defaults to None',) parser.add_argument( '-u', '--user', dest='user', default=VMS_USER, help=f'Manually specify VMS username' ) password_parser = parser.add_mutually_exclusive_group(required=False) password_parser.add_argument( '-p', '--password', dest='password', default=VMS_PASSWORD, help=f'Manually specify VMS password' ) password_parser.add_argument( '-P', '--ask-password', '--ask-pass', '--prompt-password', '--prompt-pass', dest='ask_password', action='store_true', default=False, help=f'Ask for password prompt instead of using default or provided password') args = parser.parse_args() if args.ask_password: args.password = get_password() # Initialize the dashboard status with default values #clear_terminal() # Clear the terminal before starting # Set up logging setup_logging(args.log_file, args.verbose) # Start the dashboard update thread start_dashboard() if args.nvrams: DRIVE_TYPE = TypeId.NVRAMType NVRAM_MODE = True DRIVE_TITLE = 'NVRAM' FAST_UPGRADE = True if args.disable_pci_link_reset: PCI_LINK_RESET_ENABLED = False logger.info("PCI link reset (expose_drives.py recovery step) DISABLED by --disable-pci-link-reset flag") if args.upgrade_only: UPGRADE_MODE = True OPERATION = 'FW Upgrade' if args.fast_fw_upgrade: FAST_UPGRADE = True # Log start logger.info("=" * 80) logger.info(f"Starting VastData {DRIVE_TITLE} Power Cycling and Firmware Upgrade Tool") logger.info(f"Version: {__version__}") logger.info(f"Parameters: log_file={args.log_file}, verbose={args.verbose}, force={args.force}, wait_time={args.wait_time}, " f"reset={args.reset}, resume={args.resume}, dry_run={args.dry_run}, fw_upgrade={args.fw_upgrade}, fw_file={args.fw_file}, " f"pci_link_reset={PCI_LINK_RESET_ENABLED}") add_message(f"VastData {DRIVE_TITLE} Power Cycling and Firmware Upgrade Tool started") # Validate conflicting arguments if args.reset and args.resume: error_msg = "Cannot use both --reset and --resume flags together" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 if (args.nvrams and not args.upgrade_only) and not (args.upgrade_only and not args.fw_upgrade) : error_msg = "Cannot --nvrams without specifying --upgrade-only and giving upgrade model with --fw-upgrade" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Validate firmware upgrade arguments global fw_upgrade_model, fw_file_path if args.fw_upgrade or args.fw_file: if not args.fw_upgrade: error_msg = "--fw-upgrade is required when --fw-file is specified" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 if not args.fw_file: error_msg = "--fw-file is required when --fw-upgrade is specified" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 if not os.path.exists(args.fw_file): error_msg = f"Firmware file does not exist: {args.fw_file}" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 fw_upgrade_model = args.fw_upgrade.upper() fw_file_path = args.fw_file logger.info(f"Firmware upgrade enabled for model: {fw_upgrade_model}, firmware file: {fw_file_path}") add_message(f"Firmware upgrade enabled for model: {fw_upgrade_model}") add_message(f"Firmware file: {fw_file_path}") # Ebox-specific validations ebox_mode = False vms_ip = None vms_eboxes = [] local_ebox = None excluded_local_ebox_ip = None if IS_EBOX: # On ebox, only firmware upgrade is supported (not plain power cycle) if not fw_upgrade_model: error_msg = "Power cycling without firmware upgrade is not supported on ebox. Use --fw-upgrade and --fw-file to specify firmware upgrade" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Require --target-fw-version on ebox if not args.target_fw_version: error_msg = "--target-fw-version is required for ebox firmware upgrade (used to determine which drives need upgrading)" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Implicitly set upgrade-only mode for ebox UPGRADE_MODE = True OPERATION = 'Ebox FW Upgrade' ebox_mode = True # Get VMS IP and test API connectivity vms_ip = get_vms_ip() if not vms_ip: error_msg = "Failed to determine VMS management IP" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 logger.info(f"VMS IP: {vms_ip}") add_message(f"VMS IP: {vms_ip}") # Update VMS credentials from args if provided VMS_USER = args.user VMS_PASSWORD = args.password if not test_vms_api(vms_ip): error_msg = "VMS API connectivity/authentication test failed. Check VMS IP and credentials" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 add_message("VMS API connectivity test passed") # Fetch eboxes vms_eboxes = get_vms_eboxes(vms_ip) if not vms_eboxes: error_msg = "Failed to fetch eboxes from VMS API or no eboxes found" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 add_message(f"Found {len(vms_eboxes)} eboxes") # Detect local ebox local_ips = get_local_ips() local_ebox = find_local_ebox(vms_eboxes, local_ips) if local_ebox: local_ebox_name = local_ebox.get('name', 'unknown') local_ebox_dnode_ips = set() for dnode in local_ebox.get('dnode_containers', []): ip = dnode.get('ip', '') if ip: local_ebox_dnode_ips.add(ip) excluded_local_ebox_ip = local_ebox_dnode_ips logger.info(f"Local ebox detected: {local_ebox_name} (dnode IPs: {local_ebox_dnode_ips})") add_message(f"Local ebox: {local_ebox_name}") # Check if --ebox-ips explicitly includes the local ebox if args.ips_ebox: ebox_ips_set = set(args.ips_ebox) if ebox_ips_set & local_ebox_dnode_ips: error_msg = (f"Cannot upgrade the ebox the script is running on ({local_ebox_name}). " f"The specified --ebox-ips include the local ebox IP(s): {ebox_ips_set & local_ebox_dnode_ips}. " f"Run this script from a different ebox to upgrade this one.") logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Reset state if requested if args.reset and os.path.exists(STATE_FILE): logger.info("Resetting state file...") add_message("Resetting state file...") try: if args.dry_run: logger.info(f"[DRY RUN] Would delete state file {STATE_FILE}") add_message(f"[DRY RUN] Would delete state file") else: os.remove(STATE_FILE) logger.info("State file reset successfully") add_message("State file reset successfully") except Exception as e: error_msg = f"Failed to reset state file: {str(e)}" logger.exception(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Check if resuming if args.resume: if os.path.exists(STATE_FILE): logger.info("Resuming from previous state") add_message("Resuming from previous state") else: error_msg = "Cannot resume: No previous state found" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Connect to leader and initialize commander commander = connect_to_commander() if not commander: error_msg = "Failed to connect to leader or initialize commander, exiting" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 add_message("Successfully connected to Vast API") # Display cluster identification (PSNT and system name) try: customer_info = get_cluster(interactive=False) _cluster_name = customer_info.get('cluster_name') or 'Unknown' _cluster_psnt = customer_info.get('psnt') or 'N/A' except Exception as e: logger.warning(f"Failed to retrieve cluster identification: {e}") _cluster_name = 'Unknown' _cluster_psnt = 'N/A' logger.info(f"Cluster: {_cluster_name} | PSNT: {_cluster_psnt}") add_message(f"Cluster: {_cluster_name} | PSNT: {_cluster_psnt}") with dashboard_lock: status_data["cluster_name"] = _cluster_name status_data["cluster_psnt"] = _cluster_psnt # If --load-mapping is specified, load mapping from file if args.load_mapping: mapping_path = args.load_mapping add_message(f"Loading drive mapping from {mapping_path}") try: with open(mapping_path, 'r') as f: mapping = json.load(f) logger.info(f"Loaded mapping for {len(mapping)} drives") add_message(f"Successfully loaded mapping for {len(mapping)} drives") except Exception as e: error_msg = f"Error loading mapping: {str(e)}" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 else: try: # Get mapping of drives to dnodes add_message("Discovering drive mapping...") mapping = map_drives(commander) add_message(f"Found {len(mapping)} drives across all dnodes") except ConnectionLostError as e: logger.critical(f"Leader connection lost during drive discovery: {e}") logger.critical("Check network and retry.") add_message(f"CRITICAL: Leader connection lost during drive discovery - {e}") return 1 # If --save-mapping is specified, save mapping to file if args.save_mapping: mapping_path = args.save_mapping add_message(f"Saving mapping to {mapping_path}") try: with open(mapping_path, 'w') as f: json.dump(mapping, f, indent=2) add_message(f"Successfully saved mapping to {mapping_path}") except Exception as e: error_msg = f"Error saving mapping: {str(e)}" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # If --list is specified, list drives and exit if args.list: add_message("Listing all drives:") # Use the activity section of the dashboard to show the list drive_list = [] for i, (drive_guid, drive_info) in enumerate(sorted(mapping.items(), key=lambda x: x[1]['dbox_name'] + x[1]['dnode_name'])): drive_line = f"{i + 1}. {drive_info['dbox_name']}/{drive_info['dnode_name']}/slot {drive_info['slot']} - {drive_info['drive_serial']}" drive_list.append(drive_line) logger.info(drive_line) # Also log to file # Display in batches of 10 to avoid spamming the terminal for i in range(0, len(drive_list), 10): batch = drive_list[i:i+10] add_message("\n".join(batch)) if i + 10 < len(drive_list): time.sleep(1) # Brief pause between batches return 0 # Filter by firmware model FIRST (before --drives filtering) # This ensures accurate drive counts in confirmation prompts if fw_upgrade_model: original_count = len(mapping) mapping = { guid: info for guid, info in mapping.items() if fw_upgrade_model in info.get('drive_model', '').upper() } filtered_count = len(mapping) logger.info(f"Filtered drives by model {fw_upgrade_model}: {filtered_count}/{original_count} drives match") add_message(f"Filtered to {filtered_count} drives matching model {fw_upgrade_model} (from {original_count} total)") # If --drives is specified, filter mapping to only specified drives if args.drives: if IS_EBOX: logger.error("Filtering based on drives is not supported for EBOX, an entire EBOX needs to be upgraded at once") add_message("ERROR: Filtering based on drives is not supported for EBOX, an entire EBOX needs to be upgraded at once") return 1 drive_list = args.drives.split(',') add_message(f"Filtering drives to include only: {drive_list}") # Support both GUIDs and indices filtered_mapping = {} # First, try to interpret as GUIDs guid_filtered = False for drive_id in drive_list: drive_id = drive_id.strip() # Check if this is a GUID if drive_id in mapping: guid_filtered = True filtered_mapping[drive_id] = mapping[drive_id] logger.info(f"Found drive with GUID {drive_id}") # If no GUIDs matched, try to interpret as indices if not guid_filtered: # Check if the input is a range if '-' in args.drives and len(drive_list) == 1: try: start, end = args.drives.split('-') start, end = int(start), int(end) drive_indices = list(range(start, end + 1)) add_message(f"Using drive range: {start}-{end} (inclusive)") except ValueError: error_msg = f"Invalid drive range: {args.drives}" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 else: try: drive_indices = [int(d) for d in drive_list] add_message(f"Using specified drives: {drive_indices}") except ValueError: error_msg = f"Invalid input: {args.drives} - expected GUIDs or indices" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Filter drives to only include specified indices sorted_drives = sorted(mapping.items(), key=lambda x: x[1]['dbox_name'] + x[1]['dnode_name']) for idx in drive_indices: if 1 <= idx <= len(sorted_drives): drive_guid, drive_info = sorted_drives[idx - 1] filtered_mapping[drive_guid] = drive_info else: logger.warning(f"Drive index {idx} out of range (1-{len(sorted_drives)})") add_message(f"WARNING: Drive index {idx} out of range (1-{len(sorted_drives)})") mapping = filtered_mapping add_message(f"Filtered to {len(mapping)} drives to process") # Filter by ebox IPs if specified if args.ips_ebox: if not IS_EBOX: logger.error("--ebox-ips can only be used on ebox systems") add_message("ERROR: --ebox-ips can only be used on ebox systems") return 1 ebox_ips = set(args.ips_ebox) filtered_mapping = { guid: info for guid, info in mapping.items() if info['dnode_ip'] in ebox_ips } logger.info(f"Filtered by ebox IPs {ebox_ips}: {len(filtered_mapping)} of {len(mapping)} drives") add_message(f"Filtered to {len(filtered_mapping)} drives matching ebox IPs") mapping = filtered_mapping elif ebox_mode and excluded_local_ebox_ip: # Auto-exclude local ebox when no --ebox-ips specified original_count = len(mapping) mapping = { guid: info for guid, info in mapping.items() if info.get('dnode_ip', '') not in excluded_local_ebox_ip } excluded_count = original_count - len(mapping) if excluded_count > 0: local_ebox_name = local_ebox.get('name', 'unknown') if local_ebox else 'unknown' logger.info(f"Auto-excluded {excluded_count} drives from local ebox {local_ebox_name}") add_message(f"Excluded {excluded_count} drives from local ebox {local_ebox_name} (cannot upgrade self)") # Update state with all drives state = load_state() state["mapping"] = mapping state["total_drives"] = len(mapping) if "start_time" not in state: state["start_time"] = time.time() if "completed_drives" not in state: state["completed_drives"] = [] if "current_drive" not in state: state["current_drive"] = None save_state(state) # Save initial denylist snapshot if not args.skip_denylist_check: try: initial_snapshot = fetch_denylist_snapshot(commander) state["denylist_snapshot"] = initial_snapshot save_state(state) add_message("Initial denylist snapshot recorded") except ConnectionLostError as e: logger.critical(f"Leader connection lost during denylist snapshot: {e}") logger.critical("Check network and retry.") add_message(f"CRITICAL: Leader connection lost during denylist check - {e}") return 1 # Save initial state as a backup backup_file = f"{STATE_FILE}.{int(time.time())}.bak" try: with open(backup_file, 'w') as f: json.dump(state, f, indent=2) add_message(f"Created backup state file: {backup_file}") except Exception as e: warning_msg = f"Failed to save backup state: {str(e)}" logger.warning(warning_msg) add_message(f"WARNING: {warning_msg}") # Display the plan on dashboard completed_count = len(state.get("completed_drives", [])) remaining_count = len(mapping) - completed_count add_message(f"Plan: {remaining_count} drives to {OPERATION} ({completed_count} already completed)") # Exit if there are no drives to cycle if not mapping: error_msg = "No drives found, nothing to do" logger.error(error_msg) add_message(f"ERROR: {error_msg}") return 1 # Initialize the dashboard with current status try: check_cluster_state(commander, args.force, args.skip_denylist_check) check_raid_health(commander) except ConnectionLostError as e: logger.critical(f"Leader connection lost during initial health checks: {e}") logger.critical("Check network and retry.") add_message(f"CRITICAL: Leader connection lost during health checks - {e}") return 1 # Process skip-failed-drives list if provided skip_drives = [] if args.skip_failed_drives: skip_drives = [guid.strip() for guid in args.skip_failed_drives.split(',')] add_message(f"Will skip the following drives if failed: {skip_drives}") # Check for node and drive failures to identify actual drives to be processed try: no_failures = check_node_and_drive_failures(commander, skip_drives) except ConnectionLostError as e: logger.critical(f"Leader connection lost during failure check: {e}") logger.critical("Check network and retry.") add_message(f"CRITICAL: Leader connection lost during failure check - {e}") return 1 # Ask for confirmation with accurate count of drives to be processed if not args.yes and not args.dry_run: # Get the actual count of drives that will be processed (excluding skipped drives) actual_drive_count = remaining_count if skip_drives: # Always subtract skipped drives, regardless of failure state # Subtract the number of drives to skip from the total actual_drive_count = remaining_count - len(skip_drives) # In ebox mode, check actual FW versions to get accurate count of drives needing upgrade if ebox_mode and args.target_fw_version and vms_eboxes: logger.info("Pre-scanning ebox drives to determine accurate upgrade count...") add_message("Checking which drives actually need firmware upgrade...") ebox_groups = group_drives_by_ebox(mapping, vms_eboxes) drives_needing_count = 0 for ebox_id, ebox_group in ebox_groups.items(): needs_upgrade = check_ebox_drives_need_upgrade( ebox_group['dnode_ip'], ebox_group['drives'], args.target_fw_version, fw_upgrade_model ) drives_needing_count += len(needs_upgrade) actual_drive_count = drives_needing_count logger.info(f"Pre-scan complete: {drives_needing_count} drives actually need firmware upgrade") # Include initial denylist warning in confirmation prompt operation_description = "power cycle" if fw_upgrade_model and fw_file_path: operation_description = f"upgrade firmware and power cycle" if UPGRADE_MODE: operation_description = f"upgrade firmware" # Build clearer confirmation message with model information if fw_upgrade_model: drive_info_msg = f"{actual_drive_count} {fw_upgrade_model} drives" else: drive_info_msg = f"{actual_drive_count} drives" if not args.skip_denylist_check and state.get("denylist_snapshot"): snapshot = state["denylist_snapshot"] if any(snapshot[key].get(subkey) for key in snapshot for subkey in snapshot[key]): prompt_message = ("WARNING: initial denylist items detected at startup!\n" f"{json.dumps(snapshot, indent=2)}\n" f"About to {operation_description} {drive_info_msg}. Type 'y' to continue or any other key to abort.") else: prompt_message = f"About to {operation_description} {drive_info_msg}. Type 'y' to continue or any other key to abort." else: prompt_message = f"About to {operation_description} {drive_info_msg}. Type 'y' to continue or any other key to abort." if not get_user_confirmation(prompt_message): return 0 # If failures were detected and we're not using --force, exit if not no_failures and not args.force: error_msg = "Node or drive failures detected before starting" logger.error(error_msg) add_message(f"ERROR: {error_msg}") if args.skip_failed_drives: add_message("Some drives are still in failed state even after skipping specified drives") add_message("Use --force to continue anyway or update --skip-failed-drives with additional drive GUIDs") else: add_message("Use --force to continue anyway or --skip-failed-drives to skip specific failed drives") return 1 # Run the main logic if args.dry_run: logger.info("Running in dry-run mode - no changes will be made") add_message("Running in dry-run mode - no changes will be made") if ebox_mode: # Ebox firmware upgrade flow success = cycle_ebox_drives( commander, mapping, vms_ip, vms_eboxes, args.force, args.dry_run, args.skip_denylist_check, fw_upgrade_model, fw_file_path, args.target_fw_version, args.upload_reports ) else: try: success = cycle_drives(commander, args.force, args.dry_run, skip_drives, args.wait_time, args.skip_denylist_check, fw_upgrade_model, fw_file_path, args.target_fw_version, args.upload_reports) except ConnectionLostError as e: logger.critical("=" * 80) logger.critical(f"LEADER CONNECTION LOST: {e}") logger.critical("The TCP connection to the cluster leader was dropped.") logger.critical("State has been saved. Use --resume to continue once the network is stable.") logger.critical("=" * 80) add_message("=" * 80) add_message(f"CRITICAL: Leader connection lost - {e}") add_message("State saved. Run with --resume to continue when network is stable.") add_message("=" * 80) return 1 if success: operation_name = f"{DRIVE_TITLE} {OPERATION}" if fw_upgrade_model and fw_file_path: operation_name = f"{DRIVE_TITLE} firmware upgrade and power cycling" success_msg = f"{'[DRY RUN] ' if args.dry_run else ''}{operation_name} completed successfully" logger.info(success_msg) add_message(f"✅ {success_msg}") # Add firmware version validation warning for NVRAM upgrades if fw_upgrade_model and fw_file_path and NVRAM_MODE and not args.dry_run: logger.info("=" * 80) logger.info("IMPORTANT: Firmware version validation") logger.info("=" * 80) logger.info("Note: 'nvme list' may show cached firmware versions on some VastOS versions.") logger.info("For accurate verification, use: nvme id-ctrl /dev/nvmeXnY | grep ^fr") logger.info("The firmware versions in the report are from 'nvme id-ctrl' and are accurate.") logger.info("=" * 80) add_message("=" * 80) add_message("⚠️ IMPORTANT: Firmware Version Validation") add_message("=" * 80) add_message("Note: 'nvme list' may show cached firmware versions.") add_message("For accurate verification, use: nvme id-ctrl /dev/nvmeXnY | grep ^fr") add_message("The firmware versions in the report are accurate.") add_message("=" * 80) # Ebox: remind user about the excluded local ebox if ebox_mode and local_ebox and excluded_local_ebox_ip: local_ebox_name = local_ebox.get('name', 'unknown') local_ebox_ips_str = ', '.join(sorted(excluded_local_ebox_ip)) if not args.ips_ebox: # Local ebox was auto-excluded — user may not be aware logger.info("=" * 80) logger.info(f"IMPORTANT: The local ebox ({local_ebox_name}) was excluded from this run.") logger.info(f"To upgrade it, run this script from a different ebox with:") logger.info(f" --ebox-ips {local_ebox_ips_str}") logger.info("=" * 80) add_message("=" * 80) add_message(f"NOTE: Local ebox ({local_ebox_name}) was excluded.") add_message(f"To upgrade it, run from another ebox with: --ebox-ips {local_ebox_ips_str}") add_message("=" * 80) else: # User specified --ebox-ips — just a brief note, they know what they're doing logger.info(f"NOTE: Local ebox ({local_ebox_name}, IPs: {local_ebox_ips_str}) was not included in this run. " f"To upgrade it, run from a different ebox with: --ebox-ips {local_ebox_ips_str}") return 0 else: operation_name = f"{DRIVE_TITLE} {OPERATION}" if fw_upgrade_model and fw_file_path: operation_name = f"{DRIVE_TITLE} firmware upgrade and power cycling" error_msg = f"{'[DRY RUN] ' if args.dry_run else ''}{operation_name} failed" logger.error(error_msg) add_message(f"❌ {error_msg}") return 1 def selftest() -> int: """Self-test function for verifying the logging system and dashboard. This function is used for CI testing to ensure the logging and dashboard systems are working correctly after refactoring. Returns: 0 on success, non-zero on failure """ print("Starting self-test of logging and dashboard systems...") # Test 1: Verify logging setup with RotatingFileHandler try: # Set up logging with verbose=True test_log_file = "/tmp/drive_cycle_test.log" setup_logging(test_log_file, verbose=True) # Verify handlers found_rotating_handler = False for handler in logger.handlers: if isinstance(handler, logging.handlers.RotatingFileHandler): found_rotating_handler = True if handler.maxBytes != 20*1024*1024 or handler.backupCount != 5: print("ERROR: RotatingFileHandler doesn't have correct parameters") return 1 if not found_rotating_handler: print("ERROR: RotatingFileHandler not found") return 1 print("✓ Logging setup with RotatingFileHandler verified") except Exception as e: print(f"ERROR in test 1: {str(e)}") return 1 # Test 2: Verify dashboard updates work without exceptions try: # Initialize status_data for dashboard global status_data with dashboard_lock: status_data = { "cluster_state": "ONLINE", "cluster_name": "test-cluster", "cluster_psnt": "TEST-PSNT", "raid_state": { "ssd": "HEALTHY", "nvram": "HEALTHY", "memory": "HEALTHY", "rio_nvram": "HEALTHY" }, "healthy_drives": 10, "total_drives": 10, "healthy_dnodes": 4, "total_dnodes": 4, "messages": [], "last_message": "" } # Start dashboard in separate thread dashboard_thread = threading.Thread(target=update_dashboard_thread, daemon=True) dashboard_thread.start() # Simulate 3 dashboard updates for i in range(3): add_message(f"Test message {i+1}") try: message_queue.put_nowait(("progress", (i+1, 3, time.time(), {"test": f"drive_{i}"}))) except queue.Full: pass time.sleep(1) # Wait to see update print("✓ Dashboard updates verified") except Exception as e: print(f"ERROR in test 2: {str(e)}") return 1 print("All tests passed!") return 0 def format_current_drive_info(drive_info: Dict[str, Any]) -> str: """ Format the current drive information in a nice table format for the dashboard. Args: drive_info: Dictionary with drive information Returns: Formatted string with drive information """ if not drive_info: return "" # Extract key information serial = drive_info.get('drive_serial', 'Unknown') model = drive_info.get('drive_model', 'Unknown') dbox = drive_info.get('dbox_name', 'Unknown') dnode = drive_info.get('dnode_name', 'Unknown') dnode_ip = drive_info.get('dnode_ip', 'Unknown') slot = drive_info.get('slot', 'Unknown') state = drive_info.get('state', 'Unknown') enabled = drive_info.get('enabled', False) chassis_type = drive_info.get('chassis_type', 'Unknown') if hasattr(chassis_type, 'name'): chassis_type = chassis_type.name elif isinstance(chassis_type, str) and chassis_type.startswith('DBoxChassisType.'): chassis_type = chassis_type.replace('DBoxChassisType.', '') # Get a descriptive state that's easier to read state_str = "Unknown" if "ACTIVE" in str(state).upper(): state_str = "ACTIVE" if enabled else "ACTIVE (disabled)" elif "INACTIVE" in str(state).upper(): state_str = "INACTIVE" elif "ACTIVATING" in str(state).upper(): state_str = "ACTIVATING" elif "DEACTIVATING" in str(state).upper(): state_str = "DEACTIVATING" elif "FAILED" in str(state).upper(): state_str = "FAILED" else: # Use the original state if we don't recognize it state_str = get_enum_name(state) # Check global status_data for drive_state if available with dashboard_lock: if "drive_state" in status_data and drive_info.get('drive_guid') in status_data["drive_state"]: state_str = status_data["drive_state"][drive_info.get('drive_guid')] # Create a formatted table for the drive info lines = [ "┌─ Current Drive ───────────────────────────────────────────────────┐", f"│ Serial: {serial:<25} Model: {model:<25} │", f"│ DBox: {dbox:<50} │", f"│ DNode: {dnode:<24} Slot: {slot:<21} │", f"│ DNode IP: {dnode_ip:<23} Chassis: {chassis_type:<21} │", f"│ State: {state_str:<52} │", "└─────────────────────────────────────────────────────────────────────┘" ] return "\n".join(lines) def save_fw_report(drive_mapping): report = f"{'DBox':<30} {'DNode IP':<20} {'Drive Serial':<20} {'Drive model':<20} {'NVME Device':<15} {'FW Rev':<15}" for drive in drive_mapping.values(): if 'fw_revision' in drive: report += f"\n{drive['dbox_name']:<30} {drive['dnode_ip']:<20} {drive['drive_serial']:<20} {drive['drive_model']:<20} {drive.get('nvme_device', '--'):<15} {drive['fw_revision']:<15}" with open('/vast/log/drive_fw_revisions.report', 'w') as f: f.write(report) # Utility functions for denylist snapshot and comparison def fetch_denylist_snapshot(commander: Commander) -> Dict[str, Any]: """ Fetch the current snapshot of all denylists from the system. Returns a dict with all relevant denylist items, serialized for JSON. """ snapshot = {} try: ingest_deny = list(commander.list_objects(TypeId.IngestDenylistObjType))[0] maintenance_deny = list(commander.list_objects(TypeId.MaintenanceDenylistObjType))[0] peer_ip_deny = list(commander.list_objects(TypeId.PeerIPDenylistObjType))[0] proto_handle_deny = list(commander.list_objects(TypeId.ProtoHandleDenylistObjType))[0] # Extract and serialize the relevant lists snapshot['ingest_deny'] = { 'maintenance_items': [str(item) for item in getattr(ingest_deny.denylist, 'maintenance_items', [])], 'handles_items': [str(item) for item in getattr(ingest_deny.denylist, 'handles_items', [])], } snapshot['maintenance_deny'] = { 'items': [str(item) for item in getattr(maintenance_deny.denylist, 'items', [])], } snapshot['peer_ip_deny'] = { 'peer_items': [str(item) for item in getattr(peer_ip_deny.denylist, 'peer_items', [])], } snapshot['proto_handle_deny'] = { 'proto_handle_items': [str(item) for item in getattr(proto_handle_deny.denylist, 'proto_handle_items', [])], } except ConnectionLostError: raise except Exception as e: _raise_if_connection_lost(e) logger.warning(f"Could not fetch all denylists: {e}") return snapshot def compare_denylist_snapshots(old: Dict[str, Any], new: Dict[str, Any]) -> Dict[str, Any]: """ Compare two denylist snapshots. Returns a dict of new items found. """ changes = {} for key in new: changes[key] = {} for subkey in new[key]: old_set = set(old.get(key, {}).get(subkey, [])) new_set = set(new[key][subkey]) added = list(new_set - old_set) if added: changes[key][subkey] = added # Remove empty entries return {k: v for k, v in changes.items() if any(v.values())} if __name__ == "__main__": # If --selftest is passed, run the self-test instead of main if "--selftest" in sys.argv: sys.exit(selftest()) else: exit_code=main() time.sleep(2) sys.exit(exit_code)