S7_snap7_Stremer_n_Recorder/main.py

2695 lines
97 KiB
Python

from flask import (
Flask,
render_template,
request,
jsonify,
redirect,
url_for,
send_from_directory,
)
import snap7
import snap7.util
import json
import socket
import time
import logging
import threading
from datetime import datetime
from typing import Dict, Any, Optional, List
import struct
import os
import csv
from pathlib import Path
import atexit
import psutil
import sys
app = Flask(__name__)
app.secret_key = "plc_streamer_secret_key"
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
# Not running in a bundle
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class PLCDataStreamer:
def __init__(self):
"""Initialize the PLC data streamer"""
# Configuration file paths
# Use resource_path to handle bundled and script paths correctly
self.config_file = resource_path("plc_config.json")
self.variables_file = resource_path("plc_variables.json")
self.datasets_file = resource_path("plc_datasets.json")
self.state_file = resource_path("system_state.json")
self.events_log_file = resource_path("application_events.json")
# Default configuration
self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2}
self.udp_config = {"host": "127.0.0.1", "port": 9870}
# Multiple datasets structure
self.datasets = {} # Dictionary of dataset_id -> dataset_config
self.active_datasets = set() # Set of active dataset IDs
self.current_dataset_id = None # Currently selected dataset for editing
# Dataset streaming threads and files
self.dataset_threads = {} # dataset_id -> thread object
self.dataset_csv_files = {} # dataset_id -> file handle
self.dataset_csv_writers = {} # dataset_id -> csv writer
self.dataset_csv_hours = {} # dataset_id -> current hour
self.dataset_using_modification_files = (
{}
) # dataset_id -> bool (track modification files)
# System states
self.plc = None
self.udp_socket = None
self.connected = False
self.streaming = False
self.stream_thread = None
self.sampling_interval = 0.1
# Auto-recovery settings
self.auto_recovery_enabled = True
self.last_state = {
"should_connect": False,
"should_stream": False,
"should_record_csv": False,
}
# Single instance control
self.lock_file = "plc_streamer.lock"
self.lock_fd = None
# Events log for persistent logging
self.events_log = []
self.max_log_entries = 1000 # Maximum number of log entries to keep
# Setup logging first
self.setup_logging()
# Load configuration from files
self.load_configuration()
self.load_datasets() # Load multiple datasets configuration
self.sync_streaming_variables() # Synchronize streaming variables configuration
self.load_system_state()
self.load_events_log()
# Acquire instance lock and attempt auto-recovery
if self.acquire_instance_lock():
# Small delay to ensure previous instance has fully cleaned up
time.sleep(1)
self.log_event(
"info",
"Application started",
"Application initialization completed successfully",
)
self.attempt_auto_recovery()
else:
raise RuntimeError("Another instance of the application is already running")
def setup_logging(self):
"""Configure the logging system"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()],
)
self.logger = logging.getLogger(__name__)
def load_configuration(self):
"""Load PLC and UDP configuration from JSON file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
config = json.load(f)
self.plc_config = config.get("plc_config", self.plc_config)
self.udp_config = config.get("udp_config", self.udp_config)
self.sampling_interval = config.get(
"sampling_interval", self.sampling_interval
)
self.logger.info(f"Configuration loaded from {self.config_file}")
else:
self.logger.info("No configuration file found, using defaults")
except Exception as e:
self.logger.error(f"Error loading configuration: {e}")
def save_configuration(self):
"""Save PLC and UDP configuration to JSON file"""
try:
config = {
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"sampling_interval": self.sampling_interval,
}
with open(self.config_file, "w") as f:
json.dump(config, f, indent=4)
self.logger.info(f"Configuration saved to {self.config_file}")
except Exception as e:
self.logger.error(f"Error saving configuration: {e}")
def load_datasets(self):
"""Load datasets configuration from JSON file"""
try:
if os.path.exists(self.datasets_file):
with open(self.datasets_file, "r") as f:
datasets_data = json.load(f)
self.datasets = datasets_data.get("datasets", {})
self.active_datasets = set(datasets_data.get("active_datasets", []))
self.current_dataset_id = datasets_data.get("current_dataset_id")
# Validate current_dataset_id exists
if (
self.current_dataset_id
and self.current_dataset_id not in self.datasets
):
self.current_dataset_id = None
# Set default current dataset if none selected
if not self.current_dataset_id and self.datasets:
self.current_dataset_id = next(iter(self.datasets.keys()))
self.logger.info(
f"Datasets loaded from {self.datasets_file}: {len(self.datasets)} datasets, {len(self.active_datasets)} active"
)
else:
self.logger.info("No datasets file found, starting with empty datasets")
except Exception as e:
self.logger.error(f"Error loading datasets: {e}")
def save_datasets(self):
"""Save datasets configuration to JSON file"""
try:
datasets_data = {
"datasets": self.datasets,
"active_datasets": list(self.active_datasets),
"current_dataset_id": self.current_dataset_id,
"version": "1.0",
"last_update": datetime.now().isoformat(),
}
with open(self.datasets_file, "w") as f:
json.dump(datasets_data, f, indent=4)
self.logger.info(f"Datasets configuration saved to {self.datasets_file}")
except Exception as e:
self.logger.error(f"Error saving datasets: {e}")
def sync_streaming_variables(self):
"""Synchronize streaming variables configuration - ensure variables in streaming_variables list have streaming=true"""
try:
sync_needed = False
for dataset_id, dataset_info in self.datasets.items():
streaming_vars = dataset_info.get("streaming_variables", [])
variables_config = dataset_info.get("variables", {})
for var_name in streaming_vars:
if var_name in variables_config:
# If variable is in streaming list but doesn't have streaming=true, fix it
if not variables_config[var_name].get("streaming", False):
variables_config[var_name]["streaming"] = True
sync_needed = True
self.logger.info(
f"Synchronized streaming flag for variable '{var_name}' in dataset '{dataset_id}'"
)
# Also ensure variables not in streaming list have streaming=false
for var_name, var_config in variables_config.items():
if var_name not in streaming_vars and var_config.get(
"streaming", False
):
var_config["streaming"] = False
sync_needed = True
self.logger.info(
f"Disabled streaming flag for variable '{var_name}' in dataset '{dataset_id}'"
)
if sync_needed:
self.save_datasets()
self.logger.info("Streaming variables configuration synchronized")
except Exception as e:
self.logger.error(f"Error synchronizing streaming variables: {e}")
def create_dataset(
self, dataset_id: str, name: str, prefix: str, sampling_interval: float = None
):
"""Create a new dataset"""
if dataset_id in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' already exists")
new_dataset = {
"name": name,
"prefix": prefix,
"variables": {},
"streaming_variables": [],
"sampling_interval": sampling_interval,
"enabled": False,
"created": datetime.now().isoformat(),
}
self.datasets[dataset_id] = new_dataset
# Set as current dataset if it's the first one
if not self.current_dataset_id:
self.current_dataset_id = dataset_id
self.save_datasets()
self.log_event(
"info",
"dataset_created",
f"Dataset created: {name} (prefix: {prefix})",
{
"dataset_id": dataset_id,
"name": name,
"prefix": prefix,
"sampling_interval": sampling_interval,
},
)
def delete_dataset(self, dataset_id: str):
"""Delete a dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
# Stop dataset if it's active
if dataset_id in self.active_datasets:
self.stop_dataset(dataset_id)
dataset_info = self.datasets[dataset_id].copy()
del self.datasets[dataset_id]
# Update current dataset if this was selected
if self.current_dataset_id == dataset_id:
self.current_dataset_id = (
next(iter(self.datasets.keys())) if self.datasets else None
)
self.save_datasets()
self.log_event(
"info",
"dataset_deleted",
f"Dataset deleted: {dataset_info['name']}",
{"dataset_id": dataset_id, "dataset_info": dataset_info},
)
def get_current_dataset(self):
"""Get the currently selected dataset"""
if self.current_dataset_id and self.current_dataset_id in self.datasets:
return self.datasets[self.current_dataset_id]
return None
def get_dataset_variables(self, dataset_id: str):
"""Get variables for a specific dataset"""
if dataset_id in self.datasets:
return self.datasets[dataset_id].get("variables", {})
return {}
def get_dataset_sampling_interval(self, dataset_id: str):
"""Get sampling interval for a dataset (falls back to global if not set)"""
if dataset_id in self.datasets:
dataset_interval = self.datasets[dataset_id].get("sampling_interval")
return (
dataset_interval
if dataset_interval is not None
else self.sampling_interval
)
return self.sampling_interval
def add_variable_to_dataset(
self,
dataset_id: str,
name: str,
area: str,
db: int,
offset: int,
var_type: str,
bit: int = None,
streaming: bool = False,
):
"""Add a variable to a specific dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
# Validate area and type (reuse existing validation logic)
area = area.lower()
if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]:
raise ValueError(
f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb"
)
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
if var_type not in valid_types:
raise ValueError(
f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}"
)
# Create variable configuration
var_config = {
"area": area,
"offset": offset,
"type": var_type,
"streaming": streaming,
}
if area == "db":
var_config["db"] = db
if area in ["e", "a", "mb"] or (area == "db" and bit is not None):
var_config["bit"] = bit
# Add to dataset
self.datasets[dataset_id]["variables"][name] = var_config
# Update streaming variables list if streaming is enabled
if streaming:
if name not in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].append(name)
self.save_datasets()
# Create new CSV file if dataset is active and variables were modified
self.create_new_dataset_csv_file_for_variable_modification(dataset_id)
# Log the addition
area_description = {
"db": (
f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}"
),
"mw": f"MW{offset}",
"m": f"M{offset}",
"pew": f"PEW{offset}",
"pe": f"PE{offset}",
"paw": f"PAW{offset}",
"pa": f"PA{offset}",
"e": f"E{offset}.{bit}",
"a": f"A{offset}.{bit}",
"mb": f"M{offset}.{bit}",
}
self.log_event(
"info",
"variable_added",
f"Variable added to dataset '{self.datasets[dataset_id]['name']}': {name} -> {area_description[area]} ({var_type})",
{
"dataset_id": dataset_id,
"name": name,
"area": area,
"db": db if area == "db" else None,
"offset": offset,
"bit": bit,
"type": var_type,
"streaming": streaming,
},
)
def remove_variable_from_dataset(self, dataset_id: str, name: str):
"""Remove a variable from a specific dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
if name not in self.datasets[dataset_id]["variables"]:
raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'")
var_config = self.datasets[dataset_id]["variables"][name].copy()
del self.datasets[dataset_id]["variables"][name]
# Remove from streaming variables if present
if name in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].remove(name)
self.save_datasets()
# Create new CSV file if dataset is active and variables were modified
self.create_new_dataset_csv_file_for_variable_modification(dataset_id)
self.log_event(
"info",
"variable_removed",
f"Variable removed from dataset '{self.datasets[dataset_id]['name']}': {name}",
{"dataset_id": dataset_id, "name": name, "removed_config": var_config},
)
def toggle_variable_streaming(self, dataset_id: str, name: str, enabled: bool):
"""Toggle streaming for a variable in a dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
if name not in self.datasets[dataset_id]["variables"]:
raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'")
# Update the individual variable streaming flag
self.datasets[dataset_id]["variables"][name]["streaming"] = enabled
# Update the streaming variables list
if enabled:
if name not in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].append(name)
else:
if name in self.datasets[dataset_id]["streaming_variables"]:
self.datasets[dataset_id]["streaming_variables"].remove(name)
self.save_datasets()
self.logger.info(
f"Dataset '{dataset_id}' variable {name} streaming: {'enabled' if enabled else 'disabled'}"
)
def activate_dataset(self, dataset_id: str):
"""Activate a dataset for streaming and CSV recording"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
if not self.connected:
raise RuntimeError("Cannot activate dataset: PLC not connected")
self.active_datasets.add(dataset_id)
self.datasets[dataset_id]["enabled"] = True
self.save_datasets()
# Start streaming thread for this dataset
self.start_dataset_streaming(dataset_id)
dataset_info = self.datasets[dataset_id]
self.log_event(
"info",
"dataset_activated",
f"Dataset activated: {dataset_info['name']}",
{
"dataset_id": dataset_id,
"variables_count": len(dataset_info["variables"]),
"streaming_count": len(dataset_info["streaming_variables"]),
"prefix": dataset_info["prefix"],
},
)
def deactivate_dataset(self, dataset_id: str):
"""Deactivate a dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
self.active_datasets.discard(dataset_id)
self.datasets[dataset_id]["enabled"] = False
self.save_datasets()
# Stop streaming thread for this dataset
self.stop_dataset_streaming(dataset_id)
dataset_info = self.datasets[dataset_id]
self.log_event(
"info",
"dataset_deactivated",
f"Dataset deactivated: {dataset_info['name']}",
{"dataset_id": dataset_id},
)
def start_dataset_streaming(self, dataset_id: str):
"""Start streaming thread for a specific dataset"""
if dataset_id not in self.datasets:
return False
if dataset_id in self.dataset_threads:
return True # Already running
# Create and start thread for this dataset
thread = threading.Thread(
target=self.dataset_streaming_loop, args=(dataset_id,)
)
thread.daemon = True
self.dataset_threads[dataset_id] = thread
thread.start()
dataset_info = self.datasets[dataset_id]
interval = self.get_dataset_sampling_interval(dataset_id)
self.logger.info(
f"Started streaming for dataset '{dataset_info['name']}' (interval: {interval}s)"
)
return True
def stop_dataset_streaming(self, dataset_id: str):
"""Stop streaming thread for a specific dataset"""
if dataset_id in self.dataset_threads:
# The thread will detect this and stop
thread = self.dataset_threads[dataset_id]
if thread.is_alive():
thread.join(timeout=2)
del self.dataset_threads[dataset_id]
# Close CSV file if open
if dataset_id in self.dataset_csv_files:
self.dataset_csv_files[dataset_id].close()
del self.dataset_csv_files[dataset_id]
del self.dataset_csv_writers[dataset_id]
del self.dataset_csv_hours[dataset_id]
# Reset modification file flag
self.dataset_using_modification_files.pop(dataset_id, None)
dataset_info = self.datasets.get(dataset_id, {})
self.logger.info(
f"Stopped streaming for dataset '{dataset_info.get('name', dataset_id)}'"
)
def dataset_streaming_loop(self, dataset_id: str):
"""Streaming loop for a specific dataset"""
dataset_info = self.datasets[dataset_id]
interval = self.get_dataset_sampling_interval(dataset_id)
self.logger.info(
f"Dataset '{dataset_info['name']}' streaming loop started (interval: {interval}s)"
)
consecutive_errors = 0
max_consecutive_errors = 5
while dataset_id in self.active_datasets and self.connected:
try:
start_time = time.time()
# Read variables for this dataset
dataset_variables = self.get_dataset_variables(dataset_id)
all_data = self.read_dataset_variables(dataset_id, dataset_variables)
if all_data:
consecutive_errors = 0
# Write to CSV (all variables)
self.write_dataset_csv_data(dataset_id, all_data)
# Get filtered data for streaming - only variables that are in streaming_variables list AND have streaming=true
streaming_variables = dataset_info.get("streaming_variables", [])
dataset_vars_config = dataset_info.get("variables", {})
streaming_data = {
name: value
for name, value in all_data.items()
if name in streaming_variables
and dataset_vars_config.get(name, {}).get("streaming", False)
}
# Send filtered data to PlotJuggler
if streaming_data:
self.send_to_plotjuggler(streaming_data)
# Log data
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.logger.info(
f"[{timestamp}] Dataset '{dataset_info['name']}': CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars"
)
else:
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
self.log_event(
"error",
"dataset_streaming_error",
f"Multiple consecutive read failures for dataset '{dataset_info['name']}' ({consecutive_errors}). Stopping streaming.",
{
"dataset_id": dataset_id,
"consecutive_errors": consecutive_errors,
},
)
break
# Maintain sampling interval
elapsed = time.time() - start_time
sleep_time = max(0, interval - elapsed)
time.sleep(sleep_time)
except Exception as e:
consecutive_errors += 1
self.log_event(
"error",
"dataset_streaming_error",
f"Error in dataset '{dataset_info['name']}' streaming loop: {str(e)}",
{
"dataset_id": dataset_id,
"error": str(e),
"consecutive_errors": consecutive_errors,
},
)
if consecutive_errors >= max_consecutive_errors:
self.log_event(
"error",
"dataset_streaming_error",
f"Too many consecutive errors for dataset '{dataset_info['name']}'. Stopping streaming.",
{
"dataset_id": dataset_id,
"consecutive_errors": consecutive_errors,
},
)
break
time.sleep(1) # Wait before retry
# Clean up when exiting
self.stop_dataset_streaming(dataset_id)
self.logger.info(f"Dataset '{dataset_info['name']}' streaming loop ended")
def read_dataset_variables(
self, dataset_id: str, variables: Dict[str, Any]
) -> Dict[str, Any]:
"""Read all variables for a specific dataset"""
data = {}
for var_name, var_config in variables.items():
try:
value = self.read_variable(var_config)
data[var_name] = value
except Exception as e:
self.logger.warning(
f"Error reading variable {var_name} in dataset {dataset_id}: {e}"
)
data[var_name] = None
return data
def get_dataset_csv_file_path(
self, dataset_id: str, use_modification_timestamp: bool = False
) -> str:
"""Get the CSV file path for a specific dataset"""
if dataset_id not in self.datasets:
raise ValueError(f"Dataset '{dataset_id}' does not exist")
now = datetime.now()
prefix = self.datasets[dataset_id]["prefix"]
if use_modification_timestamp:
time_suffix = now.strftime("%H_%M_%S")
filename = f"{prefix}_{time_suffix}.csv"
else:
hour = now.strftime("%H")
filename = f"{prefix}_{hour}.csv"
directory = self.get_csv_directory_path()
return os.path.join(directory, filename)
def setup_dataset_csv_file(self, dataset_id: str):
"""Setup CSV file for a specific dataset"""
current_hour = datetime.now().hour
# If we're using a modification file and the hour hasn't changed, keep using it
if (
self.dataset_using_modification_files.get(dataset_id, False)
and dataset_id in self.dataset_csv_hours
and self.dataset_csv_hours[dataset_id] == current_hour
and dataset_id in self.dataset_csv_files
):
return
# Check if we need to create a new file
if (
dataset_id not in self.dataset_csv_hours
or self.dataset_csv_hours[dataset_id] != current_hour
or dataset_id not in self.dataset_csv_files
):
# Close previous file if open
if dataset_id in self.dataset_csv_files:
self.dataset_csv_files[dataset_id].close()
# Create directory and file for current hour
self.ensure_csv_directory()
csv_path = self.get_dataset_csv_file_path(dataset_id)
# Check if file exists to determine if we need headers
file_exists = os.path.exists(csv_path)
self.dataset_csv_files[dataset_id] = open(
csv_path, "a", newline="", encoding="utf-8"
)
self.dataset_csv_writers[dataset_id] = csv.writer(
self.dataset_csv_files[dataset_id]
)
self.dataset_csv_hours[dataset_id] = current_hour
# Reset modification file flag when creating regular hourly file
self.dataset_using_modification_files[dataset_id] = False
# Write headers if it's a new file
dataset_variables = self.get_dataset_variables(dataset_id)
if not file_exists and dataset_variables:
headers = ["timestamp"] + list(dataset_variables.keys())
self.dataset_csv_writers[dataset_id].writerow(headers)
self.dataset_csv_files[dataset_id].flush()
self.logger.info(
f"CSV file created for dataset '{self.datasets[dataset_id]['name']}': {csv_path}"
)
def write_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]):
"""Write data to CSV file for a specific dataset"""
if dataset_id not in self.active_datasets:
return
try:
self.setup_dataset_csv_file(dataset_id)
if dataset_id in self.dataset_csv_writers:
# Create timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Create row with all variables for this dataset
dataset_variables = self.get_dataset_variables(dataset_id)
row = [timestamp]
for var_name in dataset_variables.keys():
row.append(data.get(var_name, None))
self.dataset_csv_writers[dataset_id].writerow(row)
self.dataset_csv_files[dataset_id].flush()
except Exception as e:
self.logger.error(f"Error writing CSV data for dataset {dataset_id}: {e}")
def create_new_dataset_csv_file_for_variable_modification(self, dataset_id: str):
"""Create a new CSV file for a dataset when variables are modified during active recording"""
if dataset_id not in self.active_datasets:
return
try:
# Close current file if open
if dataset_id in self.dataset_csv_files:
self.dataset_csv_files[dataset_id].close()
del self.dataset_csv_files[dataset_id]
del self.dataset_csv_writers[dataset_id]
self.logger.info(
f"Closed previous CSV file for dataset '{self.datasets[dataset_id]['name']}' due to variable modification"
)
# Create new file with modification timestamp
self.ensure_csv_directory()
csv_path = self.get_dataset_csv_file_path(
dataset_id, use_modification_timestamp=True
)
self.dataset_csv_files[dataset_id] = open(
csv_path, "w", newline="", encoding="utf-8"
)
self.dataset_csv_writers[dataset_id] = csv.writer(
self.dataset_csv_files[dataset_id]
)
# Mark that we're using a modification file and set current hour
self.dataset_using_modification_files[dataset_id] = True
self.dataset_csv_hours[dataset_id] = datetime.now().hour
# Write headers with new variable configuration
dataset_variables = self.get_dataset_variables(dataset_id)
if dataset_variables:
headers = ["timestamp"] + list(dataset_variables.keys())
self.dataset_csv_writers[dataset_id].writerow(headers)
self.dataset_csv_files[dataset_id].flush()
dataset_name = self.datasets[dataset_id]["name"]
self.logger.info(
f"New CSV file created after variable modification for dataset '{dataset_name}': {csv_path}"
)
self.log_event(
"info",
"dataset_csv_file_created",
f"New CSV file created after variable modification for dataset '{dataset_name}': {os.path.basename(csv_path)}",
{
"dataset_id": dataset_id,
"file_path": csv_path,
"variables_count": len(dataset_variables),
"reason": "variable_modification",
},
)
except Exception as e:
dataset_name = self.datasets.get(dataset_id, {}).get("name", dataset_id)
self.logger.error(
f"Error creating new CSV file after variable modification for dataset '{dataset_name}': {e}"
)
self.log_event(
"error",
"dataset_csv_error",
f"Failed to create new CSV file after variable modification for dataset '{dataset_name}': {str(e)}",
{"dataset_id": dataset_id, "error": str(e)},
)
def load_system_state(self):
"""Load system state from JSON file"""
try:
if os.path.exists(self.state_file):
with open(self.state_file, "r") as f:
state_data = json.load(f)
self.last_state = state_data.get("last_state", self.last_state)
self.auto_recovery_enabled = state_data.get(
"auto_recovery_enabled", True
)
self.logger.info(f"System state loaded from {self.state_file}")
else:
self.logger.info("No system state file found, starting with defaults")
except Exception as e:
self.logger.error(f"Error loading system state: {e}")
def save_system_state(self):
"""Save current system state to JSON file"""
try:
state_data = {
"last_state": {
"should_connect": self.connected,
"should_stream": self.streaming,
"active_datasets": list(self.active_datasets),
},
"auto_recovery_enabled": self.auto_recovery_enabled,
"last_update": datetime.now().isoformat(),
}
with open(self.state_file, "w") as f:
json.dump(state_data, f, indent=4)
self.logger.debug("System state saved")
except Exception as e:
self.logger.error(f"Error saving system state: {e}")
def attempt_auto_recovery(self):
"""Attempt to restore previous system state"""
if not self.auto_recovery_enabled:
self.logger.info("Auto-recovery disabled, skipping state restoration")
return
self.logger.info("Attempting auto-recovery of previous state...")
# Try to restore connection
if self.last_state.get("should_connect", False):
self.logger.info("Attempting to restore PLC connection...")
if self.connect_plc():
self.logger.info("PLC connection restored successfully")
# Try to restore streaming if connection was successful
if self.last_state.get("should_stream", False):
self.logger.info("Attempting to restore streaming...")
# Setup UDP socket first
if not self.setup_udp_socket():
self.logger.warning(
"Failed to setup UDP socket during auto-recovery"
)
return
# Restore active datasets
restored_datasets = self.last_state.get("active_datasets", [])
activated_count = 0
for dataset_id in restored_datasets:
if dataset_id in self.datasets:
try:
self.activate_dataset(dataset_id)
activated_count += 1
except Exception as e:
self.logger.warning(
f"Failed to restore dataset {dataset_id}: {e}"
)
if activated_count > 0:
self.streaming = True
self.save_system_state()
self.logger.info(
f"Streaming restored successfully: {activated_count} datasets activated"
)
else:
self.logger.warning(
"Failed to restore streaming: no datasets activated"
)
else:
self.logger.warning("Failed to restore PLC connection")
def acquire_instance_lock(self) -> bool:
"""Acquire lock to ensure single instance execution"""
try:
# Check if lock file exists
if os.path.exists(self.lock_file):
# Read PID from existing lock file
with open(self.lock_file, "r") as f:
try:
old_pid = int(f.read().strip())
# Check if process is still running
if psutil.pid_exists(old_pid):
# Get process info to verify it's our application
try:
proc = psutil.Process(old_pid)
cmdline = " ".join(proc.cmdline())
if "main.py" in cmdline or "plc" in cmdline.lower():
self.logger.error(
f"Another instance is already running (PID: {old_pid})"
)
return False
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process doesn't exist or can't access, continue
pass
# Old process is dead, remove stale lock file
os.remove(self.lock_file)
self.logger.info("Removed stale lock file")
except (ValueError, IOError):
# Invalid lock file, remove it
os.remove(self.lock_file)
self.logger.info("Removed invalid lock file")
# Create new lock file with current PID
with open(self.lock_file, "w") as f:
f.write(str(os.getpid()))
# Register cleanup function
atexit.register(self.release_instance_lock)
self.logger.info(
f"Instance lock acquired: {self.lock_file} (PID: {os.getpid()})"
)
return True
except Exception as e:
self.logger.error(f"Error acquiring instance lock: {e}")
return False
def release_instance_lock(self):
"""Release instance lock"""
try:
# Remove lock file
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
self.logger.info("Instance lock released")
except Exception as e:
self.logger.error(f"Error releasing instance lock: {e}")
def save_variables(self):
"""Save variables configuration to JSON file"""
try:
# Update streaming state in variables before saving
for var_name in self.variables:
self.variables[var_name]["streaming"] = (
var_name in self.streaming_variables
)
with open(self.variables_file, "w") as f:
json.dump(self.variables, f, indent=4)
self.logger.info(f"Variables saved to {self.variables_file}")
except Exception as e:
self.logger.error(f"Error saving variables: {e}")
def update_plc_config(self, ip: str, rack: int, slot: int):
"""Update PLC configuration"""
old_config = self.plc_config.copy()
self.plc_config = {"ip": ip, "rack": rack, "slot": slot}
self.save_configuration()
config_details = {"old_config": old_config, "new_config": self.plc_config}
self.log_event(
"info",
"config_change",
f"PLC configuration updated: {ip}:{rack}/{slot}",
config_details,
)
def update_udp_config(self, host: str, port: int):
"""Update UDP configuration"""
old_config = self.udp_config.copy()
self.udp_config = {"host": host, "port": port}
self.save_configuration()
config_details = {"old_config": old_config, "new_config": self.udp_config}
self.log_event(
"info",
"config_change",
f"UDP configuration updated: {host}:{port}",
config_details,
)
def update_sampling_interval(self, interval: float):
"""Update sampling interval"""
old_interval = self.sampling_interval
self.sampling_interval = interval
self.save_configuration()
config_details = {"old_interval": old_interval, "new_interval": interval}
self.log_event(
"info",
"config_change",
f"Sampling interval updated: {interval}s",
config_details,
)
def add_variable(
self, name: str, area: str, db: int, offset: int, var_type: str, bit: int = None
):
"""Add a variable for polling"""
area = area.lower()
# Validate area type - ahora incluye áreas de bits individuales
if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]:
raise ValueError(
f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb"
)
# Validate data type
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
if var_type not in valid_types:
raise ValueError(
f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}"
)
# Para áreas de bits individuales, el tipo debe ser bool y bit debe estar especificado
if area in ["e", "a", "mb"] and var_type != "bool":
raise ValueError(f"For bit areas ({area}), data type must be 'bool'")
if area in ["e", "a", "mb"] and bit is None:
raise ValueError(
f"For bit areas ({area}), bit position must be specified (0-7)"
)
# Validar rango de bit para todas las áreas que lo soporten
if bit is not None and (bit < 0 or bit > 7):
raise ValueError("Bit position must be between 0 and 7")
# Create variable configuration
var_config = {
"area": area,
"offset": offset,
"type": var_type,
"streaming": False,
}
# Add DB number only for DB area
if area == "db":
var_config["db"] = db
# Add bit position for bit areas and DB with specific bit
if area in ["e", "a", "mb"] or (area == "db" and bit is not None):
var_config["bit"] = bit
self.variables[name] = var_config
self.save_variables()
variable_details = {
"name": name,
"area": area,
"db": db if area == "db" else None,
"offset": offset,
"bit": bit,
"type": var_type,
"total_variables": len(self.variables),
}
# Updated area description to include bit addresses
area_description = {
"db": (
f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}"
),
"mw": f"MW{offset}",
"m": f"M{offset}",
"pew": f"PEW{offset}",
"pe": f"PE{offset}",
"paw": f"PAW{offset}",
"pa": f"PA{offset}",
"e": f"E{offset}.{bit}",
"a": f"A{offset}.{bit}",
"mb": f"M{offset}.{bit}",
}
self.log_event(
"info",
"variable_added",
f"Variable added: {name} -> {area_description[area]} ({var_type})",
variable_details,
)
self.create_new_csv_file_for_variable_modification()
def remove_variable(self, name: str):
"""Remove a variable from polling"""
if name in self.variables:
var_config = self.variables[name].copy()
del self.variables[name]
# Also remove from streaming variables if present
self.streaming_variables.discard(name)
self.save_variables()
variable_details = {
"name": name,
"removed_config": var_config,
"total_variables": len(self.variables),
}
self.log_event(
"info",
"variable_removed",
f"Variable removed: {name}",
variable_details,
)
self.create_new_csv_file_for_variable_modification()
def toggle_streaming_variable(self, name: str, enabled: bool):
"""Enable or disable a variable for streaming"""
if name in self.variables:
if enabled:
self.streaming_variables.add(name)
else:
self.streaming_variables.discard(name)
# Save changes to persist streaming configuration
self.save_variables()
self.logger.info(
f"Variable {name} streaming: {'enabled' if enabled else 'disabled'}"
)
def get_csv_directory_path(self) -> str:
"""Get the directory path for current day's CSV files"""
now = datetime.now()
day_folder = now.strftime("%d-%m-%Y")
return os.path.join("records", day_folder)
def get_csv_file_path(self, use_modification_timestamp: bool = False) -> str:
"""Get the complete file path for current hour's CSV file"""
now = datetime.now()
if use_modification_timestamp:
# Create filename with hour_min_sec format for variable modifications
time_suffix = now.strftime("%H_%M_%S")
filename = f"_{time_suffix}.csv"
else:
# Standard hourly format
hour = now.strftime("%H")
filename = f"{hour}.csv"
directory = self.get_csv_directory_path()
return os.path.join(directory, filename)
def ensure_csv_directory(self):
"""Create CSV directory structure if it doesn't exist"""
directory = self.get_csv_directory_path()
Path(directory).mkdir(parents=True, exist_ok=True)
def create_new_csv_file_for_variable_modification(self):
"""Create a new CSV file when variables are modified during active recording"""
if not self.csv_recording:
return
try:
# Close current file if open
if self.current_csv_file:
self.current_csv_file.close()
self.logger.info(
f"Closed previous CSV file due to variable modification"
)
# Create new file with modification timestamp
self.ensure_csv_directory()
csv_path = self.get_csv_file_path(use_modification_timestamp=True)
self.current_csv_file = open(csv_path, "w", newline="", encoding="utf-8")
self.current_csv_writer = csv.writer(self.current_csv_file)
# Mark that we're using a modification file and set current hour
self.using_modification_file = True
self.current_hour = datetime.now().hour
# Write headers with new variable configuration
if self.variables:
headers = ["timestamp"] + list(self.variables.keys())
self.current_csv_writer.writerow(headers)
self.current_csv_file.flush()
self.csv_headers_written = True
self.logger.info(
f"New CSV file created after variable modification: {csv_path}"
)
self.log_event(
"info",
"csv_file_created",
f"New CSV file created after variable modification: {os.path.basename(csv_path)}",
{
"file_path": csv_path,
"variables_count": len(self.variables),
"reason": "variable_modification",
},
)
except Exception as e:
self.logger.error(
f"Error creating new CSV file after variable modification: {e}"
)
self.log_event(
"error",
"csv_error",
f"Failed to create new CSV file after variable modification: {str(e)}",
{"error": str(e)},
)
def setup_csv_file(self):
"""Setup CSV file for the current hour"""
current_hour = datetime.now().hour
# If we're using a modification file and the hour hasn't changed, keep using it
if (
self.using_modification_file
and self.current_hour == current_hour
and self.current_csv_file is not None
):
return
# Check if we need to create a new file
if self.current_hour != current_hour or self.current_csv_file is None:
# Close previous file if open
if self.current_csv_file:
self.current_csv_file.close()
# Create directory and file for current hour
self.ensure_csv_directory()
csv_path = self.get_csv_file_path()
# Check if file exists to determine if we need headers
file_exists = os.path.exists(csv_path)
self.current_csv_file = open(csv_path, "a", newline="", encoding="utf-8")
self.current_csv_writer = csv.writer(self.current_csv_file)
self.current_hour = current_hour
# Reset modification file flag when creating regular hourly file
self.using_modification_file = False
# Write headers if it's a new file
if not file_exists and self.variables:
headers = ["timestamp"] + list(self.variables.keys())
self.current_csv_writer.writerow(headers)
self.current_csv_file.flush()
self.csv_headers_written = True
self.logger.info(f"CSV file created: {csv_path}")
def write_csv_data(self, data: Dict[str, Any]):
"""Write data to CSV file"""
if not self.csv_recording or not self.variables:
return
try:
self.setup_csv_file()
if self.current_csv_writer:
# Create timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Create row with all variables (use None for missing values)
row = [timestamp]
for var_name in self.variables.keys():
row.append(data.get(var_name, None))
self.current_csv_writer.writerow(row)
self.current_csv_file.flush()
except Exception as e:
self.logger.error(f"Error writing CSV data: {e}")
def get_streaming_data(self, all_data: Dict[str, Any]) -> Dict[str, Any]:
"""Filter data for streaming based on selected variables"""
if not self.streaming_variables:
return all_data
return {
name: value
for name, value in all_data.items()
if name in self.streaming_variables
}
def connect_plc(self) -> bool:
"""Connect to S7-315 PLC"""
try:
if self.plc:
self.plc.disconnect()
self.plc = snap7.client.Client()
self.plc.connect(
self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"]
)
self.connected = True
self.save_system_state()
connection_details = {
"ip": self.plc_config["ip"],
"rack": self.plc_config["rack"],
"slot": self.plc_config["slot"],
}
self.log_event(
"info",
"plc_connection",
f"Successfully connected to PLC {self.plc_config['ip']}",
connection_details,
)
return True
except Exception as e:
self.connected = False
error_details = {
"ip": self.plc_config["ip"],
"rack": self.plc_config["rack"],
"slot": self.plc_config["slot"],
"error": str(e),
}
self.log_event(
"error",
"plc_connection_failed",
f"Failed to connect to PLC {self.plc_config['ip']}: {str(e)}",
error_details,
)
return False
def disconnect_plc(self):
"""Disconnect from PLC"""
try:
if self.plc:
self.plc.disconnect()
self.connected = False
self.save_system_state()
self.log_event(
"info",
"plc_disconnection",
f"Disconnected from PLC {self.plc_config['ip']}",
)
except Exception as e:
self.log_event(
"error",
"plc_disconnection_error",
f"Error disconnecting from PLC: {str(e)}",
{"error": str(e)},
)
def read_variable(self, var_config: Dict[str, Any]) -> Any:
"""Read a specific variable from the PLC"""
try:
area_type = var_config.get("area", "db").lower()
offset = var_config["offset"]
var_type = var_config["type"]
bit = var_config.get("bit") # Extract bit position for bit areas
if area_type == "db":
# Data Block access (existing functionality)
db = var_config["db"]
if var_type == "real":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.db_read(db, offset, 1)
if bit is not None:
# Use snap7.util.get_bool for specific bit extraction
value = snap7.util.get_bool(raw_data, 0, bit)
else:
# Default to bit 0 for backward compatibility
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.db_read(db, offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.db_read(db, offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.db_read(db, offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "mw" or area_type == "m":
# Memory Words / Markers access
if var_type == "real":
raw_data = self.plc.mb_read(offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.mb_read(offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.mb_read(offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.mb_read(offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.mb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.mb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.mb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.mb_read(offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.mb_read(offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.mb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "pew" or area_type == "pe":
# Process Input Words access
if var_type == "real":
raw_data = self.plc.eb_read(offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.eb_read(offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.eb_read(offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.eb_read(offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.eb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.eb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.eb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.eb_read(offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.eb_read(offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.eb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "paw" or area_type == "pa":
# Process Output Words access
if var_type == "real":
raw_data = self.plc.ab_read(offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.ab_read(offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.ab_read(offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.ab_read(offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.ab_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.ab_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.ab_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.ab_read(offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.ab_read(offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.ab_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "e":
# Process Input Bits access (E5.1 format)
if var_type == "bool":
raw_data = self.plc.eb_read(offset, 1)
# Use snap7.util.get_bool for proper bit extraction
value = snap7.util.get_bool(raw_data, 0, bit)
else:
return None
elif area_type == "a":
# Process Output Bits access (A3.7 format)
if var_type == "bool":
raw_data = self.plc.ab_read(offset, 1)
# Use snap7.util.get_bool for proper bit extraction
value = snap7.util.get_bool(raw_data, 0, bit)
else:
return None
elif area_type == "mb":
# Memory Bits access (M10.0 format)
if var_type == "bool":
raw_data = self.plc.mb_read(offset, 1)
# Use snap7.util.get_bool for proper bit extraction
value = snap7.util.get_bool(raw_data, 0, bit)
else:
return None
else:
self.logger.error(f"Unsupported area type: {area_type}")
return None
return value
except Exception as e:
self.logger.error(f"Error reading variable: {e}")
return None
def read_all_variables(self) -> Dict[str, Any]:
"""Read all configured variables"""
if not self.connected or not self.plc:
return {}
data = {}
for var_name, var_config in self.variables.items():
value = self.read_variable(var_config)
if value is not None:
data[var_name] = value
return data
def setup_udp_socket(self) -> bool:
"""Setup UDP socket"""
try:
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.logger.info(
f"UDP socket configured for {self.udp_config['host']}:{self.udp_config['port']}"
)
return True
except Exception as e:
self.logger.error(f"Error configuring UDP socket: {e}")
return False
def send_to_plotjuggler(self, data: Dict[str, Any]):
"""Send data to PlotJuggler via UDP JSON"""
if not self.udp_socket:
return
try:
message = {"timestamp": time.time(), "data": data}
json_message = json.dumps(message)
self.udp_socket.sendto(
json_message.encode("utf-8"),
(self.udp_config["host"], self.udp_config["port"]),
)
except Exception as e:
self.logger.error(f"Error sending data to PlotJuggler: {e}")
def streaming_loop(self):
"""Main streaming loop"""
self.logger.info(
f"Starting streaming with interval of {self.sampling_interval}s"
)
consecutive_errors = 0
max_consecutive_errors = 5
while self.streaming:
try:
start_time = time.time()
# Read all variables
all_data = self.read_all_variables()
if all_data:
# Reset error counter on successful read
consecutive_errors = 0
# Write to CSV (all variables)
self.write_csv_data(all_data)
# Get filtered data for streaming
streaming_data = self.get_streaming_data(all_data)
# Send filtered data to PlotJuggler
if streaming_data:
self.send_to_plotjuggler(streaming_data)
# Log data
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.logger.info(
f"[{timestamp}] CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars"
)
else:
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
self.log_event(
"error",
"streaming_error",
f"Multiple consecutive read failures ({consecutive_errors}). Stopping streaming.",
{"consecutive_errors": consecutive_errors},
)
break
# Maintain sampling interval
elapsed = time.time() - start_time
sleep_time = max(0, self.sampling_interval - elapsed)
time.sleep(sleep_time)
except Exception as e:
consecutive_errors += 1
self.log_event(
"error",
"streaming_error",
f"Error in streaming loop: {str(e)}",
{"error": str(e), "consecutive_errors": consecutive_errors},
)
if consecutive_errors >= max_consecutive_errors:
self.log_event(
"error",
"streaming_error",
"Too many consecutive errors. Stopping streaming.",
{"consecutive_errors": consecutive_errors},
)
break
time.sleep(1) # Wait before retry
def start_streaming(self) -> bool:
"""Start data streaming - activates all datasets with variables"""
if not self.connected:
self.log_event(
"error", "streaming_error", "Cannot start streaming: PLC not connected"
)
return False
if not self.datasets:
self.log_event(
"error",
"streaming_error",
"Cannot start streaming: No datasets configured",
)
return False
if not self.setup_udp_socket():
self.log_event(
"error",
"streaming_error",
"Cannot start streaming: UDP socket setup failed",
)
return False
# Activate all datasets that have variables
activated_count = 0
for dataset_id, dataset_info in self.datasets.items():
if dataset_info.get("variables"):
try:
self.activate_dataset(dataset_id)
activated_count += 1
except Exception as e:
self.logger.warning(f"Failed to activate dataset {dataset_id}: {e}")
if activated_count == 0:
self.log_event(
"error",
"streaming_error",
"Cannot start streaming: No datasets with variables configured",
)
return False
self.streaming = True
self.save_system_state()
self.log_event(
"info",
"streaming_started",
f"Multi-dataset streaming started: {activated_count} datasets activated",
{
"activated_datasets": activated_count,
"total_datasets": len(self.datasets),
"udp_host": self.udp_config["host"],
"udp_port": self.udp_config["port"],
},
)
return True
def stop_streaming(self):
"""Stop streaming - deactivates all active datasets"""
self.streaming = False
# Stop all dataset streaming threads
active_datasets_copy = self.active_datasets.copy()
for dataset_id in active_datasets_copy:
try:
self.deactivate_dataset(dataset_id)
except Exception as e:
self.logger.warning(f"Error deactivating dataset {dataset_id}: {e}")
# Close UDP socket
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = None
self.save_system_state()
datasets_stopped = len(active_datasets_copy)
self.log_event(
"info",
"streaming_stopped",
f"Multi-dataset streaming stopped: {datasets_stopped} datasets deactivated",
)
def get_status(self) -> Dict[str, Any]:
"""Get current system status"""
total_variables = sum(
len(dataset["variables"]) for dataset in self.datasets.values()
)
# Count only variables that are in streaming_variables list AND have streaming=true
total_streaming_vars = 0
for dataset in self.datasets.values():
streaming_vars = dataset.get("streaming_variables", [])
variables_config = dataset.get("variables", {})
active_streaming_vars = [
var
for var in streaming_vars
if variables_config.get(var, {}).get("streaming", False)
]
total_streaming_vars += len(active_streaming_vars)
return {
"plc_connected": self.connected,
"streaming": self.streaming,
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"datasets_count": len(self.datasets),
"active_datasets_count": len(self.active_datasets),
"total_variables": total_variables,
"total_streaming_variables": total_streaming_vars,
"streaming_variables_count": total_streaming_vars, # Add this for frontend compatibility
"sampling_interval": self.sampling_interval,
"current_dataset_id": self.current_dataset_id,
"datasets": {
dataset_id: {
"name": info["name"],
"prefix": info["prefix"],
"variables_count": len(info["variables"]),
"streaming_count": len(
[
var
for var in info.get("streaming_variables", [])
if info.get("variables", {})
.get(var, {})
.get("streaming", False)
]
),
"sampling_interval": info.get("sampling_interval"),
"enabled": info.get("enabled", False),
"active": dataset_id in self.active_datasets,
}
for dataset_id, info in self.datasets.items()
},
}
def log_event(
self, level: str, event_type: str, message: str, details: Dict[str, Any] = None
):
"""Add an event to the persistent log"""
try:
event = {
"timestamp": datetime.now().isoformat(),
"level": level, # info, warning, error
"event_type": event_type, # connection, disconnection, error, config_change, etc.
"message": message,
"details": details or {},
}
self.events_log.append(event)
# Limit log size
if len(self.events_log) > self.max_log_entries:
self.events_log = self.events_log[-self.max_log_entries :]
# Save to file
self.save_events_log()
# Also log to regular logger
if level == "error":
self.logger.error(f"[{event_type}] {message}")
elif level == "warning":
self.logger.warning(f"[{event_type}] {message}")
else:
self.logger.info(f"[{event_type}] {message}")
except Exception as e:
self.logger.error(f"Error adding event to log: {e}")
def load_events_log(self):
"""Load events log from JSON file"""
try:
if os.path.exists(self.events_log_file):
with open(self.events_log_file, "r", encoding="utf-8") as f:
data = json.load(f)
self.events_log = data.get("events", [])
# Limit log size on load
if len(self.events_log) > self.max_log_entries:
self.events_log = self.events_log[-self.max_log_entries :]
self.logger.info(f"Events log loaded: {len(self.events_log)} entries")
else:
self.events_log = []
self.logger.info("No events log file found, starting with empty log")
except Exception as e:
self.logger.error(f"Error loading events log: {e}")
self.events_log = []
def save_events_log(self):
"""Save events log to JSON file"""
try:
log_data = {
"events": self.events_log,
"last_updated": datetime.now().isoformat(),
"total_entries": len(self.events_log),
}
with open(self.events_log_file, "w", encoding="utf-8") as f:
json.dump(log_data, f, indent=2, ensure_ascii=False)
except Exception as e:
self.logger.error(f"Error saving events log: {e}")
def get_recent_events(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent events from the log"""
return self.events_log[-limit:] if self.events_log else []
# Global streamer instance (will be initialized in main)
streamer = None
def check_streamer_initialized():
"""Check if streamer is initialized, return error response if not"""
if streamer is None:
return jsonify({"error": "Application not initialized"}), 503
return None
@app.route("/images/<filename>")
def serve_image(filename):
"""Serve images from .images directory"""
return send_from_directory(".images", filename)
@app.route("/")
def index():
"""Main page"""
if streamer is None:
return "Application not initialized", 503
# Get variables for the current dataset or empty dict if no current dataset
current_variables = {}
if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets:
current_variables = streamer.datasets[streamer.current_dataset_id].get(
"variables", {}
)
return render_template(
"index.html",
status=streamer.get_status(),
variables=current_variables,
datasets=streamer.datasets,
current_dataset_id=streamer.current_dataset_id,
)
@app.route("/api/plc/config", methods=["POST"])
def update_plc_config():
"""Update PLC configuration"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
ip = data.get("ip", "10.1.33.11")
rack = int(data.get("rack", 0))
slot = int(data.get("slot", 2))
streamer.update_plc_config(ip, rack, slot)
return jsonify({"success": True, "message": "PLC configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/udp/config", methods=["POST"])
def update_udp_config():
"""Update UDP configuration"""
try:
data = request.get_json()
host = data.get("host", "127.0.0.1")
port = int(data.get("port", 9870))
streamer.update_udp_config(host, port)
return jsonify({"success": True, "message": "UDP configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/plc/connect", methods=["POST"])
def connect_plc():
"""Connect to PLC"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.connect_plc():
return jsonify({"success": True, "message": "Connected to PLC"})
else:
return jsonify({"success": False, "message": "Error connecting to PLC"}), 500
@app.route("/api/plc/disconnect", methods=["POST"])
def disconnect_plc():
"""Disconnect from PLC"""
streamer.stop_streaming()
streamer.disconnect_plc()
return jsonify({"success": True, "message": "Disconnected from PLC"})
@app.route("/api/variables", methods=["POST"])
def add_variable():
"""Add a new variable"""
try:
data = request.get_json()
name = data.get("name")
area = data.get("area")
db = int(data.get("db"))
offset = int(data.get("offset"))
var_type = data.get("type")
bit = data.get("bit") # Added bit parameter
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
if (
not name
or not area
or var_type not in valid_types
or area.lower() not in valid_areas
):
return jsonify({"success": False, "message": "Invalid data"}), 400
if area.lower() in ["e", "a", "mb"] and bit is None:
return (
jsonify(
{
"success": False,
"message": "Bit position must be specified for bit areas",
}
),
400,
)
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
return (
jsonify(
{
"success": False,
"message": "Bit position must be between 0 and 7",
}
),
400,
)
streamer.add_variable(name, area, db, offset, var_type, bit)
return jsonify({"success": True, "message": f"Variable {name} added"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["DELETE"])
def remove_variable(name):
"""Remove a variable"""
streamer.remove_variable(name)
return jsonify({"success": True, "message": f"Variable {name} removed"})
@app.route("/api/variables/<name>", methods=["GET"])
def get_variable(name):
"""Get a specific variable configuration from current dataset"""
try:
if not streamer.current_dataset_id:
return (
jsonify({"success": False, "message": "No dataset selected"}),
400,
)
current_variables = streamer.get_dataset_variables(streamer.current_dataset_id)
if name not in current_variables:
return (
jsonify({"success": False, "message": f"Variable {name} not found"}),
404,
)
var_config = current_variables[name].copy()
var_config["name"] = name
# Check if variable is in streaming list for current dataset
streaming_vars = streamer.datasets[streamer.current_dataset_id].get(
"streaming_variables", []
)
var_config["streaming"] = name in streaming_vars
return jsonify({"success": True, "variable": var_config})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["PUT"])
def update_variable(name):
"""Update an existing variable in current dataset"""
try:
if not streamer.current_dataset_id:
return jsonify({"success": False, "message": "No dataset selected"}), 400
data = request.get_json()
new_name = data.get("name", name)
area = data.get("area")
db = int(data.get("db", 1))
offset = int(data.get("offset"))
var_type = data.get("type")
bit = data.get("bit")
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
if (
not new_name
or not area
or var_type not in valid_types
or area.lower() not in valid_areas
):
return jsonify({"success": False, "message": "Invalid data"}), 400
if area.lower() in ["e", "a", "mb"] and bit is None:
return (
jsonify(
{
"success": False,
"message": "Bit position must be specified for bit areas",
}
),
400,
)
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
return (
jsonify(
{
"success": False,
"message": "Bit position must be between 0 and 7",
}
),
400,
)
current_dataset_id = streamer.current_dataset_id
current_variables = streamer.get_dataset_variables(current_dataset_id)
# Check if variable exists in current dataset
if name not in current_variables:
return (
jsonify({"success": False, "message": f"Variable {name} not found"}),
404,
)
# Check if new name already exists (if name is changing)
if name != new_name and new_name in current_variables:
return (
jsonify(
{
"success": False,
"message": f"Variable {new_name} already exists",
}
),
400,
)
# Preserve streaming state
streaming_vars = streamer.datasets[current_dataset_id].get(
"streaming_variables", []
)
was_streaming = name in streaming_vars
# Remove old variable
streamer.remove_variable_from_dataset(current_dataset_id, name)
# Add updated variable
streamer.add_variable_to_dataset(
current_dataset_id, new_name, area, db, offset, var_type, bit, was_streaming
)
return jsonify({"success": True, "message": f"Variable updated successfully"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>/streaming", methods=["POST"])
def toggle_variable_streaming(name):
"""Toggle streaming for a specific variable in current dataset"""
try:
if not streamer.current_dataset_id:
return jsonify({"success": False, "message": "No dataset selected"}), 400
data = request.get_json()
enabled = data.get("enabled", False)
# Use the new dataset-specific method
streamer.toggle_variable_streaming(streamer.current_dataset_id, name, enabled)
status = "enabled" if enabled else "disabled"
return jsonify(
{"success": True, "message": f"Variable {name} streaming {status}"}
)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/streaming", methods=["GET"])
def get_streaming_variables():
"""Get list of variables enabled for streaming in current dataset"""
if streamer is None:
return jsonify({"success": False, "error": "Streamer not initialized"}), 503
# Get streaming variables from current dataset
streaming_vars = []
if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets:
streaming_vars = streamer.datasets[streamer.current_dataset_id].get(
"streaming_variables", []
)
return jsonify({"success": True, "streaming_variables": streaming_vars})
# Dataset Management API Endpoints
@app.route("/api/datasets", methods=["GET"])
def get_datasets():
"""Get all datasets"""
error_response = check_streamer_initialized()
if error_response:
return error_response
return jsonify(
{
"success": True,
"datasets": streamer.datasets,
"active_datasets": list(streamer.active_datasets),
"current_dataset_id": streamer.current_dataset_id,
}
)
@app.route("/api/datasets", methods=["POST"])
def create_dataset():
"""Create a new dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
dataset_id = data.get("dataset_id", "").strip()
name = data.get("name", "").strip()
prefix = data.get("prefix", "").strip()
sampling_interval = data.get("sampling_interval")
if not dataset_id or not name or not prefix:
return (
jsonify(
{
"success": False,
"message": "Dataset ID, name, and prefix are required",
}
),
400,
)
# Validate sampling interval if provided
if sampling_interval is not None:
sampling_interval = float(sampling_interval)
if sampling_interval < 0.01:
return (
jsonify(
{
"success": False,
"message": "Sampling interval must be at least 0.01 seconds",
}
),
400,
)
streamer.create_dataset(dataset_id, name, prefix, sampling_interval)
return jsonify(
{
"success": True,
"message": f"Dataset '{name}' created successfully",
"dataset_id": dataset_id,
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 400
except Exception as e:
return (
jsonify({"success": False, "message": f"Error creating dataset: {str(e)}"}),
500,
)
@app.route("/api/datasets/<dataset_id>", methods=["DELETE"])
def delete_dataset(dataset_id):
"""Delete a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.delete_dataset(dataset_id)
return jsonify({"success": True, "message": "Dataset deleted successfully"})
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify({"success": False, "message": f"Error deleting dataset: {str(e)}"}),
500,
)
@app.route("/api/datasets/<dataset_id>/activate", methods=["POST"])
def activate_dataset(dataset_id):
"""Activate a dataset for streaming"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.activate_dataset(dataset_id)
return jsonify({"success": True, "message": "Dataset activated successfully"})
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except RuntimeError as e:
return jsonify({"success": False, "message": str(e)}), 400
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error activating dataset: {str(e)}"}
),
500,
)
@app.route("/api/datasets/<dataset_id>/deactivate", methods=["POST"])
def deactivate_dataset(dataset_id):
"""Deactivate a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.deactivate_dataset(dataset_id)
return jsonify({"success": True, "message": "Dataset deactivated successfully"})
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error deactivating dataset: {str(e)}"}
),
500,
)
@app.route("/api/datasets/<dataset_id>/variables", methods=["POST"])
def add_variable_to_dataset(dataset_id):
"""Add a variable to a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
name = data.get("name", "").strip()
area = data.get("area", "").strip()
db = data.get("db", 1)
offset = data.get("offset", 0)
var_type = data.get("type", "").strip()
bit = data.get("bit")
streaming = data.get("streaming", False)
if not name or not area or not var_type:
return (
jsonify(
{
"success": False,
"message": "Variable name, area, and type are required",
}
),
400,
)
streamer.add_variable_to_dataset(
dataset_id, name, area, db, offset, var_type, bit, streaming
)
return jsonify(
{
"success": True,
"message": f"Variable '{name}' added to dataset successfully",
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 400
except Exception as e:
return (
jsonify({"success": False, "message": f"Error adding variable: {str(e)}"}),
500,
)
@app.route("/api/datasets/<dataset_id>/variables/<variable_name>", methods=["DELETE"])
def remove_variable_from_dataset(dataset_id, variable_name):
"""Remove a variable from a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.remove_variable_from_dataset(dataset_id, variable_name)
return jsonify(
{
"success": True,
"message": f"Variable '{variable_name}' removed from dataset successfully",
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error removing variable: {str(e)}"}
),
500,
)
@app.route(
"/api/datasets/<dataset_id>/variables/<variable_name>/streaming", methods=["POST"]
)
def toggle_variable_streaming_in_dataset(dataset_id, variable_name):
"""Toggle streaming for a variable in a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
enabled = data.get("enabled", False)
streamer.toggle_variable_streaming(dataset_id, variable_name, enabled)
return jsonify(
{
"success": True,
"message": f"Variable '{variable_name}' streaming {'enabled' if enabled else 'disabled'}",
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error toggling streaming: {str(e)}"}
),
500,
)
@app.route("/api/datasets/current", methods=["POST"])
def set_current_dataset():
"""Set the current dataset for editing"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
dataset_id = data.get("dataset_id")
if dataset_id and dataset_id in streamer.datasets:
streamer.current_dataset_id = dataset_id
streamer.save_datasets()
return jsonify(
{
"success": True,
"message": "Current dataset updated successfully",
"current_dataset_id": dataset_id,
}
)
else:
return jsonify({"success": False, "message": "Invalid dataset ID"}), 400
except Exception as e:
return (
jsonify(
{
"success": False,
"message": f"Error setting current dataset: {str(e)}",
}
),
500,
)
@app.route("/api/streaming/start", methods=["POST"])
def start_streaming():
"""Start streaming"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.start_streaming():
return jsonify({"success": True, "message": "Streaming started"})
else:
return jsonify({"success": False, "message": "Error starting streaming"}), 500
@app.route("/api/streaming/stop", methods=["POST"])
def stop_streaming():
"""Stop streaming"""
streamer.stop_streaming()
return jsonify({"success": True, "message": "Streaming stopped"})
@app.route("/api/sampling", methods=["POST"])
def update_sampling():
"""Update sampling interval"""
try:
data = request.get_json()
interval = float(data.get("interval", 0.1))
if interval < 0.01:
interval = 0.01
streamer.update_sampling_interval(interval)
return jsonify({"success": True, "message": f"Interval updated to {interval}s"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/csv/start", methods=["POST"])
def start_csv_recording():
"""Start CSV recording independently"""
if streamer.start_csv_recording():
return jsonify({"success": True, "message": "CSV recording started"})
else:
return (
jsonify({"success": False, "message": "Error starting CSV recording"}),
500,
)
@app.route("/api/csv/stop", methods=["POST"])
def stop_csv_recording():
"""Stop CSV recording independently"""
streamer.stop_csv_recording()
return jsonify({"success": True, "message": "CSV recording stopped"})
@app.route("/api/status")
def get_status():
"""Get current status"""
if streamer is None:
return jsonify({"error": "Application not initialized"}), 503
return jsonify(streamer.get_status())
@app.route("/api/events")
def get_events():
"""Get recent events from the application log"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
limit = request.args.get("limit", 50, type=int)
limit = min(limit, 200) # Maximum 200 events per request
events = streamer.get_recent_events(limit)
return jsonify(
{
"success": True,
"events": events,
"total_events": len(streamer.events_log),
"showing": len(events),
}
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
def graceful_shutdown():
"""Perform graceful shutdown"""
print("\n⏹️ Performing graceful shutdown...")
try:
streamer.stop_streaming()
streamer.disconnect_plc()
streamer.release_instance_lock()
print("✅ Shutdown completed successfully")
except Exception as e:
print(f"⚠️ Error during shutdown: {e}")
def main():
"""Main application entry point with error handling and recovery"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Create templates directory if it doesn't exist
os.makedirs("templates", exist_ok=True)
print("🚀 Starting Flask server for PLC S7-315 Streamer")
print("📊 Web interface available at: http://localhost:5050")
print("🔧 Configure your PLC and variables through the web interface")
# Initialize streamer (this will handle instance locking and auto-recovery)
global streamer
# Start Flask application
app.run(debug=False, host="0.0.0.0", port=5050, use_reloader=False)
# If we reach here, the server stopped normally
break
except RuntimeError as e:
if "Another instance" in str(e):
print(f"{e}")
print("💡 Tip: Stop the other instance or wait for it to finish")
sys.exit(1)
else:
print(f"⚠️ Runtime error: {e}")
retry_count += 1
except KeyboardInterrupt:
print("\n⏸️ Received interrupt signal...")
graceful_shutdown()
break
except Exception as e:
print(f"💥 Unexpected error: {e}")
retry_count += 1
if retry_count < max_retries:
print(f"🔄 Attempting restart ({retry_count}/{max_retries})...")
time.sleep(2) # Wait before retry
else:
print("❌ Maximum retries reached. Exiting...")
graceful_shutdown()
sys.exit(1)
if __name__ == "__main__":
try:
# Initialize streamer instance
streamer = PLCDataStreamer()
main()
except Exception as e:
print(f"💥 Critical error during initialization: {e}")
sys.exit(1)