Version base con Pico.css

This commit is contained in:
Miguel 2025-07-19 12:17:44 +02:00
parent 81ade6f8f0
commit f276eb96c5
12 changed files with 2948 additions and 2459 deletions

View File

@ -1,5 +1,5 @@
---
alwaysApply: true
alwaysApply: false
---
You can use .doc\MemoriaDeEvolucion.md to obtain a context of the latest modifications and concepts about this project. I would like that with the important knowledge and important decisions acquired in each modification you add them to MemoriaDeEvolucion.md maintaining the style that we already have of simple text without too much code and a summarized semantic.

View File

@ -1755,8 +1755,114 @@
"udp_host": "127.0.0.1",
"udp_port": 9870
}
},
{
"timestamp": "2025-07-19T10:20:39.326530",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T10:21:08.778204",
"level": "error",
"event_type": "plc_connection_failed",
"message": "Failed to connect to PLC 10.1.33.11",
"details": {
"ip": "10.1.33.11",
"rack": 0,
"slot": 2
}
},
{
"timestamp": "2025-07-19T10:23:53.923405",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T10:26:00.730704",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T10:28:25.232935",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T10:45:16.831127",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T10:50:46.241841",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T10:54:12.806839",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T10:57:02.513632",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T11:01:07.447778",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T12:13:37.712539",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T12:13:43.277483",
"level": "error",
"event_type": "plc_connection_failed",
"message": "Failed to connect to PLC 10.1.33.11",
"details": {
"ip": "10.1.33.11",
"rack": 0,
"slot": 2
}
},
{
"timestamp": "2025-07-19T12:14:34.874959",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
},
{
"timestamp": "2025-07-19T12:16:12.281197",
"level": "info",
"event_type": "application_started",
"message": "Application initialization completed successfully",
"details": {}
}
],
"last_updated": "2025-07-18T16:14:48.203024",
"total_entries": 157
"last_updated": "2025-07-19T12:16:12.281197",
"total_entries": 171
}

33
core/__init__.py Normal file
View File

@ -0,0 +1,33 @@
"""
PLC S7-315 Data Streamer Core Module
This module provides a complete solution for PLC data streaming, CSV recording,
and real-time monitoring for Siemens S7-315 PLCs.
Classes:
PLCDataStreamer: Main orchestrator class
ConfigManager: Configuration and persistence management
PLCClient: PLC communication handling
DataStreamer: UDP streaming and CSV recording
EventLogger: Persistent event logging
InstanceManager: Single instance control and auto-recovery
"""
from .plc_data_streamer import PLCDataStreamer
from .config_manager import ConfigManager
from .plc_client import PLCClient
from .streamer import DataStreamer
from .event_logger import EventLogger
from .instance_manager import InstanceManager
__version__ = "2.0.0"
__author__ = "Industrial Automation Team"
__all__ = [
"PLCDataStreamer",
"ConfigManager",
"PLCClient",
"DataStreamer",
"EventLogger",
"InstanceManager",
]

View File

