feat: Implement robust backend instance management with HTTP health checks and PID verification

- Added InstanceManager class for managing backend instances.
- Introduced double HTTP health checks with configurable intervals.
- Implemented PID-based verification and cleanup of stale processes.
- Enhanced instance initialization and cleanup processes.
- Updated main.py to utilize the new instance management system.
- Modified system_state.json to reflect changes in active datasets and last update timestamp.
This commit is contained in:
Miguel 2025-08-22 14:13:07 +02:00
parent 88a6b805be
commit ee6918445e
4 changed files with 2594 additions and 2931 deletions

File diff suppressed because it is too large Load Diff

126
main.py
View File

@ -10,6 +10,7 @@ import json
import time
import signal
import sys
import requests # For HTTP health checks
from datetime import datetime, timedelta, timezone
import os
import logging
@ -50,56 +51,53 @@ from core.historical_cache import HistoricalDataCache
from utils.json_manager import JSONManager, SchemaManager
from utils.symbol_loader import SymbolLoader
from utils.symbol_processor import SymbolProcessor
from utils.instance_manager import InstanceManager
def check_for_running_instance_early():
def check_backend_instance_robust(port: int = 5050, lock_file: str = "plc_streamer.lock"):
"""
Optional early check for running instance before initializing PLCDataStreamer.
This provides faster feedback to the user without going through full initialization.
🔒 ROBUST INSTANCE CHECK - HTTP + PID based verification
This function provides a more reliable way to detect existing backend instances:
1. Double HTTP health check with 5-second interval
2. PID verification and zombie process cleanup
3. Automatic lock file management
Args:
port: Backend server port (default: 5050)
lock_file: Lock file path (default: "plc_streamer.lock")
Returns:
Tuple[bool, str]: (can_proceed, message)
- can_proceed: True if this instance can start safely
- message: Detailed status message
"""
import psutil
lock_file = "plc_streamer.lock"
if not os.path.exists(lock_file):
return True # No lock file, safe to proceed
print("🔍 Starting robust backend instance verification...")
try:
with open(lock_file, "r") as f:
old_pid = int(f.read().strip())
if psutil.pid_exists(old_pid):
proc = psutil.Process(old_pid)
cmdline = " ".join(proc.cmdline())
# Check if it's really our application
if (
("main.py" in cmdline and "S7_snap7_Stremer_n_Log" in cmdline)
or ("plc_streamer" in cmdline.lower())
or ("PLCDataStreamer" in cmdline)
):
print(f"🚫 Another instance is already running (PID: {old_pid})")
print(f"📋 Process: {proc.name()}")
print(f"💻 Command: {cmdline}")
return False
# Process not running or different process, remove stale lock
os.remove(lock_file)
print(f"🧹 Removed stale lock file")
return True
except (ValueError, psutil.NoSuchProcess, psutil.AccessDenied, FileNotFoundError):
# Invalid or inaccessible, remove lock file if exists
if os.path.exists(lock_file):
try:
os.remove(lock_file)
print(f"🧹 Removed invalid lock file")
except:
pass
return True
# Initialize instance manager
instance_manager = InstanceManager(port=port, lock_file=lock_file)
# Perform comprehensive instance check
can_proceed, message = instance_manager.check_and_handle_existing_instance()
if can_proceed:
print(f"{message}")
print("🔒 Initializing new backend instance...")
# Create lock file for this instance
if not instance_manager.initialize_instance():
return False, "❌ Failed to create instance lock file"
return True, "✅ Backend instance ready to start"
else:
print(f"🚫 {message}")
return False, message
except Exception as e:
print(f"⚠️ Error checking instance: {e}")
return True # On error, allow to proceed
error_msg = f"❌ Error during instance verification: {e}"
print(error_msg)
return False, error_msg
app = Flask(__name__)
@ -3197,7 +3195,7 @@ def stream_status():
def graceful_shutdown():
"""Perform graceful shutdown"""
"""Perform graceful shutdown with robust instance cleanup"""
print("\n⏹️ Performing graceful shutdown...")
try:
if streamer is not None:
@ -3223,6 +3221,26 @@ def graceful_shutdown():
else:
print("⚠️ Streamer not initialized, skipping shutdown steps")
# 🔒 ROBUST CLEANUP: Use instance manager for reliable lock file cleanup
print("🧹 Cleaning up instance lock file...")
try:
instance_manager = InstanceManager(port=5050, lock_file="plc_streamer.lock")
if instance_manager.cleanup_instance():
print("✅ Instance lock file cleaned up successfully")
else:
print("⚠️ Warning: Instance lock file cleanup had issues")
except Exception as cleanup_error:
print(f"⚠️ Error during instance cleanup: {cleanup_error}")
# Fallback to direct file removal
try:
import os
lock_file = "plc_streamer.lock"
if os.path.exists(lock_file):
os.remove(lock_file)
print(f"🧹 Emergency cleanup: Removed lock file directly")
except:
pass # Silent fail for emergency cleanup
print("📝 Closing rotating logger system...")
# 📝 Close rotating logger system
backend_logger.close()
@ -3928,16 +3946,19 @@ if __name__ == "__main__":
print(f"🚀 Starting PLC S7-315 Streamer & Logger...")
print(f"🐍 Process PID: {os.getpid()}")
# 🔍 OPTIONAL: Early check for existing instance (faster feedback)
# Comment out the next 4 lines if you prefer the full error handling in PLCDataStreamer
if not check_for_running_instance_early():
print("❌ Startup aborted due to existing instance")
# input("Press Enter to exit...")
# <20> ROBUST INSTANCE CHECK - HTTP + PID based verification
print("=" * 60)
can_proceed, check_message = check_backend_instance_robust(port=5050)
print("=" * 60)
if not can_proceed:
print(f"❌ Startup aborted: {check_message}")
print("💡 Tip: If you believe this is an error, check Task Manager for python.exe processes")
# input("\nPress Enter to exit...")
sys.exit(1)
try:
# Initialize streamer instance with instance check
print("✅ No conflicting instances found (early check)")
# Initialize streamer instance
print("🔧 Initializing PLCDataStreamer...")
streamer = PLCDataStreamer()
@ -3946,6 +3967,7 @@ if __name__ == "__main__":
historical_cache = HistoricalDataCache(backend_logger)
print("✅ Backend initialization complete")
print(f"🌐 Starting Flask server on port 5050...")
main()
except RuntimeError as e:

