Implementación del sistema de monitoreo de rendimiento y gestión de prioridades para el recording de PLC

- Se creó el archivo PERFORMANCE_MONITORING.md que detalla el sistema de monitoreo de rendimiento en tiempo real, incluyendo métricas, logs y APIs de monitoreo.
- Se desarrolló el archivo PRIORITY_SYSTEM.md que describe la arquitectura de prioridades para asegurar que el recording de CSV tenga máxima prioridad.
- Se implementó el PerformanceMonitor en core/performance_monitor.py para registrar métricas de rendimiento, incluyendo tiempos de lectura, uso de CPU y errores.
- Se creó el PriorityThreadManager en core/priority_manager.py para gestionar la prioridad de los hilos y asegurar que las operaciones de recording no sean interrumpidas.
- Se implementó un sistema de logging rotativo en core/rotating_logger.py que permite la rotación automática de archivos de log y limpieza de archivos antiguos.
This commit is contained in:
Miguel 2025-08-16 18:34:31 +02:00
parent 5cd3d61128
commit be2df781cf
12 changed files with 8157 additions and 930 deletions

235
PERFORMANCE_MONITORING.md Normal file
View File

@ -0,0 +1,235 @@
# Sistema de Monitoreo de Rendimiento
## 🔍 Objetivo
Monitorear en **tiempo real** el rendimiento del sistema de recording, detectando automáticamente:
- **Puntos perdidos** por retrasos o errores de lectura
- **Estadísticas cada 10 segundos** con métricas detalladas
- **Tiempos de respuesta** de PLC, CSV y UDP
- **Uso de CPU** promedio
- **Errores** de lectura/escritura
## 📊 Métricas Monitoreadas
### 1. Estadísticas de Puntos
- **Points Saved**: Puntos guardados exitosamente en CSV
- **Points Lost**: Puntos perdidos por retrasos en el loop
- **Variables Saved**: Total de variables guardadas
- **UDP Points Sent**: Puntos enviados vía UDP streaming
### 2. Timing y Performance
- **Read Time**: Tiempo de lectura del PLC por dataset
- **CSV Write Time**: Tiempo de escritura a archivo CSV
- **Loop Delay**: Retraso del loop vs. intervalo esperado
- **CPU Usage**: Uso promedio de CPU del proceso
### 3. Errores y Fallos
- **Read Errors**: Errores de lectura del PLC
- **CSV Errors**: Errores de escritura CSV
- **UDP Errors**: Errores de transmisión UDP
- **Consecutive Errors**: Errores consecutivos por dataset
## 🔥 Logs de Rendimiento
### Log Principal (cada 10 segundos)
```
📊 PERFORMANCE [2025-08-16 15:30:25] | Points: 120/10s (12.0/s) | Variables: 1800 | UDP: 80/10s | CPU: 15.3% | Delay: 0.002s±0.001s | Lost: 0 | Errors: R:0 C:0 U:0
```
**Explicación**:
- `Points: 120/10s (12.0/s)`: 120 puntos guardados en 10s (12 puntos/segundo)
- `Variables: 1800`: Total de variables guardadas (120 puntos × 15 variables)
- `UDP: 80/10s`: 80 puntos enviados vía UDP
- `CPU: 15.3%`: Uso promedio de CPU del proceso
- `Delay: 0.002s±0.001s`: Retraso promedio ± desviación estándar
- `Lost: 0`: Puntos perdidos
- `Errors: R:0 C:0 U:0`: Errores de Read/CSV/UDP
### Logs de Advertencia
#### Puntos Perdidos
```
⚠️ DATA LOSS: 3 points lost in last 10s (avg delay: 0.150s)
⚠️ POINTS LOST: Dataset 'Fast' - 2 points lost (expected: 0.100s, actual: 0.350s, delay: 0.150s)
```
#### Alto Uso de CPU
```
⚠️ HIGH CPU: 85.5% average CPU usage
```
#### Errores de Lectura
```
⚠️ READ ERRORS: 5 read errors in last 10s
```
#### Timing Overrun
```
⏰ TIMING WARNING: Dataset 'Fast' loop overrun by 0.050s (read: 0.025s, csv: 0.020s, total: 0.150s)
```
## 🚨 Detección de Puntos Perdidos
### Algoritmo de Detección
```python
# Detecta puntos perdidos cuando:
actual_interval = current_time - last_read_time
expected_interval = dataset_sampling_interval
if actual_interval > expected_interval * 1.5: # 50% tolerancia
lost_points = int((actual_interval - expected_interval) / expected_interval)
# Log warning con detalles
```
### Ejemplo de Detección
```
Dataset configurado: intervalo = 0.1s (10 puntos/segundo)
Lectura anterior: 15:30:25.000
Lectura actual: 15:30:25.350 (retraso de 0.350s)
Puntos perdidos = (0.350 - 0.100) / 0.100 = 2.5 → 2 puntos perdidos
```
## 📈 APIs de Monitoreo
### `/api/performance/current`
Métricas actuales (ventana de 10s activa):
```json
{
"success": true,
"current_performance": {
"points_saved": 45,
"points_rate": 12.5,
"variables_saved": 675,
"udp_points": 30,
"points_lost": 0,
"cpu_avg": 15.2,
"delay_avg": 0.002,
"read_errors": 0,
"csv_errors": 0,
"udp_errors": 0
}
}
```
### `/api/performance/historical?windows=6`
Métricas históricas (últimos N×10s):
```json
{
"success": true,
"historical_performance": {
"windows": 6,
"duration_minutes": 1.0,
"total_points_saved": 720,
"total_variables_saved": 10800,
"total_udp_sent": 480,
"total_points_lost": 0,
"total_errors": 0,
"points_per_second": 12.0,
"loss_rate_percent": 0.0,
"average_cpu_percent": 14.8,
"average_delay_seconds": 0.002
}
}
```
### `/api/priority/status`
Incluye estadísticas de rendimiento completas:
```json
{
"priority_protection": {
"performance_current": { ... },
"performance_historical": { ... }
}
}
```
## 🔧 Configuración y Uso
### Inicio Automático
El monitoreo se inicia automáticamente cuando:
```python
# Al iniciar CSV recording:
streamer.start_csv_recording()
# - Inicia PerformanceMonitor
# - Configura intervalos de datasets
# - Comienza logs cada 10s
```
### Configuración de Intervalos
```python
# Se configura automáticamente por dataset:
self.performance_monitor.set_dataset_interval(dataset_id, interval)
# Usado para detectar puntos perdidos
```
### Parada Automática
```python
# Al parar CSV recording:
streamer.stop_csv_recording()
# - Para PerformanceMonitor
# - Guarda estadísticas finales
```
## 📋 Interpretación de Métricas
### ✅ Rendimiento Óptimo
- **Points Lost: 0** - Sin puntos perdidos
- **CPU < 50%** - Uso moderado de CPU
- **Delay < 10ms** - Timing dentro de tolerancia
- **Errors: 0** - Sin errores de lectura/escritura
### ⚠️ Rendimiento Degradado
- **Points Lost > 0** - Revisar carga del sistema
- **CPU > 80%** - Optimizar configuración
- **Delay > 50ms** - Reducir frecuencia de muestreo
- **Errors > 0** - Verificar conexión PLC/archivos
### 🚨 Problemas Críticos
- **Points Lost > 10% rate** - Sistema sobrecargado
- **CPU > 95%** - Recursos insuficientes
- **Consecutive Errors > 5** - Fallo de comunicación
- **Read Time > interval** - PLC sobrecargado
## 🛠️ Troubleshooting
### Alto Rate de Puntos Perdidos
1. **Reducir frecuencia** de muestreo de datasets
2. **Optimizar variables** - eliminar variables innecesarias
3. **Verificar carga** del sistema operativo
4. **Revisar conexión** PLC - latencia de red
### CPU Elevado
1. **Verificar threads** activos en `/api/priority/status`
2. **Reducir datasets** simultáneos
3. **Optimizar hardware** - SSD, más RAM
4. **Cerrar aplicaciones** innecesarias
### Errores de Lectura
1. **Verificar estado** PLC en `/api/status`
2. **Comprobar variables** - direcciones correctas
3. **Revisar red** - ping al PLC
4. **Verificar snap7.dll** disponible
### Retrasos de CSV
1. **Verificar espacio** en disco
2. **Comprobar permisos** de escritura
3. **Optimizar directorio** records (SSD)
4. **Reducir tamaño** de archivos CSV
## 📊 Métricas de Referencia
### Sistema Típico (Fast=0.1s, DAR=1.0s)
- **Points/second**: 11-13 (Fast: 10/s + DAR: 1/s)
- **CPU Usage**: 10-25% (depende del hardware)
- **Read Time**: 5-15ms por dataset
- **CSV Write**: 1-5ms por punto
- **Delay**: < 10ms normalmente
### Sistema Optimizado
- **Loss Rate**: < 0.1%
- **CPU Usage**: < 20%
- **Average Delay**: < 5ms
- **Error Rate**: 0 errors/10s
La clave es mantener **Points Lost = 0** y **CPU < 50%** para garantizar recording continuo y confiable.

191
PRIORITY_SYSTEM.md Normal file
View File