@ -0,0 +1,495 @@
import json
import os
import sys
from datetime import datetime
from typing import Dict, Any, Optional, List, Set
from pathlib import Path
def resource_path(relative_path):
"""Get absolute path to resource, works for dev and for PyInstaller"""
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
# Not running in a bundle
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class ConfigManager:
"""Manages all configuration persistence and validation"""
def __init__(self, logger=None):
"""Initialize configuration manager"""
self.logger = logger
# Configuration file paths
self.config_file = resource_path("plc_config.json")
self.datasets_file = resource_path("plc_datasets.json")
self.state_file = resource_path("system_state.json")
# Default configurations
self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2}
self.udp_config = {"host": "127.0.0.1", "port": 9870}
self.sampling_interval = 0.1
# Datasets management
self.datasets = {} # Dictionary of dataset_id -> dataset_config
self.active_datasets = set() # Set of active dataset IDs
self.current_dataset_id = None # Currently selected dataset for editing
# System state for auto-recovery
self.last_state = {
"should_connect": False,
"should_stream": False,
"active_datasets": [],
}
self.auto_recovery_enabled = True
# Load all configurations
self.load_all_configurations()
def load_all_configurations(self):
"""Load all configuration files"""
self.load_configuration()
self.load_datasets()
self.sync_streaming_variables()
self.load_system_state()
if self.logger:
self.logger.info("All configurations loaded successfully")
def load_configuration(self):
"""Load PLC and UDP configuration from JSON file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
config = json.load(f)
self.plc_config = config.get("plc_config", self.plc_config)
self.udp_config = config.get("udp_config", self.udp_config)
self.sampling_interval = config.get(
"sampling_interval", self.sampling_interval
)
if self.logger:
self.logger.info(f"Configuration loaded from {self.config_file}")
else:
if self.logger:
self.logger.info("No configuration file found, using defaults")
except Exception as e:
if self.logger:
self.logger.error(f"Error loading configuration: {e}")
def save_configuration(self):
"""Save PLC and UDP configuration to JSON file"""
try:
config = {
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"sampling_interval": self.sampling_interval,
}
with open(self.config_file, "w") as f:
json.dump(config, f, indent=4)
if self.logger:
self.logger.info(f"Configuration saved to {self.config_file}")
except Exception as e:
if self.logger:
self.logger.error(f"Error saving configuration: {e}")
def load_datasets(self):
"""Load datasets configuration from JSON file"""
try:
if os.path.exists(self.datasets_file):
with open(self.datasets_file, "r") as f:
datasets_data = json.load(f)
self.datasets = datasets_data.get("datasets", {})
self.active_datasets = set(datasets_data.get("active_datasets", []))
self.current_dataset_id = datasets_data.get("current_dataset_id")
# Validate current_dataset_id exists
if (
self.current_dataset_id
and self.current_dataset_id not in self.datasets
):
self.current_dataset_id = None
# Set default current dataset if none selected
if not self.current_dataset_id and self.datasets:
self.current_dataset_id = next(iter(self.datasets.keys()))
if self.logger:
self.logger.info(
f"Datasets loaded from {self.datasets_file}: {len(self.datasets)} datasets, {len(self.active_datasets)} active"
)
else:
if self.logger:
self.logger.info(
"No datasets file found, starting with empty datasets"
)
except Exception as e:
if self.logger:
self.logger.error(f"Error loading datasets: {e}")
def save_datasets(self):
"""Save datasets configuration to JSON file"""
try:
datasets_data = {
"datasets": self.datasets,
"active_datasets": list(self.active_datasets),
"current_dataset_id": self.current_dataset_id,
"version": "1.0",
"last_update": datetime.now().isoformat(),
}
with open(self.datasets_file, "w") as f:
json.dump(datasets_data, f, indent=4)
if self.logger:
self.logger.info(
f"Datasets configuration saved to {self.datasets_file}"
)
except Exception as e:
if self.logger:
self.logger.error(f"Error saving datasets: {e}")
def sync_streaming_variables(self):
"""Synchronize streaming variables configuration"""
try:
sync_needed = False
for dataset_id, dataset_info in self.datasets.items():
streaming_vars = dataset_info.get("streaming_variables", [])
variables_config = dataset_info.get("variables", {})
for var_name in streaming_vars:
if var_name in variables_config:
if not variables_config[var_name].get("streaming", False):
variables_config[var_name]["streaming"] = True
sync_needed = True
if self.logger:
self.logger.info(
f"Synchronized streaming flag for variable '{var_name}' in dataset '{dataset_id}'"
)
# Ensure variables not in streaming list have streaming=false
for var_name, var_config in variables_config.items():
if var_name not in streaming_vars and var_config.get(
"streaming", False
):
var_config["streaming"] = False
sync_needed = True
if self.logger:
self.logger.info(
f"Disabled streaming flag for variable '{var_name}' in dataset '{dataset_id}'"
)
if sync_needed:
self.save_datasets()
if self.logger:
self.logger.info("Streaming variables configuration synchronized")
except Exception as e:
if self.logger:
self.logger.error(f"Error synchronizing streaming variables: {e}")
def load_system_state(self):
"""Load system state from JSON file"""
try:
if os.path.exists(self.state_file):
with open(self.state_file, "r") as f:
state_data = json.load(f)
self.last_state = state_data.get("last_state", self.last_state)
self.auto_recovery_enabled = state_data.get(
"auto_recovery_enabled", True
)
if self.logger:
self.logger.info(f"System state loaded from {self.state_file}")
else:
if self.logger:
self.logger.info(
"No system state file found, starting with defaults"
)
except Exception as e:
if self.logger:
self.logger.error(f"Error loading system state: {e}")
def save_system_state(self, connected=False, streaming=False, active_datasets=None):
"""Save current system state to JSON file"""
try:
state_data = {
"last_state": {
"should_connect": connected,
"should_stream": streaming,
"active_datasets": list(active_datasets or []),
},
"auto_recovery_enabled": self.auto_recovery_enabled,
"last_update": datetime.now().isoformat(),
}
with open(self.state_file, "w") as f:
json.dump(state_data, f, indent=4)
if self.logger:
self.logger.debug("System state saved")
except Exception as e:
if self.logger:
self.logger.error(f"Error saving system state: {e}")
# PLC Configuration Methods
def update_plc_config(self, ip: str, rack: int, slot: int):
"""Update PLC configuration"""
old_config = self.plc_config.copy()
self.plc_config = {"ip": ip, "rack": rack, "slot": slot}
self.save_configuration()
return {"old_config": old_config, "new_config": self.plc_config}
def update_udp_config(self, host: str, port: int):
"""Update UDP configuration"""
old_config = self.udp_config.copy()
self.udp_config = {"host": host, "port": port}
self.save_configuration()
return {"old_config": old_config, "new_config": self.udp_config}
def update_sampling_interval(self, interval: float):
"""Update sampling interval"""
old_interval = self.sampling_interval
self.sampling_interval = interval
self.save_configuration()
return {"old_interval": old_interval, "new_interval": interval}
# Dataset Management Methods
def create_dataset(
self, dataset_id: str, name: str, prefix: str, sampling_interval: float = None
):
"""Create a new dataset"""
if dataset_id in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' already exists")
new_dataset = {
"name": name,
"prefix": prefix,
"variables": {},
"streaming_variables": [],
"sampling_interval": sampling_interval,
"enabled": False,
"created": datetime.now().isoformat(),
}
self.datasets[dataset_id] = new_dataset
# Set as current dataset if it's the first one
if not self.current_dataset_id:
self.current_dataset_id = dataset_id
self.save_datasets()
return new_dataset
def delete_dataset(self, dataset_id: str):
"""Delete a dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
dataset_info = self.datasets[dataset_id].copy()
del self.datasets[dataset_id]
# Remove from active datasets if present
self.active_datasets.discard(dataset_id)
# Update current dataset if this was selected
if self.current_dataset_id == dataset_id:
self.current_dataset_id = (
next(iter(self.datasets.keys())) if self.datasets else None
)
self.save_datasets()
return dataset_info
def get_current_dataset(self):
"""Get the currently selected dataset"""
if self.current_dataset_id and self.current_dataset_id in self.datasets:
return self.datasets[self.current_dataset_id]
return None
def get_dataset_variables(self, dataset_id: str):
"""Get variables for a specific dataset"""
if dataset_id in self.datasets:
return self.datasets[dataset_id].get("variables", {})
return {}
def get_dataset_sampling_interval(self, dataset_id: str):
"""Get sampling interval for a dataset (falls back to global if not set)"""
if dataset_id in self.datasets:
dataset_interval = self.datasets[dataset_id].get("sampling_interval")
return (
dataset_interval
if dataset_interval is not None
else self.sampling_interval
)
return self.sampling_interval
def add_variable_to_dataset(
self,
dataset_id: str,
name: str,
area: str,
db: int,
offset: int,
var_type: str,
bit: int = None,
streaming: bool = False,
):
"""Add a variable to a specific dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
# Validate area and type
area = area.lower()
if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]:
raise ValueError(f"Unsupported area type: {area}")
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
if var_type not in valid_types:
raise ValueError(f"Invalid data type: {var_type}")
# Create variable configuration
var_config = {
"area": area,
"offset": offset,
"type": var_type,
"streaming": streaming,
}
if area == "db":
var_config["db"] = db
if area in ["e", "a", "mb"] or (area == "db" and bit is not None):
var_config["bit"] = bit
# Add to dataset
self.datasets[dataset_id]["variables"][name] = var_config
# Update streaming variables list if streaming is enabled
if streaming:
if name not in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].append(name)
self.save_datasets()
return var_config
def remove_variable_from_dataset(self, dataset_id: str, name: str):
"""Remove a variable from a specific dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
if name not in self.datasets[dataset_id]["variables"]:
raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'")
var_config = self.datasets[dataset_id]["variables"][name].copy()
del self.datasets[dataset_id]["variables"][name]
# Remove from streaming variables if present
if name in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].remove(name)
self.save_datasets()
return var_config
def toggle_variable_streaming(self, dataset_id: str, name: str, enabled: bool):
"""Toggle streaming for a variable in a dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
if name not in self.datasets[dataset_id]["variables"]:
raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'")
# Update the individual variable streaming flag
self.datasets[dataset_id]["variables"][name]["streaming"] = enabled
# Update the streaming variables list
if enabled:
if name not in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].append(name)
else:
if name in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].remove(name)
self.save_datasets()
def activate_dataset(self, dataset_id: str):
"""Mark a dataset as active"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
self.active_datasets.add(dataset_id)
self.datasets[dataset_id]["enabled"] = True
self.save_datasets()
def deactivate_dataset(self, dataset_id: str):
"""Mark a dataset as inactive"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
self.active_datasets.discard(dataset_id)
self.datasets[dataset_id]["enabled"] = False
self.save_datasets()
def get_status(self):
"""Get configuration status"""
total_variables = sum(
len(dataset["variables"]) for dataset in self.datasets.values()
)
# Count only variables that are in streaming_variables list AND have streaming=true
total_streaming_vars = 0
for dataset in self.datasets.values():
streaming_vars = dataset.get("streaming_variables", [])
variables_config = dataset.get("variables", {})
active_streaming_vars = [
var
for var in streaming_vars
if variables_config.get(var, {}).get("streaming", False)
]
total_streaming_vars += len(active_streaming_vars)
return {
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"datasets_count": len(self.datasets),
"active_datasets_count": len(self.active_datasets),
"total_variables": total_variables,
"total_streaming_variables": total_streaming_vars,
"streaming_variables_count": total_streaming_vars,
"sampling_interval": self.sampling_interval,
"current_dataset_id": self.current_dataset_id,
"datasets": {
dataset_id: {
"name": info["name"],
"prefix": info["prefix"],
"variables_count": len(info["variables"]),
"streaming_count": len(
[
var
for var in info.get("streaming_variables", [])
if info.get("variables", {})
.get(var, {})
.get("streaming", False)
]
),
"sampling_interval": info.get("sampling_interval"),
"enabled": info.get("enabled", False),
"active": dataset_id in self.active_datasets,
}
for dataset_id, info in self.datasets.items()
},
}

165
core/event_logger.py Normal file
View File

@ -0,0 +1,165 @@
import json
import os
import sys
from datetime import datetime
from typing import Dict, Any, List
def resource_path(relative_path):
"""Get absolute path to resource, works for dev and for PyInstaller"""
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
# Not running in a bundle
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class EventLogger:
"""Handles persistent event logging and retrieval"""
def __init__(self, logger=None, max_entries=1000):
"""Initialize event logger"""
self.logger = logger
self.max_entries = max_entries
self.events_log_file = resource_path("application_events.json")
self.events_log = []
# Load existing events
self.load_events_log()
def load_events_log(self):
"""Load events log from JSON file"""
try:
if os.path.exists(self.events_log_file):
with open(self.events_log_file, "r", encoding="utf-8") as f:
data = json.load(f)
self.events_log = data.get("events", [])
# Limit log size on load
if len(self.events_log) > self.max_entries:
self.events_log = self.events_log[-self.max_entries :]
if self.logger:
self.logger.info(
f"Events log loaded: {len(self.events_log)} entries"
)
else:
self.events_log = []
if self.logger:
self.logger.info(
"No events log file found, starting with empty log"
)
except Exception as e:
if self.logger:
self.logger.error(f"Error loading events log: {e}")
self.events_log = []
def save_events_log(self):
"""Save events log to JSON file"""
try:
log_data = {
"events": self.events_log,
"last_updated": datetime.now().isoformat(),
"total_entries": len(self.events_log),
}
with open(self.events_log_file, "w", encoding="utf-8") as f:
json.dump(log_data, f, indent=2, ensure_ascii=False)
except Exception as e:
if self.logger:
self.logger.error(f"Error saving events log: {e}")
def log_event(
self, level: str, event_type: str, message: str, details: Dict[str, Any] = None
):
"""Add an event to the persistent log"""
try:
event = {
"timestamp": datetime.now().isoformat(),
"level": level, # info, warning, error
"event_type": event_type, # connection, disconnection, error, config_change, etc.
"message": message,
"details": details or {},
}
self.events_log.append(event)
# Limit log size
if len(self.events_log) > self.max_entries:
self.events_log = self.events_log[-self.max_entries :]
# Save to file
self.save_events_log()
# Also log to regular logger if available
if self.logger:
if level == "error":
self.logger.error(f"[{event_type}] {message}")
elif level == "warning":
self.logger.warning(f"[{event_type}] {message}")
else:
self.logger.info(f"[{event_type}] {message}")
except Exception as e:
if self.logger:
self.logger.error(f"Error adding event to log: {e}")
def get_recent_events(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent events from the log"""
return self.events_log[-limit:] if self.events_log else []
def get_events_by_level(self, level: str, limit: int = 50) -> List[Dict[str, Any]]:
"""Get events filtered by level"""
filtered_events = [
event for event in self.events_log if event.get("level") == level
]
return filtered_events[-limit:] if filtered_events else []
def get_events_by_type(
self, event_type: str, limit: int = 50
) -> List[Dict[str, Any]]:
"""Get events filtered by type"""
filtered_events = [
event for event in self.events_log if event.get("event_type") == event_type
]
return filtered_events[-limit:] if filtered_events else []
def clear_events_log(self):
"""Clear all events from the log"""
self.events_log = []
self.save_events_log()
if self.logger:
self.logger.info("Events log cleared")
def get_log_stats(self) -> Dict[str, Any]:
"""Get statistics about the events log"""
if not self.events_log:
return {
"total_events": 0,
"levels": {},
"types": {},
"oldest_event": None,
"newest_event": None,
}
levels = {}
types = {}
for event in self.events_log:
level = event.get("level", "unknown")
event_type = event.get("event_type", "unknown")
levels[level] = levels.get(level, 0) + 1
types[event_type] = types.get(event_type, 0) + 1
return {
"total_events": len(self.events_log),
"levels": levels,
"types": types,
"oldest_event": (
self.events_log[0].get("timestamp") if self.events_log else None
),
"newest_event": (
self.events_log[-1].get("timestamp") if self.events_log else None
),
}

