""" Backend Manager - PLC S7-315 Streamer Watchdog Service This script monitors the backend health and automatically restarts it when needed. It runs as a separate process and ensures the backend is always available. Key features: - Health monitoring every 30 seconds - Automatic restart of failed backends - Support for both development (main.py) and production (exe) environments - Robust process management and cleanup - Logging and status reporting """ import os import sys import time import json import psutil import requests import subprocess import threading import logging from datetime import datetime from typing import Optional, Dict, Any class BackendManager: """Manages backend lifecycle and health monitoring""" def __init__( self, check_interval: int = 30, health_timeout: float = 5.0, restart_delay: int = 10, max_restart_attempts: int = 3, restart_cooldown: int = 300, ): """ Initialize the backend manager Args: check_interval: Health check interval in seconds (default: 30) health_timeout: HTTP request timeout in seconds (default: 5.0) restart_delay: Delay before restart attempt in seconds (default: 10) max_restart_attempts: Maximum consecutive restart attempts (default: 3) restart_cooldown: Cooldown period after max attempts in seconds (default: 300) """ self.check_interval = check_interval self.health_timeout = health_timeout self.restart_delay = restart_delay self.max_restart_attempts = max_restart_attempts self.restart_cooldown = restart_cooldown # Configuration self.backend_port = 5050 self.health_endpoint = "/api/health" self.base_url = f"http://localhost:{self.backend_port}" self.lock_file = "plc_streamer.lock" self.status_file = "backend_manager.status" # State tracking self.restart_count = 0 self.last_restart_time = 0 self.backend_process = None self.running = True # Setup logging self.setup_logging() # Detect environment self.is_packaged = getattr(sys, "frozen", False) self.log(f"[MAIN] Backend Manager initialized") self.log(f"[CONFIG] Check interval: {check_interval}s") self.log( f"[CONFIG] Environment: {'Packaged' if self.is_packaged else 'Development'}" ) self.log(f"[CONFIG] Process separation: Independent cmd windows") def setup_logging(self): """Setup logging configuration""" log_format = "%(asctime)s [%(levelname)s] %(message)s" # Configure file handler with UTF-8 encoding file_handler = logging.FileHandler("backend_manager.log", encoding="utf-8") file_handler.setFormatter(logging.Formatter(log_format)) # Configure console handler with UTF-8 encoding console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(logging.Formatter(log_format)) logging.basicConfig( level=logging.INFO, format=log_format, handlers=[file_handler, console_handler], ) self.logger = logging.getLogger(__name__) def log(self, message: str, level: str = "INFO"): """Log message with appropriate level""" if level == "ERROR": self.logger.error(message) elif level == "WARN": self.logger.warning(message) else: self.logger.info(message) def get_backend_command(self) -> list: """Get the appropriate backend command for current environment (legacy - kept for compatibility)""" if self.is_packaged: # In packaged environment, look for the exe exe_path = os.path.join( os.path.dirname(sys.executable), "S7_Streamer_Logger.exe" ) if os.path.exists(exe_path): return [exe_path] else: # Fallback to exe in current directory exe_path = "S7_Streamer_Logger.exe" return [exe_path] else: # In development environment, use conda environment # Try to detect if we're in snap7v12 environment conda_env_python = r"C:\Users\migue\miniconda3\envs\snap7v12\python.exe" if os.path.exists(conda_env_python): main_script = os.path.join(os.path.dirname(__file__), "main.py") return [conda_env_python, main_script] else: # Fallback to current python python_exe = sys.executable main_script = os.path.join(os.path.dirname(__file__), "main.py") return [python_exe, main_script] def is_backend_alive(self) -> bool: """Check if backend is responding to health checks""" try: response = requests.get( f"{self.base_url}{self.health_endpoint}", timeout=self.health_timeout ) return 200 <= response.status_code < 300 except ( requests.RequestException, requests.ConnectionError, requests.Timeout, requests.ConnectTimeout, ): return False except Exception as e: self.log(f"[ERROR] Unexpected error during health check: {e}", "ERROR") return False def get_backend_pid(self) -> Optional[int]: """Get backend PID from lock file""" try: if os.path.exists(self.lock_file): with open(self.lock_file, "r") as f: return int(f.read().strip()) except (ValueError, FileNotFoundError, IOError): pass return None def is_backend_process_running(self, pid: int) -> bool: """Check if backend process is actually running""" try: if not psutil.pid_exists(pid): return False proc = psutil.Process(pid) cmdline = " ".join(proc.cmdline()).lower() # Check for backend signatures signatures = ["main.py", "s7_streamer_logger", "plc_streamer"] return any(sig in cmdline for sig in signatures) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): return False def cleanup_zombie_process(self, pid: int) -> bool: """Terminate zombie backend process""" try: if not psutil.pid_exists(pid): return True proc = psutil.Process(pid) self.log(f"[STOP] Terminating zombie process {pid} ({proc.name()})") # Try graceful termination proc.terminate() try: proc.wait(timeout=10) self.log(f"[OK] Process {pid} terminated gracefully") return True except psutil.TimeoutExpired: # Force kill self.log(f"[FORCE] Force killing process {pid}") proc.kill() proc.wait(timeout=5) self.log(f"[KILL] Process {pid} force killed") return True except (psutil.NoSuchProcess, psutil.AccessDenied): return True except Exception as e: self.log(f"[ERROR] Error terminating process {pid}: {e}", "ERROR") return False def cleanup_lock_file(self): """Remove stale lock file""" try: if os.path.exists(self.lock_file): os.remove(self.lock_file) self.log(f"[OK] Removed lock file: {self.lock_file}") except Exception as e: self.log(f"[ERROR] Error removing lock file: {e}", "ERROR") def get_cmd_command(self) -> str: """Get Windows cmd command to launch backend in separate console window""" if self.is_packaged: # In packaged environment, launch exe in new cmd window exe_path = os.path.join( os.path.dirname(sys.executable), "S7_Streamer_Logger.exe" ) if os.path.exists(exe_path): return f'start "S7_Streamer_Logger" "{exe_path}"' else: # Fallback to exe in current directory return 'start "S7_Streamer_Logger" "S7_Streamer_Logger.exe"' else: # In development environment, launch python script in new cmd window conda_env_python = r"C:\Users\migue\miniconda3\envs\snap7v12\python.exe" if os.path.exists(conda_env_python): main_script = os.path.join(os.path.dirname(__file__), "main.py") return f'start "PLC_Backend" "{conda_env_python}" "{main_script}"' else: # Fallback to current python python_exe = sys.executable main_script = os.path.join(os.path.dirname(__file__), "main.py") return f'start "PLC_Backend" "{python_exe}" "{main_script}"' def start_backend(self) -> bool: """Start the backend process in a separate Windows cmd console""" try: cmd_command = self.get_cmd_command() self.log(f"[START] Starting backend in separate cmd window: {cmd_command}") # Launch backend in completely separate cmd window using shell command self.backend_process = subprocess.Popen( cmd_command, cwd=os.path.dirname(__file__) if not self.is_packaged else None, shell=True, # Use shell to properly handle the start command ) self.log( f"[START] Backend launch command executed with PID: {self.backend_process.pid}" ) # Wait a moment for the actual backend to start in its new window self.log( f"[WAIT] Waiting 10 seconds for backend to initialize in separate window..." ) time.sleep(10) # The subprocess.Popen PID is just the cmd launcher, not the actual backend # We'll verify health via HTTP instead of process tracking self.log(f"[OK] Backend launch completed, will verify via health check") return True except Exception as e: self.log(f"[ERROR] Error starting backend: {e}", "ERROR") return False def handle_backend_failure(self) -> bool: """Handle backend failure and attempt restart""" current_time = time.time() # Check if we're in cooldown period if (current_time - self.last_restart_time) < self.restart_cooldown: time_left = self.restart_cooldown - (current_time - self.last_restart_time) self.log(f"[WAIT] In cooldown period, {int(time_left)}s remaining") return False # Check restart attempt limit if self.restart_count >= self.max_restart_attempts: self.log( f"[FAIL] Maximum restart attempts ({self.max_restart_attempts}) reached" ) self.restart_count = 0 self.last_restart_time = current_time return False # Cleanup existing processes backend_pid = self.get_backend_pid() if backend_pid and self.is_backend_process_running(backend_pid): self.log(f"[STOP] Cleaning up zombie backend process: {backend_pid}") self.cleanup_zombie_process(backend_pid) self.cleanup_lock_file() # Wait before restart self.log( f"[WAIT] Waiting {self.restart_delay}s before restart attempt {self.restart_count + 1}" ) time.sleep(self.restart_delay) # Attempt restart self.restart_count += 1 if self.start_backend(): self.log( f"[OK] Backend restarted successfully (attempt {self.restart_count})" ) self.restart_count = 0 # Reset counter on success return True else: self.log( f"[FAIL] Backend restart failed (attempt {self.restart_count})", "ERROR" ) return False def update_status(self, status: str, details: Dict[str, Any] = None): """Update status file with current state""" try: status_data = { "timestamp": datetime.now().isoformat(), "status": status, "restart_count": self.restart_count, "last_restart": self.last_restart_time, "backend_pid": self.get_backend_pid(), "manager_pid": os.getpid(), "details": details or {}, } with open(self.status_file, "w") as f: json.dump(status_data, f, indent=2) except Exception as e: self.log(f"[ERROR] Error updating status file: {e}", "ERROR") def run(self): """Main monitoring loop""" self.log(f"[START] Backend Manager started (PID: {os.getpid()})") self.update_status("starting") while self.running: try: # Check backend health if self.is_backend_alive(): self.log(f"[OK] Backend is healthy") self.update_status("healthy") self.restart_count = ( 0 # Reset restart counter on successful health check ) else: self.log(f"[WARN] Backend health check failed", "WARN") self.update_status("unhealthy") # Attempt to handle the failure if self.handle_backend_failure(): self.update_status("restarted") else: self.update_status("failed") # Wait for next check time.sleep(self.check_interval) except KeyboardInterrupt: self.log(f"[SHUTDOWN] Received interrupt signal") self.running = False break except Exception as e: self.log(f"[ERROR] Unexpected error in main loop: {e}", "ERROR") self.update_status("error", {"error": str(e)}) time.sleep(self.check_interval) self.shutdown() def shutdown(self): """Cleanup and shutdown""" self.log(f"[SHUTDOWN] Backend Manager shutting down") self.update_status("shutting_down") # Don't terminate any backend processes - they run independently in their own cmd windows # The manager only monitors health, doesn't control the backend lifecycle directly self.log( f"[OK] Backend Manager stopped - backend continues running independently" ) self.update_status("stopped") def main(): """Main entry point""" print("Backend Manager - PLC S7-315 Streamer Watchdog") print("=" * 50) try: manager = BackendManager() manager.run() except KeyboardInterrupt: print("\n[SHUTDOWN] Backend Manager interrupted by user") except Exception as e: print(f"[ERROR] Critical error: {e}") return 1 return 0 if __name__ == "__main__": sys.exit(main())