@ -0,0 +1,191 @@
# Sistema de Prioridades para PLC Recording
## 🔑 Objetivo Principal
**El recording de CSV debe tener MÁXIMA PRIORIDAD** y nunca ser interrumpido por operaciones de API, procesamiento de gráficas, o cualquier otra tarea secundaria.
## 🏗️ Arquitectura de Prioridades
### Niveles de Prioridad
```
CRITICAL (10) 🔥 CSV Recording threads
HIGH (7) ⚡ PLC communication
NORMAL (5) 📡 UDP streaming, basic APIs
LOW (3) 📊 Historical data processing
BACKGROUND (1) 📈 Plot generation, file operations
```
### Componentes Implementados
#### 1. `PriorityThreadManager`
- **Función**: Gestiona threads con diferentes prioridades
- **Ubicación**: `core/priority_manager.py`
- **Características**:
- Recording threads tienen prioridad CRITICAL
- API operations usan thread pools con prioridad NORMAL/LOW
- Background tasks (plots, cleanup) usan prioridad BACKGROUND
- Configuración automática de prioridades a nivel de OS (Windows/Linux)
#### 2. `RecordingProtector`
- **Función**: Protege operaciones de recording mediante rate limiting
- **Características**:
- Limita APIs a máximo 10 requests/segundo cuando recording está activo
- Previene sobrecarga del sistema durante recording crítico
- Monitorea y reporta métricas de protección
#### 3. Thread Separation
- **Recording Threads**: Ejecutan SOLO operaciones de CSV
- **API Thread Pool**: Maneja requests HTTP con prioridad más baja
- **Background Thread Pool**: Procesa tareas no críticas
## 🔥 Protecciones Implementadas
### 1. Recording Thread Protection
```python
# Los threads de recording tienen:
- Prioridad CRITICAL (máxima disponible en el OS)
- No son daemon threads (no se terminan automáticamente)
- Shutdown controlado y seguro
- Separación completa de operaciones UDP/API
```
### 2. API Rate Limiting
```python
# Protección contra sobrecarga de APIs:
- Máximo 10 requests/segundo durante recording
- Timeout de 2 segundos para operaciones de cache
- Rate limiting automático con respuestas HTTP 429
```
### 3. Background Processing
```python
# Operaciones no críticas se ejecutan en background:
- UDP streaming a PlotJuggler
- Actualización de gráficas en tiempo real
- Cleanup de archivos CSV
- Procesamiento de datos históricos
```
## 📊 Monitoreo y Métricas
### Endpoint de Status: `/api/priority/status`
```json
{
"success": true,
"priority_protection": {
"csv_recording_enabled": true,
"priority_stats": {
"active_recording_threads": 2,
"recording_thread_names": ["Fast:recording_fast", "DAR:recording_dar"],
"api_pool_active": 2,
"background_pool_active": 1
},
"recording_protection": {
"recording_protection_active": true,
"api_requests_this_second": 3,
"api_rate_limit": 10,
"time_until_reset": 0.7
}
}
}
```
### Logs de Prioridad
```
🔥 CRITICAL PRIORITY: Dataset 'Fast' recording loop started (interval: 0.1s)
🔥 [2025-08-16 15:30:25.123] RECORDING Dataset 'Fast': 15 vars recorded
📡 Background: UDP sent 8 vars for dataset 'Fast'
📈 Background: Plot data updated for session abc123
```
## 🛡️ Garantías de Protección
### 1. Recording Never Stops
- **Thread isolation**: Recording threads completamente separados
- **OS Priority**: Máxima prioridad a nivel de sistema operativo
- **Resource protection**: Rate limiting de APIs para evitar competencia
### 2. Graceful Degradation
- **API timeout**: APIs responden con error si exceden tiempo límite
- **Rate limiting**: APIs se limitan automáticamente durante recording
- **Background processing**: Tareas no críticas se postergan
### 3. System Monitoring
- **Real-time stats**: Métricas en tiempo real de threads y protección
- **Logging detallado**: Logs específicos para operaciones críticas
- **Health checks**: Endpoints para verificar estado del sistema
## 🚀 Uso en Producción
### Configuración Automática
```python
# El sistema se configura automáticamente al inicializar:
streamer = PLCDataStreamer()
# - PriorityThreadManager configurado
# - RecordingProtector activado
# - Thread pools separados creados
# - Prioridades de OS configuradas
```
### APIs Protegidas
```python
# Estas APIs ahora usan protección de prioridad:
GET /api/datasets/{id}/variables/values # Rate limited
POST /api/csv/cleanup # Background processing
GET /api/priority/status # Monitoring
```
### Shutdown Seguro
```python
# Shutdown respeta prioridades:
1. Para CSV recording gracefully
2. Para UDP streaming
3. Espera threads de recording (timeout 10s)
4. Cierra thread pools de API/background
5. Limpia recursos
```
## 📋 Checklist de Funcionamiento
### ✅ Recording Protection Active
- [ ] CSV recording threads tienen prioridad CRITICAL
- [ ] APIs limitadas a 10 req/s durante recording
- [ ] Background tasks no bloquean recording
- [ ] Logs muestran "🔥 CRITICAL PRIORITY"
### ✅ System Health
- [ ] `/api/priority/status` responde correctamente
- [ ] Recording threads aparecen en stats
- [ ] Rate limiting funciona (HTTP 429 cuando excede)
- [ ] Background tasks se ejecutan sin bloquear
### ✅ Performance Monitoring
- [ ] Recording intervals se mantienen estables
- [ ] No hay errores de timeout en recording
- [ ] APIs responden dentro de 2 segundos
- [ ] Cleanup se ejecuta en background
## 🔧 Troubleshooting
### Recording Lento
1. Verificar `/api/priority/status` - ¿threads críticos activos?
2. Revisar logs - ¿aparecen mensajes "🔥 CRITICAL"?
3. Comprobar rate limiting - ¿muchas APIs simultáneas?
### APIs Lentas
1. Verificar rate limiting en `/api/priority/status`
2. Reducir frecuencia de requests de frontend
3. Usar cache del browser para datos estáticos
### Threads Bloqueados
1. Revisar stats de thread pools
2. Verificar que recording threads no sean daemon
3. Comprobar shutdown order en logs
## 📚 Referencias
- `core/priority_manager.py`: Implementación completa del sistema
- `core/streamer.py`: Integration con DataStreamer
- `main.py`: APIs protegidas con rate limiting
- `application_events.json`: Logs de eventos de prioridad

File diff suppressed because it is too large Load Diff

View File