227
core/instance_manager.py Normal file
View File

@ -0,0 +1,227 @@
import os
import atexit
import psutil
import time
from typing import Optional, Callable
class InstanceManager:
"""Manages single instance control and auto-recovery functionality"""
def __init__(self, logger=None, lock_filename="plc_streamer.lock"):
"""Initialize instance manager"""
self.logger = logger
self.lock_file = lock_filename
self.lock_fd = None
self._cleanup_registered = False
def acquire_instance_lock(self) -> bool:
"""Acquire lock to ensure single instance execution"""
try:
# Check if lock file exists
if os.path.exists(self.lock_file):
# Read PID from existing lock file
with open(self.lock_file, "r") as f:
try:
old_pid = int(f.read().strip())
# Check if process is still running
if psutil.pid_exists(old_pid):
# Get process info to verify it's our application
try:
proc = psutil.Process(old_pid)
cmdline = " ".join(proc.cmdline())
# More specific check - only block if it's really our application
if (
(
"main.py" in cmdline
and "S7_snap7_Stremer_n_Log" in cmdline
)
or ("plc_streamer" in cmdline.lower())
or ("PLCDataStreamer" in cmdline)
):
if self.logger:
self.logger.error(
f"Another instance is already running (PID: {old_pid})"
)
self.logger.error(f"Command line: {cmdline}")
return False
else:
# Different Python process, remove stale lock
if self.logger:
self.logger.info(
f"Found different Python process (PID: {old_pid}), removing stale lock"
)
os.remove(self.lock_file)
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process doesn't exist or can't access, continue
pass
# Old process is dead, remove stale lock file
os.remove(self.lock_file)
if self.logger:
self.logger.info("Removed stale lock file")
except (ValueError, IOError):
# Invalid lock file, remove it
os.remove(self.lock_file)
if self.logger:
self.logger.info("Removed invalid lock file")
# Create new lock file with current PID
with open(self.lock_file, "w") as f:
f.write(str(os.getpid()))
# Register cleanup function only once
if not self._cleanup_registered:
atexit.register(self.release_instance_lock)
self._cleanup_registered = True
if self.logger:
self.logger.info(
f"Instance lock acquired: {self.lock_file} (PID: {os.getpid()})"
)
return True
except Exception as e:
if self.logger:
self.logger.error(f"Error acquiring instance lock: {e}")
return False
def release_instance_lock(self):
"""Release instance lock"""
try:
# Remove lock file
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
if self.logger:
self.logger.info("Instance lock released")
except Exception as e:
if self.logger:
self.logger.error(f"Error releasing instance lock: {e}")
def is_process_running(self, pid: int) -> bool:
"""Check if a process with given PID is running"""
try:
return psutil.pid_exists(pid)
except Exception:
return False
def get_process_info(self, pid: int) -> Optional[dict]:
"""Get information about a process"""
try:
if not psutil.pid_exists(pid):
return None
proc = psutil.Process(pid)
return {
"pid": pid,
"name": proc.name(),
"cmdline": proc.cmdline(),
"status": proc.status(),
"create_time": proc.create_time(),
"memory_info": proc.memory_info()._asdict(),
"cpu_percent": proc.cpu_percent(),
}
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return None
def attempt_auto_recovery(self, config_manager, plc_client, data_streamer) -> bool:
"""Attempt to restore previous system state"""
if not config_manager.auto_recovery_enabled:
if self.logger:
self.logger.info("Auto-recovery disabled, skipping state restoration")
return False
if self.logger:
self.logger.info("Attempting auto-recovery of previous state...")
recovery_success = False
try:
# Try to restore connection
if config_manager.last_state.get("should_connect", False):
if self.logger:
self.logger.info("Attempting to restore PLC connection...")
if plc_client.connect():
if self.logger:
self.logger.info("PLC connection restored successfully")
# Try to restore streaming if connection was successful
if config_manager.last_state.get("should_stream", False):
if self.logger:
self.logger.info("Attempting to restore streaming...")
# Setup UDP socket first
if not data_streamer.setup_udp_socket():
if self.logger:
self.logger.warning(
"Failed to setup UDP socket during auto-recovery"
)
return False
# Restore active datasets
restored_datasets = config_manager.last_state.get(
"active_datasets", []
)
activated_count = 0
for dataset_id in restored_datasets:
if dataset_id in config_manager.datasets:
try:
data_streamer.activate_dataset(dataset_id)
activated_count += 1
except Exception as e:
if self.logger:
self.logger.warning(
f"Failed to restore dataset {dataset_id}: {e}"
)
if activated_count > 0:
recovery_success = True
if self.logger:
self.logger.info(
f"Streaming restored successfully: {activated_count} datasets activated"
)
else:
if self.logger:
self.logger.warning(
"Failed to restore streaming: no datasets activated"
)
else:
recovery_success = True # Connection restored successfully
else:
if self.logger:
self.logger.warning("Failed to restore PLC connection")
else:
recovery_success = True # No connection was expected
except Exception as e:
if self.logger:
self.logger.error(f"Error during auto-recovery: {e}")
recovery_success = False
return recovery_success
def wait_for_safe_startup(self, delay_seconds: float = 1.0):
"""Wait for a safe startup delay to ensure previous instance cleanup"""
if delay_seconds > 0:
if self.logger:
self.logger.info(f"Waiting {delay_seconds}s for safe startup...")
time.sleep(delay_seconds)
def force_cleanup_stale_locks(self):
"""Force cleanup of stale lock files (use with caution)"""
try:
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
if self.logger:
self.logger.info("Forced cleanup of lock file")
return True
return False
except Exception as e:
if self.logger:
self.logger.error(f"Error during forced cleanup: {e}")
return False

