S7_snap7_Stremer_n_Recorder/backmanager.py

408 lines
15 KiB
Python

"""
Backend Manager - PLC S7-315 Streamer Watchdog Service
This script monitors the backend health and automatically restarts it when needed.
It runs as a separate process and ensures the backend is always available.
Key features:
- Health monitoring every 30 seconds
- Automatic restart of failed backends
- Support for both development (main.py) and production (exe) environments
- Robust process management and cleanup
- Logging and status reporting
"""
import os
import sys
import time
import json
import psutil
import requests
import subprocess
import threading
import logging
from datetime import datetime
from typing import Optional, Dict, Any
class BackendManager:
"""Manages backend lifecycle and health monitoring"""
def __init__(
self,
check_interval: int = 30,
health_timeout: float = 5.0,
restart_delay: int = 10,
max_restart_attempts: int = 3,
restart_cooldown: int = 300,
):
"""
Initialize the backend manager
Args:
check_interval: Health check interval in seconds (default: 30)
health_timeout: HTTP request timeout in seconds (default: 5.0)
restart_delay: Delay before restart attempt in seconds (default: 10)
max_restart_attempts: Maximum consecutive restart attempts (default: 3)
restart_cooldown: Cooldown period after max attempts in seconds (default: 300)
"""
self.check_interval = check_interval
self.health_timeout = health_timeout
self.restart_delay = restart_delay
self.max_restart_attempts = max_restart_attempts
self.restart_cooldown = restart_cooldown
# Configuration
self.backend_port = 5050
self.health_endpoint = "/api/health"
self.base_url = f"http://localhost:{self.backend_port}"
self.lock_file = "plc_streamer.lock"
self.status_file = "backend_manager.status"
# State tracking
self.restart_count = 0
self.last_restart_time = 0
self.backend_process = None
self.running = True
# Setup logging
self.setup_logging()
# Detect environment
self.is_packaged = getattr(sys, "frozen", False)
self.log(f"[MAIN] Backend Manager initialized")
self.log(f"[CONFIG] Check interval: {check_interval}s")
self.log(
f"[CONFIG] Environment: {'Packaged' if self.is_packaged else 'Development'}"
)
self.log(f"[CONFIG] Process separation: Independent cmd windows")
def setup_logging(self):
"""Setup logging configuration"""
log_format = "%(asctime)s [%(levelname)s] %(message)s"
# Configure file handler with UTF-8 encoding
file_handler = logging.FileHandler("backend_manager.log", encoding="utf-8")
file_handler.setFormatter(logging.Formatter(log_format))
# Configure console handler with UTF-8 encoding
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(log_format))
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[file_handler, console_handler],
)
self.logger = logging.getLogger(__name__)
def log(self, message: str, level: str = "INFO"):
"""Log message with appropriate level"""
if level == "ERROR":
self.logger.error(message)
elif level == "WARN":
self.logger.warning(message)
else:
self.logger.info(message)
def get_backend_command(self) -> list:
"""Get the appropriate backend command for current environment (legacy - kept for compatibility)"""
if self.is_packaged:
# In packaged environment, look for the exe
exe_path = os.path.join(
os.path.dirname(sys.executable), "S7_Streamer_Logger.exe"
)
if os.path.exists(exe_path):
return [exe_path]
else:
# Fallback to exe in current directory
exe_path = "S7_Streamer_Logger.exe"
return [exe_path]
else:
# In development environment, use conda environment
# Try to detect if we're in snap7v12 environment
conda_env_python = r"C:\Users\migue\miniconda3\envs\snap7v12\python.exe"
if os.path.exists(conda_env_python):
main_script = os.path.join(os.path.dirname(__file__), "main.py")
return [conda_env_python, main_script]
else:
# Fallback to current python
python_exe = sys.executable
main_script = os.path.join(os.path.dirname(__file__), "main.py")
return [python_exe, main_script]
def is_backend_alive(self) -> bool:
"""Check if backend is responding to health checks"""
try:
response = requests.get(
f"{self.base_url}{self.health_endpoint}", timeout=self.health_timeout
)
return 200 <= response.status_code < 300
except (
requests.RequestException,
requests.ConnectionError,
requests.Timeout,
requests.ConnectTimeout,
):
return False
except Exception as e:
self.log(f"[ERROR] Unexpected error during health check: {e}", "ERROR")
return False
def get_backend_pid(self) -> Optional[int]:
"""Get backend PID from lock file"""
try:
if os.path.exists(self.lock_file):
with open(self.lock_file, "r") as f:
return int(f.read().strip())
except (ValueError, FileNotFoundError, IOError):
pass
return None
def is_backend_process_running(self, pid: int) -> bool:
"""Check if backend process is actually running"""
try:
if not psutil.pid_exists(pid):
return False
proc = psutil.Process(pid)
cmdline = " ".join(proc.cmdline()).lower()
# Check for backend signatures
signatures = ["main.py", "s7_streamer_logger", "plc_streamer"]
return any(sig in cmdline for sig in signatures)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return False
def cleanup_zombie_process(self, pid: int) -> bool:
"""Terminate zombie backend process"""
try:
if not psutil.pid_exists(pid):
return True
proc = psutil.Process(pid)
self.log(f"[STOP] Terminating zombie process {pid} ({proc.name()})")
# Try graceful termination
proc.terminate()
try:
proc.wait(timeout=10)
self.log(f"[OK] Process {pid} terminated gracefully")
return True
except psutil.TimeoutExpired:
# Force kill
self.log(f"[FORCE] Force killing process {pid}")
proc.kill()
proc.wait(timeout=5)
self.log(f"[KILL] Process {pid} force killed")
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
return True
except Exception as e:
self.log(f"[ERROR] Error terminating process {pid}: {e}", "ERROR")
return False
def cleanup_lock_file(self):
"""Remove stale lock file"""
try:
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
self.log(f"[OK] Removed lock file: {self.lock_file}")
except Exception as e:
self.log(f"[ERROR] Error removing lock file: {e}", "ERROR")
def get_cmd_command(self) -> str:
"""Get Windows cmd command to launch backend in separate console window"""
if self.is_packaged:
# In packaged environment, launch exe in new cmd window
exe_path = os.path.join(
os.path.dirname(sys.executable), "S7_Streamer_Logger.exe"
)
if os.path.exists(exe_path):
return f'start "S7_Streamer_Logger" "{exe_path}"'
else:
# Fallback to exe in current directory
return 'start "S7_Streamer_Logger" "S7_Streamer_Logger.exe"'
else:
# In development environment, launch python script in new cmd window
conda_env_python = r"C:\Users\migue\miniconda3\envs\snap7v12\python.exe"
if os.path.exists(conda_env_python):
main_script = os.path.join(os.path.dirname(__file__), "main.py")
return f'start "PLC_Backend" "{conda_env_python}" "{main_script}"'
else:
# Fallback to current python
python_exe = sys.executable
main_script = os.path.join(os.path.dirname(__file__), "main.py")
return f'start "PLC_Backend" "{python_exe}" "{main_script}"'
def start_backend(self) -> bool:
"""Start the backend process in a separate Windows cmd console"""
try:
cmd_command = self.get_cmd_command()
self.log(f"[START] Starting backend in separate cmd window: {cmd_command}")
# Launch backend in completely separate cmd window using shell command
self.backend_process = subprocess.Popen(
cmd_command,
cwd=os.path.dirname(__file__) if not self.is_packaged else None,
shell=True, # Use shell to properly handle the start command
)
self.log(
f"[START] Backend launch command executed with PID: {self.backend_process.pid}"
)
# Wait a moment for the actual backend to start in its new window
self.log(
f"[WAIT] Waiting 10 seconds for backend to initialize in separate window..."
)
time.sleep(10)
# The subprocess.Popen PID is just the cmd launcher, not the actual backend
# We'll verify health via HTTP instead of process tracking
self.log(f"[OK] Backend launch completed, will verify via health check")
return True
except Exception as e:
self.log(f"[ERROR] Error starting backend: {e}", "ERROR")
return False
def handle_backend_failure(self) -> bool:
"""Handle backend failure and attempt restart"""
current_time = time.time()
# Check if we're in cooldown period
if (current_time - self.last_restart_time) < self.restart_cooldown:
time_left = self.restart_cooldown - (current_time - self.last_restart_time)
self.log(f"[WAIT] In cooldown period, {int(time_left)}s remaining")
return False
# Check restart attempt limit
if self.restart_count >= self.max_restart_attempts:
self.log(
f"[FAIL] Maximum restart attempts ({self.max_restart_attempts}) reached"
)
self.restart_count = 0
self.last_restart_time = current_time
return False
# Cleanup existing processes
backend_pid = self.get_backend_pid()
if backend_pid and self.is_backend_process_running(backend_pid):
self.log(f"[STOP] Cleaning up zombie backend process: {backend_pid}")
self.cleanup_zombie_process(backend_pid)
self.cleanup_lock_file()
# Wait before restart
self.log(
f"[WAIT] Waiting {self.restart_delay}s before restart attempt {self.restart_count + 1}"
)
time.sleep(self.restart_delay)
# Attempt restart
self.restart_count += 1
if self.start_backend():
self.log(
f"[OK] Backend restarted successfully (attempt {self.restart_count})"
)
self.restart_count = 0 # Reset counter on success
return True
else:
self.log(
f"[FAIL] Backend restart failed (attempt {self.restart_count})", "ERROR"
)
return False
def update_status(self, status: str, details: Dict[str, Any] = None):
"""Update status file with current state"""
try:
status_data = {
"timestamp": datetime.now().isoformat(),
"status": status,
"restart_count": self.restart_count,
"last_restart": self.last_restart_time,
"backend_pid": self.get_backend_pid(),
"manager_pid": os.getpid(),
"details": details or {},
}
with open(self.status_file, "w") as f:
json.dump(status_data, f, indent=2)
except Exception as e:
self.log(f"[ERROR] Error updating status file: {e}", "ERROR")
def run(self):
"""Main monitoring loop"""
self.log(f"[START] Backend Manager started (PID: {os.getpid()})")
self.update_status("starting")
while self.running:
try:
# Check backend health
if self.is_backend_alive():
self.log(f"[OK] Backend is healthy")
self.update_status("healthy")
self.restart_count = (
0 # Reset restart counter on successful health check
)
else:
self.log(f"[WARN] Backend health check failed", "WARN")
self.update_status("unhealthy")
# Attempt to handle the failure
if self.handle_backend_failure():
self.update_status("restarted")
else:
self.update_status("failed")
# Wait for next check
time.sleep(self.check_interval)
except KeyboardInterrupt:
self.log(f"[SHUTDOWN] Received interrupt signal")
self.running = False
break
except Exception as e:
self.log(f"[ERROR] Unexpected error in main loop: {e}", "ERROR")
self.update_status("error", {"error": str(e)})
time.sleep(self.check_interval)
self.shutdown()
def shutdown(self):
"""Cleanup and shutdown"""
self.log(f"[SHUTDOWN] Backend Manager shutting down")
self.update_status("shutting_down")
# Don't terminate any backend processes - they run independently in their own cmd windows
# The manager only monitors health, doesn't control the backend lifecycle directly
self.log(
f"[OK] Backend Manager stopped - backend continues running independently"
)
self.update_status("stopped")
def main():
"""Main entry point"""
print("Backend Manager - PLC S7-315 Streamer Watchdog")
print("=" * 50)
try:
manager = BackendManager()
manager.run()
except KeyboardInterrupt:
print("\n[SHUTDOWN] Backend Manager interrupted by user")
except Exception as e:
print(f"[ERROR] Critical error: {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())