@ -14,7 +14,7 @@
"id": "Fast",
"name": "Fast",
"prefix": "fast",
"sampling_interval": 0.62
"sampling_interval": 1
},
{
"enabled": true,

395
core/performance_monitor.py Normal file
View File

@ -0,0 +1,395 @@
"""
Performance Monitor for PLC Data Streamer
🔑 CORE PRINCIPLE: Real-time Performance Monitoring
=================================================
This module provides detailed monitoring of recording performance including:
- Points lost due to delays or read errors
- CPU usage statistics
- Recording and streaming throughput
- Error rates and timing analysis
Logs every 10 seconds with comprehensive statistics.
"""
import time
import threading
import psutil
from collections import deque, defaultdict
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
import statistics
@dataclass
class PerformanceMetrics:
"""Container for performance metrics over a time window"""
# Timing metrics
read_delays: List[float] = field(default_factory=list)
read_times: List[float] = field(default_factory=list)
csv_write_times: List[float] = field(default_factory=list)
# Throughput metrics
points_saved: int = 0
variables_saved: int = 0
udp_points_sent: int = 0
# Error metrics
read_errors: int = 0
csv_errors: int = 0
udp_errors: int = 0
points_lost: int = 0
# CPU metrics
cpu_samples: List[float] = field(default_factory=list)
# Timestamp tracking
start_time: float = field(default_factory=time.time)
def reset(self):
"""Reset all metrics for new measurement window"""
self.read_delays.clear()
self.read_times.clear()
self.csv_write_times.clear()
self.points_saved = 0
self.variables_saved = 0
self.udp_points_sent = 0
self.read_errors = 0
self.csv_errors = 0
self.udp_errors = 0
self.points_lost = 0
self.cpu_samples.clear()
self.start_time = time.time()
class PerformanceMonitor:
"""Real-time performance monitoring for PLC data streaming"""
def __init__(self, logger=None, event_logger=None, report_interval=10.0):
self.logger = logger
self.event_logger = event_logger
self.report_interval = report_interval
# Current metrics window
self.current_metrics = PerformanceMetrics()
# Historical data (keep last 60 windows = 10 minutes)
self.historical_metrics = deque(maxlen=60)
# Dataset-specific tracking
self.dataset_last_read = {} # dataset_id -> timestamp
self.dataset_expected_interval = {} # dataset_id -> interval
self.dataset_metrics = defaultdict(PerformanceMetrics)
# Monitoring thread
self.monitoring_active = False
self.monitor_thread = None
self.lock = threading.Lock()
# CPU monitoring
self.process = psutil.Process()
def start_monitoring(self):
"""Start performance monitoring thread"""
if self.monitoring_active:
return
self.monitoring_active = True
self.monitor_thread = threading.Thread(
target=self._monitoring_loop, name="performance_monitor", daemon=True
)
self.monitor_thread.start()
if self.logger:
self.logger.info("🔍 Performance monitoring started (10s intervals)")
def stop_monitoring(self):
"""Stop performance monitoring thread"""
self.monitoring_active = False
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=2.0)
if self.logger:
self.logger.info("🔍 Performance monitoring stopped")
def record_dataset_read(
self,
dataset_id: str,
read_time: float,
variables_count: int,
success: bool = True,
delay: float = 0.0,
):
"""Record a dataset read operation"""
with self.lock:
current_time = time.time()
# Update dataset tracking
if dataset_id in self.dataset_last_read:
# Calculate actual interval vs expected
actual_interval = current_time - self.dataset_last_read[dataset_id]
expected_interval = self.dataset_expected_interval.get(dataset_id, 1.0)
# Check for lost points due to delays
if actual_interval > expected_interval * 1.5: # 50% tolerance
lost_points = int(
(actual_interval - expected_interval) / expected_interval
)
self.current_metrics.points_lost += lost_points
if self.logger:
self.logger.warning(
f"⚠️ POINTS LOST: Dataset '{dataset_id}' - {lost_points} points lost "
f"(expected: {expected_interval:.2f}s, actual: {actual_interval:.2f}s, "
f"delay: {delay:.3f}s)"
)
self.dataset_last_read[dataset_id] = current_time
# Record metrics
if success:
self.current_metrics.read_times.append(read_time)
self.current_metrics.points_saved += 1
self.current_metrics.variables_saved += variables_count
if delay > 0:
self.current_metrics.read_delays.append(delay)
else:
self.current_metrics.read_errors += 1
def record_csv_write(
self, dataset_id: str, write_time: float, success: bool = True
):
"""Record a CSV write operation"""
with self.lock:
if success:
self.current_metrics.csv_write_times.append(write_time)
else:
self.current_metrics.csv_errors += 1
def record_udp_send(self, variables_count: int, success: bool = True):
"""Record a UDP send operation"""
with self.lock:
if success:
self.current_metrics.udp_points_sent += variables_count
else:
self.current_metrics.udp_errors += 1
def set_dataset_interval(self, dataset_id: str, interval: float):
"""Set expected interval for a dataset"""
self.dataset_expected_interval[dataset_id] = interval
def _monitoring_loop(self):
"""Main monitoring loop that reports statistics every interval"""
while self.monitoring_active:
try:
# Sample CPU usage
with self.lock:
cpu_percent = self.process.cpu_percent()
self.current_metrics.cpu_samples.append(cpu_percent)
# Wait for report interval
time.sleep(self.report_interval)
if self.monitoring_active:
self._generate_report()
except Exception as e:
if self.logger:
self.logger.error(f"Error in performance monitoring loop: {e}")
time.sleep(1.0)
def _generate_report(self):
"""Generate and log performance report"""
with self.lock:
# Create copy of current metrics
metrics = PerformanceMetrics(
read_delays=self.current_metrics.read_delays.copy(),
read_times=self.current_metrics.read_times.copy(),
csv_write_times=self.current_metrics.csv_write_times.copy(),
points_saved=self.current_metrics.points_saved,
variables_saved=self.current_metrics.variables_saved,
udp_points_sent=self.current_metrics.udp_points_sent,
read_errors=self.current_metrics.read_errors,
csv_errors=self.current_metrics.csv_errors,
udp_errors=self.current_metrics.udp_errors,
points_lost=self.current_metrics.points_lost,
cpu_samples=self.current_metrics.cpu_samples.copy(),
start_time=self.current_metrics.start_time,
)
# Store in history
self.historical_metrics.append(metrics)
# Reset current metrics
self.current_metrics.reset()
# Generate report
self._log_performance_report(metrics)
def _log_performance_report(self, metrics: PerformanceMetrics):
"""Log detailed performance report"""
duration = time.time() - metrics.start_time
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Calculate statistics
stats = self._calculate_statistics(metrics, duration)
# Main performance log
if self.logger:
self.logger.info(
f"📊 PERFORMANCE [{timestamp}] | "
f"Points: {stats['points_saved']}/10s ({stats['points_rate']:.1f}/s) | "
f"Variables: {stats['variables_saved']} | "
f"UDP: {stats['udp_points']}/10s | "
f"CPU: {stats['cpu_avg']:.1f}% | "
f"Delay: {stats['delay_avg']:.3f}{stats['delay_std']:.3f}s | "
f"Lost: {stats['points_lost']} | "
f"Errors: R:{stats['read_errors']} C:{stats['csv_errors']} U:{stats['udp_errors']}"
)
# Warning logs for significant issues
if stats["points_lost"] > 0:
if self.logger:
self.logger.warning(
f"⚠️ DATA LOSS: {stats['points_lost']} points lost in last 10s "
f"(avg delay: {stats['delay_avg']:.3f}s)"
)
if stats["cpu_avg"] > 80:
if self.logger:
self.logger.warning(
f"⚠️ HIGH CPU: {stats['cpu_avg']:.1f}% average CPU usage"
)
if stats["read_errors"] > 0:
if self.logger:
self.logger.warning(
f"⚠️ READ ERRORS: {stats['read_errors']} read errors in last 10s"
)
# Log to event system
if self.event_logger:
self.event_logger.log_event(
"info",
"performance_report",
f"Performance report: {stats['points_saved']} points saved, "
f"{stats['points_lost']} lost, {stats['cpu_avg']:.1f}% CPU",
{
"duration": duration,
"points_saved": stats["points_saved"],
"points_rate": stats["points_rate"],
"variables_saved": stats["variables_saved"],
"udp_points_sent": stats["udp_points"],
"points_lost": stats["points_lost"],
"cpu_average": stats["cpu_avg"],
"cpu_max": stats["cpu_max"],
"delay_average": stats["delay_avg"],
"delay_max": stats["delay_max"],
"read_errors": stats["read_errors"],
"csv_errors": stats["csv_errors"],
"udp_errors": stats["udp_errors"],
"read_time_avg": stats["read_time_avg"],
"csv_write_time_avg": stats["csv_write_avg"],
},
)
def _calculate_statistics(
self, metrics: PerformanceMetrics, duration: float
) -> Dict[str, Any]:
"""Calculate performance statistics from metrics"""
# Points and rates
points_rate = metrics.points_saved / max(duration, 0.1)
# CPU statistics
cpu_avg = statistics.mean(metrics.cpu_samples) if metrics.cpu_samples else 0.0
cpu_max = max(metrics.cpu_samples) if metrics.cpu_samples else 0.0
# Delay statistics
delay_avg = statistics.mean(metrics.read_delays) if metrics.read_delays else 0.0
delay_std = (
statistics.stdev(metrics.read_delays)
if len(metrics.read_delays) > 1
else 0.0
)
delay_max = max(metrics.read_delays) if metrics.read_delays else 0.0
# Timing statistics
read_time_avg = (
statistics.mean(metrics.read_times) if metrics.read_times else 0.0
)
csv_write_avg = (
statistics.mean(metrics.csv_write_times) if metrics.csv_write_times else 0.0
)
return {
"points_saved": metrics.points_saved,
"points_rate": points_rate,
"variables_saved": metrics.variables_saved,
"udp_points": metrics.udp_points_sent,
"points_lost": metrics.points_lost,
"cpu_avg": cpu_avg,
"cpu_max": cpu_max,
"delay_avg": delay_avg,
"delay_std": delay_std,
"delay_max": delay_max,
"read_errors": metrics.read_errors,
"csv_errors": metrics.csv_errors,
"udp_errors": metrics.udp_errors,
"read_time_avg": read_time_avg,
"csv_write_avg": csv_write_avg,
}
def get_current_stats(self) -> Dict[str, Any]:
"""Get current performance statistics"""
with self.lock:
duration = time.time() - self.current_metrics.start_time
return self._calculate_statistics(self.current_metrics, duration)
def get_historical_stats(self, windows: int = 6) -> Dict[str, Any]:
"""Get historical performance statistics (last N windows)"""
with self.lock:
recent_metrics = list(self.historical_metrics)[-windows:]
if not recent_metrics:
return {}
# Aggregate statistics
total_points = sum(m.points_saved for m in recent_metrics)
total_variables = sum(m.variables_saved for m in recent_metrics)
total_udp = sum(m.udp_points_sent for m in recent_metrics)
total_lost = sum(m.points_lost for m in recent_metrics)
total_errors = sum(
m.read_errors + m.csv_errors + m.udp_errors for m in recent_metrics
)
# Average CPU
all_cpu = []
for m in recent_metrics:
all_cpu.extend(m.cpu_samples)
avg_cpu = statistics.mean(all_cpu) if all_cpu else 0.0
# Average delays
all_delays = []
for m in recent_metrics:
all_delays.extend(m.read_delays)
avg_delay = statistics.mean(all_delays) if all_delays else 0.0
total_duration = windows * self.report_interval
return {
"windows": windows,
"duration_minutes": total_duration / 60,
"total_points_saved": total_points,
"total_variables_saved": total_variables,
"total_udp_sent": total_udp,
"total_points_lost": total_lost,
"total_errors": total_errors,
"points_per_second": total_points / total_duration,
"loss_rate_percent": (total_lost / max(total_points + total_lost, 1)) * 100,
"average_cpu_percent": avg_cpu,
"average_delay_seconds": avg_delay,
}

View File