File diff suppressed because it is too large Load Diff

530
core/plc_data_streamer.py Normal file
View File

@ -0,0 +1,530 @@
import logging
from typing import Dict, Any, Optional, List
import psutil
import os
import math
from datetime import datetime
try:
# Try relative imports first (when used as a package)
from .config_manager import ConfigManager
from .plc_client import PLCClient
from .streamer import DataStreamer
from .event_logger import EventLogger
from .instance_manager import InstanceManager
except ImportError:
# Fallback to absolute imports (when run directly)
from core.config_manager import ConfigManager
from core.plc_client import PLCClient
from core.streamer import DataStreamer
from core.event_logger import EventLogger
from core.instance_manager import InstanceManager
class PLCDataStreamer:
"""Main orchestrator class that coordinates all PLC streaming components"""
def __init__(self):
"""Initialize the PLC data streamer orchestrator"""
try:
# Setup logging first
self.setup_logging()
self.logger.info("Logging system initialized")
# Initialize all components step by step with error handling
self.logger.info("Initializing ConfigManager...")
self.config_manager = ConfigManager(self.logger)
self.logger.info("ConfigManager initialized successfully")
self.logger.info("Initializing EventLogger...")
self.event_logger = EventLogger(self.logger)
self.logger.info("EventLogger initialized successfully")
self.logger.info("Initializing PLCClient...")
self.plc_client = PLCClient(self.logger)
self.logger.info("PLCClient initialized successfully")
self.logger.info("Initializing DataStreamer...")
self.data_streamer = DataStreamer(
self.config_manager, self.plc_client, self.event_logger, self.logger
)
self.logger.info("DataStreamer initialized successfully")
self.logger.info("Initializing InstanceManager...")
self.instance_manager = InstanceManager(self.logger)
self.logger.info("InstanceManager initialized successfully")
# Acquire instance lock and attempt auto-recovery
self.logger.info("Acquiring instance lock...")
if self.instance_manager.acquire_instance_lock():
# Small delay to ensure previous instance has fully cleaned up
self.instance_manager.wait_for_safe_startup(1.0)
self.event_logger.log_event(
"info",
"application_started",
"Application initialization completed successfully",
)
self.logger.info("Attempting auto-recovery...")
self.attempt_auto_recovery()
self.logger.info("Initialization completed successfully")
else:
raise RuntimeError(
"Another instance of the application is already running"
)
except Exception as e:
if hasattr(self, "logger"):
self.logger.error(f"Error during initialization: {e}")
self.logger.exception("Full traceback:")
else:
print(f"Error during initialization: {e}")
import traceback
traceback.print_exc()
raise
def setup_logging(self):
"""Configure the logging system"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()],
)
self.logger = logging.getLogger(__name__)
# PLC Connection Methods
def connect_plc(self) -> bool:
"""Connect to PLC"""
success = self.plc_client.connect(
self.config_manager.plc_config["ip"],
self.config_manager.plc_config["rack"],
self.config_manager.plc_config["slot"],
)
if success:
self.config_manager.save_system_state(
connected=True,
streaming=self.data_streamer.is_streaming(),
active_datasets=self.data_streamer.get_active_datasets(),
)
self.event_logger.log_event(
"info",
"plc_connection",
f"Successfully connected to PLC {self.config_manager.plc_config['ip']}",
self.config_manager.plc_config,
)
else:
self.event_logger.log_event(
"error",
"plc_connection_failed",
f"Failed to connect to PLC {self.config_manager.plc_config['ip']}",
self.config_manager.plc_config,
)
return success
def disconnect_plc(self):
"""Disconnect from PLC"""
self.data_streamer.stop_streaming()
self.plc_client.disconnect()
self.config_manager.save_system_state(
connected=False, streaming=False, active_datasets=set()
)
self.event_logger.log_event(
"info",
"plc_disconnection",
f"Disconnected from PLC {self.config_manager.plc_config['ip']}",
)
# Configuration Methods
def update_plc_config(self, ip: str, rack: int, slot: int):
"""Update PLC configuration"""
config_details = self.config_manager.update_plc_config(ip, rack, slot)
self.event_logger.log_event(
"info",
"config_change",
f"PLC configuration updated: {ip}:{rack}/{slot}",
config_details,
)
def update_udp_config(self, host: str, port: int):
"""Update UDP configuration"""
config_details = self.config_manager.update_udp_config(host, port)
self.event_logger.log_event(
"info",
"config_change",
f"UDP configuration updated: {host}:{port}",
config_details,
)
def update_sampling_interval(self, interval: float):
"""Update sampling interval"""
config_details = self.config_manager.update_sampling_interval(interval)
self.event_logger.log_event(
"info",
"config_change",
f"Sampling interval updated: {interval}s",
config_details,
)
# Dataset Management Methods
def create_dataset(
self, dataset_id: str, name: str, prefix: str, sampling_interval: float = None
):
"""Create a new dataset"""
new_dataset = self.config_manager.create_dataset(
dataset_id, name, prefix, sampling_interval
)
self.event_logger.log_event(
"info",
"dataset_created",
f"Dataset created: {name} (prefix: {prefix})",
{
"dataset_id": dataset_id,
"name": name,
"prefix": prefix,
"sampling_interval": sampling_interval,
},
)
return new_dataset
def delete_dataset(self, dataset_id: str):
"""Delete a dataset"""
# Stop dataset if it's active
if dataset_id in self.config_manager.active_datasets:
self.data_streamer.deactivate_dataset(dataset_id)
dataset_info = self.config_manager.delete_dataset(dataset_id)
self.event_logger.log_event(
"info",
"dataset_deleted",
f"Dataset deleted: {dataset_info['name']}",
{"dataset_id": dataset_id, "dataset_info": dataset_info},
)
def activate_dataset(self, dataset_id: str):
"""Activate a dataset for streaming"""
if not self.plc_client.is_connected():
raise RuntimeError("Cannot activate dataset: PLC not connected")
self.data_streamer.activate_dataset(dataset_id)
def deactivate_dataset(self, dataset_id: str):
"""Deactivate a dataset"""
self.data_streamer.deactivate_dataset(dataset_id)
# Variable Management Methods
def add_variable_to_dataset(
self,
dataset_id: str,
name: str,
area: str,
db: int,
offset: int,
var_type: str,
bit: int = None,
streaming: bool = False,
):
"""Add a variable to a dataset"""
var_config = self.config_manager.add_variable_to_dataset(
dataset_id, name, area, db, offset, var_type, bit, streaming
)
# Create new CSV file if dataset is active and variables were modified
self.data_streamer.create_new_dataset_csv_file_for_variable_modification(
dataset_id
)
# Log the addition
area_description = self._get_area_description(area, db, offset, bit)
self.event_logger.log_event(
"info",
"variable_added",
f"Variable added to dataset '{self.config_manager.datasets[dataset_id]['name']}': {name} -> {area_description} ({var_type})",
{
"dataset_id": dataset_id,
"name": name,
"area": area,
"db": db if area == "db" else None,
"offset": offset,
"bit": bit,
"type": var_type,
"streaming": streaming,
},
)
return var_config
def remove_variable_from_dataset(self, dataset_id: str, name: str):
"""Remove a variable from a dataset"""
var_config = self.config_manager.remove_variable_from_dataset(dataset_id, name)
# Create new CSV file if dataset is active and variables were modified
self.data_streamer.create_new_dataset_csv_file_for_variable_modification(
dataset_id
)
self.event_logger.log_event(
"info",
"variable_removed",
f"Variable removed from dataset '{self.config_manager.datasets[dataset_id]['name']}': {name}",
{"dataset_id": dataset_id, "name": name, "removed_config": var_config},
)
return var_config
def toggle_variable_streaming(self, dataset_id: str, name: str, enabled: bool):
"""Toggle streaming for a variable in a dataset"""
self.config_manager.toggle_variable_streaming(dataset_id, name, enabled)
self.logger.info(
f"Dataset '{dataset_id}' variable {name} streaming: {'enabled' if enabled else 'disabled'}"
)
# Streaming Methods
def start_streaming(self) -> bool:
"""Start streaming"""
success = self.data_streamer.start_streaming()
return success
def stop_streaming(self):
"""Stop streaming"""
self.data_streamer.stop_streaming()
# Status and Information Methods
def get_status(self) -> Dict[str, Any]:
"""Get current status of the application"""
status = {
"plc_connected": self.plc_client.is_connected(),
"streaming": self.data_streamer.is_streaming(),
"csv_recording": self.data_streamer.is_csv_recording(),
"udp_config": self.config_manager.udp_config,
"plc_config": self.config_manager.plc_config,
"datasets_count": len(self.config_manager.datasets),
"active_datasets_count": len(self.config_manager.active_datasets),
"total_variables": sum(
len(dataset["variables"])
for dataset in self.config_manager.datasets.values()
),
"streaming_variables_count": sum(
len(dataset.get("streaming_variables", []))
for dataset in self.config_manager.datasets.values()
),
"sampling_interval": self.config_manager.sampling_interval,
"disk_space_info": self.get_disk_space_info(),
}
return status
def get_disk_space_info(self) -> Dict[str, Any]:
"""Get information about disk space usage and recording time estimates"""
try:
# Get the records directory path
records_path = "records"
if not os.path.exists(records_path):
os.makedirs(records_path)
# Get disk usage for the drive where records are stored
usage = psutil.disk_usage(os.path.abspath(records_path))
# Calculate average CSV file size (estimate based on active datasets)
avg_file_size_per_hour = self._estimate_csv_size_per_hour()
# Calculate recording time left (in hours)
hours_left = usage.free / (
avg_file_size_per_hour if avg_file_size_per_hour > 0 else 1024 * 1024
) # Default to 1MB if no estimate
days_left = hours_left / 24
# Format the time left
if hours_left > 48:
time_left = f"{days_left:.1f} days"
else:
time_left = f"{hours_left:.1f} hours"
# Format human readable sizes
free_space = self._format_size(usage.free)
total_space = self._format_size(usage.total)
used_space = self._format_size(usage.used)
return {
"free_space": free_space,
"total_space": total_space,
"used_space": used_space,
"percent_used": usage.percent,
"recording_time_left": time_left,
"avg_file_size_per_hour": self._format_size(avg_file_size_per_hour),
}
except Exception as e:
if hasattr(self, "logger"):
self.logger.error(f"Error calculating disk space: {e}")
return None
def _estimate_csv_size_per_hour(self) -> float:
"""Estimate CSV file size per hour based on active datasets and variables"""
try:
# Get active datasets
active_dataset_ids = self.config_manager.active_datasets
if not active_dataset_ids:
return 0
# Get CSV directory to check existing files
records_dir = "records"
if not os.path.exists(records_dir):
# If no records directory exists yet, make a rough estimate
return self._rough_size_estimate()
# Try to find actual CSV files to calculate average size
total_size = 0
file_count = 0
# Look at today's directory if it exists
today_dir = os.path.join(records_dir, datetime.now().strftime("%d-%m-%Y"))
if os.path.exists(today_dir):
for filename in os.listdir(today_dir):
if filename.endswith(".csv"):
file_path = os.path.join(today_dir, filename)
if os.path.isfile(file_path):
total_size += os.path.getsize(file_path)
file_count += 1
# If we found files, calculate the average size per hour
if file_count > 0:
avg_size = total_size / file_count
# Multiply by active datasets (if we have data from fewer datasets)
active_count = len(active_dataset_ids)
if active_count > file_count:
avg_size = (avg_size / file_count) * active_count
return avg_size
else:
# Fallback to rough estimate
return self._rough_size_estimate()
except Exception as e:
if hasattr(self, "logger"):
self.logger.error(f"Error estimating CSV size: {e}")
# Return a reasonable default (500KB per hour per dataset)
return 500 * 1024 * len(self.config_manager.active_datasets)
def _rough_size_estimate(self) -> float:
"""Make a rough estimate of CSV file size per hour"""
active_datasets = self.config_manager.active_datasets
total_vars = 0
# Count variables in active datasets
for dataset_id in active_datasets:
if dataset_id in self.config_manager.datasets:
total_vars += len(
self.config_manager.datasets[dataset_id].get("variables", {})
)
# Estimate based on:
# - Each variable produces one value per sampling interval
# - Each value with timestamp takes about 20 bytes on average
# - Sample interval in seconds
sampling_interval = self.config_manager.sampling_interval
if sampling_interval <= 0:
sampling_interval = 0.1 # Default
# Calculate bytes per hour
records_per_hour = 3600 / sampling_interval
bytes_per_hour = total_vars * records_per_hour * 20
# Add 10% overhead for CSV formatting
return bytes_per_hour * 1.1
def _format_size(self, size_bytes):
"""Format file size in a human-readable format"""
if size_bytes == 0:
return "0B"
size_names = ("B", "KB", "MB", "GB", "TB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_names[i]}"
def get_datasets(self):
"""Get all datasets information"""
return {
"datasets": self.config_manager.datasets,
"active_datasets": list(self.config_manager.active_datasets),
"current_dataset_id": self.config_manager.current_dataset_id,
}
def get_dataset_variables(self, dataset_id: str):
"""Get variables for a specific dataset"""
return self.config_manager.get_dataset_variables(dataset_id)
def get_recent_events(self, limit: int = 50):
"""Get recent events from the log"""
return self.event_logger.get_recent_events(limit)
# Auto-recovery and Instance Management
def attempt_auto_recovery(self):
"""Attempt to restore previous system state"""
return self.instance_manager.attempt_auto_recovery(
self.config_manager, self.plc_client, self.data_streamer
)
def release_instance_lock(self):
"""Release instance lock"""
self.instance_manager.release_instance_lock()
# Utility Methods
def _get_area_description(
self, area: str, db: int = None, offset: int = 0, bit: int = None
) -> str:
"""Get area description for logging"""
area_descriptions = {
"db": (
f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}"
),
"mw": f"MW{offset}",
"m": f"M{offset}",
"pew": f"PEW{offset}",
"pe": f"PE{offset}",
"paw": f"PAW{offset}",
"pa": f"PA{offset}",
"e": f"E{offset}.{bit}",
"a": f"A{offset}.{bit}",
"mb": f"M{offset}.{bit}",
}
return area_descriptions.get(area.lower(), f"{area.upper()}{offset}")
# Properties for backward compatibility
@property
def datasets(self):
"""Get datasets (backward compatibility)"""
return self.config_manager.datasets
@property
def active_datasets(self):
"""Get active datasets (backward compatibility)"""
return self.config_manager.active_datasets
@property
def current_dataset_id(self):
"""Get current dataset ID (backward compatibility)"""
return self.config_manager.current_dataset_id
@current_dataset_id.setter
def current_dataset_id(self, value):
"""Set current dataset ID (backward compatibility)"""
self.config_manager.current_dataset_id = value
self.config_manager.save_datasets()
@property
def connected(self):
"""Get connection status (backward compatibility)"""
return self.plc_client.is_connected()
@property
def streaming(self):
"""Get streaming status (backward compatibility)"""
return self.data_streamer.is_streaming()
def save_datasets(self):
"""Save datasets (backward compatibility)"""
self.config_manager.save_datasets()

View File

@ -0,0 +1,605 @@
import json
import socket
import time
import threading
import csv
import os
import sys
from datetime import datetime
from typing import Dict, Any, Optional, Set
from pathlib import Path
def resource_path(relative_path):
"""Get absolute path to resource, works for dev and for PyInstaller"""
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
# Not running in a bundle
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class DataStreamer:
"""Handles data streaming, CSV recording, and dataset management"""
def __init__(self, config_manager, plc_client, event_logger, logger=None):
"""Initialize data streamer"""
self.config_manager = config_manager
self.plc_client = plc_client
self.event_logger = event_logger
self.logger = logger
# UDP streaming setup
self.udp_socket = None
# Streaming state
self.streaming = False
# Dataset streaming threads and files
self.dataset_threads = {} # dataset_id -> thread object
self.dataset_csv_files = {} # dataset_id -> file handle
self.dataset_csv_writers = {} # dataset_id -> csv writer
self.dataset_csv_hours = {} # dataset_id -> current hour
self.dataset_using_modification_files = {} # dataset_id -> bool
def setup_udp_socket(self) -> bool:
"""Setup UDP socket for PlotJuggler communication"""
try:
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.logger:
self.logger.info(
f"UDP socket configured for {self.config_manager.udp_config['host']}:{self.config_manager.udp_config['port']}"
)
return True
except Exception as e:
if self.logger:
self.logger.error(f"Error configuring UDP socket: {e}")
return False
def send_to_plotjuggler(self, data: Dict[str, Any]):
"""Send data to PlotJuggler via UDP JSON"""
if not self.udp_socket:
return
try:
message = {"timestamp": time.time(), "data": data}
json_message = json.dumps(message)
self.udp_socket.sendto(
json_message.encode("utf-8"),
(
self.config_manager.udp_config["host"],
self.config_manager.udp_config["port"],
),
)
except Exception as e:
if self.logger:
self.logger.error(f"Error sending data to PlotJuggler: {e}")
def get_csv_directory_path(self) -> str:
"""Get the directory path for current day's CSV files"""
now = datetime.now()
day_folder = now.strftime("%d-%m-%Y")
return os.path.join("records", day_folder)
def ensure_csv_directory(self):
"""Create CSV directory structure if it doesn't exist"""
directory = self.get_csv_directory_path()
Path(directory).mkdir(parents=True, exist_ok=True)
def get_dataset_csv_file_path(
self, dataset_id: str, use_modification_timestamp: bool = False
) -> str:
"""Get the CSV file path for a specific dataset"""
if dataset_id not in self.config_manager.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
now = datetime.now()
prefix = self.config_manager.datasets[dataset_id]["prefix"]
if use_modification_timestamp:
time_suffix = now.strftime("%H_%M_%S")
filename = f"{prefix}_{time_suffix}.csv"
else:
hour = now.strftime("%H")
filename = f"{prefix}_{hour}.csv"
directory = self.get_csv_directory_path()
return os.path.join(directory, filename)
def setup_dataset_csv_file(self, dataset_id: str):
"""Setup CSV file for a specific dataset"""
current_hour = datetime.now().hour
# If we're using a modification file and the hour hasn't changed, keep using it
if (
self.dataset_using_modification_files.get(dataset_id, False)
and dataset_id in self.dataset_csv_hours
and self.dataset_csv_hours[dataset_id] == current_hour
and dataset_id in self.dataset_csv_files
):
return
# Check if we need to create a new file
if (
dataset_id not in self.dataset_csv_hours
or self.dataset_csv_hours[dataset_id] != current_hour
or dataset_id not in self.dataset_csv_files
):
# Close previous file if open
if dataset_id in self.dataset_csv_files:
self.dataset_csv_files[dataset_id].close()
# Create directory and file for current hour
self.ensure_csv_directory()
csv_path = self.get_dataset_csv_file_path(dataset_id)
# Check if file exists to determine if we need headers
file_exists = os.path.exists(csv_path)
self.dataset_csv_files[dataset_id] = open(
csv_path, "a", newline="", encoding="utf-8"
)
self.dataset_csv_writers[dataset_id] = csv.writer(
self.dataset_csv_files[dataset_id]
)
self.dataset_csv_hours[dataset_id] = current_hour
# Reset modification file flag when creating regular hourly file
self.dataset_using_modification_files[dataset_id] = False
# Write headers if it's a new file
dataset_variables = self.config_manager.get_dataset_variables(dataset_id)
if not file_exists and dataset_variables:
headers = ["timestamp"] + list(dataset_variables.keys())
self.dataset_csv_writers[dataset_id].writerow(headers)
self.dataset_csv_files[dataset_id].flush()
if self.logger:
self.logger.info(
f"CSV file created for dataset '{self.config_manager.datasets[dataset_id]['name']}': {csv_path}"
)
def write_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]):
"""Write data to CSV file for a specific dataset"""
if dataset_id not in self.config_manager.active_datasets:
return
try:
self.setup_dataset_csv_file(dataset_id)
if dataset_id in self.dataset_csv_writers:
# Create timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Create row with all variables for this dataset
dataset_variables = self.config_manager.get_dataset_variables(
dataset_id
)
row = [timestamp]
for var_name in dataset_variables.keys():
row.append(data.get(var_name, None))
self.dataset_csv_writers[dataset_id].writerow(row)
self.dataset_csv_files[dataset_id].flush()
except Exception as e:
if self.logger:
self.logger.error(
f"Error writing CSV data for dataset {dataset_id}: {e}"
)
def create_new_dataset_csv_file_for_variable_modification(self, dataset_id: str):
"""Create a new CSV file for a dataset when variables are modified during active recording"""
if dataset_id not in self.config_manager.active_datasets:
return
try:
# Close current file if open
if dataset_id in self.dataset_csv_files:
self.dataset_csv_files[dataset_id].close()
del self.dataset_csv_files[dataset_id]
del self.dataset_csv_writers[dataset_id]
if self.logger:
self.logger.info(
f"Closed previous CSV file for dataset '{self.config_manager.datasets[dataset_id]['name']}' due to variable modification"
)
# Create new file with modification timestamp
self.ensure_csv_directory()
csv_path = self.get_dataset_csv_file_path(
dataset_id, use_modification_timestamp=True
)
self.dataset_csv_files[dataset_id] = open(
csv_path, "w", newline="", encoding="utf-8"
)
self.dataset_csv_writers[dataset_id] = csv.writer(
self.dataset_csv_files[dataset_id]
)
# Mark that we're using a modification file and set current hour
self.dataset_using_modification_files[dataset_id] = True
self.dataset_csv_hours[dataset_id] = datetime.now().hour
# Write headers with new variable configuration
dataset_variables = self.config_manager.get_dataset_variables(dataset_id)
if dataset_variables:
headers = ["timestamp"] + list(dataset_variables.keys())
self.dataset_csv_writers[dataset_id].writerow(headers)
self.dataset_csv_files[dataset_id].flush()
dataset_name = self.config_manager.datasets[dataset_id]["name"]
if self.logger:
self.logger.info(
f"New CSV file created after variable modification for dataset '{dataset_name}': {csv_path}"
)
self.event_logger.log_event(
"info",
"dataset_csv_file_created",
f"New CSV file created after variable modification for dataset '{dataset_name}': {os.path.basename(csv_path)}",
{
"dataset_id": dataset_id,
"file_path": csv_path,
"variables_count": len(dataset_variables),
"reason": "variable_modification",
},
)
except Exception as e:
dataset_name = self.config_manager.datasets.get(dataset_id, {}).get(
"name", dataset_id
)
if self.logger:
self.logger.error(
f"Error creating new CSV file after variable modification for dataset '{dataset_name}': {e}"
)
self.event_logger.log_event(
"error",
"dataset_csv_error",
f"Failed to create new CSV file after variable modification for dataset '{dataset_name}': {str(e)}",
{"dataset_id": dataset_id, "error": str(e)},
)
def read_dataset_variables(
self, dataset_id: str, variables: Dict[str, Any]
) -> Dict[str, Any]:
"""Read all variables for a specific dataset"""
data = {}
for var_name, var_config in variables.items():
try:
value = self.plc_client.read_variable(var_config)
data[var_name] = value
except Exception as e:
if self.logger:
self.logger.warning(
f"Error reading variable {var_name} in dataset {dataset_id}: {e}"
)
data[var_name] = None
return data
def dataset_streaming_loop(self, dataset_id: str):
"""Streaming loop for a specific dataset"""
dataset_info = self.config_manager.datasets[dataset_id]
interval = self.config_manager.get_dataset_sampling_interval(dataset_id)
if self.logger:
self.logger.info(
f"Dataset '{dataset_info['name']}' streaming loop started (interval: {interval}s)"
)
consecutive_errors = 0
max_consecutive_errors = 5
while (
dataset_id in self.config_manager.active_datasets
and self.plc_client.is_connected()
):
try:
start_time = time.time()
# Read variables for this dataset
dataset_variables = self.config_manager.get_dataset_variables(
dataset_id
)
all_data = self.read_dataset_variables(dataset_id, dataset_variables)
if all_data:
consecutive_errors = 0
# Write to CSV (all variables)
self.write_dataset_csv_data(dataset_id, all_data)
# Get filtered data for streaming - only variables that are in streaming_variables list AND have streaming=true
streaming_variables = dataset_info.get("streaming_variables", [])
dataset_vars_config = dataset_info.get("variables", {})
streaming_data = {
name: value
for name, value in all_data.items()
if name in streaming_variables
and dataset_vars_config.get(name, {}).get("streaming", False)
}
# Send filtered data to PlotJuggler
if streaming_data:
self.send_to_plotjuggler(streaming_data)
# Log data
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
if self.logger:
self.logger.info(
f"[{timestamp}] Dataset '{dataset_info['name']}': CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars"
)
else:
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
self.event_logger.log_event(
"error",
"dataset_streaming_error",
f"Multiple consecutive read failures for dataset '{dataset_info['name']}' ({consecutive_errors}). Stopping streaming.",
{
"dataset_id": dataset_id,
"consecutive_errors": consecutive_errors,
},
)
break
# Maintain sampling interval
elapsed = time.time() - start_time
sleep_time = max(0, interval - elapsed)
time.sleep(sleep_time)
except Exception as e:
consecutive_errors += 1
self.event_logger.log_event(
"error",
"dataset_streaming_error",
f"Error in dataset '{dataset_info['name']}' streaming loop: {str(e)}",
{
"dataset_id": dataset_id,
"error": str(e),
"consecutive_errors": consecutive_errors,
},
)
if consecutive_errors >= max_consecutive_errors:
self.event_logger.log_event(
"error",
"dataset_streaming_error",
f"Too many consecutive errors for dataset '{dataset_info['name']}'. Stopping streaming.",
{
"dataset_id": dataset_id,
"consecutive_errors": consecutive_errors,
},
)
break
time.sleep(1) # Wait before retry
# Clean up when exiting
self.stop_dataset_streaming(dataset_id)
if self.logger:
self.logger.info(f"Dataset '{dataset_info['name']}' streaming loop ended")
def start_dataset_streaming(self, dataset_id: str):
"""Start streaming thread for a specific dataset"""
if dataset_id not in self.config_manager.datasets:
return False
if dataset_id in self.dataset_threads:
return True # Already running
# Create and start thread for this dataset
thread = threading.Thread(
target=self.dataset_streaming_loop, args=(dataset_id,)
)
thread.daemon = True
self.dataset_threads[dataset_id] = thread
thread.start()
dataset_info = self.config_manager.datasets[dataset_id]
interval = self.config_manager.get_dataset_sampling_interval(dataset_id)
if self.logger:
self.logger.info(
f"Started streaming for dataset '{dataset_info['name']}' (interval: {interval}s)"
)
return True
def stop_dataset_streaming(self, dataset_id: str):
"""Stop streaming thread for a specific dataset"""
if dataset_id in self.dataset_threads:
# The thread will detect this and stop
thread = self.dataset_threads[dataset_id]
if thread.is_alive():
thread.join(timeout=2)
del self.dataset_threads[dataset_id]
# Close CSV file if open
if dataset_id in self.dataset_csv_files:
self.dataset_csv_files[dataset_id].close()
del self.dataset_csv_files[dataset_id]
del self.dataset_csv_writers[dataset_id]
del self.dataset_csv_hours[dataset_id]
# Reset modification file flag
self.dataset_using_modification_files.pop(dataset_id, None)
dataset_info = self.config_manager.datasets.get(dataset_id, {})
if self.logger:
self.logger.info(
f"Stopped streaming for dataset '{dataset_info.get('name', dataset_id)}'"
)
def activate_dataset(self, dataset_id: str):
"""Activate a dataset for streaming and CSV recording"""
if dataset_id not in self.config_manager.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
if not self.plc_client.is_connected():
raise RuntimeError("Cannot activate dataset: PLC not connected")
self.config_manager.activate_dataset(dataset_id)
# Start streaming thread for this dataset
self.start_dataset_streaming(dataset_id)
dataset_info = self.config_manager.datasets[dataset_id]
self.event_logger.log_event(
"info",
"dataset_activated",
f"Dataset activated: {dataset_info['name']}",
{
"dataset_id": dataset_id,
"variables_count": len(dataset_info["variables"]),
"streaming_count": len(dataset_info["streaming_variables"]),
"prefix": dataset_info["prefix"],
},
)
def deactivate_dataset(self, dataset_id: str):
"""Deactivate a dataset"""
if dataset_id not in self.config_manager.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
self.config_manager.deactivate_dataset(dataset_id)
# Stop streaming thread for this dataset
self.stop_dataset_streaming(dataset_id)
dataset_info = self.config_manager.datasets[dataset_id]
self.event_logger.log_event(
"info",
"dataset_deactivated",
f"Dataset deactivated: {dataset_info['name']}",
{"dataset_id": dataset_id},
)
def start_streaming(self) -> bool:
"""Start data streaming - activates all datasets with variables"""
if not self.plc_client.is_connected():
self.event_logger.log_event(
"error", "streaming_error", "Cannot start streaming: PLC not connected"
)
return False
if not self.config_manager.datasets:
self.event_logger.log_event(
"error",
"streaming_error",
"Cannot start streaming: No datasets configured",
)
return False
if not self.setup_udp_socket():
self.event_logger.log_event(
"error",
"streaming_error",
"Cannot start streaming: UDP socket setup failed",
)
return False
# Activate all datasets that have variables
activated_count = 0
for dataset_id, dataset_info in self.config_manager.datasets.items():
if dataset_info.get("variables"):
try:
self.activate_dataset(dataset_id)
activated_count += 1
except Exception as e:
if self.logger:
self.logger.warning(
f"Failed to activate dataset {dataset_id}: {e}"
)
if activated_count == 0:
self.event_logger.log_event(
"error",
"streaming_error",
"Cannot start streaming: No datasets with variables configured",
)
return False
self.streaming = True
self.config_manager.save_system_state(
connected=self.plc_client.is_connected(),
streaming=True,
active_datasets=self.config_manager.active_datasets,
)
self.event_logger.log_event(
"info",
"streaming_started",
f"Multi-dataset streaming started: {activated_count} datasets activated",
{
"activated_datasets": activated_count,
"total_datasets": len(self.config_manager.datasets),
"udp_host": self.config_manager.udp_config["host"],
"udp_port": self.config_manager.udp_config["port"],
},
)
return True
def stop_streaming(self):
"""Stop streaming - deactivates all active datasets"""
self.streaming = False
# Stop all dataset streaming threads
active_datasets_copy = self.config_manager.active_datasets.copy()
for dataset_id in active_datasets_copy:
try:
self.deactivate_dataset(dataset_id)
except Exception as e:
if self.logger:
self.logger.warning(f"Error deactivating dataset {dataset_id}: {e}")
# Close UDP socket
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = None
self.config_manager.save_system_state(
connected=self.plc_client.is_connected(),
streaming=False,
active_datasets=set(),
)
datasets_stopped = len(active_datasets_copy)
self.event_logger.log_event(
"info",
"streaming_stopped",
f"Multi-dataset streaming stopped: {datasets_stopped} datasets deactivated",
)
def is_streaming(self) -> bool:
"""Check if streaming is active"""
return self.streaming
def is_csv_recording(self) -> bool:
"""Check if CSV recording is active"""
return bool(self.dataset_csv_files) and self.streaming
def get_active_datasets(self) -> Set[str]:
"""Get set of currently active dataset IDs"""
return self.config_manager.active_datasets.copy()
def get_streaming_stats(self) -> Dict[str, Any]:
"""Get streaming statistics"""
return {
"streaming": self.streaming,
"active_datasets": len(self.config_manager.active_datasets),
"active_threads": len(self.dataset_threads),
"open_csv_files": len(self.dataset_csv_files),
"udp_socket_active": self.udp_socket is not None,
}

13
main.py
View File

@ -23,13 +23,14 @@ from pathlib import Path
import atexit
import psutil
import sys
from core.plc_client import PLCDataStreamer
from core import PLCDataStreamer
app = Flask(__name__)
app.secret_key = "plc_streamer_secret_key"
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
"""Get absolute path to resource, works for dev and for PyInstaller"""
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
@ -57,6 +58,12 @@ def serve_image(filename):
return send_from_directory(".images", filename)
@app.route("/static/<path:filename>")
def serve_static(filename):
"""Serve static files (CSS, JS, etc.)"""
return send_from_directory("static", filename)
@app.route("/")
def index():
"""Main page"""
@ -739,7 +746,7 @@ def get_events():
{
"success": True,
"events": events,
"total_events": len(streamer.events_log),
"total_events": len(streamer.event_logger.events_log),
"showing": len(events),
}
)

4
static/css/pico.min.css vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff