diff --git a/application_events.json b/application_events.json index 6addc8e..cfc5c03 100644 --- a/application_events.json +++ b/application_events.json @@ -1411,8 +1411,352 @@ "streaming_count": 3, "prefix": "dar" } + }, + { + "timestamp": "2025-07-18T15:11:14.294668", + "level": "info", + "event_type": "Application started", + "message": "Application initialization completed successfully", + "details": {} + }, + { + "timestamp": "2025-07-18T15:11:14.321092", + "level": "info", + "event_type": "plc_connection", + "message": "Successfully connected to PLC 10.1.33.11", + "details": { + "ip": "10.1.33.11", + "rack": 0, + "slot": 2 + } + }, + { + "timestamp": "2025-07-18T15:11:14.325574", + "level": "info", + "event_type": "dataset_activated", + "message": "Dataset activated: DAR", + "details": { + "dataset_id": "dar", + "variables_count": 4, + "streaming_count": 3, + "prefix": "dar" + } + }, + { + "timestamp": "2025-07-18T15:11:51.299652", + "level": "info", + "event_type": "dataset_deactivated", + "message": "Dataset deactivated: DAR", + "details": { + "dataset_id": "dar" + } + }, + { + "timestamp": "2025-07-18T15:11:51.303633", + "level": "info", + "event_type": "streaming_stopped", + "message": "Multi-dataset streaming stopped: 1 datasets deactivated", + "details": {} + }, + { + "timestamp": "2025-07-18T15:11:51.306353", + "level": "info", + "event_type": "plc_disconnection", + "message": "Disconnected from PLC 10.1.33.11", + "details": {} + }, + { + "timestamp": "2025-07-18T15:14:25.443703", + "level": "info", + "event_type": "Application started", + "message": "Application initialization completed successfully", + "details": {} + }, + { + "timestamp": "2025-07-18T15:14:32.201432", + "level": "error", + "event_type": "streaming_error", + "message": "Cannot start streaming: PLC not connected", + "details": {} + }, + { + "timestamp": "2025-07-18T15:14:38.523861", + "level": "info", + "event_type": "plc_connection", + "message": "Successfully connected to PLC 10.1.33.11", + "details": { + "ip": "10.1.33.11", + "rack": 0, + "slot": 2 + } + }, + { + "timestamp": "2025-07-18T15:14:41.291500", + "level": "info", + "event_type": "dataset_activated", + "message": "Dataset activated: DAR", + "details": { + "dataset_id": "dar", + "variables_count": 4, + "streaming_count": 3, + "prefix": "dar" + } + }, + { + "timestamp": "2025-07-18T15:14:41.294494", + "level": "info", + "event_type": "streaming_started", + "message": "Multi-dataset streaming started: 1 datasets activated", + "details": { + "activated_datasets": 1, + "total_datasets": 2, + "udp_host": "127.0.0.1", + "udp_port": 9870 + } + }, + { + "timestamp": "2025-07-18T16:03:07.780616", + "level": "info", + "event_type": "Application started", + "message": "Application initialization completed successfully", + "details": {} + }, + { + "timestamp": "2025-07-18T16:03:07.801197", + "level": "info", + "event_type": "plc_connection", + "message": "Successfully connected to PLC 10.1.33.11", + "details": { + "ip": "10.1.33.11", + "rack": 0, + "slot": 2 + } + }, + { + "timestamp": "2025-07-18T16:03:07.805174", + "level": "info", + "event_type": "dataset_activated", + "message": "Dataset activated: DAR", + "details": { + "dataset_id": "dar", + "variables_count": 4, + "streaming_count": 3, + "prefix": "dar" + } + }, + { + "timestamp": "2025-07-18T16:07:14.134957", + "level": "info", + "event_type": "dataset_csv_file_created", + "message": "New CSV file created after variable modification for dataset 'DAR': dar_16_07_14.csv", + "details": { + "dataset_id": "dar", + "file_path": "records\\18-07-2025\\dar_16_07_14.csv", + "variables_count": 3, + "reason": "variable_modification" + } + }, + { + "timestamp": "2025-07-18T16:07:14.138946", + "level": "info", + "event_type": "variable_removed", + "message": "Variable removed from dataset 'DAR': PEW300", + "details": { + "dataset_id": "dar", + "name": "PEW300", + "removed_config": { + "area": "pew", + "offset": 300, + "type": "word", + "streaming": false + } + } + }, + { + "timestamp": "2025-07-18T16:07:14.460648", + "level": "info", + "event_type": "dataset_csv_file_created", + "message": "New CSV file created after variable modification for dataset 'DAR': dar_16_07_14.csv", + "details": { + "dataset_id": "dar", + "file_path": "records\\18-07-2025\\dar_16_07_14.csv", + "variables_count": 4, + "reason": "variable_modification" + } + }, + { + "timestamp": "2025-07-18T16:07:14.465179", + "level": "info", + "event_type": "variable_added", + "message": "Variable added to dataset 'DAR': UR62_PEW -> PEW300 (word)", + "details": { + "dataset_id": "dar", + "name": "UR62_PEW", + "area": "pew", + "db": null, + "offset": 300, + "bit": null, + "type": "word", + "streaming": false + } + }, + { + "timestamp": "2025-07-18T16:07:19.834364", + "level": "info", + "event_type": "dataset_deactivated", + "message": "Dataset deactivated: DAR", + "details": { + "dataset_id": "dar" + } + }, + { + "timestamp": "2025-07-18T16:07:19.838351", + "level": "info", + "event_type": "streaming_stopped", + "message": "Multi-dataset streaming stopped: 1 datasets deactivated", + "details": {} + }, + { + "timestamp": "2025-07-18T16:07:50.834816", + "level": "info", + "event_type": "variable_added", + "message": "Variable added to dataset 'DAR': UR29_PEW -> PEW304 (word)", + "details": { + "dataset_id": "dar", + "name": "UR29_PEW", + "area": "pew", + "db": null, + "offset": 304, + "bit": null, + "type": "word", + "streaming": false + } + }, + { + "timestamp": "2025-07-18T16:08:15.313118", + "level": "info", + "event_type": "variable_removed", + "message": "Variable removed from dataset 'DAR': CTS306_Conditi", + "details": { + "dataset_id": "dar", + "name": "CTS306_Conditi", + "removed_config": { + "area": "db", + "offset": 18, + "type": "real", + "streaming": true, + "db": 2124 + } + } + }, + { + "timestamp": "2025-07-18T16:08:50.490326", + "level": "info", + "event_type": "variable_removed", + "message": "Variable removed from dataset 'DAR': UR62_Brix", + "details": { + "dataset_id": "dar", + "name": "UR62_Brix", + "removed_config": { + "area": "db", + "offset": 18, + "type": "real", + "streaming": true, + "db": 2122 + } + } + }, + { + "timestamp": "2025-07-18T16:08:50.750605", + "level": "info", + "event_type": "variable_added", + "message": "Variable added to dataset 'DAR': UR62_Brix -> DB1011.1296 (real)", + "details": { + "dataset_id": "dar", + "name": "UR62_Brix", + "area": "db", + "db": 1011, + "offset": 1296, + "bit": null, + "type": "real", + "streaming": false + } + }, + { + "timestamp": "2025-07-18T16:09:30.790847", + "level": "info", + "event_type": "variable_added", + "message": "Variable added to dataset 'DAR': UR29_Brix -> DB1011.1322 (real)", + "details": { + "dataset_id": "dar", + "name": "UR29_Brix", + "area": "db", + "db": 1011, + "offset": 1322, + "bit": null, + "type": "real", + "streaming": false + } + }, + { + "timestamp": "2025-07-18T16:14:37.452312", + "level": "info", + "event_type": "variable_added", + "message": "Variable added to dataset 'DAR': CTS306_PV -> DB1011.1328 (real)", + "details": { + "dataset_id": "dar", + "name": "CTS306_PV", + "area": "db", + "db": 1011, + "offset": 1328, + "bit": null, + "type": "real", + "streaming": false + } + }, + { + "timestamp": "2025-07-18T16:14:46.295741", + "level": "info", + "event_type": "config_change", + "message": "PLC configuration updated: 10.1.33.11:0/2", + "details": { + "old_config": { + "ip": "10.1.33.11", + "rack": 0, + "slot": 2 + }, + "new_config": { + "ip": "10.1.33.11", + "rack": 0, + "slot": 2 + } + } + }, + { + "timestamp": "2025-07-18T16:14:48.200032", + "level": "info", + "event_type": "dataset_activated", + "message": "Dataset activated: DAR", + "details": { + "dataset_id": "dar", + "variables_count": 6, + "streaming_count": 1, + "prefix": "dar" + } + }, + { + "timestamp": "2025-07-18T16:14:48.203024", + "level": "info", + "event_type": "streaming_started", + "message": "Multi-dataset streaming started: 1 datasets activated", + "details": { + "activated_datasets": 1, + "total_datasets": 2, + "udp_host": "127.0.0.1", + "udp_port": 9870 + } } ], - "last_updated": "2025-07-18T10:02:33.093596", - "total_entries": 128 + "last_updated": "2025-07-18T16:14:48.203024", + "total_entries": 157 } \ No newline at end of file diff --git a/core/config_manager.py b/core/config_manager.py new file mode 100644 index 0000000..e69de29 diff --git a/core/plc_client.py b/core/plc_client.py new file mode 100644 index 0000000..4d93f8c --- /dev/null +++ b/core/plc_client.py @@ -0,0 +1,1916 @@ +from flask import ( + Flask, + render_template, + request, + jsonify, + redirect, + url_for, + send_from_directory, +) +import snap7 +import snap7.util +import json +import socket +import time +import logging +import threading +from datetime import datetime +from typing import Dict, Any, Optional, List +import struct +import os +import csv +from pathlib import Path +import atexit +import psutil +import sys + +app = Flask(__name__) +app.secret_key = "plc_streamer_secret_key" + +def resource_path(relative_path): + """ Get absolute path to resource, works for dev and for PyInstaller """ + try: + # PyInstaller creates a temp folder and stores path in _MEIPASS + base_path = sys._MEIPASS + except Exception: + # Not running in a bundle + base_path = os.path.abspath(".") + + return os.path.join(base_path, relative_path) + + +class PLCDataStreamer: + def __init__(self): + """Initialize the PLC data streamer""" + # Configuration file paths + # Use resource_path to handle bundled and script paths correctly + self.config_file = resource_path("plc_config.json") + self.variables_file = resource_path("plc_variables.json") + self.datasets_file = resource_path("plc_datasets.json") + self.state_file = resource_path("system_state.json") + self.events_log_file = resource_path("application_events.json") + + # Default configuration + self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2} + self.udp_config = {"host": "127.0.0.1", "port": 9870} + + # Multiple datasets structure + self.datasets = {} # Dictionary of dataset_id -> dataset_config + self.active_datasets = set() # Set of active dataset IDs + self.current_dataset_id = None # Currently selected dataset for editing + + # Dataset streaming threads and files + self.dataset_threads = {} # dataset_id -> thread object + self.dataset_csv_files = {} # dataset_id -> file handle + self.dataset_csv_writers = {} # dataset_id -> csv writer + self.dataset_csv_hours = {} # dataset_id -> current hour + self.dataset_using_modification_files = ( + {} + ) # dataset_id -> bool (track modification files) + + # System states + self.plc = None + self.udp_socket = None + self.connected = False + self.streaming = False + self.stream_thread = None + self.sampling_interval = 0.1 + + # Auto-recovery settings + self.auto_recovery_enabled = True + self.last_state = { + "should_connect": False, + "should_stream": False, + "should_record_csv": False, + } + + # Single instance control + self.lock_file = "plc_streamer.lock" + self.lock_fd = None + + # Events log for persistent logging + self.events_log = [] + self.max_log_entries = 1000 # Maximum number of log entries to keep + + # Setup logging first + self.setup_logging() + + # Load configuration from files + self.load_configuration() + self.load_datasets() # Load multiple datasets configuration + self.sync_streaming_variables() # Synchronize streaming variables configuration + self.load_system_state() + self.load_events_log() + + # Acquire instance lock and attempt auto-recovery + if self.acquire_instance_lock(): + # Small delay to ensure previous instance has fully cleaned up + time.sleep(1) + self.log_event( + "info", + "Application started", + "Application initialization completed successfully", + ) + self.attempt_auto_recovery() + else: + raise RuntimeError("Another instance of the application is already running") + + def setup_logging(self): + """Configure the logging system""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()], + ) + self.logger = logging.getLogger(__name__) + + def load_configuration(self): + """Load PLC and UDP configuration from JSON file""" + try: + if os.path.exists(self.config_file): + with open(self.config_file, "r") as f: + config = json.load(f) + self.plc_config = config.get("plc_config", self.plc_config) + self.udp_config = config.get("udp_config", self.udp_config) + self.sampling_interval = config.get( + "sampling_interval", self.sampling_interval + ) + self.logger.info(f"Configuration loaded from {self.config_file}") + else: + self.logger.info("No configuration file found, using defaults") + except Exception as e: + self.logger.error(f"Error loading configuration: {e}") + + def save_configuration(self): + """Save PLC and UDP configuration to JSON file""" + try: + config = { + "plc_config": self.plc_config, + "udp_config": self.udp_config, + "sampling_interval": self.sampling_interval, + } + with open(self.config_file, "w") as f: + json.dump(config, f, indent=4) + self.logger.info(f"Configuration saved to {self.config_file}") + except Exception as e: + self.logger.error(f"Error saving configuration: {e}") + + def load_datasets(self): + """Load datasets configuration from JSON file""" + try: + if os.path.exists(self.datasets_file): + with open(self.datasets_file, "r") as f: + datasets_data = json.load(f) + self.datasets = datasets_data.get("datasets", {}) + self.active_datasets = set(datasets_data.get("active_datasets", [])) + self.current_dataset_id = datasets_data.get("current_dataset_id") + + # Validate current_dataset_id exists + if ( + self.current_dataset_id + and self.current_dataset_id not in self.datasets + ): + self.current_dataset_id = None + + # Set default current dataset if none selected + if not self.current_dataset_id and self.datasets: + self.current_dataset_id = next(iter(self.datasets.keys())) + + self.logger.info( + f"Datasets loaded from {self.datasets_file}: {len(self.datasets)} datasets, {len(self.active_datasets)} active" + ) + else: + self.logger.info("No datasets file found, starting with empty datasets") + except Exception as e: + self.logger.error(f"Error loading datasets: {e}") + + def save_datasets(self): + """Save datasets configuration to JSON file""" + try: + datasets_data = { + "datasets": self.datasets, + "active_datasets": list(self.active_datasets), + "current_dataset_id": self.current_dataset_id, + "version": "1.0", + "last_update": datetime.now().isoformat(), + } + with open(self.datasets_file, "w") as f: + json.dump(datasets_data, f, indent=4) + self.logger.info(f"Datasets configuration saved to {self.datasets_file}") + except Exception as e: + self.logger.error(f"Error saving datasets: {e}") + + def sync_streaming_variables(self): + """Synchronize streaming variables configuration - ensure variables in streaming_variables list have streaming=true""" + try: + sync_needed = False + for dataset_id, dataset_info in self.datasets.items(): + streaming_vars = dataset_info.get("streaming_variables", []) + variables_config = dataset_info.get("variables", {}) + + for var_name in streaming_vars: + if var_name in variables_config: + # If variable is in streaming list but doesn't have streaming=true, fix it + if not variables_config[var_name].get("streaming", False): + variables_config[var_name]["streaming"] = True + sync_needed = True + self.logger.info( + f"Synchronized streaming flag for variable '{var_name}' in dataset '{dataset_id}'" + ) + + # Also ensure variables not in streaming list have streaming=false + for var_name, var_config in variables_config.items(): + if var_name not in streaming_vars and var_config.get( + "streaming", False + ): + var_config["streaming"] = False + sync_needed = True + self.logger.info( + f"Disabled streaming flag for variable '{var_name}' in dataset '{dataset_id}'" + ) + + if sync_needed: + self.save_datasets() + self.logger.info("Streaming variables configuration synchronized") + + except Exception as e: + self.logger.error(f"Error synchronizing streaming variables: {e}") + + def create_dataset( + self, dataset_id: str, name: str, prefix: str, sampling_interval: float = None + ): + """Create a new dataset""" + if dataset_id in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' already exists") + + new_dataset = { + "name": name, + "prefix": prefix, + "variables": {}, + "streaming_variables": [], + "sampling_interval": sampling_interval, + "enabled": False, + "created": datetime.now().isoformat(), + } + + self.datasets[dataset_id] = new_dataset + + # Set as current dataset if it's the first one + if not self.current_dataset_id: + self.current_dataset_id = dataset_id + + self.save_datasets() + + self.log_event( + "info", + "dataset_created", + f"Dataset created: {name} (prefix: {prefix})", + { + "dataset_id": dataset_id, + "name": name, + "prefix": prefix, + "sampling_interval": sampling_interval, + }, + ) + + def delete_dataset(self, dataset_id: str): + """Delete a dataset""" + if dataset_id not in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' does not exist") + + # Stop dataset if it's active + if dataset_id in self.active_datasets: + self.stop_dataset(dataset_id) + + dataset_info = self.datasets[dataset_id].copy() + del self.datasets[dataset_id] + + # Update current dataset if this was selected + if self.current_dataset_id == dataset_id: + self.current_dataset_id = ( + next(iter(self.datasets.keys())) if self.datasets else None + ) + + self.save_datasets() + + self.log_event( + "info", + "dataset_deleted", + f"Dataset deleted: {dataset_info['name']}", + {"dataset_id": dataset_id, "dataset_info": dataset_info}, + ) + + def get_current_dataset(self): + """Get the currently selected dataset""" + if self.current_dataset_id and self.current_dataset_id in self.datasets: + return self.datasets[self.current_dataset_id] + return None + + def get_dataset_variables(self, dataset_id: str): + """Get variables for a specific dataset""" + if dataset_id in self.datasets: + return self.datasets[dataset_id].get("variables", {}) + return {} + + def get_dataset_sampling_interval(self, dataset_id: str): + """Get sampling interval for a dataset (falls back to global if not set)""" + if dataset_id in self.datasets: + dataset_interval = self.datasets[dataset_id].get("sampling_interval") + return ( + dataset_interval + if dataset_interval is not None + else self.sampling_interval + ) + return self.sampling_interval + + def add_variable_to_dataset( + self, + dataset_id: str, + name: str, + area: str, + db: int, + offset: int, + var_type: str, + bit: int = None, + streaming: bool = False, + ): + """Add a variable to a specific dataset""" + if dataset_id not in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' does not exist") + + # Validate area and type (reuse existing validation logic) + area = area.lower() + if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]: + raise ValueError( + f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb" + ) + + valid_types = [ + "real", + "int", + "bool", + "dint", + "word", + "byte", + "uint", + "udint", + "sint", + "usint", + ] + if var_type not in valid_types: + raise ValueError( + f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}" + ) + + # Create variable configuration + var_config = { + "area": area, + "offset": offset, + "type": var_type, + "streaming": streaming, + } + + if area == "db": + var_config["db"] = db + if area in ["e", "a", "mb"] or (area == "db" and bit is not None): + var_config["bit"] = bit + + # Add to dataset + self.datasets[dataset_id]["variables"][name] = var_config + + # Update streaming variables list if streaming is enabled + if streaming: + if name not in self.datasets[dataset_id]["streaming_variables"]: + self.datasets[dataset_id]["streaming_variables"].append(name) + + self.save_datasets() + + # Create new CSV file if dataset is active and variables were modified + self.create_new_dataset_csv_file_for_variable_modification(dataset_id) + + # Log the addition + area_description = { + "db": ( + f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}" + ), + "mw": f"MW{offset}", + "m": f"M{offset}", + "pew": f"PEW{offset}", + "pe": f"PE{offset}", + "paw": f"PAW{offset}", + "pa": f"PA{offset}", + "e": f"E{offset}.{bit}", + "a": f"A{offset}.{bit}", + "mb": f"M{offset}.{bit}", + } + + self.log_event( + "info", + "variable_added", + f"Variable added to dataset '{self.datasets[dataset_id]['name']}': {name} -> {area_description[area]} ({var_type})", + { + "dataset_id": dataset_id, + "name": name, + "area": area, + "db": db if area == "db" else None, + "offset": offset, + "bit": bit, + "type": var_type, + "streaming": streaming, + }, + ) + + def remove_variable_from_dataset(self, dataset_id: str, name: str): + """Remove a variable from a specific dataset""" + if dataset_id not in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' does not exist") + + if name not in self.datasets[dataset_id]["variables"]: + raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'") + + var_config = self.datasets[dataset_id]["variables"][name].copy() + del self.datasets[dataset_id]["variables"][name] + + # Remove from streaming variables if present + if name in self.datasets[dataset_id]["streaming_variables"]: + self.datasets[dataset_id]["streaming_variables"].remove(name) + + self.save_datasets() + + # Create new CSV file if dataset is active and variables were modified + self.create_new_dataset_csv_file_for_variable_modification(dataset_id) + + self.log_event( + "info", + "variable_removed", + f"Variable removed from dataset '{self.datasets[dataset_id]['name']}': {name}", + {"dataset_id": dataset_id, "name": name, "removed_config": var_config}, + ) + + def toggle_variable_streaming(self, dataset_id: str, name: str, enabled: bool): + """Toggle streaming for a variable in a dataset""" + if dataset_id not in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' does not exist") + + if name not in self.datasets[dataset_id]["variables"]: + raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'") + + # Update the individual variable streaming flag + self.datasets[dataset_id]["variables"][name]["streaming"] = enabled + + # Update the streaming variables list + if enabled: + if name not in self.datasets[dataset_id]["streaming_variables"]: + self.datasets[dataset_id]["streaming_variables"].append(name) + else: + if name in self.datasets[dataset_id]["streaming_variables"]: + self.datasets[dataset_id]["streaming_variables"].remove(name) + + self.save_datasets() + + self.logger.info( + f"Dataset '{dataset_id}' variable {name} streaming: {'enabled' if enabled else 'disabled'}" + ) + + def activate_dataset(self, dataset_id: str): + """Activate a dataset for streaming and CSV recording""" + if dataset_id not in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' does not exist") + + if not self.connected: + raise RuntimeError("Cannot activate dataset: PLC not connected") + + self.active_datasets.add(dataset_id) + self.datasets[dataset_id]["enabled"] = True + self.save_datasets() + + # Start streaming thread for this dataset + self.start_dataset_streaming(dataset_id) + + dataset_info = self.datasets[dataset_id] + self.log_event( + "info", + "dataset_activated", + f"Dataset activated: {dataset_info['name']}", + { + "dataset_id": dataset_id, + "variables_count": len(dataset_info["variables"]), + "streaming_count": len(dataset_info["streaming_variables"]), + "prefix": dataset_info["prefix"], + }, + ) + + def deactivate_dataset(self, dataset_id: str): + """Deactivate a dataset""" + if dataset_id not in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' does not exist") + + self.active_datasets.discard(dataset_id) + self.datasets[dataset_id]["enabled"] = False + self.save_datasets() + + # Stop streaming thread for this dataset + self.stop_dataset_streaming(dataset_id) + + dataset_info = self.datasets[dataset_id] + self.log_event( + "info", + "dataset_deactivated", + f"Dataset deactivated: {dataset_info['name']}", + {"dataset_id": dataset_id}, + ) + + def start_dataset_streaming(self, dataset_id: str): + """Start streaming thread for a specific dataset""" + if dataset_id not in self.datasets: + return False + + if dataset_id in self.dataset_threads: + return True # Already running + + # Create and start thread for this dataset + thread = threading.Thread( + target=self.dataset_streaming_loop, args=(dataset_id,) + ) + thread.daemon = True + self.dataset_threads[dataset_id] = thread + thread.start() + + dataset_info = self.datasets[dataset_id] + interval = self.get_dataset_sampling_interval(dataset_id) + + self.logger.info( + f"Started streaming for dataset '{dataset_info['name']}' (interval: {interval}s)" + ) + return True + + def stop_dataset_streaming(self, dataset_id: str): + """Stop streaming thread for a specific dataset""" + if dataset_id in self.dataset_threads: + # The thread will detect this and stop + thread = self.dataset_threads[dataset_id] + if thread.is_alive(): + thread.join(timeout=2) + del self.dataset_threads[dataset_id] + + # Close CSV file if open + if dataset_id in self.dataset_csv_files: + self.dataset_csv_files[dataset_id].close() + del self.dataset_csv_files[dataset_id] + del self.dataset_csv_writers[dataset_id] + del self.dataset_csv_hours[dataset_id] + # Reset modification file flag + self.dataset_using_modification_files.pop(dataset_id, None) + + dataset_info = self.datasets.get(dataset_id, {}) + self.logger.info( + f"Stopped streaming for dataset '{dataset_info.get('name', dataset_id)}'" + ) + + def dataset_streaming_loop(self, dataset_id: str): + """Streaming loop for a specific dataset""" + dataset_info = self.datasets[dataset_id] + interval = self.get_dataset_sampling_interval(dataset_id) + + self.logger.info( + f"Dataset '{dataset_info['name']}' streaming loop started (interval: {interval}s)" + ) + + consecutive_errors = 0 + max_consecutive_errors = 5 + + while dataset_id in self.active_datasets and self.connected: + try: + start_time = time.time() + + # Read variables for this dataset + dataset_variables = self.get_dataset_variables(dataset_id) + all_data = self.read_dataset_variables(dataset_id, dataset_variables) + + if all_data: + consecutive_errors = 0 + + # Write to CSV (all variables) + self.write_dataset_csv_data(dataset_id, all_data) + + # Get filtered data for streaming - only variables that are in streaming_variables list AND have streaming=true + streaming_variables = dataset_info.get("streaming_variables", []) + dataset_vars_config = dataset_info.get("variables", {}) + streaming_data = { + name: value + for name, value in all_data.items() + if name in streaming_variables + and dataset_vars_config.get(name, {}).get("streaming", False) + } + + # Send filtered data to PlotJuggler + if streaming_data: + self.send_to_plotjuggler(streaming_data) + + # Log data + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + self.logger.info( + f"[{timestamp}] Dataset '{dataset_info['name']}': CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars" + ) + else: + consecutive_errors += 1 + if consecutive_errors >= max_consecutive_errors: + self.log_event( + "error", + "dataset_streaming_error", + f"Multiple consecutive read failures for dataset '{dataset_info['name']}' ({consecutive_errors}). Stopping streaming.", + { + "dataset_id": dataset_id, + "consecutive_errors": consecutive_errors, + }, + ) + break + + # Maintain sampling interval + elapsed = time.time() - start_time + sleep_time = max(0, interval - elapsed) + time.sleep(sleep_time) + + except Exception as e: + consecutive_errors += 1 + self.log_event( + "error", + "dataset_streaming_error", + f"Error in dataset '{dataset_info['name']}' streaming loop: {str(e)}", + { + "dataset_id": dataset_id, + "error": str(e), + "consecutive_errors": consecutive_errors, + }, + ) + + if consecutive_errors >= max_consecutive_errors: + self.log_event( + "error", + "dataset_streaming_error", + f"Too many consecutive errors for dataset '{dataset_info['name']}'. Stopping streaming.", + { + "dataset_id": dataset_id, + "consecutive_errors": consecutive_errors, + }, + ) + break + + time.sleep(1) # Wait before retry + + # Clean up when exiting + self.stop_dataset_streaming(dataset_id) + self.logger.info(f"Dataset '{dataset_info['name']}' streaming loop ended") + + def read_dataset_variables( + self, dataset_id: str, variables: Dict[str, Any] + ) -> Dict[str, Any]: + """Read all variables for a specific dataset""" + data = {} + + for var_name, var_config in variables.items(): + try: + value = self.read_variable(var_config) + data[var_name] = value + except Exception as e: + self.logger.warning( + f"Error reading variable {var_name} in dataset {dataset_id}: {e}" + ) + data[var_name] = None + + return data + + def get_dataset_csv_file_path( + self, dataset_id: str, use_modification_timestamp: bool = False + ) -> str: + """Get the CSV file path for a specific dataset""" + if dataset_id not in self.datasets: + raise ValueError(f"Dataset '{dataset_id}' does not exist") + + now = datetime.now() + prefix = self.datasets[dataset_id]["prefix"] + + if use_modification_timestamp: + time_suffix = now.strftime("%H_%M_%S") + filename = f"{prefix}_{time_suffix}.csv" + else: + hour = now.strftime("%H") + filename = f"{prefix}_{hour}.csv" + + directory = self.get_csv_directory_path() + return os.path.join(directory, filename) + + def setup_dataset_csv_file(self, dataset_id: str): + """Setup CSV file for a specific dataset""" + current_hour = datetime.now().hour + + # If we're using a modification file and the hour hasn't changed, keep using it + if ( + self.dataset_using_modification_files.get(dataset_id, False) + and dataset_id in self.dataset_csv_hours + and self.dataset_csv_hours[dataset_id] == current_hour + and dataset_id in self.dataset_csv_files + ): + return + + # Check if we need to create a new file + if ( + dataset_id not in self.dataset_csv_hours + or self.dataset_csv_hours[dataset_id] != current_hour + or dataset_id not in self.dataset_csv_files + ): + + # Close previous file if open + if dataset_id in self.dataset_csv_files: + self.dataset_csv_files[dataset_id].close() + + # Create directory and file for current hour + self.ensure_csv_directory() + csv_path = self.get_dataset_csv_file_path(dataset_id) + + # Check if file exists to determine if we need headers + file_exists = os.path.exists(csv_path) + + self.dataset_csv_files[dataset_id] = open( + csv_path, "a", newline="", encoding="utf-8" + ) + self.dataset_csv_writers[dataset_id] = csv.writer( + self.dataset_csv_files[dataset_id] + ) + self.dataset_csv_hours[dataset_id] = current_hour + + # Reset modification file flag when creating regular hourly file + self.dataset_using_modification_files[dataset_id] = False + + # Write headers if it's a new file + dataset_variables = self.get_dataset_variables(dataset_id) + if not file_exists and dataset_variables: + headers = ["timestamp"] + list(dataset_variables.keys()) + self.dataset_csv_writers[dataset_id].writerow(headers) + self.dataset_csv_files[dataset_id].flush() + self.logger.info( + f"CSV file created for dataset '{self.datasets[dataset_id]['name']}': {csv_path}" + ) + + def write_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]): + """Write data to CSV file for a specific dataset""" + if dataset_id not in self.active_datasets: + return + + try: + self.setup_dataset_csv_file(dataset_id) + + if dataset_id in self.dataset_csv_writers: + # Create timestamp + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + # Create row with all variables for this dataset + dataset_variables = self.get_dataset_variables(dataset_id) + row = [timestamp] + for var_name in dataset_variables.keys(): + row.append(data.get(var_name, None)) + + self.dataset_csv_writers[dataset_id].writerow(row) + self.dataset_csv_files[dataset_id].flush() + + except Exception as e: + self.logger.error(f"Error writing CSV data for dataset {dataset_id}: {e}") + + def create_new_dataset_csv_file_for_variable_modification(self, dataset_id: str): + """Create a new CSV file for a dataset when variables are modified during active recording""" + if dataset_id not in self.active_datasets: + return + + try: + # Close current file if open + if dataset_id in self.dataset_csv_files: + self.dataset_csv_files[dataset_id].close() + del self.dataset_csv_files[dataset_id] + del self.dataset_csv_writers[dataset_id] + self.logger.info( + f"Closed previous CSV file for dataset '{self.datasets[dataset_id]['name']}' due to variable modification" + ) + + # Create new file with modification timestamp + self.ensure_csv_directory() + csv_path = self.get_dataset_csv_file_path( + dataset_id, use_modification_timestamp=True + ) + + self.dataset_csv_files[dataset_id] = open( + csv_path, "w", newline="", encoding="utf-8" + ) + self.dataset_csv_writers[dataset_id] = csv.writer( + self.dataset_csv_files[dataset_id] + ) + + # Mark that we're using a modification file and set current hour + self.dataset_using_modification_files[dataset_id] = True + self.dataset_csv_hours[dataset_id] = datetime.now().hour + + # Write headers with new variable configuration + dataset_variables = self.get_dataset_variables(dataset_id) + if dataset_variables: + headers = ["timestamp"] + list(dataset_variables.keys()) + self.dataset_csv_writers[dataset_id].writerow(headers) + self.dataset_csv_files[dataset_id].flush() + + dataset_name = self.datasets[dataset_id]["name"] + self.logger.info( + f"New CSV file created after variable modification for dataset '{dataset_name}': {csv_path}" + ) + self.log_event( + "info", + "dataset_csv_file_created", + f"New CSV file created after variable modification for dataset '{dataset_name}': {os.path.basename(csv_path)}", + { + "dataset_id": dataset_id, + "file_path": csv_path, + "variables_count": len(dataset_variables), + "reason": "variable_modification", + }, + ) + + except Exception as e: + dataset_name = self.datasets.get(dataset_id, {}).get("name", dataset_id) + self.logger.error( + f"Error creating new CSV file after variable modification for dataset '{dataset_name}': {e}" + ) + self.log_event( + "error", + "dataset_csv_error", + f"Failed to create new CSV file after variable modification for dataset '{dataset_name}': {str(e)}", + {"dataset_id": dataset_id, "error": str(e)}, + ) + + def load_system_state(self): + """Load system state from JSON file""" + try: + if os.path.exists(self.state_file): + with open(self.state_file, "r") as f: + state_data = json.load(f) + self.last_state = state_data.get("last_state", self.last_state) + self.auto_recovery_enabled = state_data.get( + "auto_recovery_enabled", True + ) + self.logger.info(f"System state loaded from {self.state_file}") + else: + self.logger.info("No system state file found, starting with defaults") + except Exception as e: + self.logger.error(f"Error loading system state: {e}") + + def save_system_state(self): + """Save current system state to JSON file""" + try: + state_data = { + "last_state": { + "should_connect": self.connected, + "should_stream": self.streaming, + "active_datasets": list(self.active_datasets), + }, + "auto_recovery_enabled": self.auto_recovery_enabled, + "last_update": datetime.now().isoformat(), + } + + with open(self.state_file, "w") as f: + json.dump(state_data, f, indent=4) + self.logger.debug("System state saved") + except Exception as e: + self.logger.error(f"Error saving system state: {e}") + + def attempt_auto_recovery(self): + """Attempt to restore previous system state""" + if not self.auto_recovery_enabled: + self.logger.info("Auto-recovery disabled, skipping state restoration") + return + + self.logger.info("Attempting auto-recovery of previous state...") + + # Try to restore connection + if self.last_state.get("should_connect", False): + self.logger.info("Attempting to restore PLC connection...") + if self.connect_plc(): + self.logger.info("PLC connection restored successfully") + + # Try to restore streaming if connection was successful + if self.last_state.get("should_stream", False): + self.logger.info("Attempting to restore streaming...") + + # Setup UDP socket first + if not self.setup_udp_socket(): + self.logger.warning( + "Failed to setup UDP socket during auto-recovery" + ) + return + + # Restore active datasets + restored_datasets = self.last_state.get("active_datasets", []) + activated_count = 0 + + for dataset_id in restored_datasets: + if dataset_id in self.datasets: + try: + self.activate_dataset(dataset_id) + activated_count += 1 + except Exception as e: + self.logger.warning( + f"Failed to restore dataset {dataset_id}: {e}" + ) + + if activated_count > 0: + self.streaming = True + self.save_system_state() + self.logger.info( + f"Streaming restored successfully: {activated_count} datasets activated" + ) + else: + self.logger.warning( + "Failed to restore streaming: no datasets activated" + ) + else: + self.logger.warning("Failed to restore PLC connection") + + def acquire_instance_lock(self) -> bool: + """Acquire lock to ensure single instance execution""" + try: + # Check if lock file exists + if os.path.exists(self.lock_file): + # Read PID from existing lock file + with open(self.lock_file, "r") as f: + try: + old_pid = int(f.read().strip()) + + # Check if process is still running + if psutil.pid_exists(old_pid): + # Get process info to verify it's our application + try: + proc = psutil.Process(old_pid) + cmdline = " ".join(proc.cmdline()) + if "main.py" in cmdline or "plc" in cmdline.lower(): + self.logger.error( + f"Another instance is already running (PID: {old_pid})" + ) + return False + except (psutil.NoSuchProcess, psutil.AccessDenied): + # Process doesn't exist or can't access, continue + pass + + # Old process is dead, remove stale lock file + os.remove(self.lock_file) + self.logger.info("Removed stale lock file") + + except (ValueError, IOError): + # Invalid lock file, remove it + os.remove(self.lock_file) + self.logger.info("Removed invalid lock file") + + # Create new lock file with current PID + with open(self.lock_file, "w") as f: + f.write(str(os.getpid())) + + # Register cleanup function + atexit.register(self.release_instance_lock) + + self.logger.info( + f"Instance lock acquired: {self.lock_file} (PID: {os.getpid()})" + ) + return True + + except Exception as e: + self.logger.error(f"Error acquiring instance lock: {e}") + return False + + def release_instance_lock(self): + """Release instance lock""" + try: + # Remove lock file + if os.path.exists(self.lock_file): + os.remove(self.lock_file) + self.logger.info("Instance lock released") + + except Exception as e: + self.logger.error(f"Error releasing instance lock: {e}") + + def save_variables(self): + """Save variables configuration to JSON file""" + try: + # Update streaming state in variables before saving + for var_name in self.variables: + self.variables[var_name]["streaming"] = ( + var_name in self.streaming_variables + ) + + with open(self.variables_file, "w") as f: + json.dump(self.variables, f, indent=4) + self.logger.info(f"Variables saved to {self.variables_file}") + except Exception as e: + self.logger.error(f"Error saving variables: {e}") + + def update_plc_config(self, ip: str, rack: int, slot: int): + """Update PLC configuration""" + old_config = self.plc_config.copy() + self.plc_config = {"ip": ip, "rack": rack, "slot": slot} + self.save_configuration() + + config_details = {"old_config": old_config, "new_config": self.plc_config} + self.log_event( + "info", + "config_change", + f"PLC configuration updated: {ip}:{rack}/{slot}", + config_details, + ) + + def update_udp_config(self, host: str, port: int): + """Update UDP configuration""" + old_config = self.udp_config.copy() + self.udp_config = {"host": host, "port": port} + self.save_configuration() + + config_details = {"old_config": old_config, "new_config": self.udp_config} + self.log_event( + "info", + "config_change", + f"UDP configuration updated: {host}:{port}", + config_details, + ) + + def update_sampling_interval(self, interval: float): + """Update sampling interval""" + old_interval = self.sampling_interval + self.sampling_interval = interval + self.save_configuration() + + config_details = {"old_interval": old_interval, "new_interval": interval} + self.log_event( + "info", + "config_change", + f"Sampling interval updated: {interval}s", + config_details, + ) + + def add_variable( + self, name: str, area: str, db: int, offset: int, var_type: str, bit: int = None + ): + """Add a variable for polling""" + area = area.lower() + + # Validate area type - ahora incluye áreas de bits individuales + if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]: + raise ValueError( + f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb" + ) + + # Validate data type + valid_types = [ + "real", + "int", + "bool", + "dint", + "word", + "byte", + "uint", + "udint", + "sint", + "usint", + ] + if var_type not in valid_types: + raise ValueError( + f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}" + ) + + # Para áreas de bits individuales, el tipo debe ser bool y bit debe estar especificado + if area in ["e", "a", "mb"] and var_type != "bool": + raise ValueError(f"For bit areas ({area}), data type must be 'bool'") + + if area in ["e", "a", "mb"] and bit is None: + raise ValueError( + f"For bit areas ({area}), bit position must be specified (0-7)" + ) + + # Validar rango de bit para todas las áreas que lo soporten + if bit is not None and (bit < 0 or bit > 7): + raise ValueError("Bit position must be between 0 and 7") + + # Create variable configuration + var_config = { + "area": area, + "offset": offset, + "type": var_type, + "streaming": False, + } + + # Add DB number only for DB area + if area == "db": + var_config["db"] = db + + # Add bit position for bit areas and DB with specific bit + if area in ["e", "a", "mb"] or (area == "db" and bit is not None): + var_config["bit"] = bit + + self.variables[name] = var_config + self.save_variables() + + variable_details = { + "name": name, + "area": area, + "db": db if area == "db" else None, + "offset": offset, + "bit": bit, + "type": var_type, + "total_variables": len(self.variables), + } + + # Updated area description to include bit addresses + area_description = { + "db": ( + f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}" + ), + "mw": f"MW{offset}", + "m": f"M{offset}", + "pew": f"PEW{offset}", + "pe": f"PE{offset}", + "paw": f"PAW{offset}", + "pa": f"PA{offset}", + "e": f"E{offset}.{bit}", + "a": f"A{offset}.{bit}", + "mb": f"M{offset}.{bit}", + } + + self.log_event( + "info", + "variable_added", + f"Variable added: {name} -> {area_description[area]} ({var_type})", + variable_details, + ) + self.create_new_csv_file_for_variable_modification() + + def remove_variable(self, name: str): + """Remove a variable from polling""" + if name in self.variables: + var_config = self.variables[name].copy() + del self.variables[name] + # Also remove from streaming variables if present + self.streaming_variables.discard(name) + self.save_variables() + + variable_details = { + "name": name, + "removed_config": var_config, + "total_variables": len(self.variables), + } + self.log_event( + "info", + "variable_removed", + f"Variable removed: {name}", + variable_details, + ) + self.create_new_csv_file_for_variable_modification() + + def toggle_streaming_variable(self, name: str, enabled: bool): + """Enable or disable a variable for streaming""" + if name in self.variables: + if enabled: + self.streaming_variables.add(name) + else: + self.streaming_variables.discard(name) + + # Save changes to persist streaming configuration + self.save_variables() + + self.logger.info( + f"Variable {name} streaming: {'enabled' if enabled else 'disabled'}" + ) + + def get_csv_directory_path(self) -> str: + """Get the directory path for current day's CSV files""" + now = datetime.now() + day_folder = now.strftime("%d-%m-%Y") + return os.path.join("records", day_folder) + + def get_csv_file_path(self, use_modification_timestamp: bool = False) -> str: + """Get the complete file path for current hour's CSV file""" + now = datetime.now() + + if use_modification_timestamp: + # Create filename with hour_min_sec format for variable modifications + time_suffix = now.strftime("%H_%M_%S") + filename = f"_{time_suffix}.csv" + else: + # Standard hourly format + hour = now.strftime("%H") + filename = f"{hour}.csv" + + directory = self.get_csv_directory_path() + return os.path.join(directory, filename) + + def ensure_csv_directory(self): + """Create CSV directory structure if it doesn't exist""" + directory = self.get_csv_directory_path() + Path(directory).mkdir(parents=True, exist_ok=True) + + def create_new_csv_file_for_variable_modification(self): + """Create a new CSV file when variables are modified during active recording""" + if not self.csv_recording: + return + + try: + # Close current file if open + if self.current_csv_file: + self.current_csv_file.close() + self.logger.info( + f"Closed previous CSV file due to variable modification" + ) + + # Create new file with modification timestamp + self.ensure_csv_directory() + csv_path = self.get_csv_file_path(use_modification_timestamp=True) + + self.current_csv_file = open(csv_path, "w", newline="", encoding="utf-8") + self.current_csv_writer = csv.writer(self.current_csv_file) + + # Mark that we're using a modification file and set current hour + self.using_modification_file = True + self.current_hour = datetime.now().hour + + # Write headers with new variable configuration + if self.variables: + headers = ["timestamp"] + list(self.variables.keys()) + self.current_csv_writer.writerow(headers) + self.current_csv_file.flush() + self.csv_headers_written = True + + self.logger.info( + f"New CSV file created after variable modification: {csv_path}" + ) + self.log_event( + "info", + "csv_file_created", + f"New CSV file created after variable modification: {os.path.basename(csv_path)}", + { + "file_path": csv_path, + "variables_count": len(self.variables), + "reason": "variable_modification", + }, + ) + + except Exception as e: + self.logger.error( + f"Error creating new CSV file after variable modification: {e}" + ) + self.log_event( + "error", + "csv_error", + f"Failed to create new CSV file after variable modification: {str(e)}", + {"error": str(e)}, + ) + + def setup_csv_file(self): + """Setup CSV file for the current hour""" + current_hour = datetime.now().hour + + # If we're using a modification file and the hour hasn't changed, keep using it + if ( + self.using_modification_file + and self.current_hour == current_hour + and self.current_csv_file is not None + ): + return + + # Check if we need to create a new file + if self.current_hour != current_hour or self.current_csv_file is None: + # Close previous file if open + if self.current_csv_file: + self.current_csv_file.close() + + # Create directory and file for current hour + self.ensure_csv_directory() + csv_path = self.get_csv_file_path() + + # Check if file exists to determine if we need headers + file_exists = os.path.exists(csv_path) + + self.current_csv_file = open(csv_path, "a", newline="", encoding="utf-8") + self.current_csv_writer = csv.writer(self.current_csv_file) + self.current_hour = current_hour + + # Reset modification file flag when creating regular hourly file + self.using_modification_file = False + + # Write headers if it's a new file + if not file_exists and self.variables: + headers = ["timestamp"] + list(self.variables.keys()) + self.current_csv_writer.writerow(headers) + self.current_csv_file.flush() + self.csv_headers_written = True + self.logger.info(f"CSV file created: {csv_path}") + + def write_csv_data(self, data: Dict[str, Any]): + """Write data to CSV file""" + if not self.csv_recording or not self.variables: + return + + try: + self.setup_csv_file() + + if self.current_csv_writer: + # Create timestamp + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + # Create row with all variables (use None for missing values) + row = [timestamp] + for var_name in self.variables.keys(): + row.append(data.get(var_name, None)) + + self.current_csv_writer.writerow(row) + self.current_csv_file.flush() + + except Exception as e: + self.logger.error(f"Error writing CSV data: {e}") + + def get_streaming_data(self, all_data: Dict[str, Any]) -> Dict[str, Any]: + """Filter data for streaming based on selected variables""" + if not self.streaming_variables: + return all_data + + return { + name: value + for name, value in all_data.items() + if name in self.streaming_variables + } + + def connect_plc(self) -> bool: + """Connect to S7-315 PLC""" + try: + if self.plc: + self.plc.disconnect() + + self.plc = snap7.client.Client() + self.plc.connect( + self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"] + ) + + self.connected = True + self.save_system_state() + + connection_details = { + "ip": self.plc_config["ip"], + "rack": self.plc_config["rack"], + "slot": self.plc_config["slot"], + } + self.log_event( + "info", + "plc_connection", + f"Successfully connected to PLC {self.plc_config['ip']}", + connection_details, + ) + return True + + except Exception as e: + self.connected = False + error_details = { + "ip": self.plc_config["ip"], + "rack": self.plc_config["rack"], + "slot": self.plc_config["slot"], + "error": str(e), + } + self.log_event( + "error", + "plc_connection_failed", + f"Failed to connect to PLC {self.plc_config['ip']}: {str(e)}", + error_details, + ) + return False + + def disconnect_plc(self): + """Disconnect from PLC""" + try: + if self.plc: + self.plc.disconnect() + self.connected = False + self.save_system_state() + + self.log_event( + "info", + "plc_disconnection", + f"Disconnected from PLC {self.plc_config['ip']}", + ) + except Exception as e: + self.log_event( + "error", + "plc_disconnection_error", + f"Error disconnecting from PLC: {str(e)}", + {"error": str(e)}, + ) + + def read_variable(self, var_config: Dict[str, Any]) -> Any: + """Read a specific variable from the PLC""" + try: + area_type = var_config.get("area", "db").lower() + offset = var_config["offset"] + var_type = var_config["type"] + bit = var_config.get("bit") # Extract bit position for bit areas + + if area_type == "db": + # Data Block access (existing functionality) + db = var_config["db"] + if var_type == "real": + raw_data = self.plc.db_read(db, offset, 4) + value = struct.unpack(">f", raw_data)[0] + elif var_type == "int": + raw_data = self.plc.db_read(db, offset, 2) + value = struct.unpack(">h", raw_data)[0] + elif var_type == "bool": + raw_data = self.plc.db_read(db, offset, 1) + if bit is not None: + # Use snap7.util.get_bool for specific bit extraction + value = snap7.util.get_bool(raw_data, 0, bit) + else: + # Default to bit 0 for backward compatibility + value = bool(raw_data[0] & 0x01) + elif var_type == "dint": + raw_data = self.plc.db_read(db, offset, 4) + value = struct.unpack(">l", raw_data)[0] + elif var_type == "word": + raw_data = self.plc.db_read(db, offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "byte": + raw_data = self.plc.db_read(db, offset, 1) + value = struct.unpack(">B", raw_data)[0] + elif var_type == "uint": + raw_data = self.plc.db_read(db, offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "udint": + raw_data = self.plc.db_read(db, offset, 4) + value = struct.unpack(">L", raw_data)[0] + elif var_type == "sint": + raw_data = self.plc.db_read(db, offset, 1) + value = struct.unpack(">b", raw_data)[0] + elif var_type == "usint": + raw_data = self.plc.db_read(db, offset, 1) + value = struct.unpack(">B", raw_data)[0] + else: + return None + + elif area_type == "mw" or area_type == "m": + # Memory Words / Markers access + if var_type == "real": + raw_data = self.plc.mb_read(offset, 4) + value = struct.unpack(">f", raw_data)[0] + elif var_type == "int": + raw_data = self.plc.mb_read(offset, 2) + value = struct.unpack(">h", raw_data)[0] + elif var_type == "bool": + raw_data = self.plc.mb_read(offset, 1) + value = bool(raw_data[0] & 0x01) + elif var_type == "dint": + raw_data = self.plc.mb_read(offset, 4) + value = struct.unpack(">l", raw_data)[0] + elif var_type == "word": + raw_data = self.plc.mb_read(offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "byte": + raw_data = self.plc.mb_read(offset, 1) + value = struct.unpack(">B", raw_data)[0] + elif var_type == "uint": + raw_data = self.plc.mb_read(offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "udint": + raw_data = self.plc.mb_read(offset, 4) + value = struct.unpack(">L", raw_data)[0] + elif var_type == "sint": + raw_data = self.plc.mb_read(offset, 1) + value = struct.unpack(">b", raw_data)[0] + elif var_type == "usint": + raw_data = self.plc.mb_read(offset, 1) + value = struct.unpack(">B", raw_data)[0] + else: + return None + + elif area_type == "pew" or area_type == "pe": + # Process Input Words access + if var_type == "real": + raw_data = self.plc.eb_read(offset, 4) + value = struct.unpack(">f", raw_data)[0] + elif var_type == "int": + raw_data = self.plc.eb_read(offset, 2) + value = struct.unpack(">h", raw_data)[0] + elif var_type == "bool": + raw_data = self.plc.eb_read(offset, 1) + value = bool(raw_data[0] & 0x01) + elif var_type == "dint": + raw_data = self.plc.eb_read(offset, 4) + value = struct.unpack(">l", raw_data)[0] + elif var_type == "word": + raw_data = self.plc.eb_read(offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "byte": + raw_data = self.plc.eb_read(offset, 1) + value = struct.unpack(">B", raw_data)[0] + elif var_type == "uint": + raw_data = self.plc.eb_read(offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "udint": + raw_data = self.plc.eb_read(offset, 4) + value = struct.unpack(">L", raw_data)[0] + elif var_type == "sint": + raw_data = self.plc.eb_read(offset, 1) + value = struct.unpack(">b", raw_data)[0] + elif var_type == "usint": + raw_data = self.plc.eb_read(offset, 1) + value = struct.unpack(">B", raw_data)[0] + else: + return None + + elif area_type == "paw" or area_type == "pa": + # Process Output Words access + if var_type == "real": + raw_data = self.plc.ab_read(offset, 4) + value = struct.unpack(">f", raw_data)[0] + elif var_type == "int": + raw_data = self.plc.ab_read(offset, 2) + value = struct.unpack(">h", raw_data)[0] + elif var_type == "bool": + raw_data = self.plc.ab_read(offset, 1) + value = bool(raw_data[0] & 0x01) + elif var_type == "dint": + raw_data = self.plc.ab_read(offset, 4) + value = struct.unpack(">l", raw_data)[0] + elif var_type == "word": + raw_data = self.plc.ab_read(offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "byte": + raw_data = self.plc.ab_read(offset, 1) + value = struct.unpack(">B", raw_data)[0] + elif var_type == "uint": + raw_data = self.plc.ab_read(offset, 2) + value = struct.unpack(">H", raw_data)[0] + elif var_type == "udint": + raw_data = self.plc.ab_read(offset, 4) + value = struct.unpack(">L", raw_data)[0] + elif var_type == "sint": + raw_data = self.plc.ab_read(offset, 1) + value = struct.unpack(">b", raw_data)[0] + elif var_type == "usint": + raw_data = self.plc.ab_read(offset, 1) + value = struct.unpack(">B", raw_data)[0] + else: + return None + + elif area_type == "e": + # Process Input Bits access (E5.1 format) + if var_type == "bool": + raw_data = self.plc.eb_read(offset, 1) + # Use snap7.util.get_bool for proper bit extraction + value = snap7.util.get_bool(raw_data, 0, bit) + else: + return None + + elif area_type == "a": + # Process Output Bits access (A3.7 format) + if var_type == "bool": + raw_data = self.plc.ab_read(offset, 1) + # Use snap7.util.get_bool for proper bit extraction + value = snap7.util.get_bool(raw_data, 0, bit) + else: + return None + + elif area_type == "mb": + # Memory Bits access (M10.0 format) + if var_type == "bool": + raw_data = self.plc.mb_read(offset, 1) + # Use snap7.util.get_bool for proper bit extraction + value = snap7.util.get_bool(raw_data, 0, bit) + else: + return None + + else: + self.logger.error(f"Unsupported area type: {area_type}") + return None + + return value + + except Exception as e: + self.logger.error(f"Error reading variable: {e}") + return None + + def read_all_variables(self) -> Dict[str, Any]: + """Read all configured variables""" + if not self.connected or not self.plc: + return {} + + data = {} + for var_name, var_config in self.variables.items(): + value = self.read_variable(var_config) + if value is not None: + data[var_name] = value + + return data + + def setup_udp_socket(self) -> bool: + """Setup UDP socket""" + try: + if self.udp_socket: + self.udp_socket.close() + + self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.logger.info( + f"UDP socket configured for {self.udp_config['host']}:{self.udp_config['port']}" + ) + return True + + except Exception as e: + self.logger.error(f"Error configuring UDP socket: {e}") + return False + + def send_to_plotjuggler(self, data: Dict[str, Any]): + """Send data to PlotJuggler via UDP JSON""" + if not self.udp_socket: + return + + try: + message = {"timestamp": time.time(), "data": data} + + json_message = json.dumps(message) + self.udp_socket.sendto( + json_message.encode("utf-8"), + (self.udp_config["host"], self.udp_config["port"]), + ) + + except Exception as e: + self.logger.error(f"Error sending data to PlotJuggler: {e}") + + def streaming_loop(self): + """Main streaming loop""" + self.logger.info( + f"Starting streaming with interval of {self.sampling_interval}s" + ) + + consecutive_errors = 0 + max_consecutive_errors = 5 + + while self.streaming: + try: + start_time = time.time() + + # Read all variables + all_data = self.read_all_variables() + + if all_data: + # Reset error counter on successful read + consecutive_errors = 0 + + # Write to CSV (all variables) + self.write_csv_data(all_data) + + # Get filtered data for streaming + streaming_data = self.get_streaming_data(all_data) + + # Send filtered data to PlotJuggler + if streaming_data: + self.send_to_plotjuggler(streaming_data) + + # Log data + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + self.logger.info( + f"[{timestamp}] CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars" + ) + else: + consecutive_errors += 1 + if consecutive_errors >= max_consecutive_errors: + self.log_event( + "error", + "streaming_error", + f"Multiple consecutive read failures ({consecutive_errors}). Stopping streaming.", + {"consecutive_errors": consecutive_errors}, + ) + break + + # Maintain sampling interval + elapsed = time.time() - start_time + sleep_time = max(0, self.sampling_interval - elapsed) + time.sleep(sleep_time) + + except Exception as e: + consecutive_errors += 1 + self.log_event( + "error", + "streaming_error", + f"Error in streaming loop: {str(e)}", + {"error": str(e), "consecutive_errors": consecutive_errors}, + ) + + if consecutive_errors >= max_consecutive_errors: + self.log_event( + "error", + "streaming_error", + "Too many consecutive errors. Stopping streaming.", + {"consecutive_errors": consecutive_errors}, + ) + break + + time.sleep(1) # Wait before retry + + def start_streaming(self) -> bool: + """Start data streaming - activates all datasets with variables""" + if not self.connected: + self.log_event( + "error", "streaming_error", "Cannot start streaming: PLC not connected" + ) + return False + + if not self.datasets: + self.log_event( + "error", + "streaming_error", + "Cannot start streaming: No datasets configured", + ) + return False + + if not self.setup_udp_socket(): + self.log_event( + "error", + "streaming_error", + "Cannot start streaming: UDP socket setup failed", + ) + return False + + # Activate all datasets that have variables + activated_count = 0 + for dataset_id, dataset_info in self.datasets.items(): + if dataset_info.get("variables"): + try: + self.activate_dataset(dataset_id) + activated_count += 1 + except Exception as e: + self.logger.warning(f"Failed to activate dataset {dataset_id}: {e}") + + if activated_count == 0: + self.log_event( + "error", + "streaming_error", + "Cannot start streaming: No datasets with variables configured", + ) + return False + + self.streaming = True + self.save_system_state() + + self.log_event( + "info", + "streaming_started", + f"Multi-dataset streaming started: {activated_count} datasets activated", + { + "activated_datasets": activated_count, + "total_datasets": len(self.datasets), + "udp_host": self.udp_config["host"], + "udp_port": self.udp_config["port"], + }, + ) + return True + + def stop_streaming(self): + """Stop streaming - deactivates all active datasets""" + self.streaming = False + + # Stop all dataset streaming threads + active_datasets_copy = self.active_datasets.copy() + for dataset_id in active_datasets_copy: + try: + self.deactivate_dataset(dataset_id) + except Exception as e: + self.logger.warning(f"Error deactivating dataset {dataset_id}: {e}") + + # Close UDP socket + if self.udp_socket: + self.udp_socket.close() + self.udp_socket = None + + self.save_system_state() + + datasets_stopped = len(active_datasets_copy) + self.log_event( + "info", + "streaming_stopped", + f"Multi-dataset streaming stopped: {datasets_stopped} datasets deactivated", + ) + + def get_status(self) -> Dict[str, Any]: + """Get current system status""" + total_variables = sum( + len(dataset["variables"]) for dataset in self.datasets.values() + ) + + # Count only variables that are in streaming_variables list AND have streaming=true + total_streaming_vars = 0 + for dataset in self.datasets.values(): + streaming_vars = dataset.get("streaming_variables", []) + variables_config = dataset.get("variables", {}) + active_streaming_vars = [ + var + for var in streaming_vars + if variables_config.get(var, {}).get("streaming", False) + ] + total_streaming_vars += len(active_streaming_vars) + + return { + "plc_connected": self.connected, + "streaming": self.streaming, + "plc_config": self.plc_config, + "udp_config": self.udp_config, + "datasets_count": len(self.datasets), + "active_datasets_count": len(self.active_datasets), + "total_variables": total_variables, + "total_streaming_variables": total_streaming_vars, + "streaming_variables_count": total_streaming_vars, # Add this for frontend compatibility + "sampling_interval": self.sampling_interval, + "current_dataset_id": self.current_dataset_id, + "datasets": { + dataset_id: { + "name": info["name"], + "prefix": info["prefix"], + "variables_count": len(info["variables"]), + "streaming_count": len( + [ + var + for var in info.get("streaming_variables", []) + if info.get("variables", {}) + .get(var, {}) + .get("streaming", False) + ] + ), + "sampling_interval": info.get("sampling_interval"), + "enabled": info.get("enabled", False), + "active": dataset_id in self.active_datasets, + } + for dataset_id, info in self.datasets.items() + }, + } + + def log_event( + self, level: str, event_type: str, message: str, details: Dict[str, Any] = None + ): + """Add an event to the persistent log""" + try: + event = { + "timestamp": datetime.now().isoformat(), + "level": level, # info, warning, error + "event_type": event_type, # connection, disconnection, error, config_change, etc. + "message": message, + "details": details or {}, + } + + self.events_log.append(event) + + # Limit log size + if len(self.events_log) > self.max_log_entries: + self.events_log = self.events_log[-self.max_log_entries :] + + # Save to file + self.save_events_log() + + # Also log to regular logger + if level == "error": + self.logger.error(f"[{event_type}] {message}") + elif level == "warning": + self.logger.warning(f"[{event_type}] {message}") + else: + self.logger.info(f"[{event_type}] {message}") + + except Exception as e: + self.logger.error(f"Error adding event to log: {e}") + + def load_events_log(self): + """Load events log from JSON file""" + try: + if os.path.exists(self.events_log_file): + with open(self.events_log_file, "r", encoding="utf-8") as f: + data = json.load(f) + self.events_log = data.get("events", []) + # Limit log size on load + if len(self.events_log) > self.max_log_entries: + self.events_log = self.events_log[-self.max_log_entries :] + self.logger.info(f"Events log loaded: {len(self.events_log)} entries") + else: + self.events_log = [] + self.logger.info("No events log file found, starting with empty log") + except Exception as e: + self.logger.error(f"Error loading events log: {e}") + self.events_log = [] + + def save_events_log(self): + """Save events log to JSON file""" + try: + log_data = { + "events": self.events_log, + "last_updated": datetime.now().isoformat(), + "total_entries": len(self.events_log), + } + with open(self.events_log_file, "w", encoding="utf-8") as f: + json.dump(log_data, f, indent=2, ensure_ascii=False) + except Exception as e: + self.logger.error(f"Error saving events log: {e}") + + def get_recent_events(self, limit: int = 50) -> List[Dict[str, Any]]: + """Get recent events from the log""" + return self.events_log[-limit:] if self.events_log else [] + diff --git a/core/streamer.py b/core/streamer.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py index f43f560..8d354de 100644 --- a/main.py +++ b/main.py @@ -23,6 +23,7 @@ from pathlib import Path import atexit import psutil import sys +from core.plc_client import PLCDataStreamer app = Flask(__name__) app.secret_key = "plc_streamer_secret_key" @@ -39,1882 +40,6 @@ def resource_path(relative_path): return os.path.join(base_path, relative_path) -class PLCDataStreamer: - def __init__(self): - """Initialize the PLC data streamer""" - # Configuration file paths - # Use resource_path to handle bundled and script paths correctly - self.config_file = resource_path("plc_config.json") - self.variables_file = resource_path("plc_variables.json") - self.datasets_file = resource_path("plc_datasets.json") - self.state_file = resource_path("system_state.json") - self.events_log_file = resource_path("application_events.json") - - # Default configuration - self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2} - self.udp_config = {"host": "127.0.0.1", "port": 9870} - - # Multiple datasets structure - self.datasets = {} # Dictionary of dataset_id -> dataset_config - self.active_datasets = set() # Set of active dataset IDs - self.current_dataset_id = None # Currently selected dataset for editing - - # Dataset streaming threads and files - self.dataset_threads = {} # dataset_id -> thread object - self.dataset_csv_files = {} # dataset_id -> file handle - self.dataset_csv_writers = {} # dataset_id -> csv writer - self.dataset_csv_hours = {} # dataset_id -> current hour - self.dataset_using_modification_files = ( - {} - ) # dataset_id -> bool (track modification files) - - # System states - self.plc = None - self.udp_socket = None - self.connected = False - self.streaming = False - self.stream_thread = None - self.sampling_interval = 0.1 - - # Auto-recovery settings - self.auto_recovery_enabled = True - self.last_state = { - "should_connect": False, - "should_stream": False, - "should_record_csv": False, - } - - # Single instance control - self.lock_file = "plc_streamer.lock" - self.lock_fd = None - - # Events log for persistent logging - self.events_log = [] - self.max_log_entries = 1000 # Maximum number of log entries to keep - - # Setup logging first - self.setup_logging() - - # Load configuration from files - self.load_configuration() - self.load_datasets() # Load multiple datasets configuration - self.sync_streaming_variables() # Synchronize streaming variables configuration - self.load_system_state() - self.load_events_log() - - # Acquire instance lock and attempt auto-recovery - if self.acquire_instance_lock(): - # Small delay to ensure previous instance has fully cleaned up - time.sleep(1) - self.log_event( - "info", - "Application started", - "Application initialization completed successfully", - ) - self.attempt_auto_recovery() - else: - raise RuntimeError("Another instance of the application is already running") - - def setup_logging(self): - """Configure the logging system""" - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", - handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()], - ) - self.logger = logging.getLogger(__name__) - - def load_configuration(self): - """Load PLC and UDP configuration from JSON file""" - try: - if os.path.exists(self.config_file): - with open(self.config_file, "r") as f: - config = json.load(f) - self.plc_config = config.get("plc_config", self.plc_config) - self.udp_config = config.get("udp_config", self.udp_config) - self.sampling_interval = config.get( - "sampling_interval", self.sampling_interval - ) - self.logger.info(f"Configuration loaded from {self.config_file}") - else: - self.logger.info("No configuration file found, using defaults") - except Exception as e: - self.logger.error(f"Error loading configuration: {e}") - - def save_configuration(self): - """Save PLC and UDP configuration to JSON file""" - try: - config = { - "plc_config": self.plc_config, - "udp_config": self.udp_config, - "sampling_interval": self.sampling_interval, - } - with open(self.config_file, "w") as f: - json.dump(config, f, indent=4) - self.logger.info(f"Configuration saved to {self.config_file}") - except Exception as e: - self.logger.error(f"Error saving configuration: {e}") - - def load_datasets(self): - """Load datasets configuration from JSON file""" - try: - if os.path.exists(self.datasets_file): - with open(self.datasets_file, "r") as f: - datasets_data = json.load(f) - self.datasets = datasets_data.get("datasets", {}) - self.active_datasets = set(datasets_data.get("active_datasets", [])) - self.current_dataset_id = datasets_data.get("current_dataset_id") - - # Validate current_dataset_id exists - if ( - self.current_dataset_id - and self.current_dataset_id not in self.datasets - ): - self.current_dataset_id = None - - # Set default current dataset if none selected - if not self.current_dataset_id and self.datasets: - self.current_dataset_id = next(iter(self.datasets.keys())) - - self.logger.info( - f"Datasets loaded from {self.datasets_file}: {len(self.datasets)} datasets, {len(self.active_datasets)} active" - ) - else: - self.logger.info("No datasets file found, starting with empty datasets") - except Exception as e: - self.logger.error(f"Error loading datasets: {e}") - - def save_datasets(self): - """Save datasets configuration to JSON file""" - try: - datasets_data = { - "datasets": self.datasets, - "active_datasets": list(self.active_datasets), - "current_dataset_id": self.current_dataset_id, - "version": "1.0", - "last_update": datetime.now().isoformat(), - } - with open(self.datasets_file, "w") as f: - json.dump(datasets_data, f, indent=4) - self.logger.info(f"Datasets configuration saved to {self.datasets_file}") - except Exception as e: - self.logger.error(f"Error saving datasets: {e}") - - def sync_streaming_variables(self): - """Synchronize streaming variables configuration - ensure variables in streaming_variables list have streaming=true""" - try: - sync_needed = False - for dataset_id, dataset_info in self.datasets.items(): - streaming_vars = dataset_info.get("streaming_variables", []) - variables_config = dataset_info.get("variables", {}) - - for var_name in streaming_vars: - if var_name in variables_config: - # If variable is in streaming list but doesn't have streaming=true, fix it - if not variables_config[var_name].get("streaming", False): - variables_config[var_name]["streaming"] = True - sync_needed = True - self.logger.info( - f"Synchronized streaming flag for variable '{var_name}' in dataset '{dataset_id}'" - ) - - # Also ensure variables not in streaming list have streaming=false - for var_name, var_config in variables_config.items(): - if var_name not in streaming_vars and var_config.get( - "streaming", False - ): - var_config["streaming"] = False - sync_needed = True - self.logger.info( - f"Disabled streaming flag for variable '{var_name}' in dataset '{dataset_id}'" - ) - - if sync_needed: - self.save_datasets() - self.logger.info("Streaming variables configuration synchronized") - - except Exception as e: - self.logger.error(f"Error synchronizing streaming variables: {e}") - - def create_dataset( - self, dataset_id: str, name: str, prefix: str, sampling_interval: float = None - ): - """Create a new dataset""" - if dataset_id in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' already exists") - - new_dataset = { - "name": name, - "prefix": prefix, - "variables": {}, - "streaming_variables": [], - "sampling_interval": sampling_interval, - "enabled": False, - "created": datetime.now().isoformat(), - } - - self.datasets[dataset_id] = new_dataset - - # Set as current dataset if it's the first one - if not self.current_dataset_id: - self.current_dataset_id = dataset_id - - self.save_datasets() - - self.log_event( - "info", - "dataset_created", - f"Dataset created: {name} (prefix: {prefix})", - { - "dataset_id": dataset_id, - "name": name, - "prefix": prefix, - "sampling_interval": sampling_interval, - }, - ) - - def delete_dataset(self, dataset_id: str): - """Delete a dataset""" - if dataset_id not in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' does not exist") - - # Stop dataset if it's active - if dataset_id in self.active_datasets: - self.stop_dataset(dataset_id) - - dataset_info = self.datasets[dataset_id].copy() - del self.datasets[dataset_id] - - # Update current dataset if this was selected - if self.current_dataset_id == dataset_id: - self.current_dataset_id = ( - next(iter(self.datasets.keys())) if self.datasets else None - ) - - self.save_datasets() - - self.log_event( - "info", - "dataset_deleted", - f"Dataset deleted: {dataset_info['name']}", - {"dataset_id": dataset_id, "dataset_info": dataset_info}, - ) - - def get_current_dataset(self): - """Get the currently selected dataset""" - if self.current_dataset_id and self.current_dataset_id in self.datasets: - return self.datasets[self.current_dataset_id] - return None - - def get_dataset_variables(self, dataset_id: str): - """Get variables for a specific dataset""" - if dataset_id in self.datasets: - return self.datasets[dataset_id].get("variables", {}) - return {} - - def get_dataset_sampling_interval(self, dataset_id: str): - """Get sampling interval for a dataset (falls back to global if not set)""" - if dataset_id in self.datasets: - dataset_interval = self.datasets[dataset_id].get("sampling_interval") - return ( - dataset_interval - if dataset_interval is not None - else self.sampling_interval - ) - return self.sampling_interval - - def add_variable_to_dataset( - self, - dataset_id: str, - name: str, - area: str, - db: int, - offset: int, - var_type: str, - bit: int = None, - streaming: bool = False, - ): - """Add a variable to a specific dataset""" - if dataset_id not in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' does not exist") - - # Validate area and type (reuse existing validation logic) - area = area.lower() - if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]: - raise ValueError( - f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb" - ) - - valid_types = [ - "real", - "int", - "bool", - "dint", - "word", - "byte", - "uint", - "udint", - "sint", - "usint", - ] - if var_type not in valid_types: - raise ValueError( - f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}" - ) - - # Create variable configuration - var_config = { - "area": area, - "offset": offset, - "type": var_type, - "streaming": streaming, - } - - if area == "db": - var_config["db"] = db - if area in ["e", "a", "mb"] or (area == "db" and bit is not None): - var_config["bit"] = bit - - # Add to dataset - self.datasets[dataset_id]["variables"][name] = var_config - - # Update streaming variables list if streaming is enabled - if streaming: - if name not in self.datasets[dataset_id]["streaming_variables"]: - self.datasets[dataset_id]["streaming_variables"].append(name) - - self.save_datasets() - - # Create new CSV file if dataset is active and variables were modified - self.create_new_dataset_csv_file_for_variable_modification(dataset_id) - - # Log the addition - area_description = { - "db": ( - f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}" - ), - "mw": f"MW{offset}", - "m": f"M{offset}", - "pew": f"PEW{offset}", - "pe": f"PE{offset}", - "paw": f"PAW{offset}", - "pa": f"PA{offset}", - "e": f"E{offset}.{bit}", - "a": f"A{offset}.{bit}", - "mb": f"M{offset}.{bit}", - } - - self.log_event( - "info", - "variable_added", - f"Variable added to dataset '{self.datasets[dataset_id]['name']}': {name} -> {area_description[area]} ({var_type})", - { - "dataset_id": dataset_id, - "name": name, - "area": area, - "db": db if area == "db" else None, - "offset": offset, - "bit": bit, - "type": var_type, - "streaming": streaming, - }, - ) - - def remove_variable_from_dataset(self, dataset_id: str, name: str): - """Remove a variable from a specific dataset""" - if dataset_id not in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' does not exist") - - if name not in self.datasets[dataset_id]["variables"]: - raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'") - - var_config = self.datasets[dataset_id]["variables"][name].copy() - del self.datasets[dataset_id]["variables"][name] - - # Remove from streaming variables if present - if name in self.datasets[dataset_id]["streaming_variables"]: - self.datasets[dataset_id]["streaming_variables"].remove(name) - - self.save_datasets() - - # Create new CSV file if dataset is active and variables were modified - self.create_new_dataset_csv_file_for_variable_modification(dataset_id) - - self.log_event( - "info", - "variable_removed", - f"Variable removed from dataset '{self.datasets[dataset_id]['name']}': {name}", - {"dataset_id": dataset_id, "name": name, "removed_config": var_config}, - ) - - def toggle_variable_streaming(self, dataset_id: str, name: str, enabled: bool): - """Toggle streaming for a variable in a dataset""" - if dataset_id not in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' does not exist") - - if name not in self.datasets[dataset_id]["variables"]: - raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'") - - # Update the individual variable streaming flag - self.datasets[dataset_id]["variables"][name]["streaming"] = enabled - - # Update the streaming variables list - if enabled: - if name not in self.datasets[dataset_id]["streaming_variables"]: - self.datasets[dataset_id]["streaming_variables"].append(name) - else: - if name in self.datasets[dataset_id]["streaming_variables"]: - self.datasets[dataset_id]["streaming_variables"].remove(name) - - self.save_datasets() - - self.logger.info( - f"Dataset '{dataset_id}' variable {name} streaming: {'enabled' if enabled else 'disabled'}" - ) - - def activate_dataset(self, dataset_id: str): - """Activate a dataset for streaming and CSV recording""" - if dataset_id not in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' does not exist") - - if not self.connected: - raise RuntimeError("Cannot activate dataset: PLC not connected") - - self.active_datasets.add(dataset_id) - self.datasets[dataset_id]["enabled"] = True - self.save_datasets() - - # Start streaming thread for this dataset - self.start_dataset_streaming(dataset_id) - - dataset_info = self.datasets[dataset_id] - self.log_event( - "info", - "dataset_activated", - f"Dataset activated: {dataset_info['name']}", - { - "dataset_id": dataset_id, - "variables_count": len(dataset_info["variables"]), - "streaming_count": len(dataset_info["streaming_variables"]), - "prefix": dataset_info["prefix"], - }, - ) - - def deactivate_dataset(self, dataset_id: str): - """Deactivate a dataset""" - if dataset_id not in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' does not exist") - - self.active_datasets.discard(dataset_id) - self.datasets[dataset_id]["enabled"] = False - self.save_datasets() - - # Stop streaming thread for this dataset - self.stop_dataset_streaming(dataset_id) - - dataset_info = self.datasets[dataset_id] - self.log_event( - "info", - "dataset_deactivated", - f"Dataset deactivated: {dataset_info['name']}", - {"dataset_id": dataset_id}, - ) - - def start_dataset_streaming(self, dataset_id: str): - """Start streaming thread for a specific dataset""" - if dataset_id not in self.datasets: - return False - - if dataset_id in self.dataset_threads: - return True # Already running - - # Create and start thread for this dataset - thread = threading.Thread( - target=self.dataset_streaming_loop, args=(dataset_id,) - ) - thread.daemon = True - self.dataset_threads[dataset_id] = thread - thread.start() - - dataset_info = self.datasets[dataset_id] - interval = self.get_dataset_sampling_interval(dataset_id) - - self.logger.info( - f"Started streaming for dataset '{dataset_info['name']}' (interval: {interval}s)" - ) - return True - - def stop_dataset_streaming(self, dataset_id: str): - """Stop streaming thread for a specific dataset""" - if dataset_id in self.dataset_threads: - # The thread will detect this and stop - thread = self.dataset_threads[dataset_id] - if thread.is_alive(): - thread.join(timeout=2) - del self.dataset_threads[dataset_id] - - # Close CSV file if open - if dataset_id in self.dataset_csv_files: - self.dataset_csv_files[dataset_id].close() - del self.dataset_csv_files[dataset_id] - del self.dataset_csv_writers[dataset_id] - del self.dataset_csv_hours[dataset_id] - # Reset modification file flag - self.dataset_using_modification_files.pop(dataset_id, None) - - dataset_info = self.datasets.get(dataset_id, {}) - self.logger.info( - f"Stopped streaming for dataset '{dataset_info.get('name', dataset_id)}'" - ) - - def dataset_streaming_loop(self, dataset_id: str): - """Streaming loop for a specific dataset""" - dataset_info = self.datasets[dataset_id] - interval = self.get_dataset_sampling_interval(dataset_id) - - self.logger.info( - f"Dataset '{dataset_info['name']}' streaming loop started (interval: {interval}s)" - ) - - consecutive_errors = 0 - max_consecutive_errors = 5 - - while dataset_id in self.active_datasets and self.connected: - try: - start_time = time.time() - - # Read variables for this dataset - dataset_variables = self.get_dataset_variables(dataset_id) - all_data = self.read_dataset_variables(dataset_id, dataset_variables) - - if all_data: - consecutive_errors = 0 - - # Write to CSV (all variables) - self.write_dataset_csv_data(dataset_id, all_data) - - # Get filtered data for streaming - only variables that are in streaming_variables list AND have streaming=true - streaming_variables = dataset_info.get("streaming_variables", []) - dataset_vars_config = dataset_info.get("variables", {}) - streaming_data = { - name: value - for name, value in all_data.items() - if name in streaming_variables - and dataset_vars_config.get(name, {}).get("streaming", False) - } - - # Send filtered data to PlotJuggler - if streaming_data: - self.send_to_plotjuggler(streaming_data) - - # Log data - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - self.logger.info( - f"[{timestamp}] Dataset '{dataset_info['name']}': CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars" - ) - else: - consecutive_errors += 1 - if consecutive_errors >= max_consecutive_errors: - self.log_event( - "error", - "dataset_streaming_error", - f"Multiple consecutive read failures for dataset '{dataset_info['name']}' ({consecutive_errors}). Stopping streaming.", - { - "dataset_id": dataset_id, - "consecutive_errors": consecutive_errors, - }, - ) - break - - # Maintain sampling interval - elapsed = time.time() - start_time - sleep_time = max(0, interval - elapsed) - time.sleep(sleep_time) - - except Exception as e: - consecutive_errors += 1 - self.log_event( - "error", - "dataset_streaming_error", - f"Error in dataset '{dataset_info['name']}' streaming loop: {str(e)}", - { - "dataset_id": dataset_id, - "error": str(e), - "consecutive_errors": consecutive_errors, - }, - ) - - if consecutive_errors >= max_consecutive_errors: - self.log_event( - "error", - "dataset_streaming_error", - f"Too many consecutive errors for dataset '{dataset_info['name']}'. Stopping streaming.", - { - "dataset_id": dataset_id, - "consecutive_errors": consecutive_errors, - }, - ) - break - - time.sleep(1) # Wait before retry - - # Clean up when exiting - self.stop_dataset_streaming(dataset_id) - self.logger.info(f"Dataset '{dataset_info['name']}' streaming loop ended") - - def read_dataset_variables( - self, dataset_id: str, variables: Dict[str, Any] - ) -> Dict[str, Any]: - """Read all variables for a specific dataset""" - data = {} - - for var_name, var_config in variables.items(): - try: - value = self.read_variable(var_config) - data[var_name] = value - except Exception as e: - self.logger.warning( - f"Error reading variable {var_name} in dataset {dataset_id}: {e}" - ) - data[var_name] = None - - return data - - def get_dataset_csv_file_path( - self, dataset_id: str, use_modification_timestamp: bool = False - ) -> str: - """Get the CSV file path for a specific dataset""" - if dataset_id not in self.datasets: - raise ValueError(f"Dataset '{dataset_id}' does not exist") - - now = datetime.now() - prefix = self.datasets[dataset_id]["prefix"] - - if use_modification_timestamp: - time_suffix = now.strftime("%H_%M_%S") - filename = f"{prefix}_{time_suffix}.csv" - else: - hour = now.strftime("%H") - filename = f"{prefix}_{hour}.csv" - - directory = self.get_csv_directory_path() - return os.path.join(directory, filename) - - def setup_dataset_csv_file(self, dataset_id: str): - """Setup CSV file for a specific dataset""" - current_hour = datetime.now().hour - - # If we're using a modification file and the hour hasn't changed, keep using it - if ( - self.dataset_using_modification_files.get(dataset_id, False) - and dataset_id in self.dataset_csv_hours - and self.dataset_csv_hours[dataset_id] == current_hour - and dataset_id in self.dataset_csv_files - ): - return - - # Check if we need to create a new file - if ( - dataset_id not in self.dataset_csv_hours - or self.dataset_csv_hours[dataset_id] != current_hour - or dataset_id not in self.dataset_csv_files - ): - - # Close previous file if open - if dataset_id in self.dataset_csv_files: - self.dataset_csv_files[dataset_id].close() - - # Create directory and file for current hour - self.ensure_csv_directory() - csv_path = self.get_dataset_csv_file_path(dataset_id) - - # Check if file exists to determine if we need headers - file_exists = os.path.exists(csv_path) - - self.dataset_csv_files[dataset_id] = open( - csv_path, "a", newline="", encoding="utf-8" - ) - self.dataset_csv_writers[dataset_id] = csv.writer( - self.dataset_csv_files[dataset_id] - ) - self.dataset_csv_hours[dataset_id] = current_hour - - # Reset modification file flag when creating regular hourly file - self.dataset_using_modification_files[dataset_id] = False - - # Write headers if it's a new file - dataset_variables = self.get_dataset_variables(dataset_id) - if not file_exists and dataset_variables: - headers = ["timestamp"] + list(dataset_variables.keys()) - self.dataset_csv_writers[dataset_id].writerow(headers) - self.dataset_csv_files[dataset_id].flush() - self.logger.info( - f"CSV file created for dataset '{self.datasets[dataset_id]['name']}': {csv_path}" - ) - - def write_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]): - """Write data to CSV file for a specific dataset""" - if dataset_id not in self.active_datasets: - return - - try: - self.setup_dataset_csv_file(dataset_id) - - if dataset_id in self.dataset_csv_writers: - # Create timestamp - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - # Create row with all variables for this dataset - dataset_variables = self.get_dataset_variables(dataset_id) - row = [timestamp] - for var_name in dataset_variables.keys(): - row.append(data.get(var_name, None)) - - self.dataset_csv_writers[dataset_id].writerow(row) - self.dataset_csv_files[dataset_id].flush() - - except Exception as e: - self.logger.error(f"Error writing CSV data for dataset {dataset_id}: {e}") - - def create_new_dataset_csv_file_for_variable_modification(self, dataset_id: str): - """Create a new CSV file for a dataset when variables are modified during active recording""" - if dataset_id not in self.active_datasets: - return - - try: - # Close current file if open - if dataset_id in self.dataset_csv_files: - self.dataset_csv_files[dataset_id].close() - del self.dataset_csv_files[dataset_id] - del self.dataset_csv_writers[dataset_id] - self.logger.info( - f"Closed previous CSV file for dataset '{self.datasets[dataset_id]['name']}' due to variable modification" - ) - - # Create new file with modification timestamp - self.ensure_csv_directory() - csv_path = self.get_dataset_csv_file_path( - dataset_id, use_modification_timestamp=True - ) - - self.dataset_csv_files[dataset_id] = open( - csv_path, "w", newline="", encoding="utf-8" - ) - self.dataset_csv_writers[dataset_id] = csv.writer( - self.dataset_csv_files[dataset_id] - ) - - # Mark that we're using a modification file and set current hour - self.dataset_using_modification_files[dataset_id] = True - self.dataset_csv_hours[dataset_id] = datetime.now().hour - - # Write headers with new variable configuration - dataset_variables = self.get_dataset_variables(dataset_id) - if dataset_variables: - headers = ["timestamp"] + list(dataset_variables.keys()) - self.dataset_csv_writers[dataset_id].writerow(headers) - self.dataset_csv_files[dataset_id].flush() - - dataset_name = self.datasets[dataset_id]["name"] - self.logger.info( - f"New CSV file created after variable modification for dataset '{dataset_name}': {csv_path}" - ) - self.log_event( - "info", - "dataset_csv_file_created", - f"New CSV file created after variable modification for dataset '{dataset_name}': {os.path.basename(csv_path)}", - { - "dataset_id": dataset_id, - "file_path": csv_path, - "variables_count": len(dataset_variables), - "reason": "variable_modification", - }, - ) - - except Exception as e: - dataset_name = self.datasets.get(dataset_id, {}).get("name", dataset_id) - self.logger.error( - f"Error creating new CSV file after variable modification for dataset '{dataset_name}': {e}" - ) - self.log_event( - "error", - "dataset_csv_error", - f"Failed to create new CSV file after variable modification for dataset '{dataset_name}': {str(e)}", - {"dataset_id": dataset_id, "error": str(e)}, - ) - - def load_system_state(self): - """Load system state from JSON file""" - try: - if os.path.exists(self.state_file): - with open(self.state_file, "r") as f: - state_data = json.load(f) - self.last_state = state_data.get("last_state", self.last_state) - self.auto_recovery_enabled = state_data.get( - "auto_recovery_enabled", True - ) - self.logger.info(f"System state loaded from {self.state_file}") - else: - self.logger.info("No system state file found, starting with defaults") - except Exception as e: - self.logger.error(f"Error loading system state: {e}") - - def save_system_state(self): - """Save current system state to JSON file""" - try: - state_data = { - "last_state": { - "should_connect": self.connected, - "should_stream": self.streaming, - "active_datasets": list(self.active_datasets), - }, - "auto_recovery_enabled": self.auto_recovery_enabled, - "last_update": datetime.now().isoformat(), - } - - with open(self.state_file, "w") as f: - json.dump(state_data, f, indent=4) - self.logger.debug("System state saved") - except Exception as e: - self.logger.error(f"Error saving system state: {e}") - - def attempt_auto_recovery(self): - """Attempt to restore previous system state""" - if not self.auto_recovery_enabled: - self.logger.info("Auto-recovery disabled, skipping state restoration") - return - - self.logger.info("Attempting auto-recovery of previous state...") - - # Try to restore connection - if self.last_state.get("should_connect", False): - self.logger.info("Attempting to restore PLC connection...") - if self.connect_plc(): - self.logger.info("PLC connection restored successfully") - - # Try to restore streaming if connection was successful - if self.last_state.get("should_stream", False): - self.logger.info("Attempting to restore streaming...") - - # Setup UDP socket first - if not self.setup_udp_socket(): - self.logger.warning( - "Failed to setup UDP socket during auto-recovery" - ) - return - - # Restore active datasets - restored_datasets = self.last_state.get("active_datasets", []) - activated_count = 0 - - for dataset_id in restored_datasets: - if dataset_id in self.datasets: - try: - self.activate_dataset(dataset_id) - activated_count += 1 - except Exception as e: - self.logger.warning( - f"Failed to restore dataset {dataset_id}: {e}" - ) - - if activated_count > 0: - self.streaming = True - self.save_system_state() - self.logger.info( - f"Streaming restored successfully: {activated_count} datasets activated" - ) - else: - self.logger.warning( - "Failed to restore streaming: no datasets activated" - ) - else: - self.logger.warning("Failed to restore PLC connection") - - def acquire_instance_lock(self) -> bool: - """Acquire lock to ensure single instance execution""" - try: - # Check if lock file exists - if os.path.exists(self.lock_file): - # Read PID from existing lock file - with open(self.lock_file, "r") as f: - try: - old_pid = int(f.read().strip()) - - # Check if process is still running - if psutil.pid_exists(old_pid): - # Get process info to verify it's our application - try: - proc = psutil.Process(old_pid) - cmdline = " ".join(proc.cmdline()) - if "main.py" in cmdline or "plc" in cmdline.lower(): - self.logger.error( - f"Another instance is already running (PID: {old_pid})" - ) - return False - except (psutil.NoSuchProcess, psutil.AccessDenied): - # Process doesn't exist or can't access, continue - pass - - # Old process is dead, remove stale lock file - os.remove(self.lock_file) - self.logger.info("Removed stale lock file") - - except (ValueError, IOError): - # Invalid lock file, remove it - os.remove(self.lock_file) - self.logger.info("Removed invalid lock file") - - # Create new lock file with current PID - with open(self.lock_file, "w") as f: - f.write(str(os.getpid())) - - # Register cleanup function - atexit.register(self.release_instance_lock) - - self.logger.info( - f"Instance lock acquired: {self.lock_file} (PID: {os.getpid()})" - ) - return True - - except Exception as e: - self.logger.error(f"Error acquiring instance lock: {e}") - return False - - def release_instance_lock(self): - """Release instance lock""" - try: - # Remove lock file - if os.path.exists(self.lock_file): - os.remove(self.lock_file) - self.logger.info("Instance lock released") - - except Exception as e: - self.logger.error(f"Error releasing instance lock: {e}") - - def save_variables(self): - """Save variables configuration to JSON file""" - try: - # Update streaming state in variables before saving - for var_name in self.variables: - self.variables[var_name]["streaming"] = ( - var_name in self.streaming_variables - ) - - with open(self.variables_file, "w") as f: - json.dump(self.variables, f, indent=4) - self.logger.info(f"Variables saved to {self.variables_file}") - except Exception as e: - self.logger.error(f"Error saving variables: {e}") - - def update_plc_config(self, ip: str, rack: int, slot: int): - """Update PLC configuration""" - old_config = self.plc_config.copy() - self.plc_config = {"ip": ip, "rack": rack, "slot": slot} - self.save_configuration() - - config_details = {"old_config": old_config, "new_config": self.plc_config} - self.log_event( - "info", - "config_change", - f"PLC configuration updated: {ip}:{rack}/{slot}", - config_details, - ) - - def update_udp_config(self, host: str, port: int): - """Update UDP configuration""" - old_config = self.udp_config.copy() - self.udp_config = {"host": host, "port": port} - self.save_configuration() - - config_details = {"old_config": old_config, "new_config": self.udp_config} - self.log_event( - "info", - "config_change", - f"UDP configuration updated: {host}:{port}", - config_details, - ) - - def update_sampling_interval(self, interval: float): - """Update sampling interval""" - old_interval = self.sampling_interval - self.sampling_interval = interval - self.save_configuration() - - config_details = {"old_interval": old_interval, "new_interval": interval} - self.log_event( - "info", - "config_change", - f"Sampling interval updated: {interval}s", - config_details, - ) - - def add_variable( - self, name: str, area: str, db: int, offset: int, var_type: str, bit: int = None - ): - """Add a variable for polling""" - area = area.lower() - - # Validate area type - ahora incluye áreas de bits individuales - if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]: - raise ValueError( - f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb" - ) - - # Validate data type - valid_types = [ - "real", - "int", - "bool", - "dint", - "word", - "byte", - "uint", - "udint", - "sint", - "usint", - ] - if var_type not in valid_types: - raise ValueError( - f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}" - ) - - # Para áreas de bits individuales, el tipo debe ser bool y bit debe estar especificado - if area in ["e", "a", "mb"] and var_type != "bool": - raise ValueError(f"For bit areas ({area}), data type must be 'bool'") - - if area in ["e", "a", "mb"] and bit is None: - raise ValueError( - f"For bit areas ({area}), bit position must be specified (0-7)" - ) - - # Validar rango de bit para todas las áreas que lo soporten - if bit is not None and (bit < 0 or bit > 7): - raise ValueError("Bit position must be between 0 and 7") - - # Create variable configuration - var_config = { - "area": area, - "offset": offset, - "type": var_type, - "streaming": False, - } - - # Add DB number only for DB area - if area == "db": - var_config["db"] = db - - # Add bit position for bit areas and DB with specific bit - if area in ["e", "a", "mb"] or (area == "db" and bit is not None): - var_config["bit"] = bit - - self.variables[name] = var_config - self.save_variables() - - variable_details = { - "name": name, - "area": area, - "db": db if area == "db" else None, - "offset": offset, - "bit": bit, - "type": var_type, - "total_variables": len(self.variables), - } - - # Updated area description to include bit addresses - area_description = { - "db": ( - f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}" - ), - "mw": f"MW{offset}", - "m": f"M{offset}", - "pew": f"PEW{offset}", - "pe": f"PE{offset}", - "paw": f"PAW{offset}", - "pa": f"PA{offset}", - "e": f"E{offset}.{bit}", - "a": f"A{offset}.{bit}", - "mb": f"M{offset}.{bit}", - } - - self.log_event( - "info", - "variable_added", - f"Variable added: {name} -> {area_description[area]} ({var_type})", - variable_details, - ) - self.create_new_csv_file_for_variable_modification() - - def remove_variable(self, name: str): - """Remove a variable from polling""" - if name in self.variables: - var_config = self.variables[name].copy() - del self.variables[name] - # Also remove from streaming variables if present - self.streaming_variables.discard(name) - self.save_variables() - - variable_details = { - "name": name, - "removed_config": var_config, - "total_variables": len(self.variables), - } - self.log_event( - "info", - "variable_removed", - f"Variable removed: {name}", - variable_details, - ) - self.create_new_csv_file_for_variable_modification() - - def toggle_streaming_variable(self, name: str, enabled: bool): - """Enable or disable a variable for streaming""" - if name in self.variables: - if enabled: - self.streaming_variables.add(name) - else: - self.streaming_variables.discard(name) - - # Save changes to persist streaming configuration - self.save_variables() - - self.logger.info( - f"Variable {name} streaming: {'enabled' if enabled else 'disabled'}" - ) - - def get_csv_directory_path(self) -> str: - """Get the directory path for current day's CSV files""" - now = datetime.now() - day_folder = now.strftime("%d-%m-%Y") - return os.path.join("records", day_folder) - - def get_csv_file_path(self, use_modification_timestamp: bool = False) -> str: - """Get the complete file path for current hour's CSV file""" - now = datetime.now() - - if use_modification_timestamp: - # Create filename with hour_min_sec format for variable modifications - time_suffix = now.strftime("%H_%M_%S") - filename = f"_{time_suffix}.csv" - else: - # Standard hourly format - hour = now.strftime("%H") - filename = f"{hour}.csv" - - directory = self.get_csv_directory_path() - return os.path.join(directory, filename) - - def ensure_csv_directory(self): - """Create CSV directory structure if it doesn't exist""" - directory = self.get_csv_directory_path() - Path(directory).mkdir(parents=True, exist_ok=True) - - def create_new_csv_file_for_variable_modification(self): - """Create a new CSV file when variables are modified during active recording""" - if not self.csv_recording: - return - - try: - # Close current file if open - if self.current_csv_file: - self.current_csv_file.close() - self.logger.info( - f"Closed previous CSV file due to variable modification" - ) - - # Create new file with modification timestamp - self.ensure_csv_directory() - csv_path = self.get_csv_file_path(use_modification_timestamp=True) - - self.current_csv_file = open(csv_path, "w", newline="", encoding="utf-8") - self.current_csv_writer = csv.writer(self.current_csv_file) - - # Mark that we're using a modification file and set current hour - self.using_modification_file = True - self.current_hour = datetime.now().hour - - # Write headers with new variable configuration - if self.variables: - headers = ["timestamp"] + list(self.variables.keys()) - self.current_csv_writer.writerow(headers) - self.current_csv_file.flush() - self.csv_headers_written = True - - self.logger.info( - f"New CSV file created after variable modification: {csv_path}" - ) - self.log_event( - "info", - "csv_file_created", - f"New CSV file created after variable modification: {os.path.basename(csv_path)}", - { - "file_path": csv_path, - "variables_count": len(self.variables), - "reason": "variable_modification", - }, - ) - - except Exception as e: - self.logger.error( - f"Error creating new CSV file after variable modification: {e}" - ) - self.log_event( - "error", - "csv_error", - f"Failed to create new CSV file after variable modification: {str(e)}", - {"error": str(e)}, - ) - - def setup_csv_file(self): - """Setup CSV file for the current hour""" - current_hour = datetime.now().hour - - # If we're using a modification file and the hour hasn't changed, keep using it - if ( - self.using_modification_file - and self.current_hour == current_hour - and self.current_csv_file is not None - ): - return - - # Check if we need to create a new file - if self.current_hour != current_hour or self.current_csv_file is None: - # Close previous file if open - if self.current_csv_file: - self.current_csv_file.close() - - # Create directory and file for current hour - self.ensure_csv_directory() - csv_path = self.get_csv_file_path() - - # Check if file exists to determine if we need headers - file_exists = os.path.exists(csv_path) - - self.current_csv_file = open(csv_path, "a", newline="", encoding="utf-8") - self.current_csv_writer = csv.writer(self.current_csv_file) - self.current_hour = current_hour - - # Reset modification file flag when creating regular hourly file - self.using_modification_file = False - - # Write headers if it's a new file - if not file_exists and self.variables: - headers = ["timestamp"] + list(self.variables.keys()) - self.current_csv_writer.writerow(headers) - self.current_csv_file.flush() - self.csv_headers_written = True - self.logger.info(f"CSV file created: {csv_path}") - - def write_csv_data(self, data: Dict[str, Any]): - """Write data to CSV file""" - if not self.csv_recording or not self.variables: - return - - try: - self.setup_csv_file() - - if self.current_csv_writer: - # Create timestamp - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - # Create row with all variables (use None for missing values) - row = [timestamp] - for var_name in self.variables.keys(): - row.append(data.get(var_name, None)) - - self.current_csv_writer.writerow(row) - self.current_csv_file.flush() - - except Exception as e: - self.logger.error(f"Error writing CSV data: {e}") - - def get_streaming_data(self, all_data: Dict[str, Any]) -> Dict[str, Any]: - """Filter data for streaming based on selected variables""" - if not self.streaming_variables: - return all_data - - return { - name: value - for name, value in all_data.items() - if name in self.streaming_variables - } - - def connect_plc(self) -> bool: - """Connect to S7-315 PLC""" - try: - if self.plc: - self.plc.disconnect() - - self.plc = snap7.client.Client() - self.plc.connect( - self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"] - ) - - self.connected = True - self.save_system_state() - - connection_details = { - "ip": self.plc_config["ip"], - "rack": self.plc_config["rack"], - "slot": self.plc_config["slot"], - } - self.log_event( - "info", - "plc_connection", - f"Successfully connected to PLC {self.plc_config['ip']}", - connection_details, - ) - return True - - except Exception as e: - self.connected = False - error_details = { - "ip": self.plc_config["ip"], - "rack": self.plc_config["rack"], - "slot": self.plc_config["slot"], - "error": str(e), - } - self.log_event( - "error", - "plc_connection_failed", - f"Failed to connect to PLC {self.plc_config['ip']}: {str(e)}", - error_details, - ) - return False - - def disconnect_plc(self): - """Disconnect from PLC""" - try: - if self.plc: - self.plc.disconnect() - self.connected = False - self.save_system_state() - - self.log_event( - "info", - "plc_disconnection", - f"Disconnected from PLC {self.plc_config['ip']}", - ) - except Exception as e: - self.log_event( - "error", - "plc_disconnection_error", - f"Error disconnecting from PLC: {str(e)}", - {"error": str(e)}, - ) - - def read_variable(self, var_config: Dict[str, Any]) -> Any: - """Read a specific variable from the PLC""" - try: - area_type = var_config.get("area", "db").lower() - offset = var_config["offset"] - var_type = var_config["type"] - bit = var_config.get("bit") # Extract bit position for bit areas - - if area_type == "db": - # Data Block access (existing functionality) - db = var_config["db"] - if var_type == "real": - raw_data = self.plc.db_read(db, offset, 4) - value = struct.unpack(">f", raw_data)[0] - elif var_type == "int": - raw_data = self.plc.db_read(db, offset, 2) - value = struct.unpack(">h", raw_data)[0] - elif var_type == "bool": - raw_data = self.plc.db_read(db, offset, 1) - if bit is not None: - # Use snap7.util.get_bool for specific bit extraction - value = snap7.util.get_bool(raw_data, 0, bit) - else: - # Default to bit 0 for backward compatibility - value = bool(raw_data[0] & 0x01) - elif var_type == "dint": - raw_data = self.plc.db_read(db, offset, 4) - value = struct.unpack(">l", raw_data)[0] - elif var_type == "word": - raw_data = self.plc.db_read(db, offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "byte": - raw_data = self.plc.db_read(db, offset, 1) - value = struct.unpack(">B", raw_data)[0] - elif var_type == "uint": - raw_data = self.plc.db_read(db, offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "udint": - raw_data = self.plc.db_read(db, offset, 4) - value = struct.unpack(">L", raw_data)[0] - elif var_type == "sint": - raw_data = self.plc.db_read(db, offset, 1) - value = struct.unpack(">b", raw_data)[0] - elif var_type == "usint": - raw_data = self.plc.db_read(db, offset, 1) - value = struct.unpack(">B", raw_data)[0] - else: - return None - - elif area_type == "mw" or area_type == "m": - # Memory Words / Markers access - if var_type == "real": - raw_data = self.plc.mb_read(offset, 4) - value = struct.unpack(">f", raw_data)[0] - elif var_type == "int": - raw_data = self.plc.mb_read(offset, 2) - value = struct.unpack(">h", raw_data)[0] - elif var_type == "bool": - raw_data = self.plc.mb_read(offset, 1) - value = bool(raw_data[0] & 0x01) - elif var_type == "dint": - raw_data = self.plc.mb_read(offset, 4) - value = struct.unpack(">l", raw_data)[0] - elif var_type == "word": - raw_data = self.plc.mb_read(offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "byte": - raw_data = self.plc.mb_read(offset, 1) - value = struct.unpack(">B", raw_data)[0] - elif var_type == "uint": - raw_data = self.plc.mb_read(offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "udint": - raw_data = self.plc.mb_read(offset, 4) - value = struct.unpack(">L", raw_data)[0] - elif var_type == "sint": - raw_data = self.plc.mb_read(offset, 1) - value = struct.unpack(">b", raw_data)[0] - elif var_type == "usint": - raw_data = self.plc.mb_read(offset, 1) - value = struct.unpack(">B", raw_data)[0] - else: - return None - - elif area_type == "pew" or area_type == "pe": - # Process Input Words access - if var_type == "real": - raw_data = self.plc.eb_read(offset, 4) - value = struct.unpack(">f", raw_data)[0] - elif var_type == "int": - raw_data = self.plc.eb_read(offset, 2) - value = struct.unpack(">h", raw_data)[0] - elif var_type == "bool": - raw_data = self.plc.eb_read(offset, 1) - value = bool(raw_data[0] & 0x01) - elif var_type == "dint": - raw_data = self.plc.eb_read(offset, 4) - value = struct.unpack(">l", raw_data)[0] - elif var_type == "word": - raw_data = self.plc.eb_read(offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "byte": - raw_data = self.plc.eb_read(offset, 1) - value = struct.unpack(">B", raw_data)[0] - elif var_type == "uint": - raw_data = self.plc.eb_read(offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "udint": - raw_data = self.plc.eb_read(offset, 4) - value = struct.unpack(">L", raw_data)[0] - elif var_type == "sint": - raw_data = self.plc.eb_read(offset, 1) - value = struct.unpack(">b", raw_data)[0] - elif var_type == "usint": - raw_data = self.plc.eb_read(offset, 1) - value = struct.unpack(">B", raw_data)[0] - else: - return None - - elif area_type == "paw" or area_type == "pa": - # Process Output Words access - if var_type == "real": - raw_data = self.plc.ab_read(offset, 4) - value = struct.unpack(">f", raw_data)[0] - elif var_type == "int": - raw_data = self.plc.ab_read(offset, 2) - value = struct.unpack(">h", raw_data)[0] - elif var_type == "bool": - raw_data = self.plc.ab_read(offset, 1) - value = bool(raw_data[0] & 0x01) - elif var_type == "dint": - raw_data = self.plc.ab_read(offset, 4) - value = struct.unpack(">l", raw_data)[0] - elif var_type == "word": - raw_data = self.plc.ab_read(offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "byte": - raw_data = self.plc.ab_read(offset, 1) - value = struct.unpack(">B", raw_data)[0] - elif var_type == "uint": - raw_data = self.plc.ab_read(offset, 2) - value = struct.unpack(">H", raw_data)[0] - elif var_type == "udint": - raw_data = self.plc.ab_read(offset, 4) - value = struct.unpack(">L", raw_data)[0] - elif var_type == "sint": - raw_data = self.plc.ab_read(offset, 1) - value = struct.unpack(">b", raw_data)[0] - elif var_type == "usint": - raw_data = self.plc.ab_read(offset, 1) - value = struct.unpack(">B", raw_data)[0] - else: - return None - - elif area_type == "e": - # Process Input Bits access (E5.1 format) - if var_type == "bool": - raw_data = self.plc.eb_read(offset, 1) - # Use snap7.util.get_bool for proper bit extraction - value = snap7.util.get_bool(raw_data, 0, bit) - else: - return None - - elif area_type == "a": - # Process Output Bits access (A3.7 format) - if var_type == "bool": - raw_data = self.plc.ab_read(offset, 1) - # Use snap7.util.get_bool for proper bit extraction - value = snap7.util.get_bool(raw_data, 0, bit) - else: - return None - - elif area_type == "mb": - # Memory Bits access (M10.0 format) - if var_type == "bool": - raw_data = self.plc.mb_read(offset, 1) - # Use snap7.util.get_bool for proper bit extraction - value = snap7.util.get_bool(raw_data, 0, bit) - else: - return None - - else: - self.logger.error(f"Unsupported area type: {area_type}") - return None - - return value - - except Exception as e: - self.logger.error(f"Error reading variable: {e}") - return None - - def read_all_variables(self) -> Dict[str, Any]: - """Read all configured variables""" - if not self.connected or not self.plc: - return {} - - data = {} - for var_name, var_config in self.variables.items(): - value = self.read_variable(var_config) - if value is not None: - data[var_name] = value - - return data - - def setup_udp_socket(self) -> bool: - """Setup UDP socket""" - try: - if self.udp_socket: - self.udp_socket.close() - - self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.logger.info( - f"UDP socket configured for {self.udp_config['host']}:{self.udp_config['port']}" - ) - return True - - except Exception as e: - self.logger.error(f"Error configuring UDP socket: {e}") - return False - - def send_to_plotjuggler(self, data: Dict[str, Any]): - """Send data to PlotJuggler via UDP JSON""" - if not self.udp_socket: - return - - try: - message = {"timestamp": time.time(), "data": data} - - json_message = json.dumps(message) - self.udp_socket.sendto( - json_message.encode("utf-8"), - (self.udp_config["host"], self.udp_config["port"]), - ) - - except Exception as e: - self.logger.error(f"Error sending data to PlotJuggler: {e}") - - def streaming_loop(self): - """Main streaming loop""" - self.logger.info( - f"Starting streaming with interval of {self.sampling_interval}s" - ) - - consecutive_errors = 0 - max_consecutive_errors = 5 - - while self.streaming: - try: - start_time = time.time() - - # Read all variables - all_data = self.read_all_variables() - - if all_data: - # Reset error counter on successful read - consecutive_errors = 0 - - # Write to CSV (all variables) - self.write_csv_data(all_data) - - # Get filtered data for streaming - streaming_data = self.get_streaming_data(all_data) - - # Send filtered data to PlotJuggler - if streaming_data: - self.send_to_plotjuggler(streaming_data) - - # Log data - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - self.logger.info( - f"[{timestamp}] CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars" - ) - else: - consecutive_errors += 1 - if consecutive_errors >= max_consecutive_errors: - self.log_event( - "error", - "streaming_error", - f"Multiple consecutive read failures ({consecutive_errors}). Stopping streaming.", - {"consecutive_errors": consecutive_errors}, - ) - break - - # Maintain sampling interval - elapsed = time.time() - start_time - sleep_time = max(0, self.sampling_interval - elapsed) - time.sleep(sleep_time) - - except Exception as e: - consecutive_errors += 1 - self.log_event( - "error", - "streaming_error", - f"Error in streaming loop: {str(e)}", - {"error": str(e), "consecutive_errors": consecutive_errors}, - ) - - if consecutive_errors >= max_consecutive_errors: - self.log_event( - "error", - "streaming_error", - "Too many consecutive errors. Stopping streaming.", - {"consecutive_errors": consecutive_errors}, - ) - break - - time.sleep(1) # Wait before retry - - def start_streaming(self) -> bool: - """Start data streaming - activates all datasets with variables""" - if not self.connected: - self.log_event( - "error", "streaming_error", "Cannot start streaming: PLC not connected" - ) - return False - - if not self.datasets: - self.log_event( - "error", - "streaming_error", - "Cannot start streaming: No datasets configured", - ) - return False - - if not self.setup_udp_socket(): - self.log_event( - "error", - "streaming_error", - "Cannot start streaming: UDP socket setup failed", - ) - return False - - # Activate all datasets that have variables - activated_count = 0 - for dataset_id, dataset_info in self.datasets.items(): - if dataset_info.get("variables"): - try: - self.activate_dataset(dataset_id) - activated_count += 1 - except Exception as e: - self.logger.warning(f"Failed to activate dataset {dataset_id}: {e}") - - if activated_count == 0: - self.log_event( - "error", - "streaming_error", - "Cannot start streaming: No datasets with variables configured", - ) - return False - - self.streaming = True - self.save_system_state() - - self.log_event( - "info", - "streaming_started", - f"Multi-dataset streaming started: {activated_count} datasets activated", - { - "activated_datasets": activated_count, - "total_datasets": len(self.datasets), - "udp_host": self.udp_config["host"], - "udp_port": self.udp_config["port"], - }, - ) - return True - - def stop_streaming(self): - """Stop streaming - deactivates all active datasets""" - self.streaming = False - - # Stop all dataset streaming threads - active_datasets_copy = self.active_datasets.copy() - for dataset_id in active_datasets_copy: - try: - self.deactivate_dataset(dataset_id) - except Exception as e: - self.logger.warning(f"Error deactivating dataset {dataset_id}: {e}") - - # Close UDP socket - if self.udp_socket: - self.udp_socket.close() - self.udp_socket = None - - self.save_system_state() - - datasets_stopped = len(active_datasets_copy) - self.log_event( - "info", - "streaming_stopped", - f"Multi-dataset streaming stopped: {datasets_stopped} datasets deactivated", - ) - - def get_status(self) -> Dict[str, Any]: - """Get current system status""" - total_variables = sum( - len(dataset["variables"]) for dataset in self.datasets.values() - ) - - # Count only variables that are in streaming_variables list AND have streaming=true - total_streaming_vars = 0 - for dataset in self.datasets.values(): - streaming_vars = dataset.get("streaming_variables", []) - variables_config = dataset.get("variables", {}) - active_streaming_vars = [ - var - for var in streaming_vars - if variables_config.get(var, {}).get("streaming", False) - ] - total_streaming_vars += len(active_streaming_vars) - - return { - "plc_connected": self.connected, - "streaming": self.streaming, - "plc_config": self.plc_config, - "udp_config": self.udp_config, - "datasets_count": len(self.datasets), - "active_datasets_count": len(self.active_datasets), - "total_variables": total_variables, - "total_streaming_variables": total_streaming_vars, - "streaming_variables_count": total_streaming_vars, # Add this for frontend compatibility - "sampling_interval": self.sampling_interval, - "current_dataset_id": self.current_dataset_id, - "datasets": { - dataset_id: { - "name": info["name"], - "prefix": info["prefix"], - "variables_count": len(info["variables"]), - "streaming_count": len( - [ - var - for var in info.get("streaming_variables", []) - if info.get("variables", {}) - .get(var, {}) - .get("streaming", False) - ] - ), - "sampling_interval": info.get("sampling_interval"), - "enabled": info.get("enabled", False), - "active": dataset_id in self.active_datasets, - } - for dataset_id, info in self.datasets.items() - }, - } - - def log_event( - self, level: str, event_type: str, message: str, details: Dict[str, Any] = None - ): - """Add an event to the persistent log""" - try: - event = { - "timestamp": datetime.now().isoformat(), - "level": level, # info, warning, error - "event_type": event_type, # connection, disconnection, error, config_change, etc. - "message": message, - "details": details or {}, - } - - self.events_log.append(event) - - # Limit log size - if len(self.events_log) > self.max_log_entries: - self.events_log = self.events_log[-self.max_log_entries :] - - # Save to file - self.save_events_log() - - # Also log to regular logger - if level == "error": - self.logger.error(f"[{event_type}] {message}") - elif level == "warning": - self.logger.warning(f"[{event_type}] {message}") - else: - self.logger.info(f"[{event_type}] {message}") - - except Exception as e: - self.logger.error(f"Error adding event to log: {e}") - - def load_events_log(self): - """Load events log from JSON file""" - try: - if os.path.exists(self.events_log_file): - with open(self.events_log_file, "r", encoding="utf-8") as f: - data = json.load(f) - self.events_log = data.get("events", []) - # Limit log size on load - if len(self.events_log) > self.max_log_entries: - self.events_log = self.events_log[-self.max_log_entries :] - self.logger.info(f"Events log loaded: {len(self.events_log)} entries") - else: - self.events_log = [] - self.logger.info("No events log file found, starting with empty log") - except Exception as e: - self.logger.error(f"Error loading events log: {e}") - self.events_log = [] - - def save_events_log(self): - """Save events log to JSON file""" - try: - log_data = { - "events": self.events_log, - "last_updated": datetime.now().isoformat(), - "total_entries": len(self.events_log), - } - with open(self.events_log_file, "w", encoding="utf-8") as f: - json.dump(log_data, f, indent=2, ensure_ascii=False) - except Exception as e: - self.logger.error(f"Error saving events log: {e}") - - def get_recent_events(self, limit: int = 50) -> List[Dict[str, Any]]: - """Get recent events from the log""" - return self.events_log[-limit:] if self.events_log else [] - - # Global streamer instance (will be initialized in main) streamer = None diff --git a/plc_datasets.json b/plc_datasets.json index 9320e13..434dd4b 100644 --- a/plc_datasets.json +++ b/plc_datasets.json @@ -4,19 +4,6 @@ "name": "DAR", "prefix": "dar", "variables": { - "PEW300": { - "area": "pew", - "offset": 300, - "type": "word", - "streaming": false - }, - "UR62_Brix": { - "area": "db", - "offset": 18, - "type": "real", - "streaming": true, - "db": 2122 - }, "UR29_Brix_Digital": { "area": "db", "offset": 40, @@ -24,18 +11,45 @@ "streaming": true, "db": 2120 }, - "CTS306_Conditi": { + "UR62_PEW": { + "area": "pew", + "offset": 300, + "type": "word", + "streaming": false + }, + "UR29_PEW": { + "area": "pew", + "offset": 304, + "type": "word", + "streaming": false + }, + "UR62_Brix": { "area": "db", - "offset": 18, + "offset": 1296, "type": "real", "streaming": true, - "db": 2124 + "db": 1011 + }, + "UR29_Brix": { + "area": "db", + "offset": 1322, + "type": "real", + "streaming": true, + "db": 1011 + }, + "CTS306_PV": { + "area": "db", + "offset": 1328, + "type": "real", + "streaming": true, + "db": 1011 } }, "streaming_variables": [ "UR29_Brix_Digital", "UR62_Brix", - "CTS306_Conditi" + "UR29_Brix", + "CTS306_PV" ], "sampling_interval": 0.2, "enabled": true, @@ -56,5 +70,5 @@ ], "current_dataset_id": "dar", "version": "1.0", - "last_update": "2025-07-18T10:02:33.090888" + "last_update": "2025-07-18T16:14:57.607742" } \ No newline at end of file diff --git a/system_state.json b/system_state.json index c3ae23c..0ff7ed7 100644 --- a/system_state.json +++ b/system_state.json @@ -7,5 +7,5 @@ ] }, "auto_recovery_enabled": true, - "last_update": "2025-07-18T10:02:33.096712" + "last_update": "2025-07-18T16:14:48.202036" } \ No newline at end of file