@ -13,6 +13,8 @@ try:
from .event_logger import EventLogger
from .instance_manager import InstanceManager
from .schema_manager import ConfigSchemaManager
from .priority_manager import PriorityThreadManager, RecordingProtector
from .performance_monitor import PerformanceMonitor
except ImportError:
# Fallback to absolute imports (when run directly)
from core.config_manager import ConfigManager
@ -21,6 +23,8 @@ except ImportError:
from core.event_logger import EventLogger
from core.instance_manager import InstanceManager
from core.schema_manager import ConfigSchemaManager
from core.priority_manager import PriorityThreadManager, RecordingProtector
from core.performance_monitor import PerformanceMonitor
class PLCDataStreamer:
@ -93,13 +97,10 @@ class PLCDataStreamer:
raise
def setup_logging(self):
"""Configure the logging system"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()],
)
self.logger = logging.getLogger(__name__)
"""Configure the logging system to use the rotating logger from main"""
# Use the backend logger configured in main.py
# This provides both console and rotating file logging
self.logger = logging.getLogger("backend")
# PLC Connection Methods
def connect_plc(self) -> bool:
@ -169,8 +170,13 @@ class PLCDataStreamer:
return success
def disconnect_plc(self):
"""Disconnect from PLC and stop all recording/streaming"""
def disconnect_plc(self, manual_disconnect=True):
"""Disconnect from PLC and stop all recording/streaming
Args:
manual_disconnect: True if user manually disconnected (don't auto-reconnect),
False if shutdown/crash (should auto-reconnect)
"""
# Stop both CSV recording and UDP streaming
self.data_streamer.stop_csv_recording()
self.data_streamer.stop_udp_streaming()
@ -184,20 +190,28 @@ class PLCDataStreamer:
if self.logger:
self.logger.warning(f"Error deactivating dataset {dataset_id}: {e}")
# Clear any tracked datasets for auto-resume since this is manual disconnect
self.data_streamer.datasets_before_disconnection.clear()
# 🔑 CRITICAL: Only clear auto-reconnect data if this is a MANUAL disconnect
if manual_disconnect:
# Clear any tracked datasets for auto-resume since this is manual disconnect
self.data_streamer.datasets_before_disconnection.clear()
# Save state indicating user manually disconnected (don't auto-reconnect)
self.config_manager.save_system_state(
connected=False, streaming=False, active_datasets=set()
)
disconnect_reason = "manually disconnected"
else:
# 🚀 PRESERVE auto-reconnect state for application restart
# Don't clear datasets_before_disconnection - let them auto-reconnect
# Don't save disconnected state - preserve previous "should connect" state
disconnect_reason = "application shutdown (will auto-reconnect on restart)"
# Disconnect from PLC
self.plc_client.disconnect()
self.config_manager.save_system_state(
connected=False, streaming=False, active_datasets=set()
)
self.event_logger.log_event(
"info",
"plc_disconnection",
f"Disconnected from PLC {self.config_manager.plc_config['ip']} (stopped recording and streaming)",
f"Disconnected from PLC {self.config_manager.plc_config['ip']} ({disconnect_reason})",
)
# Configuration Management Methods
@ -500,7 +514,7 @@ class PLCDataStreamer:
# Status and Information Methods
def get_status(self) -> Dict[str, Any]:
"""Get current status of the application"""
"""Get current status of the application including priority information"""
status = {
"plc_connected": self.plc_client.is_connected(),
"streaming": self.data_streamer.is_streaming(),
@ -509,6 +523,9 @@ class PLCDataStreamer:
"plc_config": self.config_manager.plc_config,
"datasets_count": len(self.config_manager.datasets),
"active_datasets_count": len(self.config_manager.active_datasets),
"active_datasets": list(
self.config_manager.active_datasets
), # Convert set to list for JSON
"total_variables": sum(
len(self.config_manager.get_dataset_variables(dataset_id))
for dataset_id in self.config_manager.datasets.keys()
@ -522,6 +539,8 @@ class PLCDataStreamer:
# Add reconnection status information
"plc_reconnection": self.plc_client.get_reconnection_status(),
"plc_connection_info": self.plc_client.get_connection_info(),
# 🔑 PRIORITY MANAGEMENT STATUS
"priority_protection": self.data_streamer.get_streaming_stats(),
}
return status
@ -750,3 +769,45 @@ class PLCDataStreamer:
# DEPRECATED: save_datasets() method removed
# Data is now saved directly from frontend via RJSF and API endpoints
# Use reload_dataset_configuration() to reload configuration when needed
def shutdown(self):
"""🔑 CRITICAL: Safely shutdown the entire PLC data streamer system"""
if self.logger:
self.logger.info("🔥 CRITICAL: Starting PLC Data Streamer shutdown...")
try:
# 1. Disconnect PLC first to stop all readings
if self.plc_client.is_connected():
self.plc_client.disconnect()
if self.logger:
self.logger.info("PLC disconnected")
# 2. Shutdown data streamer (this will handle all recording threads with priority)
if hasattr(self, "data_streamer") and self.data_streamer:
self.data_streamer.shutdown()
# 3. Release instance lock
if hasattr(self, "instance_manager") and self.instance_manager:
self.instance_manager.release_instance_lock()
if self.logger:
self.logger.info("Instance lock released")
if self.logger:
self.logger.info(
"🔥 CRITICAL: PLC Data Streamer shutdown completed successfully"
)
except Exception as e:
if self.logger:
self.logger.error(
f"🚨 CRITICAL ERROR: Error during PLC Data Streamer shutdown: {e}"
)
raise
def get_cached_dataset_values_safe(self, dataset_id: str):
"""🔑 RATE LIMITED: Get cached values with recording protection"""
return self.data_streamer.get_cached_dataset_values_safe(dataset_id)
def perform_csv_cleanup_safe(self):
"""🔑 BACKGROUND: Perform CSV cleanup with recording protection"""
self.data_streamer.perform_csv_cleanup_safe()

342
core/priority_manager.py Normal file
View File

@ -0,0 +1,342 @@
"""
Priority Manager for PLC Data Streamer
🔑 CORE PRINCIPLE: Recording Priority Protection
===============================================
This module ensures that CSV recording operations have the highest priority
and are never interrupted by API processing or visualization tasks.
Priority Levels:
- CRITICAL (10): CSV Recording threads
- HIGH (7): PLC communication
- NORMAL (5): UDP streaming, basic APIs
- LOW (3): Historical data processing
- BACKGROUND (1): Plot generation, file operations
"""
import threading
import os
import time
import psutil
from concurrent.futures import ThreadPoolExecutor
from queue import PriorityQueue
import queue
from typing import Callable, Any, Optional
from dataclasses import dataclass, field
from enum import IntEnum
class Priority(IntEnum):
"""Thread priority levels - higher number = higher priority"""
BACKGROUND = 1 # Plot generation, file operations
LOW = 3 # Historical data processing
NORMAL = 5 # UDP streaming, basic APIs
HIGH = 7 # PLC communication
CRITICAL = 10 # CSV Recording threads
@dataclass
class PriorityTask:
"""Task with priority for execution"""
priority: Priority
func: Callable
args: tuple = field(default_factory=tuple)
kwargs: dict = field(default_factory=dict)
callback: Optional[Callable] = None
task_id: Optional[str] = None
def __lt__(self, other):
# Higher priority number = higher priority (reversed for queue)
return self.priority > other.priority
class PriorityThreadManager:
"""Manages thread priorities to ensure recording operations have precedence"""
def __init__(self, logger=None):
self.logger = logger
self.recording_threads = {} # dataset_id -> thread
self.api_thread_pool = None
self.background_thread_pool = None
self._setup_thread_pools()
# System priority adjustment
self._adjust_process_priority()
def _setup_thread_pools(self):
"""Setup separate thread pools for different priority tasks"""
# API operations pool - lower priority
self.api_thread_pool = ThreadPoolExecutor(
max_workers=2, # Limited workers to not overwhelm recording
thread_name_prefix="api_worker",
)
# Background operations pool - lowest priority
self.background_thread_pool = ThreadPoolExecutor(
max_workers=1, # Single worker for background tasks
thread_name_prefix="background_worker",
)
if self.logger:
self.logger.info(
"Priority thread pools initialized (API: 2 workers, Background: 1 worker)"
)
def _adjust_process_priority(self):
"""Adjust current process priority to ensure recording performance"""
try:
current_process = psutil.Process()
# Set process priority to HIGH on Windows
if os.name == "nt": # Windows
current_process.nice(psutil.HIGH_PRIORITY_CLASS)
if self.logger:
self.logger.info(
"Process priority set to HIGH for recording optimization"
)
else: # Unix/Linux
current_process.nice(-10) # Higher priority (lower nice value)
if self.logger:
self.logger.info(
"Process nice value set to -10 for recording optimization"
)
except Exception as e:
if self.logger:
self.logger.warning(f"Could not adjust process priority: {e}")
def create_recording_thread(
self,
target: Callable,
args: tuple = (),
dataset_id: str = None,
name: str = None,
) -> threading.Thread:
"""Create a high-priority thread specifically for CSV recording"""
def priority_wrapper():
"""Wrapper that sets thread priority and executes target"""
try:
# Set thread priority to highest available
self._set_thread_priority(threading.current_thread(), Priority.CRITICAL)
if self.logger:
thread_name = threading.current_thread().name
self.logger.info(
f"Recording thread '{thread_name}' started with CRITICAL priority"
)
# Execute the actual recording function
target(*args)
except Exception as e:
if self.logger:
self.logger.error(f"Error in recording thread: {e}")
raise
# Create thread with high priority name
thread_name = (
name or f"recording_{dataset_id}" if dataset_id else "recording_thread"
)
thread = threading.Thread(
target=priority_wrapper,
name=thread_name,
daemon=False, # Recording threads should NOT be daemon
)
# Track recording threads
if dataset_id:
self.recording_threads[dataset_id] = thread
return thread
def submit_api_task(self, func: Callable, *args, **kwargs) -> Any:
"""Submit a task to the API thread pool (lower priority)"""
def priority_wrapper():
# Set lower priority for API operations
self._set_thread_priority(threading.current_thread(), Priority.NORMAL)
return func(*args, **kwargs)
return self.api_thread_pool.submit(priority_wrapper)
def submit_background_task(self, func: Callable, *args, **kwargs) -> Any:
"""Submit a task to the background thread pool (lowest priority)"""
def priority_wrapper():
# Set lowest priority for background operations
self._set_thread_priority(threading.current_thread(), Priority.BACKGROUND)
return func(*args, **kwargs)
return self.background_thread_pool.submit(priority_wrapper)
def _set_thread_priority(self, thread: threading.Thread, priority: Priority):
"""Set thread priority using OS-specific mechanisms"""
try:
if os.name == "nt": # Windows
import ctypes
from ctypes import wintypes
# Windows thread priority constants
priority_map = {
Priority.CRITICAL: 2, # THREAD_PRIORITY_HIGHEST
Priority.HIGH: 1, # THREAD_PRIORITY_ABOVE_NORMAL
Priority.NORMAL: 0, # THREAD_PRIORITY_NORMAL
Priority.LOW: -1, # THREAD_PRIORITY_BELOW_NORMAL
Priority.BACKGROUND: -2, # THREAD_PRIORITY_LOWEST
}
# Get thread handle and set priority
kernel32 = ctypes.windll.kernel32
thread_handle = kernel32.GetCurrentThread()
kernel32.SetThreadPriority(thread_handle, priority_map[priority])
if self.logger:
self.logger.debug(
f"Thread '{thread.name}' priority set to {priority.name}"
)
else: # Unix/Linux
# On Unix systems, use nice values (approximate)
nice_map = {
Priority.CRITICAL: -15,
Priority.HIGH: -10,
Priority.NORMAL: 0,
Priority.LOW: 5,
Priority.BACKGROUND: 10,
}
try:
os.nice(nice_map[priority])
except PermissionError:
# Fall back to process-level priority if thread-level fails
pass
except Exception as e:
if self.logger:
self.logger.debug(f"Could not set thread priority: {e}")
def stop_recording_thread(self, dataset_id: str, timeout: float = 5.0):
"""Safely stop a recording thread"""
if dataset_id in self.recording_threads:
thread = self.recording_threads[dataset_id]
if thread.is_alive():
# Wait for thread to finish gracefully
thread.join(timeout=timeout)
if thread.is_alive():
if self.logger:
self.logger.warning(
f"Recording thread for dataset '{dataset_id}' did not stop gracefully"
)
else:
if self.logger:
self.logger.info(
f"Recording thread for dataset '{dataset_id}' stopped successfully"
)
del self.recording_threads[dataset_id]
def get_recording_thread_stats(self) -> dict:
"""Get statistics about recording threads"""
stats = {
"active_recording_threads": 0,
"recording_thread_names": [],
"api_pool_active": (
len(self.api_thread_pool._threads) if self.api_thread_pool else 0
),
"background_pool_active": (
len(self.background_thread_pool._threads)
if self.background_thread_pool
else 0
),
}
for dataset_id, thread in self.recording_threads.items():
if thread.is_alive():
stats["active_recording_threads"] += 1
stats["recording_thread_names"].append(f"{dataset_id}:{thread.name}")
return stats
def shutdown(self):
"""Shutdown all thread pools and wait for recording threads"""
if self.logger:
self.logger.info("Shutting down priority thread manager...")
# Stop API thread pools first (lower priority) - be aggressive
if self.api_thread_pool:
self.api_thread_pool.shutdown(wait=False, cancel_futures=True)
if self.background_thread_pool:
self.background_thread_pool.shutdown(wait=False, cancel_futures=True)
# Wait for recording threads to finish (highest priority) - shorter timeout
for dataset_id in list(self.recording_threads.keys()):
self.stop_recording_thread(dataset_id, timeout=2.0) # Reduced timeout
# Force clear all threads if they didn't stop
self.recording_threads.clear()
if self.logger:
self.logger.info("Priority thread manager shutdown complete")
class RecordingProtector:
"""Additional protection mechanisms for recording operations"""
def __init__(self, logger=None):
self.logger = logger
self.recording_active = False
self.api_request_count = 0
self.max_api_requests_per_second = 10 # Limit API requests
self.last_api_reset = time.time()
def start_recording_protection(self):
"""Enable recording protection mode"""
self.recording_active = True
if self.logger:
self.logger.info(
"Recording protection mode ENABLED - API throttling active"
)
def stop_recording_protection(self):
"""Disable recording protection mode"""
self.recording_active = False
if self.logger:
self.logger.info("Recording protection mode DISABLED")
def check_api_rate_limit(self) -> bool:
"""Check if API request should be rate-limited to protect recording"""
if not self.recording_active:
return True # No protection needed
current_time = time.time()
# Reset counter every second
if current_time - self.last_api_reset >= 1.0:
self.api_request_count = 0
self.last_api_reset = current_time
# Check rate limit
if self.api_request_count >= self.max_api_requests_per_second:
if self.logger:
self.logger.warning(
"API rate limit exceeded - protecting recording operations"
)
return False
self.api_request_count += 1
return True
def get_protection_stats(self) -> dict:
"""Get protection statistics"""
return {
"recording_protection_active": self.recording_active,
"api_requests_this_second": self.api_request_count,
"api_rate_limit": self.max_api_requests_per_second,
"time_until_reset": max(0, 1.0 - (time.time() - self.last_api_reset)),
}

203
core/rotating_logger.py Normal file
View File

@ -0,0 +1,203 @@
"""
Rotating Logger System
======================
Sistema de logging con rotación automática por líneas y limpieza de archivos antiguos.
Características:
- Archivos nombrados con fecha+hora de inicio
- Máximo 10,000 líneas por archivo
- Máximo 30 archivos en directorio
- Eliminación automática de archivos más antiguos
- Logs tanto en consola como en archivo
"""
import os
import logging
import glob
from datetime import datetime
from typing import Optional
class RotatingFileHandler(logging.Handler):
"""Custom rotating file handler que rota por número de líneas"""
def __init__(
self, log_dir: str = ".logs", max_lines: int = 10000, max_files: int = 30
):
super().__init__()
self.log_dir = log_dir
self.max_lines = max_lines
self.max_files = max_files
self.current_lines = 0
self.current_file = None
self.current_handler = None
# Crear directorio de logs si no existe
os.makedirs(self.log_dir, exist_ok=True)
# Iniciar primer archivo
self._start_new_file()
def _start_new_file(self):
"""Iniciar un nuevo archivo de log"""
# Cerrar handler anterior si existe
if self.current_handler:
self.current_handler.close()
# Generar nombre de archivo con timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"backend_{timestamp}.log"
self.current_file = os.path.join(self.log_dir, filename)
# Crear nuevo handler de archivo
self.current_handler = logging.FileHandler(self.current_file, encoding="utf-8")
self.current_handler.setFormatter(self.formatter)
# Reset contador de líneas
self.current_lines = 0
# Limpiar archivos antiguos
self._cleanup_old_files()
print(f"📝 New log file started: {self.current_file}")
def _cleanup_old_files(self):
"""Eliminar archivos de log más antiguos si exceden el límite"""
# Obtener todos los archivos de log
log_pattern = os.path.join(self.log_dir, "backend_*.log")
log_files = glob.glob(log_pattern)
# Ordenar por fecha de modificación (más antiguos primero)
log_files.sort(key=os.path.getmtime)
# Eliminar archivos excedentes
while len(log_files) > self.max_files:
oldest_file = log_files.pop(0)
try:
os.remove(oldest_file)
print(f"🗑️ Removed old log file: {os.path.basename(oldest_file)}")
except OSError as e:
print(f"⚠️ Could not remove {oldest_file}: {e}")
def emit(self, record):
"""Emitir un registro de log"""
if not self.current_handler:
return
# Escribir al archivo actual
self.current_handler.emit(record)
# Incrementar contador de líneas
self.current_lines += 1
# Rotar si se alcanza el límite
if self.current_lines >= self.max_lines:
print(
f"🔄 Log rotation: {self.current_lines} lines reached, starting new file..."
)
self._start_new_file()
def close(self):
"""Cerrar el handler"""
if self.current_handler:
self.current_handler.close()
super().close()
class DualLogger:
"""Logger que escribe tanto a consola como a archivo rotativo"""
def __init__(
self,
name: str = "backend",
log_dir: str = ".logs",
max_lines: int = 10000,
max_files: int = 30,
):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Limpiar handlers existentes
self.logger.handlers.clear()
# Formatter
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Handler para consola
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# Handler para archivo rotativo
self.file_handler = RotatingFileHandler(log_dir, max_lines, max_files)
self.file_handler.setLevel(logging.INFO)
self.file_handler.setFormatter(formatter)
self.logger.addHandler(self.file_handler)
# Mensaje de inicio
self.logger.info("=" * 60)
self.logger.info("🚀 Backend logging system started")
self.logger.info(f"📁 Log directory: {os.path.abspath(log_dir)}")
self.logger.info(f"📊 Max lines per file: {max_lines:,}")
self.logger.info(f"📦 Max files retained: {max_files}")
self.logger.info("=" * 60)
def get_logger(self):
"""Obtener el logger configurado"""
return self.logger
def info(self, message: str):
"""Log info message"""
self.logger.info(message)
def error(self, message: str):
"""Log error message"""
self.logger.error(message)
def warning(self, message: str):
"""Log warning message"""
self.logger.warning(message)
def debug(self, message: str):
"""Log debug message"""
self.logger.debug(message)
def close(self):
"""Cerrar el logger"""
self.logger.info("⏹️ Logging system shutdown")
self.file_handler.close()
def setup_backend_logging(
log_dir: str = ".logs", max_lines: int = 10000, max_files: int = 30
) -> DualLogger:
"""
Configurar el sistema de logging del backend
Args:
log_dir: Directorio donde guardar los logs (default: .logs)
max_lines: Máximo de líneas por archivo (default: 10,000)
max_files: Máximo de archivos a retener (default: 30)
Returns:
DualLogger: Instancia del logger configurado
"""
return DualLogger("backend", log_dir, max_lines, max_files)
if __name__ == "__main__":
# Test del sistema de logging
logger = setup_backend_logging()
# Simular muchos logs para probar rotación
for i in range(50):
logger.info(f"Test log message {i+1}")
if i % 10 == 9:
logger.warning(f"Warning message at iteration {i+1}")
logger.close()

View File

@ -10,6 +10,8 @@ from datetime import datetime
from typing import Dict, Any, Optional, Set, List
from pathlib import Path
from .plot_manager import PlotManager
from .priority_manager import PriorityThreadManager, RecordingProtector, Priority
from .performance_monitor import PerformanceMonitor
def resource_path(relative_path):
@ -45,12 +47,19 @@ class DataStreamer:
"""
def __init__(self, config_manager, plc_client, event_logger, logger=None):
"""Initialize data streamer"""
"""Initialize data streamer with priority management and performance monitoring"""
self.config_manager = config_manager
self.plc_client = plc_client
self.event_logger = event_logger
self.logger = logger
# 🔑 PRIORITY MANAGEMENT - Ensure recording has maximum priority
self.priority_manager = PriorityThreadManager(logger)
self.recording_protector = RecordingProtector(logger)
# 📊 PERFORMANCE MONITORING - Real-time performance tracking
self.performance_monitor = PerformanceMonitor(logger, event_logger, report_interval=10.0)
# UDP streaming setup
self.udp_socket = None
self.udp_streaming_enabled = False # 🔑 Independent UDP control
@ -58,8 +67,8 @@ class DataStreamer:
# CSV recording state (automatic when PLC connected)
self.csv_recording_enabled = False
# Dataset streaming threads and files
self.dataset_threads = {} # dataset_id -> thread object
# Dataset streaming threads and files - now managed by priority system
self.dataset_threads = {} # dataset_id -> thread object (HIGH PRIORITY)
self.dataset_csv_files = {} # dataset_id -> file handle
self.dataset_csv_writers = {} # dataset_id -> csv writer
self.dataset_csv_hours = {} # dataset_id -> current hour
@ -588,13 +597,16 @@ class DataStreamer:
)
def dataset_streaming_loop(self, dataset_id: str):
"""Streaming loop for a specific dataset - handles both CSV and UDP"""
"""🔑 HIGH PRIORITY: Streaming loop for CSV recording - CRITICAL PRIORITY THREAD with performance monitoring"""
dataset_info = self.config_manager.datasets[dataset_id]
interval = self.config_manager.get_dataset_sampling_interval(dataset_id)
# 📊 Register dataset with performance monitor
self.performance_monitor.set_dataset_interval(dataset_id, interval)
if self.logger:
self.logger.info(
f"Dataset '{dataset_info['name']}' loop started (interval: {interval}s)"
f"🔥 CRITICAL PRIORITY: Dataset '{dataset_info['name']}' recording loop started (interval: {interval}s)"
)
consecutive_errors = 0
@ -604,103 +616,128 @@ class DataStreamer:
dataset_id in self.config_manager.active_datasets
and self.plc_client.is_connected()
):
loop_start_time = time.time()
read_success = False
read_time = 0.0
csv_write_time = 0.0
variables_count = 0
try:
start_time = time.time()
# Read variables for this dataset (serialized across datasets)
dataset_variables = self.config_manager.get_dataset_variables(
dataset_id
)
# 📋 CRITICAL SECTION: PLC READ with timing and error tracking
dataset_variables = self.config_manager.get_dataset_variables(dataset_id)
variables_count = len(dataset_variables)
# Measure read operation time
read_start = time.time()
# Ensure entire dataset read is atomic w.r.t. other datasets
with self.plc_client.io_lock:
all_data = self.read_dataset_variables(
dataset_id, dataset_variables
)
all_data = self.read_dataset_variables(dataset_id, dataset_variables)
read_time = time.time() - read_start
read_success = bool(all_data)
if all_data:
consecutive_errors = 0
# 📝 CSV Recording: Always write if enabled (automatic)
# 📝 CSV Recording: ALWAYS FIRST - HIGHEST PRIORITY with timing
if self.csv_recording_enabled:
self.write_dataset_csv_data(dataset_id, all_data)
csv_start = time.time()
try:
self.write_dataset_csv_data(dataset_id, all_data)
csv_write_time = time.time() - csv_start
# 📊 Record successful CSV write
self.performance_monitor.record_csv_write(dataset_id, csv_write_time, success=True)
except Exception as csv_error:
csv_write_time = time.time() - csv_start
# 📊 Record CSV write error
self.performance_monitor.record_csv_write(dataset_id, csv_write_time, success=False)
if self.logger:
self.logger.error(f"🚨 CSV WRITE ERROR for dataset '{dataset_info['name']}': {csv_error}")
# 📡 UDP Streaming: Only if UDP streaming is enabled (manual)
# 📡 UDP Streaming: Lower priority - only if enabled
if self.udp_streaming_enabled:
# Get filtered data for streaming - only variables that are in streaming_variables list AND have streaming=true
streaming_variables = dataset_info.get(
"streaming_variables", []
# Use background thread for UDP to not block recording
self.priority_manager.submit_background_task(
self._handle_udp_streaming, dataset_id, dataset_info, all_data
)
dataset_vars_config = dataset_info.get("variables", {})
streaming_data = {
name: value
for name, value in all_data.items()
if name in streaming_variables
and dataset_vars_config.get(name, {}).get(
"streaming", False
)
}
# Send filtered data to PlotJuggler
if streaming_data:
self.send_to_plotjuggler(streaming_data)
# 📈 PLOT MANAGER: Update all active plot sessions with cache data
# 📈 PLOT MANAGER: Background priority - update plots without blocking
if self.plot_manager.get_active_sessions_count() > 0:
self.plot_manager.update_data(dataset_id, all_data)
# Log data
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
if self.logger:
csv_count = len(all_data) if self.csv_recording_enabled else 0
udp_count = 0
if self.udp_streaming_enabled:
streaming_variables = dataset_info.get(
"streaming_variables", []
)
dataset_vars_config = dataset_info.get("variables", {})
udp_count = len(
[
name
for name in all_data.keys()
if name in streaming_variables
and dataset_vars_config.get(name, {}).get(
"streaming", False
)
]
)
self.logger.info(
f"[{timestamp}] Dataset '{dataset_info['name']}': CSV: {csv_count} vars, UDP: {udp_count} vars"
self.priority_manager.submit_background_task(
self.plot_manager.update_data, dataset_id, all_data
)
else:
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
self.event_logger.log_event(
"error",
"dataset_loop_error",
f"Multiple consecutive read failures for dataset '{dataset_info['name']}' ({consecutive_errors}). Stopping dataset.",
f"<EFBFBD> CRITICAL: Multiple consecutive read failures for dataset '{dataset_info['name']}' ({consecutive_errors}). Stopping dataset.",
{
"dataset_id": dataset_id,
"consecutive_errors": consecutive_errors,
"priority": "CRITICAL"
},
)
break
# <20> Calculate timing and delay metrics
loop_end_time = time.time()
total_loop_time = loop_end_time - loop_start_time
expected_end_time = loop_start_time + interval
# Calculate delay (how much we're behind schedule)
delay = max(0.0, loop_end_time - expected_end_time)
# 📊 Record performance metrics
self.performance_monitor.record_dataset_read(
dataset_id=dataset_id,
read_time=read_time,
variables_count=variables_count,
success=read_success,
delay=delay
)
# Maintain sampling interval
elapsed = time.time() - start_time
sleep_time = max(0, interval - elapsed)
time.sleep(sleep_time)
sleep_time = max(0, interval - total_loop_time)
if sleep_time > 0:
time.sleep(sleep_time)
elif delay > interval * 0.1: # Log if delay is > 10% of interval
if self.logger:
self.logger.warning(
f"⏰ TIMING WARNING: Dataset '{dataset_info['name']}' loop overrun by {delay:.3f}s "
f"(read: {read_time:.3f}s, csv: {csv_write_time:.3f}s, total: {total_loop_time:.3f}s)"
)
except Exception as e:
consecutive_errors += 1
# 📊 Record read error
self.performance_monitor.record_dataset_read(
dataset_id=dataset_id,
read_time=read_time,
variables_count=variables_count,
success=False,
delay=0.0
)
self.event_logger.log_event(
"error",
"dataset_loop_error",
f"Error in dataset '{dataset_info['name']}' loop: {str(e)}",
f"🚨 CRITICAL ERROR: Dataset '{dataset_info['name']}' recording loop error: {str(e)}",
{
"dataset_id": dataset_id,
"error": str(e),
"consecutive_errors": consecutive_errors,
"priority": "CRITICAL",
"read_time": read_time,
"variables_count": variables_count
},
)
@ -708,10 +745,11 @@ class DataStreamer:
self.event_logger.log_event(
"error",
"dataset_loop_error",
f"Too many consecutive errors for dataset '{dataset_info['name']}'. Stopping dataset.",
f"🚨 CRITICAL FAILURE: Too many consecutive errors for dataset '{dataset_info['name']}'. Stopping dataset.",
{
"dataset_id": dataset_id,
"consecutive_errors": consecutive_errors,
"priority": "CRITICAL"
},
)
break
@ -721,10 +759,47 @@ class DataStreamer:
# 🔑 FIXED: Do NOT call stop_dataset_streaming from within the loop
# The thread will be cleaned up externally when needed
if self.logger:
self.logger.info(f"Dataset '{dataset_info['name']}' loop ended")
self.logger.info(f"🔥 CRITICAL: Dataset '{dataset_info['name']}' recording loop ended")
def _handle_udp_streaming(self, dataset_id: str, dataset_info: dict, all_data: dict):
"""Handle UDP streaming in background thread (lower priority) with performance tracking"""
try:
# Get filtered data for streaming - only variables that are in streaming_variables list AND have streaming=true
streaming_variables = dataset_info.get("streaming_variables", [])
dataset_vars_config = dataset_info.get("variables", {})
streaming_data = {
name: value
for name, value in all_data.items()
if name in streaming_variables
and dataset_vars_config.get(name, {}).get("streaming", False)
}
# Send filtered data to PlotJuggler
if streaming_data:
try:
self.send_to_plotjuggler(streaming_data)
# 📊 Record successful UDP send
self.performance_monitor.record_udp_send(len(streaming_data), success=True)
except Exception as udp_error:
# 📊 Record UDP send error
self.performance_monitor.record_udp_send(len(streaming_data), success=False)
if self.logger:
self.logger.warning(f"UDP streaming error for dataset '{dataset_info['name']}': {udp_error}")
if self.logger:
udp_count = len(streaming_data)
self.logger.debug(
f"📡 Background: UDP sent {udp_count} vars for dataset '{dataset_info['name']}'"
)
except Exception as e:
if self.logger:
self.logger.warning(f"UDP streaming error (non-critical): {e}")
def start_dataset_streaming(self, dataset_id: str):
"""Start streaming thread for a specific dataset"""
"""🔑 CRITICAL: Start HIGH PRIORITY recording thread for a specific dataset"""
if dataset_id not in self.config_manager.datasets:
return False
@ -735,7 +810,7 @@ class DataStreamer:
if self.logger:
dataset_name = self.config_manager.datasets[dataset_id]["name"]
self.logger.info(
f"Dataset '{dataset_name}' thread is already running, skipping creation"
f"🔥 CRITICAL: Dataset '{dataset_name}' recording thread is already running, skipping creation"
)
return True # Already running and alive
else:
@ -743,15 +818,18 @@ class DataStreamer:
if self.logger:
dataset_name = self.config_manager.datasets[dataset_id]["name"]
self.logger.info(
f"Cleaning up dead thread for dataset '{dataset_name}'"
f"🔥 CRITICAL: Cleaning up dead recording thread for dataset '{dataset_name}'"
)
del self.dataset_threads[dataset_id]
# Create and start thread for this dataset
thread = threading.Thread(
target=self.dataset_streaming_loop, args=(dataset_id,)
# 🔑 CRITICAL: Create HIGH PRIORITY thread for recording using priority manager
thread = self.priority_manager.create_recording_thread(
target=self.dataset_streaming_loop,
args=(dataset_id,),
dataset_id=dataset_id,
name=f"recording_{dataset_id}"
)
thread.daemon = True
self.dataset_threads[dataset_id] = thread
thread.start()
@ -760,19 +838,19 @@ class DataStreamer:
if self.logger:
self.logger.info(
f"Started streaming for dataset '{dataset_info['name']}' (interval: {interval}s)"
f"🔥 CRITICAL PRIORITY: Started recording thread for dataset '{dataset_info['name']}' (interval: {interval}s)"
)
return True
def stop_dataset_streaming(self, dataset_id: str):
"""Stop streaming thread for a specific dataset"""
"""🔑 CRITICAL: Stop recording thread for a specific dataset safely"""
if dataset_id in self.dataset_threads:
# The thread will detect this and stop
thread = self.dataset_threads[dataset_id]
# 🔑 FIXED: Check if we're not trying to join the current thread
if thread.is_alive() and thread != threading.current_thread():
thread.join(timeout=2)
del self.dataset_threads[dataset_id]
# Use priority manager to safely stop the recording thread
self.priority_manager.stop_recording_thread(dataset_id, timeout=5.0)
# Clean up the reference
if dataset_id in self.dataset_threads:
del self.dataset_threads[dataset_id]
# Close CSV file if open
if dataset_id in self.dataset_csv_files:
@ -786,7 +864,7 @@ class DataStreamer:
dataset_info = self.config_manager.datasets.get(dataset_id, {})
if self.logger:
self.logger.info(
f"Stopped streaming for dataset '{dataset_info.get('name', dataset_id)}'"
f"🔥 CRITICAL: Stopped recording thread for dataset '{dataset_info.get('name', dataset_id)}'"
)
def activate_dataset(self, dataset_id: str):
@ -838,9 +916,9 @@ class DataStreamer:
{"dataset_id": dataset_id},
)
# 🔑 NEW: CSV Recording Methods (Automatic)
# 🔑 NEW: CSV Recording Methods (Automatic - HIGHEST PRIORITY)
def start_csv_recording(self) -> bool:
"""Start automatic CSV recording for all datasets with variables"""
"""🔥 CRITICAL: Start automatic CSV recording with MAXIMUM PRIORITY and performance monitoring"""
if not self.plc_client.is_connected():
self.event_logger.log_event(
"error",
@ -857,6 +935,12 @@ class DataStreamer:
)
return False
# 🔥 ENABLE RECORDING PROTECTION MODE
self.recording_protector.start_recording_protection()
# 📊 START PERFORMANCE MONITORING
self.performance_monitor.start_monitoring()
# Activate all datasets that have variables for CSV recording
activated_count = 0
for dataset_id, dataset_info in self.config_manager.datasets.items():
@ -876,6 +960,8 @@ class DataStreamer:
"csv_recording_error",
"Cannot start CSV recording: No datasets with variables configured",
)
# Stop monitoring if no datasets activated
self.performance_monitor.stop_monitoring()
return False
self.csv_recording_enabled = True
@ -888,17 +974,26 @@ class DataStreamer:
self.event_logger.log_event(
"info",
"csv_recording_started",
f"CSV recording started: {activated_count} datasets activated",
f"🔥 CRITICAL PRIORITY: CSV recording started with MAXIMUM PRIORITY and performance monitoring: {activated_count} datasets activated",
{
"activated_datasets": activated_count,
"total_datasets": len(self.config_manager.datasets),
"priority": "CRITICAL",
"recording_protection": True,
"performance_monitoring": True
},
)
return True
def stop_csv_recording(self):
"""Stop CSV recording but keep dataset threads for potential UDP streaming"""
"""🔥 CRITICAL: Stop CSV recording safely with performance monitoring"""
self.csv_recording_enabled = False
# 🔥 DISABLE RECORDING PROTECTION MODE
self.recording_protector.stop_recording_protection()
# 📊 STOP PERFORMANCE MONITORING
self.performance_monitor.stop_monitoring()
# Close all CSV files but keep threads running
for dataset_id in list(self.dataset_csv_files.keys()):
@ -919,7 +1014,11 @@ class DataStreamer:
self.event_logger.log_event(
"info",
"csv_recording_stopped",
"CSV recording stopped (dataset threads continue for UDP streaming)",
"🔥 CRITICAL: CSV recording stopped (dataset threads continue for UDP streaming)",
{
"recording_protection": False,
"performance_monitoring": False
}
)
# 🔑 NEW: UDP Streaming Methods (Manual)
@ -1146,14 +1245,60 @@ class DataStreamer:
)
def get_streaming_stats(self) -> Dict[str, Any]:
"""Get streaming statistics"""
return {
"""Get streaming statistics including priority and performance information"""
stats = {
"streaming": self.streaming,
"active_datasets": len(self.config_manager.active_datasets),
"active_threads": len(self.dataset_threads),
"open_csv_files": len(self.dataset_csv_files),
"udp_socket_active": self.udp_socket is not None,
# 🔑 PRIORITY STATS
"priority_stats": self.priority_manager.get_recording_thread_stats(),
"recording_protection": self.recording_protector.get_protection_stats(),
"csv_recording_enabled": self.csv_recording_enabled,
# 📊 PERFORMANCE STATS
"performance_current": self.performance_monitor.get_current_stats(),
"performance_historical": self.performance_monitor.get_historical_stats(windows=6), # Last minute
}
return stats
def get_cached_dataset_values_safe(self, dataset_id: str):
"""🔑 RATE LIMITED: Get cached values with API rate limiting protection"""
# Check rate limit to protect recording operations
if not self.recording_protector.check_api_rate_limit():
return {
"success": False,
"error": "API rate limit exceeded - protecting recording operations",
"error_type": "rate_limited",
"message": "Too many API requests - recording operations have priority",
"retry_after": 1.0
}
# Use background thread for cache access to not block recording
future = self.priority_manager.submit_api_task(
self.get_cached_dataset_values, dataset_id
)
try:
return future.result(timeout=2.0) # 2 second timeout
except Exception as e:
return {
"success": False,
"error": f"API timeout or error: {str(e)}",
"error_type": "api_timeout",
"message": "API request timed out - recording operations have priority"
}
def perform_csv_cleanup_safe(self):
"""🔑 BACKGROUND: Perform CSV cleanup in background thread"""
if self.csv_recording_enabled:
# Don't run cleanup while recording is active - protect recording
if self.logger:
self.logger.info("Skipping CSV cleanup - recording is active (protecting recording operations)")
return
# Run cleanup in background thread
self.priority_manager.submit_background_task(self.perform_csv_cleanup)
def _on_plc_reconnected(self):
"""Callback method called when PLC reconnects successfully"""
@ -1333,3 +1478,50 @@ class DataStreamer:
)
else:
self.logger.warning("No active datasets found to track for auto-resume")
def shutdown(self):
"""🔑 CRITICAL: Safely shutdown all streaming operations with priority protection and performance monitoring"""
if self.logger:
self.logger.info("🔥 CRITICAL: Starting safe shutdown of data streamer...")
try:
# 1. Stop performance monitoring first
self.performance_monitor.stop_monitoring()
# 2. Stop CSV recording first (graceful stop)
if self.csv_recording_enabled:
self.stop_csv_recording()
# 3. Stop UDP streaming
if self.udp_streaming_enabled:
self.stop_udp_streaming()
# 4. Stop all dataset streaming threads using priority manager
active_datasets = list(self.dataset_threads.keys())
for dataset_id in active_datasets:
self.stop_dataset_streaming(dataset_id)
# 5. Shutdown priority manager (will wait for all recording threads)
self.priority_manager.shutdown()
# 6. Clear all cached data
self.clear_cached_values()
# 7. Close any remaining files
for dataset_id in list(self.dataset_csv_files.keys()):
try:
self.dataset_csv_files[dataset_id].close()
except:
pass
self.dataset_csv_files.clear()
self.dataset_csv_writers.clear()
self.dataset_csv_hours.clear()
if self.logger:
self.logger.info("🔥 CRITICAL: Data streamer shutdown completed successfully")
except Exception as e:
if self.logger:
self.logger.error(f"🚨 CRITICAL ERROR: Error during data streamer shutdown: {e}")
raise