View File

@ -4,11 +4,11 @@
"should_stream": false,
"active_datasets": [
"DAR",
"Fast",
"Test"
"Test",
"Fast"
]
},
"auto_recovery_enabled": true,
"last_update": "2025-08-22T12:14:57.462145",
"last_update": "2025-08-22T14:03:25.041057",
"plotjuggler_path": "C:\\Program Files\\PlotJuggler\\plotjuggler.exe"
}

283
utils/instance_manager.py Normal file
View File

@ -0,0 +1,283 @@
"""
🔒 Instance Manager - Robust backend instance control system
This module provides a reliable way to manage backend instances using:
1. HTTP health check on the backend port
2. PID-based verification and cleanup
3. Graceful termination of zombie processes
Key features:
- Double health check with 5-second intervals for reliability
- Automatic cleanup of stale lock files
- Force termination of unresponsive processes
- Thread-safe operations
"""
import os
import sys
import time
import json
import psutil
import requests
from typing import Optional, Tuple
class InstanceManager:
"""Manages backend instance lifecycle and prevents duplicate executions"""
def __init__(self,
port: int = 5050,
lock_file: str = "plc_streamer.lock",
health_endpoint: str = "/api/health",
check_timeout: float = 3.0,
check_interval: float = 5.0):
"""
Initialize the instance manager
Args:
port: Backend server port to check
lock_file: Path to the PID lock file
health_endpoint: HTTP endpoint for health checks
check_timeout: Timeout for each HTTP request (seconds)
check_interval: Time between double-checks (seconds)
"""
self.port = port
self.lock_file = lock_file
self.health_endpoint = health_endpoint
self.check_timeout = check_timeout
self.check_interval = check_interval
self.base_url = f"http://localhost:{port}"
def is_backend_alive_http(self) -> bool:
"""
Check if backend is alive via HTTP health check
Returns:
True if backend responds to health check, False otherwise
"""
try:
response = requests.get(
f"{self.base_url}{self.health_endpoint}",
timeout=self.check_timeout
)
# Accept any successful HTTP response (200-299)
return 200 <= response.status_code < 300
except (requests.RequestException, requests.ConnectionError,
requests.Timeout, requests.ConnectTimeout):
return False
except Exception as e:
print(f"⚠️ Unexpected error during health check: {e}")
return False
def get_lock_file_pid(self) -> Optional[int]:
"""
Read PID from lock file
Returns:
PID if lock file exists and is valid, None otherwise
"""
if not os.path.exists(self.lock_file):
return None
try:
with open(self.lock_file, "r") as f:
content = f.read().strip()
return int(content) if content else None
except (ValueError, FileNotFoundError, IOError):
return None
def is_process_our_backend(self, pid: int) -> bool:
"""
Verify if the process with given PID is our backend application
Args:
pid: Process ID to check
Returns:
True if it's our backend process, False otherwise
"""
try:
if not psutil.pid_exists(pid):
return False
proc = psutil.Process(pid)
cmdline = " ".join(proc.cmdline()).lower()
# Check for our application signatures
backend_signatures = [
"main.py",
"s7_snap7_streamer_n_log",
"plc_streamer",
"plcdatastreamer"
]
return any(sig in cmdline for sig in backend_signatures)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return False
except Exception as e:
print(f"⚠️ Error checking process {pid}: {e}")
return False
def terminate_process_safely(self, pid: int) -> bool:
"""
Safely terminate a process
Args:
pid: Process ID to terminate
Returns:
True if process was terminated successfully, False otherwise
"""
try:
if not psutil.pid_exists(pid):
return True # Already gone
proc = psutil.Process(pid)
print(f"🛑 Attempting to terminate process {pid} ({proc.name()})...")
# Try graceful termination first
proc.terminate()
# Wait up to 10 seconds for graceful shutdown
try:
proc.wait(timeout=10)
print(f"✅ Process {pid} terminated gracefully")
return True
except psutil.TimeoutExpired:
# Force kill if graceful didn't work
print(f"⚡ Force killing process {pid}...")
proc.kill()
proc.wait(timeout=5)
print(f"💥 Process {pid} force killed")
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
return True # Process already gone or no permission
except Exception as e:
print(f"❌ Error terminating process {pid}: {e}")
return False
def cleanup_lock_file(self) -> bool:
"""
Remove the lock file
Returns:
True if lock file was removed or didn't exist, False on error
"""
try:
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
print(f"🧹 Removed lock file: {self.lock_file}")
return True
except Exception as e:
print(f"❌ Error removing lock file: {e}")
return False
def create_lock_file(self) -> bool:
"""
Create lock file with current process PID
Returns:
True if lock file was created successfully, False otherwise
"""
try:
with open(self.lock_file, "w") as f:
f.write(str(os.getpid()))
print(f"🔒 Created lock file: {self.lock_file} (PID: {os.getpid()})")
return True
except Exception as e:
print(f"❌ Error creating lock file: {e}")
return False
def check_and_handle_existing_instance(self) -> Tuple[bool, str]:
"""
Main method: Check for existing instances and handle them
Returns:
Tuple of (can_proceed, message)
- can_proceed: True if this instance can start, False if should exit
- message: Description of what happened
"""
print("🔍 Checking for existing backend instances...")
# Step 1: First HTTP health check
print("📡 Performing first health check...")
if self.is_backend_alive_http():
return False, f"❌ Another backend is already running on port {self.port}"
print(f"⏳ Waiting {self.check_interval} seconds for double-check...")
time.sleep(self.check_interval)
# Step 2: Second HTTP health check (double verification)
print("📡 Performing second health check...")
if self.is_backend_alive_http():
return False, f"❌ Another backend is confirmed running on port {self.port}"
print("✅ No active backend detected via HTTP")
# Step 3: Check lock file and handle zombie processes
lock_pid = self.get_lock_file_pid()
if lock_pid is None:
print("📝 No lock file found")
return True, "✅ No existing instances detected"
print(f"📋 Found lock file with PID: {lock_pid}")
# Step 4: Verify if the process is actually our backend
if not self.is_process_our_backend(lock_pid):
print(f"🧹 PID {lock_pid} is not our backend process")
self.cleanup_lock_file()
return True, "✅ Cleaned up stale lock file"
# Step 5: We have a zombie backend process - terminate it
print(f"🧟 Found zombie backend process (PID: {lock_pid})")
if self.terminate_process_safely(lock_pid):
self.cleanup_lock_file()
print("🎯 Successfully cleaned up zombie backend")
return True, "✅ Cleaned up zombie backend process"
else:
return False, f"❌ Failed to cleanup zombie process (PID: {lock_pid})"
def initialize_instance(self) -> bool:
"""
Initialize this instance (create lock file)
Returns:
True if initialization successful, False otherwise
"""
return self.create_lock_file()
def cleanup_instance(self) -> bool:
"""
Cleanup this instance (remove lock file)
Returns:
True if cleanup successful, False otherwise
"""
return self.cleanup_lock_file()
def check_backend_instance(port: int = 5050,
lock_file: str = "plc_streamer.lock") -> Tuple[bool, str]:
"""
Convenience function to check and handle backend instances
Args:
port: Backend server port
lock_file: Lock file path
Returns:
Tuple of (can_proceed, message)
"""
manager = InstanceManager(port=port, lock_file=lock_file)
return manager.check_and_handle_existing_instance()
if __name__ == "__main__":
# Test the instance manager
manager = InstanceManager()
can_proceed, message = manager.check_and_handle_existing_instance()
print(f"\nResult: {message}")
print(f"Can proceed: {can_proceed}")