View File

@ -457,7 +457,14 @@ function StatusBar({ status, isConnected, isLeader }) {
status: 'info',
duration: 2000
})
// El polling coordinado actualizará automáticamente el estado
// 🔥 IMMEDIATELY update frontend state after successful disconnect
if (result.success) {
setPlcConnected(false)
setStreaming(false)
setCsvRecording(false)
// Force refresh status to sync with backend
loadStatus()
}
} catch (error) {
toast({
title: '❌ Failed to disconnect PLC',

139
main.py
View File

@ -8,14 +8,25 @@ from flask import (
from flask_cors import CORS
import json
import time
import signal
import sys
from datetime import datetime, timedelta, timezone
import os
import sys
import logging
# Configure logging to show only errors for cleaner historical data logs
logging.basicConfig(level=logging.ERROR)
# Reduce Flask's request logging to ERROR level only
# 📝 ROTATING LOGGER SYSTEM
from core.rotating_logger import setup_backend_logging
# 📝 Setup rotating logger system
# This will log to both console and .logs/ directory with automatic rotation
backend_logger = setup_backend_logging(
log_dir=".logs", # Directory for log files
max_lines=10000, # Max lines per file
max_files=30 # Max files to retain
)
# Configure standard logging to use our rotating system
# Reduce Flask's request logging to ERROR level only (keep logs clean)
logging.getLogger('werkzeug').setLevel(logging.ERROR)
try:
@ -467,16 +478,16 @@ def update_csv_config():
@app.route("/api/csv/cleanup", methods=["POST"])
def trigger_csv_cleanup():
"""Manually trigger CSV cleanup"""
"""🔑 BACKGROUND: Manually trigger CSV cleanup with recording protection"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
# Perform cleanup
streamer.streamer.perform_csv_cleanup()
# 🔑 PROTECTED: Perform cleanup with recording protection
streamer.perform_csv_cleanup_safe()
return jsonify(
{"success": True, "message": "CSV cleanup completed successfully"}
{"success": True, "message": "CSV cleanup queued in background (recording protection active)"}
)
except Exception as e:
@ -582,8 +593,17 @@ def connect_plc():
@app.route("/api/plc/disconnect", methods=["POST"])
def disconnect_plc():
"""Disconnect from PLC"""
streamer.disconnect_plc()
return jsonify({"success": True, "message": "Disconnected from PLC"})
streamer.disconnect_plc(manual_disconnect=True) # User manually disconnected
# Return updated status to ensure frontend gets correct state
return jsonify({
"success": True,
"message": "Disconnected from PLC",
"status": {
"plc_connected": False,
"streaming": False,
"csv_recording": False
}
})
@app.route("/api/variables", methods=["POST"])
@ -829,7 +849,7 @@ def get_streaming_variables():
@app.route("/api/datasets/<dataset_id>/variables/values", methods=["GET"])
def get_dataset_variable_values(dataset_id):
"""Get current values of all variables in a dataset - CACHE ONLY"""
"""🔑 RATE LIMITED: Get current values of all variables in a dataset - CACHE ONLY with recording protection"""
error_response = check_streamer_initialized()
if error_response:
return error_response
@ -897,7 +917,7 @@ def get_dataset_variable_values(dataset_id):
}
)
# Get cached values - this is the ONLY source of data according to application principles
# Check if cached values are available
if not streamer.has_cached_values(dataset_id):
return jsonify(
{
@ -918,8 +938,16 @@ def get_dataset_variable_values(dataset_id):
}
)
# Get cached values (the ONLY valid source according to application design)
read_result = streamer.get_cached_dataset_values(dataset_id)
# 🔑 PROTECTED: Get cached values with rate limiting and recording protection
read_result = streamer.get_cached_dataset_values_safe(dataset_id)
# Handle rate limiting response
if not read_result.get("success", False) and read_result.get("error_type") == "rate_limited":
return jsonify(read_result), 429 # Too Many Requests
# Handle API timeout response
if not read_result.get("success", False) and read_result.get("error_type") == "api_timeout":
return jsonify(read_result), 503 # Service Unavailable
# Convert timestamp from ISO format to readable format for consistency
if read_result.get("timestamp"):
@ -2411,6 +2439,66 @@ def get_status():
return jsonify(streamer.get_status())
@app.route("/api/priority/status")
def get_priority_status():
"""🔑 Get detailed priority and recording protection status"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
priority_stats = streamer.data_streamer.get_streaming_stats()
return jsonify({
"success": True,
"priority_protection": priority_stats,
"message": "Priority protection ensures CSV recording has maximum priority over API operations"
})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/performance/current")
def get_current_performance():
"""📊 Get current performance metrics (last 10 seconds)"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
current_stats = streamer.data_streamer.performance_monitor.get_current_stats()
return jsonify({
"success": True,
"current_performance": current_stats,
"message": "Current performance metrics for active recording operations"
})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/performance/historical")
def get_historical_performance():
"""📊 Get historical performance metrics"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
# Get query parameters
windows = int(request.args.get('windows', 6)) # Default: last minute (6 * 10s)
windows = min(max(windows, 1), 60) # Limit between 1 and 60 windows
historical_stats = streamer.data_streamer.performance_monitor.get_historical_stats(windows)
return jsonify({
"success": True,
"historical_performance": historical_stats,
"windows_requested": windows,
"duration_minutes": windows * 10 / 60,
"message": f"Historical performance metrics for last {windows} windows ({windows*10} seconds)"
})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/plc/reconnection/status")
def get_plc_reconnection_status():
"""Get detailed PLC reconnection status"""
@ -2723,15 +2811,34 @@ def graceful_shutdown():
print("\n⏹️ Performing graceful shutdown...")
try:
streamer.stop_streaming()
streamer.disconnect_plc()
# 🚀 PRESERVE auto-reconnect state: Don't clear reconnection data
streamer.disconnect_plc(manual_disconnect=False) # Keep auto-reconnect state
# 🔑 PRIORITY: Shutdown priority manager and performance monitor
if hasattr(streamer.data_streamer, 'priority_manager'):
streamer.data_streamer.priority_manager.shutdown()
if hasattr(streamer.data_streamer, 'performance_monitor'):
streamer.data_streamer.performance_monitor.stop_monitoring()
streamer.release_instance_lock()
# 📝 Close rotating logger system
backend_logger.close()
print("✅ Shutdown completed successfully")
except Exception as e:
print(f"⚠️ Error during shutdown: {e}")
def signal_handler(sig, frame):
"""Handle interrupt signals (Ctrl+C)"""
print(f"\n🛑 Received signal {sig}...")
graceful_shutdown()
sys.exit(0)
def main():
"""Main application entry point with error handling and recovery"""
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
max_retries = 3
retry_count = 0
@ -2745,7 +2852,7 @@ def main():
global streamer
# Start Flask application
app.run(debug=False, host="0.0.0.0", port=5050, use_reloader=False)
app.run(debug=False, host="0.0.0.0", port=5050, use_reloader=False, threaded=True)
# If we reach here, the server stopped normally
break

View File

@ -4,11 +4,10 @@
"should_stream": false,
"active_datasets": [
"DAR",
"Test",
"Fast"
]
},
"auto_recovery_enabled": true,
"last_update": "2025-08-16T16:41:43.854840",
"last_update": "2025-08-16T18:34:19.713675",
"plotjuggler_path": "C:\\Program Files\\PlotJuggler\\plotjuggler.exe"
}