2683 lines
97 KiB
Python
2683 lines
97 KiB
Python
from flask import (
|
|
Flask,
|
|
render_template,
|
|
request,
|
|
jsonify,
|
|
redirect,
|
|
url_for,
|
|
send_from_directory,
|
|
)
|
|
import snap7
|
|
import snap7.util
|
|
import json
|
|
import socket
|
|
import time
|
|
import logging
|
|
import threading
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
import struct
|
|
import os
|
|
import csv
|
|
from pathlib import Path
|
|
import atexit
|
|
import psutil
|
|
import sys
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = "plc_streamer_secret_key"
|
|
|
|
|
|
class PLCDataStreamer:
|
|
def __init__(self):
|
|
"""Initialize the PLC data streamer"""
|
|
# Configuration file paths
|
|
self.config_file = "plc_config.json"
|
|
self.variables_file = "plc_variables.json"
|
|
self.datasets_file = "plc_datasets.json" # New file for multiple datasets
|
|
self.state_file = "system_state.json"
|
|
self.events_log_file = "application_events.json"
|
|
|
|
# Default configuration
|
|
self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2}
|
|
self.udp_config = {"host": "127.0.0.1", "port": 9870}
|
|
|
|
# Multiple datasets structure
|
|
self.datasets = {} # Dictionary of dataset_id -> dataset_config
|
|
self.active_datasets = set() # Set of active dataset IDs
|
|
self.current_dataset_id = None # Currently selected dataset for editing
|
|
|
|
# Dataset streaming threads and files
|
|
self.dataset_threads = {} # dataset_id -> thread object
|
|
self.dataset_csv_files = {} # dataset_id -> file handle
|
|
self.dataset_csv_writers = {} # dataset_id -> csv writer
|
|
self.dataset_csv_hours = {} # dataset_id -> current hour
|
|
self.dataset_using_modification_files = (
|
|
{}
|
|
) # dataset_id -> bool (track modification files)
|
|
|
|
# System states
|
|
self.plc = None
|
|
self.udp_socket = None
|
|
self.connected = False
|
|
self.streaming = False
|
|
self.stream_thread = None
|
|
self.sampling_interval = 0.1
|
|
|
|
# Auto-recovery settings
|
|
self.auto_recovery_enabled = True
|
|
self.last_state = {
|
|
"should_connect": False,
|
|
"should_stream": False,
|
|
"should_record_csv": False,
|
|
}
|
|
|
|
# Single instance control
|
|
self.lock_file = "plc_streamer.lock"
|
|
self.lock_fd = None
|
|
|
|
# Events log for persistent logging
|
|
self.events_log = []
|
|
self.max_log_entries = 1000 # Maximum number of log entries to keep
|
|
|
|
# Setup logging first
|
|
self.setup_logging()
|
|
|
|
# Load configuration from files
|
|
self.load_configuration()
|
|
self.load_datasets() # Load multiple datasets configuration
|
|
self.sync_streaming_variables() # Synchronize streaming variables configuration
|
|
self.load_system_state()
|
|
self.load_events_log()
|
|
|
|
# Acquire instance lock and attempt auto-recovery
|
|
if self.acquire_instance_lock():
|
|
# Small delay to ensure previous instance has fully cleaned up
|
|
time.sleep(1)
|
|
self.log_event(
|
|
"info",
|
|
"Application started",
|
|
"Application initialization completed successfully",
|
|
)
|
|
self.attempt_auto_recovery()
|
|
else:
|
|
raise RuntimeError("Another instance of the application is already running")
|
|
|
|
def setup_logging(self):
|
|
"""Configure the logging system"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()],
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def load_configuration(self):
|
|
"""Load PLC and UDP configuration from JSON file"""
|
|
try:
|
|
if os.path.exists(self.config_file):
|
|
with open(self.config_file, "r") as f:
|
|
config = json.load(f)
|
|
self.plc_config = config.get("plc_config", self.plc_config)
|
|
self.udp_config = config.get("udp_config", self.udp_config)
|
|
self.sampling_interval = config.get(
|
|
"sampling_interval", self.sampling_interval
|
|
)
|
|
self.logger.info(f"Configuration loaded from {self.config_file}")
|
|
else:
|
|
self.logger.info("No configuration file found, using defaults")
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading configuration: {e}")
|
|
|
|
def save_configuration(self):
|
|
"""Save PLC and UDP configuration to JSON file"""
|
|
try:
|
|
config = {
|
|
"plc_config": self.plc_config,
|
|
"udp_config": self.udp_config,
|
|
"sampling_interval": self.sampling_interval,
|
|
}
|
|
with open(self.config_file, "w") as f:
|
|
json.dump(config, f, indent=4)
|
|
self.logger.info(f"Configuration saved to {self.config_file}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving configuration: {e}")
|
|
|
|
def load_datasets(self):
|
|
"""Load datasets configuration from JSON file"""
|
|
try:
|
|
if os.path.exists(self.datasets_file):
|
|
with open(self.datasets_file, "r") as f:
|
|
datasets_data = json.load(f)
|
|
self.datasets = datasets_data.get("datasets", {})
|
|
self.active_datasets = set(datasets_data.get("active_datasets", []))
|
|
self.current_dataset_id = datasets_data.get("current_dataset_id")
|
|
|
|
# Validate current_dataset_id exists
|
|
if (
|
|
self.current_dataset_id
|
|
and self.current_dataset_id not in self.datasets
|
|
):
|
|
self.current_dataset_id = None
|
|
|
|
# Set default current dataset if none selected
|
|
if not self.current_dataset_id and self.datasets:
|
|
self.current_dataset_id = next(iter(self.datasets.keys()))
|
|
|
|
self.logger.info(
|
|
f"Datasets loaded from {self.datasets_file}: {len(self.datasets)} datasets, {len(self.active_datasets)} active"
|
|
)
|
|
else:
|
|
self.logger.info("No datasets file found, starting with empty datasets")
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading datasets: {e}")
|
|
|
|
def save_datasets(self):
|
|
"""Save datasets configuration to JSON file"""
|
|
try:
|
|
datasets_data = {
|
|
"datasets": self.datasets,
|
|
"active_datasets": list(self.active_datasets),
|
|
"current_dataset_id": self.current_dataset_id,
|
|
"version": "1.0",
|
|
"last_update": datetime.now().isoformat(),
|
|
}
|
|
with open(self.datasets_file, "w") as f:
|
|
json.dump(datasets_data, f, indent=4)
|
|
self.logger.info(f"Datasets configuration saved to {self.datasets_file}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving datasets: {e}")
|
|
|
|
def sync_streaming_variables(self):
|
|
"""Synchronize streaming variables configuration - ensure variables in streaming_variables list have streaming=true"""
|
|
try:
|
|
sync_needed = False
|
|
for dataset_id, dataset_info in self.datasets.items():
|
|
streaming_vars = dataset_info.get("streaming_variables", [])
|
|
variables_config = dataset_info.get("variables", {})
|
|
|
|
for var_name in streaming_vars:
|
|
if var_name in variables_config:
|
|
# If variable is in streaming list but doesn't have streaming=true, fix it
|
|
if not variables_config[var_name].get("streaming", False):
|
|
variables_config[var_name]["streaming"] = True
|
|
sync_needed = True
|
|
self.logger.info(
|
|
f"Synchronized streaming flag for variable '{var_name}' in dataset '{dataset_id}'"
|
|
)
|
|
|
|
# Also ensure variables not in streaming list have streaming=false
|
|
for var_name, var_config in variables_config.items():
|
|
if var_name not in streaming_vars and var_config.get(
|
|
"streaming", False
|
|
):
|
|
var_config["streaming"] = False
|
|
sync_needed = True
|
|
self.logger.info(
|
|
f"Disabled streaming flag for variable '{var_name}' in dataset '{dataset_id}'"
|
|
)
|
|
|
|
if sync_needed:
|
|
self.save_datasets()
|
|
self.logger.info("Streaming variables configuration synchronized")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error synchronizing streaming variables: {e}")
|
|
|
|
def create_dataset(
|
|
self, dataset_id: str, name: str, prefix: str, sampling_interval: float = None
|
|
):
|
|
"""Create a new dataset"""
|
|
if dataset_id in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' already exists")
|
|
|
|
new_dataset = {
|
|
"name": name,
|
|
"prefix": prefix,
|
|
"variables": {},
|
|
"streaming_variables": [],
|
|
"sampling_interval": sampling_interval,
|
|
"enabled": False,
|
|
"created": datetime.now().isoformat(),
|
|
}
|
|
|
|
self.datasets[dataset_id] = new_dataset
|
|
|
|
# Set as current dataset if it's the first one
|
|
if not self.current_dataset_id:
|
|
self.current_dataset_id = dataset_id
|
|
|
|
self.save_datasets()
|
|
|
|
self.log_event(
|
|
"info",
|
|
"dataset_created",
|
|
f"Dataset created: {name} (prefix: {prefix})",
|
|
{
|
|
"dataset_id": dataset_id,
|
|
"name": name,
|
|
"prefix": prefix,
|
|
"sampling_interval": sampling_interval,
|
|
},
|
|
)
|
|
|
|
def delete_dataset(self, dataset_id: str):
|
|
"""Delete a dataset"""
|
|
if dataset_id not in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' does not exist")
|
|
|
|
# Stop dataset if it's active
|
|
if dataset_id in self.active_datasets:
|
|
self.stop_dataset(dataset_id)
|
|
|
|
dataset_info = self.datasets[dataset_id].copy()
|
|
del self.datasets[dataset_id]
|
|
|
|
# Update current dataset if this was selected
|
|
if self.current_dataset_id == dataset_id:
|
|
self.current_dataset_id = (
|
|
next(iter(self.datasets.keys())) if self.datasets else None
|
|
)
|
|
|
|
self.save_datasets()
|
|
|
|
self.log_event(
|
|
"info",
|
|
"dataset_deleted",
|
|
f"Dataset deleted: {dataset_info['name']}",
|
|
{"dataset_id": dataset_id, "dataset_info": dataset_info},
|
|
)
|
|
|
|
def get_current_dataset(self):
|
|
"""Get the currently selected dataset"""
|
|
if self.current_dataset_id and self.current_dataset_id in self.datasets:
|
|
return self.datasets[self.current_dataset_id]
|
|
return None
|
|
|
|
def get_dataset_variables(self, dataset_id: str):
|
|
"""Get variables for a specific dataset"""
|
|
if dataset_id in self.datasets:
|
|
return self.datasets[dataset_id].get("variables", {})
|
|
return {}
|
|
|
|
def get_dataset_sampling_interval(self, dataset_id: str):
|
|
"""Get sampling interval for a dataset (falls back to global if not set)"""
|
|
if dataset_id in self.datasets:
|
|
dataset_interval = self.datasets[dataset_id].get("sampling_interval")
|
|
return (
|
|
dataset_interval
|
|
if dataset_interval is not None
|
|
else self.sampling_interval
|
|
)
|
|
return self.sampling_interval
|
|
|
|
def add_variable_to_dataset(
|
|
self,
|
|
dataset_id: str,
|
|
name: str,
|
|
area: str,
|
|
db: int,
|
|
offset: int,
|
|
var_type: str,
|
|
bit: int = None,
|
|
streaming: bool = False,
|
|
):
|
|
"""Add a variable to a specific dataset"""
|
|
if dataset_id not in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' does not exist")
|
|
|
|
# Validate area and type (reuse existing validation logic)
|
|
area = area.lower()
|
|
if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]:
|
|
raise ValueError(
|
|
f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb"
|
|
)
|
|
|
|
valid_types = [
|
|
"real",
|
|
"int",
|
|
"bool",
|
|
"dint",
|
|
"word",
|
|
"byte",
|
|
"uint",
|
|
"udint",
|
|
"sint",
|
|
"usint",
|
|
]
|
|
if var_type not in valid_types:
|
|
raise ValueError(
|
|
f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}"
|
|
)
|
|
|
|
# Create variable configuration
|
|
var_config = {
|
|
"area": area,
|
|
"offset": offset,
|
|
"type": var_type,
|
|
"streaming": streaming,
|
|
}
|
|
|
|
if area == "db":
|
|
var_config["db"] = db
|
|
if area in ["e", "a", "mb"] or (area == "db" and bit is not None):
|
|
var_config["bit"] = bit
|
|
|
|
# Add to dataset
|
|
self.datasets[dataset_id]["variables"][name] = var_config
|
|
|
|
# Update streaming variables list if streaming is enabled
|
|
if streaming:
|
|
if name not in self.datasets[dataset_id]["streaming_variables"]:
|
|
self.datasets[dataset_id]["streaming_variables"].append(name)
|
|
|
|
self.save_datasets()
|
|
|
|
# Create new CSV file if dataset is active and variables were modified
|
|
self.create_new_dataset_csv_file_for_variable_modification(dataset_id)
|
|
|
|
# Log the addition
|
|
area_description = {
|
|
"db": (
|
|
f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}"
|
|
),
|
|
"mw": f"MW{offset}",
|
|
"m": f"M{offset}",
|
|
"pew": f"PEW{offset}",
|
|
"pe": f"PE{offset}",
|
|
"paw": f"PAW{offset}",
|
|
"pa": f"PA{offset}",
|
|
"e": f"E{offset}.{bit}",
|
|
"a": f"A{offset}.{bit}",
|
|
"mb": f"M{offset}.{bit}",
|
|
}
|
|
|
|
self.log_event(
|
|
"info",
|
|
"variable_added",
|
|
f"Variable added to dataset '{self.datasets[dataset_id]['name']}': {name} -> {area_description[area]} ({var_type})",
|
|
{
|
|
"dataset_id": dataset_id,
|
|
"name": name,
|
|
"area": area,
|
|
"db": db if area == "db" else None,
|
|
"offset": offset,
|
|
"bit": bit,
|
|
"type": var_type,
|
|
"streaming": streaming,
|
|
},
|
|
)
|
|
|
|
def remove_variable_from_dataset(self, dataset_id: str, name: str):
|
|
"""Remove a variable from a specific dataset"""
|
|
if dataset_id not in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' does not exist")
|
|
|
|
if name not in self.datasets[dataset_id]["variables"]:
|
|
raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'")
|
|
|
|
var_config = self.datasets[dataset_id]["variables"][name].copy()
|
|
del self.datasets[dataset_id]["variables"][name]
|
|
|
|
# Remove from streaming variables if present
|
|
if name in self.datasets[dataset_id]["streaming_variables"]:
|
|
self.datasets[dataset_id]["streaming_variables"].remove(name)
|
|
|
|
self.save_datasets()
|
|
|
|
# Create new CSV file if dataset is active and variables were modified
|
|
self.create_new_dataset_csv_file_for_variable_modification(dataset_id)
|
|
|
|
self.log_event(
|
|
"info",
|
|
"variable_removed",
|
|
f"Variable removed from dataset '{self.datasets[dataset_id]['name']}': {name}",
|
|
{"dataset_id": dataset_id, "name": name, "removed_config": var_config},
|
|
)
|
|
|
|
def toggle_variable_streaming(self, dataset_id: str, name: str, enabled: bool):
|
|
"""Toggle streaming for a variable in a dataset"""
|
|
if dataset_id not in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' does not exist")
|
|
|
|
if name not in self.datasets[dataset_id]["variables"]:
|
|
raise ValueError(f"Variable '{name}' not found in dataset '{dataset_id}'")
|
|
|
|
# Update the individual variable streaming flag
|
|
self.datasets[dataset_id]["variables"][name]["streaming"] = enabled
|
|
|
|
# Update the streaming variables list
|
|
if enabled:
|
|
if name not in self.datasets[dataset_id]["streaming_variables"]:
|
|
self.datasets[dataset_id]["streaming_variables"].append(name)
|
|
else:
|
|
if name in self.datasets[dataset_id]["streaming_variables"]:
|
|
self.datasets[dataset_id]["streaming_variables"].remove(name)
|
|
|
|
self.save_datasets()
|
|
|
|
self.logger.info(
|
|
f"Dataset '{dataset_id}' variable {name} streaming: {'enabled' if enabled else 'disabled'}"
|
|
)
|
|
|
|
def activate_dataset(self, dataset_id: str):
|
|
"""Activate a dataset for streaming and CSV recording"""
|
|
if dataset_id not in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' does not exist")
|
|
|
|
if not self.connected:
|
|
raise RuntimeError("Cannot activate dataset: PLC not connected")
|
|
|
|
self.active_datasets.add(dataset_id)
|
|
self.datasets[dataset_id]["enabled"] = True
|
|
self.save_datasets()
|
|
|
|
# Start streaming thread for this dataset
|
|
self.start_dataset_streaming(dataset_id)
|
|
|
|
dataset_info = self.datasets[dataset_id]
|
|
self.log_event(
|
|
"info",
|
|
"dataset_activated",
|
|
f"Dataset activated: {dataset_info['name']}",
|
|
{
|
|
"dataset_id": dataset_id,
|
|
"variables_count": len(dataset_info["variables"]),
|
|
"streaming_count": len(dataset_info["streaming_variables"]),
|
|
"prefix": dataset_info["prefix"],
|
|
},
|
|
)
|
|
|
|
def deactivate_dataset(self, dataset_id: str):
|
|
"""Deactivate a dataset"""
|
|
if dataset_id not in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' does not exist")
|
|
|
|
self.active_datasets.discard(dataset_id)
|
|
self.datasets[dataset_id]["enabled"] = False
|
|
self.save_datasets()
|
|
|
|
# Stop streaming thread for this dataset
|
|
self.stop_dataset_streaming(dataset_id)
|
|
|
|
dataset_info = self.datasets[dataset_id]
|
|
self.log_event(
|
|
"info",
|
|
"dataset_deactivated",
|
|
f"Dataset deactivated: {dataset_info['name']}",
|
|
{"dataset_id": dataset_id},
|
|
)
|
|
|
|
def start_dataset_streaming(self, dataset_id: str):
|
|
"""Start streaming thread for a specific dataset"""
|
|
if dataset_id not in self.datasets:
|
|
return False
|
|
|
|
if dataset_id in self.dataset_threads:
|
|
return True # Already running
|
|
|
|
# Create and start thread for this dataset
|
|
thread = threading.Thread(
|
|
target=self.dataset_streaming_loop, args=(dataset_id,)
|
|
)
|
|
thread.daemon = True
|
|
self.dataset_threads[dataset_id] = thread
|
|
thread.start()
|
|
|
|
dataset_info = self.datasets[dataset_id]
|
|
interval = self.get_dataset_sampling_interval(dataset_id)
|
|
|
|
self.logger.info(
|
|
f"Started streaming for dataset '{dataset_info['name']}' (interval: {interval}s)"
|
|
)
|
|
return True
|
|
|
|
def stop_dataset_streaming(self, dataset_id: str):
|
|
"""Stop streaming thread for a specific dataset"""
|
|
if dataset_id in self.dataset_threads:
|
|
# The thread will detect this and stop
|
|
thread = self.dataset_threads[dataset_id]
|
|
if thread.is_alive():
|
|
thread.join(timeout=2)
|
|
del self.dataset_threads[dataset_id]
|
|
|
|
# Close CSV file if open
|
|
if dataset_id in self.dataset_csv_files:
|
|
self.dataset_csv_files[dataset_id].close()
|
|
del self.dataset_csv_files[dataset_id]
|
|
del self.dataset_csv_writers[dataset_id]
|
|
del self.dataset_csv_hours[dataset_id]
|
|
# Reset modification file flag
|
|
self.dataset_using_modification_files.pop(dataset_id, None)
|
|
|
|
dataset_info = self.datasets.get(dataset_id, {})
|
|
self.logger.info(
|
|
f"Stopped streaming for dataset '{dataset_info.get('name', dataset_id)}'"
|
|
)
|
|
|
|
def dataset_streaming_loop(self, dataset_id: str):
|
|
"""Streaming loop for a specific dataset"""
|
|
dataset_info = self.datasets[dataset_id]
|
|
interval = self.get_dataset_sampling_interval(dataset_id)
|
|
|
|
self.logger.info(
|
|
f"Dataset '{dataset_info['name']}' streaming loop started (interval: {interval}s)"
|
|
)
|
|
|
|
consecutive_errors = 0
|
|
max_consecutive_errors = 5
|
|
|
|
while dataset_id in self.active_datasets and self.connected:
|
|
try:
|
|
start_time = time.time()
|
|
|
|
# Read variables for this dataset
|
|
dataset_variables = self.get_dataset_variables(dataset_id)
|
|
all_data = self.read_dataset_variables(dataset_id, dataset_variables)
|
|
|
|
if all_data:
|
|
consecutive_errors = 0
|
|
|
|
# Write to CSV (all variables)
|
|
self.write_dataset_csv_data(dataset_id, all_data)
|
|
|
|
# Get filtered data for streaming - only variables that are in streaming_variables list AND have streaming=true
|
|
streaming_variables = dataset_info.get("streaming_variables", [])
|
|
dataset_vars_config = dataset_info.get("variables", {})
|
|
streaming_data = {
|
|
name: value
|
|
for name, value in all_data.items()
|
|
if name in streaming_variables
|
|
and dataset_vars_config.get(name, {}).get("streaming", False)
|
|
}
|
|
|
|
# Send filtered data to PlotJuggler
|
|
if streaming_data:
|
|
self.send_to_plotjuggler(streaming_data)
|
|
|
|
# Log data
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
self.logger.info(
|
|
f"[{timestamp}] Dataset '{dataset_info['name']}': CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars"
|
|
)
|
|
else:
|
|
consecutive_errors += 1
|
|
if consecutive_errors >= max_consecutive_errors:
|
|
self.log_event(
|
|
"error",
|
|
"dataset_streaming_error",
|
|
f"Multiple consecutive read failures for dataset '{dataset_info['name']}' ({consecutive_errors}). Stopping streaming.",
|
|
{
|
|
"dataset_id": dataset_id,
|
|
"consecutive_errors": consecutive_errors,
|
|
},
|
|
)
|
|
break
|
|
|
|
# Maintain sampling interval
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, interval - elapsed)
|
|
time.sleep(sleep_time)
|
|
|
|
except Exception as e:
|
|
consecutive_errors += 1
|
|
self.log_event(
|
|
"error",
|
|
"dataset_streaming_error",
|
|
f"Error in dataset '{dataset_info['name']}' streaming loop: {str(e)}",
|
|
{
|
|
"dataset_id": dataset_id,
|
|
"error": str(e),
|
|
"consecutive_errors": consecutive_errors,
|
|
},
|
|
)
|
|
|
|
if consecutive_errors >= max_consecutive_errors:
|
|
self.log_event(
|
|
"error",
|
|
"dataset_streaming_error",
|
|
f"Too many consecutive errors for dataset '{dataset_info['name']}'. Stopping streaming.",
|
|
{
|
|
"dataset_id": dataset_id,
|
|
"consecutive_errors": consecutive_errors,
|
|
},
|
|
)
|
|
break
|
|
|
|
time.sleep(1) # Wait before retry
|
|
|
|
# Clean up when exiting
|
|
self.stop_dataset_streaming(dataset_id)
|
|
self.logger.info(f"Dataset '{dataset_info['name']}' streaming loop ended")
|
|
|
|
def read_dataset_variables(
|
|
self, dataset_id: str, variables: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""Read all variables for a specific dataset"""
|
|
data = {}
|
|
|
|
for var_name, var_config in variables.items():
|
|
try:
|
|
value = self.read_variable(var_config)
|
|
data[var_name] = value
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
f"Error reading variable {var_name} in dataset {dataset_id}: {e}"
|
|
)
|
|
data[var_name] = None
|
|
|
|
return data
|
|
|
|
def get_dataset_csv_file_path(
|
|
self, dataset_id: str, use_modification_timestamp: bool = False
|
|
) -> str:
|
|
"""Get the CSV file path for a specific dataset"""
|
|
if dataset_id not in self.datasets:
|
|
raise ValueError(f"Dataset '{dataset_id}' does not exist")
|
|
|
|
now = datetime.now()
|
|
prefix = self.datasets[dataset_id]["prefix"]
|
|
|
|
if use_modification_timestamp:
|
|
time_suffix = now.strftime("%H_%M_%S")
|
|
filename = f"{prefix}_{time_suffix}.csv"
|
|
else:
|
|
hour = now.strftime("%H")
|
|
filename = f"{prefix}_{hour}.csv"
|
|
|
|
directory = self.get_csv_directory_path()
|
|
return os.path.join(directory, filename)
|
|
|
|
def setup_dataset_csv_file(self, dataset_id: str):
|
|
"""Setup CSV file for a specific dataset"""
|
|
current_hour = datetime.now().hour
|
|
|
|
# If we're using a modification file and the hour hasn't changed, keep using it
|
|
if (
|
|
self.dataset_using_modification_files.get(dataset_id, False)
|
|
and dataset_id in self.dataset_csv_hours
|
|
and self.dataset_csv_hours[dataset_id] == current_hour
|
|
and dataset_id in self.dataset_csv_files
|
|
):
|
|
return
|
|
|
|
# Check if we need to create a new file
|
|
if (
|
|
dataset_id not in self.dataset_csv_hours
|
|
or self.dataset_csv_hours[dataset_id] != current_hour
|
|
or dataset_id not in self.dataset_csv_files
|
|
):
|
|
|
|
# Close previous file if open
|
|
if dataset_id in self.dataset_csv_files:
|
|
self.dataset_csv_files[dataset_id].close()
|
|
|
|
# Create directory and file for current hour
|
|
self.ensure_csv_directory()
|
|
csv_path = self.get_dataset_csv_file_path(dataset_id)
|
|
|
|
# Check if file exists to determine if we need headers
|
|
file_exists = os.path.exists(csv_path)
|
|
|
|
self.dataset_csv_files[dataset_id] = open(
|
|
csv_path, "a", newline="", encoding="utf-8"
|
|
)
|
|
self.dataset_csv_writers[dataset_id] = csv.writer(
|
|
self.dataset_csv_files[dataset_id]
|
|
)
|
|
self.dataset_csv_hours[dataset_id] = current_hour
|
|
|
|
# Reset modification file flag when creating regular hourly file
|
|
self.dataset_using_modification_files[dataset_id] = False
|
|
|
|
# Write headers if it's a new file
|
|
dataset_variables = self.get_dataset_variables(dataset_id)
|
|
if not file_exists and dataset_variables:
|
|
headers = ["timestamp"] + list(dataset_variables.keys())
|
|
self.dataset_csv_writers[dataset_id].writerow(headers)
|
|
self.dataset_csv_files[dataset_id].flush()
|
|
self.logger.info(
|
|
f"CSV file created for dataset '{self.datasets[dataset_id]['name']}': {csv_path}"
|
|
)
|
|
|
|
def write_dataset_csv_data(self, dataset_id: str, data: Dict[str, Any]):
|
|
"""Write data to CSV file for a specific dataset"""
|
|
if dataset_id not in self.active_datasets:
|
|
return
|
|
|
|
try:
|
|
self.setup_dataset_csv_file(dataset_id)
|
|
|
|
if dataset_id in self.dataset_csv_writers:
|
|
# Create timestamp
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
|
|
# Create row with all variables for this dataset
|
|
dataset_variables = self.get_dataset_variables(dataset_id)
|
|
row = [timestamp]
|
|
for var_name in dataset_variables.keys():
|
|
row.append(data.get(var_name, None))
|
|
|
|
self.dataset_csv_writers[dataset_id].writerow(row)
|
|
self.dataset_csv_files[dataset_id].flush()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error writing CSV data for dataset {dataset_id}: {e}")
|
|
|
|
def create_new_dataset_csv_file_for_variable_modification(self, dataset_id: str):
|
|
"""Create a new CSV file for a dataset when variables are modified during active recording"""
|
|
if dataset_id not in self.active_datasets:
|
|
return
|
|
|
|
try:
|
|
# Close current file if open
|
|
if dataset_id in self.dataset_csv_files:
|
|
self.dataset_csv_files[dataset_id].close()
|
|
del self.dataset_csv_files[dataset_id]
|
|
del self.dataset_csv_writers[dataset_id]
|
|
self.logger.info(
|
|
f"Closed previous CSV file for dataset '{self.datasets[dataset_id]['name']}' due to variable modification"
|
|
)
|
|
|
|
# Create new file with modification timestamp
|
|
self.ensure_csv_directory()
|
|
csv_path = self.get_dataset_csv_file_path(
|
|
dataset_id, use_modification_timestamp=True
|
|
)
|
|
|
|
self.dataset_csv_files[dataset_id] = open(
|
|
csv_path, "w", newline="", encoding="utf-8"
|
|
)
|
|
self.dataset_csv_writers[dataset_id] = csv.writer(
|
|
self.dataset_csv_files[dataset_id]
|
|
)
|
|
|
|
# Mark that we're using a modification file and set current hour
|
|
self.dataset_using_modification_files[dataset_id] = True
|
|
self.dataset_csv_hours[dataset_id] = datetime.now().hour
|
|
|
|
# Write headers with new variable configuration
|
|
dataset_variables = self.get_dataset_variables(dataset_id)
|
|
if dataset_variables:
|
|
headers = ["timestamp"] + list(dataset_variables.keys())
|
|
self.dataset_csv_writers[dataset_id].writerow(headers)
|
|
self.dataset_csv_files[dataset_id].flush()
|
|
|
|
dataset_name = self.datasets[dataset_id]["name"]
|
|
self.logger.info(
|
|
f"New CSV file created after variable modification for dataset '{dataset_name}': {csv_path}"
|
|
)
|
|
self.log_event(
|
|
"info",
|
|
"dataset_csv_file_created",
|
|
f"New CSV file created after variable modification for dataset '{dataset_name}': {os.path.basename(csv_path)}",
|
|
{
|
|
"dataset_id": dataset_id,
|
|
"file_path": csv_path,
|
|
"variables_count": len(dataset_variables),
|
|
"reason": "variable_modification",
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
dataset_name = self.datasets.get(dataset_id, {}).get("name", dataset_id)
|
|
self.logger.error(
|
|
f"Error creating new CSV file after variable modification for dataset '{dataset_name}': {e}"
|
|
)
|
|
self.log_event(
|
|
"error",
|
|
"dataset_csv_error",
|
|
f"Failed to create new CSV file after variable modification for dataset '{dataset_name}': {str(e)}",
|
|
{"dataset_id": dataset_id, "error": str(e)},
|
|
)
|
|
|
|
def load_system_state(self):
|
|
"""Load system state from JSON file"""
|
|
try:
|
|
if os.path.exists(self.state_file):
|
|
with open(self.state_file, "r") as f:
|
|
state_data = json.load(f)
|
|
self.last_state = state_data.get("last_state", self.last_state)
|
|
self.auto_recovery_enabled = state_data.get(
|
|
"auto_recovery_enabled", True
|
|
)
|
|
self.logger.info(f"System state loaded from {self.state_file}")
|
|
else:
|
|
self.logger.info("No system state file found, starting with defaults")
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading system state: {e}")
|
|
|
|
def save_system_state(self):
|
|
"""Save current system state to JSON file"""
|
|
try:
|
|
state_data = {
|
|
"last_state": {
|
|
"should_connect": self.connected,
|
|
"should_stream": self.streaming,
|
|
"active_datasets": list(self.active_datasets),
|
|
},
|
|
"auto_recovery_enabled": self.auto_recovery_enabled,
|
|
"last_update": datetime.now().isoformat(),
|
|
}
|
|
|
|
with open(self.state_file, "w") as f:
|
|
json.dump(state_data, f, indent=4)
|
|
self.logger.debug("System state saved")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving system state: {e}")
|
|
|
|
def attempt_auto_recovery(self):
|
|
"""Attempt to restore previous system state"""
|
|
if not self.auto_recovery_enabled:
|
|
self.logger.info("Auto-recovery disabled, skipping state restoration")
|
|
return
|
|
|
|
self.logger.info("Attempting auto-recovery of previous state...")
|
|
|
|
# Try to restore connection
|
|
if self.last_state.get("should_connect", False):
|
|
self.logger.info("Attempting to restore PLC connection...")
|
|
if self.connect_plc():
|
|
self.logger.info("PLC connection restored successfully")
|
|
|
|
# Try to restore streaming if connection was successful
|
|
if self.last_state.get("should_stream", False):
|
|
self.logger.info("Attempting to restore streaming...")
|
|
|
|
# Setup UDP socket first
|
|
if not self.setup_udp_socket():
|
|
self.logger.warning(
|
|
"Failed to setup UDP socket during auto-recovery"
|
|
)
|
|
return
|
|
|
|
# Restore active datasets
|
|
restored_datasets = self.last_state.get("active_datasets", [])
|
|
activated_count = 0
|
|
|
|
for dataset_id in restored_datasets:
|
|
if dataset_id in self.datasets:
|
|
try:
|
|
self.activate_dataset(dataset_id)
|
|
activated_count += 1
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
f"Failed to restore dataset {dataset_id}: {e}"
|
|
)
|
|
|
|
if activated_count > 0:
|
|
self.streaming = True
|
|
self.save_system_state()
|
|
self.logger.info(
|
|
f"Streaming restored successfully: {activated_count} datasets activated"
|
|
)
|
|
else:
|
|
self.logger.warning(
|
|
"Failed to restore streaming: no datasets activated"
|
|
)
|
|
else:
|
|
self.logger.warning("Failed to restore PLC connection")
|
|
|
|
def acquire_instance_lock(self) -> bool:
|
|
"""Acquire lock to ensure single instance execution"""
|
|
try:
|
|
# Check if lock file exists
|
|
if os.path.exists(self.lock_file):
|
|
# Read PID from existing lock file
|
|
with open(self.lock_file, "r") as f:
|
|
try:
|
|
old_pid = int(f.read().strip())
|
|
|
|
# Check if process is still running
|
|
if psutil.pid_exists(old_pid):
|
|
# Get process info to verify it's our application
|
|
try:
|
|
proc = psutil.Process(old_pid)
|
|
cmdline = " ".join(proc.cmdline())
|
|
if "main.py" in cmdline or "plc" in cmdline.lower():
|
|
self.logger.error(
|
|
f"Another instance is already running (PID: {old_pid})"
|
|
)
|
|
return False
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
# Process doesn't exist or can't access, continue
|
|
pass
|
|
|
|
# Old process is dead, remove stale lock file
|
|
os.remove(self.lock_file)
|
|
self.logger.info("Removed stale lock file")
|
|
|
|
except (ValueError, IOError):
|
|
# Invalid lock file, remove it
|
|
os.remove(self.lock_file)
|
|
self.logger.info("Removed invalid lock file")
|
|
|
|
# Create new lock file with current PID
|
|
with open(self.lock_file, "w") as f:
|
|
f.write(str(os.getpid()))
|
|
|
|
# Register cleanup function
|
|
atexit.register(self.release_instance_lock)
|
|
|
|
self.logger.info(
|
|
f"Instance lock acquired: {self.lock_file} (PID: {os.getpid()})"
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error acquiring instance lock: {e}")
|
|
return False
|
|
|
|
def release_instance_lock(self):
|
|
"""Release instance lock"""
|
|
try:
|
|
# Remove lock file
|
|
if os.path.exists(self.lock_file):
|
|
os.remove(self.lock_file)
|
|
self.logger.info("Instance lock released")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error releasing instance lock: {e}")
|
|
|
|
def save_variables(self):
|
|
"""Save variables configuration to JSON file"""
|
|
try:
|
|
# Update streaming state in variables before saving
|
|
for var_name in self.variables:
|
|
self.variables[var_name]["streaming"] = (
|
|
var_name in self.streaming_variables
|
|
)
|
|
|
|
with open(self.variables_file, "w") as f:
|
|
json.dump(self.variables, f, indent=4)
|
|
self.logger.info(f"Variables saved to {self.variables_file}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving variables: {e}")
|
|
|
|
def update_plc_config(self, ip: str, rack: int, slot: int):
|
|
"""Update PLC configuration"""
|
|
old_config = self.plc_config.copy()
|
|
self.plc_config = {"ip": ip, "rack": rack, "slot": slot}
|
|
self.save_configuration()
|
|
|
|
config_details = {"old_config": old_config, "new_config": self.plc_config}
|
|
self.log_event(
|
|
"info",
|
|
"config_change",
|
|
f"PLC configuration updated: {ip}:{rack}/{slot}",
|
|
config_details,
|
|
)
|
|
|
|
def update_udp_config(self, host: str, port: int):
|
|
"""Update UDP configuration"""
|
|
old_config = self.udp_config.copy()
|
|
self.udp_config = {"host": host, "port": port}
|
|
self.save_configuration()
|
|
|
|
config_details = {"old_config": old_config, "new_config": self.udp_config}
|
|
self.log_event(
|
|
"info",
|
|
"config_change",
|
|
f"UDP configuration updated: {host}:{port}",
|
|
config_details,
|
|
)
|
|
|
|
def update_sampling_interval(self, interval: float):
|
|
"""Update sampling interval"""
|
|
old_interval = self.sampling_interval
|
|
self.sampling_interval = interval
|
|
self.save_configuration()
|
|
|
|
config_details = {"old_interval": old_interval, "new_interval": interval}
|
|
self.log_event(
|
|
"info",
|
|
"config_change",
|
|
f"Sampling interval updated: {interval}s",
|
|
config_details,
|
|
)
|
|
|
|
def add_variable(
|
|
self, name: str, area: str, db: int, offset: int, var_type: str, bit: int = None
|
|
):
|
|
"""Add a variable for polling"""
|
|
area = area.lower()
|
|
|
|
# Validate area type - ahora incluye áreas de bits individuales
|
|
if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]:
|
|
raise ValueError(
|
|
f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb"
|
|
)
|
|
|
|
# Validate data type
|
|
valid_types = [
|
|
"real",
|
|
"int",
|
|
"bool",
|
|
"dint",
|
|
"word",
|
|
"byte",
|
|
"uint",
|
|
"udint",
|
|
"sint",
|
|
"usint",
|
|
]
|
|
if var_type not in valid_types:
|
|
raise ValueError(
|
|
f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}"
|
|
)
|
|
|
|
# Para áreas de bits individuales, el tipo debe ser bool y bit debe estar especificado
|
|
if area in ["e", "a", "mb"] and var_type != "bool":
|
|
raise ValueError(f"For bit areas ({area}), data type must be 'bool'")
|
|
|
|
if area in ["e", "a", "mb"] and bit is None:
|
|
raise ValueError(
|
|
f"For bit areas ({area}), bit position must be specified (0-7)"
|
|
)
|
|
|
|
# Validar rango de bit para todas las áreas que lo soporten
|
|
if bit is not None and (bit < 0 or bit > 7):
|
|
raise ValueError("Bit position must be between 0 and 7")
|
|
|
|
# Create variable configuration
|
|
var_config = {
|
|
"area": area,
|
|
"offset": offset,
|
|
"type": var_type,
|
|
"streaming": False,
|
|
}
|
|
|
|
# Add DB number only for DB area
|
|
if area == "db":
|
|
var_config["db"] = db
|
|
|
|
# Add bit position for bit areas and DB with specific bit
|
|
if area in ["e", "a", "mb"] or (area == "db" and bit is not None):
|
|
var_config["bit"] = bit
|
|
|
|
self.variables[name] = var_config
|
|
self.save_variables()
|
|
|
|
variable_details = {
|
|
"name": name,
|
|
"area": area,
|
|
"db": db if area == "db" else None,
|
|
"offset": offset,
|
|
"bit": bit,
|
|
"type": var_type,
|
|
"total_variables": len(self.variables),
|
|
}
|
|
|
|
# Updated area description to include bit addresses
|
|
area_description = {
|
|
"db": (
|
|
f"DB{db}.DBX{offset}.{bit}" if bit is not None else f"DB{db}.{offset}"
|
|
),
|
|
"mw": f"MW{offset}",
|
|
"m": f"M{offset}",
|
|
"pew": f"PEW{offset}",
|
|
"pe": f"PE{offset}",
|
|
"paw": f"PAW{offset}",
|
|
"pa": f"PA{offset}",
|
|
"e": f"E{offset}.{bit}",
|
|
"a": f"A{offset}.{bit}",
|
|
"mb": f"M{offset}.{bit}",
|
|
}
|
|
|
|
self.log_event(
|
|
"info",
|
|
"variable_added",
|
|
f"Variable added: {name} -> {area_description[area]} ({var_type})",
|
|
variable_details,
|
|
)
|
|
self.create_new_csv_file_for_variable_modification()
|
|
|
|
def remove_variable(self, name: str):
|
|
"""Remove a variable from polling"""
|
|
if name in self.variables:
|
|
var_config = self.variables[name].copy()
|
|
del self.variables[name]
|
|
# Also remove from streaming variables if present
|
|
self.streaming_variables.discard(name)
|
|
self.save_variables()
|
|
|
|
variable_details = {
|
|
"name": name,
|
|
"removed_config": var_config,
|
|
"total_variables": len(self.variables),
|
|
}
|
|
self.log_event(
|
|
"info",
|
|
"variable_removed",
|
|
f"Variable removed: {name}",
|
|
variable_details,
|
|
)
|
|
self.create_new_csv_file_for_variable_modification()
|
|
|
|
def toggle_streaming_variable(self, name: str, enabled: bool):
|
|
"""Enable or disable a variable for streaming"""
|
|
if name in self.variables:
|
|
if enabled:
|
|
self.streaming_variables.add(name)
|
|
else:
|
|
self.streaming_variables.discard(name)
|
|
|
|
# Save changes to persist streaming configuration
|
|
self.save_variables()
|
|
|
|
self.logger.info(
|
|
f"Variable {name} streaming: {'enabled' if enabled else 'disabled'}"
|
|
)
|
|
|
|
def get_csv_directory_path(self) -> str:
|
|
"""Get the directory path for current day's CSV files"""
|
|
now = datetime.now()
|
|
day_folder = now.strftime("%d-%m-%Y")
|
|
return os.path.join("records", day_folder)
|
|
|
|
def get_csv_file_path(self, use_modification_timestamp: bool = False) -> str:
|
|
"""Get the complete file path for current hour's CSV file"""
|
|
now = datetime.now()
|
|
|
|
if use_modification_timestamp:
|
|
# Create filename with hour_min_sec format for variable modifications
|
|
time_suffix = now.strftime("%H_%M_%S")
|
|
filename = f"_{time_suffix}.csv"
|
|
else:
|
|
# Standard hourly format
|
|
hour = now.strftime("%H")
|
|
filename = f"{hour}.csv"
|
|
|
|
directory = self.get_csv_directory_path()
|
|
return os.path.join(directory, filename)
|
|
|
|
def ensure_csv_directory(self):
|
|
"""Create CSV directory structure if it doesn't exist"""
|
|
directory = self.get_csv_directory_path()
|
|
Path(directory).mkdir(parents=True, exist_ok=True)
|
|
|
|
def create_new_csv_file_for_variable_modification(self):
|
|
"""Create a new CSV file when variables are modified during active recording"""
|
|
if not self.csv_recording:
|
|
return
|
|
|
|
try:
|
|
# Close current file if open
|
|
if self.current_csv_file:
|
|
self.current_csv_file.close()
|
|
self.logger.info(
|
|
f"Closed previous CSV file due to variable modification"
|
|
)
|
|
|
|
# Create new file with modification timestamp
|
|
self.ensure_csv_directory()
|
|
csv_path = self.get_csv_file_path(use_modification_timestamp=True)
|
|
|
|
self.current_csv_file = open(csv_path, "w", newline="", encoding="utf-8")
|
|
self.current_csv_writer = csv.writer(self.current_csv_file)
|
|
|
|
# Mark that we're using a modification file and set current hour
|
|
self.using_modification_file = True
|
|
self.current_hour = datetime.now().hour
|
|
|
|
# Write headers with new variable configuration
|
|
if self.variables:
|
|
headers = ["timestamp"] + list(self.variables.keys())
|
|
self.current_csv_writer.writerow(headers)
|
|
self.current_csv_file.flush()
|
|
self.csv_headers_written = True
|
|
|
|
self.logger.info(
|
|
f"New CSV file created after variable modification: {csv_path}"
|
|
)
|
|
self.log_event(
|
|
"info",
|
|
"csv_file_created",
|
|
f"New CSV file created after variable modification: {os.path.basename(csv_path)}",
|
|
{
|
|
"file_path": csv_path,
|
|
"variables_count": len(self.variables),
|
|
"reason": "variable_modification",
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(
|
|
f"Error creating new CSV file after variable modification: {e}"
|
|
)
|
|
self.log_event(
|
|
"error",
|
|
"csv_error",
|
|
f"Failed to create new CSV file after variable modification: {str(e)}",
|
|
{"error": str(e)},
|
|
)
|
|
|
|
def setup_csv_file(self):
|
|
"""Setup CSV file for the current hour"""
|
|
current_hour = datetime.now().hour
|
|
|
|
# If we're using a modification file and the hour hasn't changed, keep using it
|
|
if (
|
|
self.using_modification_file
|
|
and self.current_hour == current_hour
|
|
and self.current_csv_file is not None
|
|
):
|
|
return
|
|
|
|
# Check if we need to create a new file
|
|
if self.current_hour != current_hour or self.current_csv_file is None:
|
|
# Close previous file if open
|
|
if self.current_csv_file:
|
|
self.current_csv_file.close()
|
|
|
|
# Create directory and file for current hour
|
|
self.ensure_csv_directory()
|
|
csv_path = self.get_csv_file_path()
|
|
|
|
# Check if file exists to determine if we need headers
|
|
file_exists = os.path.exists(csv_path)
|
|
|
|
self.current_csv_file = open(csv_path, "a", newline="", encoding="utf-8")
|
|
self.current_csv_writer = csv.writer(self.current_csv_file)
|
|
self.current_hour = current_hour
|
|
|
|
# Reset modification file flag when creating regular hourly file
|
|
self.using_modification_file = False
|
|
|
|
# Write headers if it's a new file
|
|
if not file_exists and self.variables:
|
|
headers = ["timestamp"] + list(self.variables.keys())
|
|
self.current_csv_writer.writerow(headers)
|
|
self.current_csv_file.flush()
|
|
self.csv_headers_written = True
|
|
self.logger.info(f"CSV file created: {csv_path}")
|
|
|
|
def write_csv_data(self, data: Dict[str, Any]):
|
|
"""Write data to CSV file"""
|
|
if not self.csv_recording or not self.variables:
|
|
return
|
|
|
|
try:
|
|
self.setup_csv_file()
|
|
|
|
if self.current_csv_writer:
|
|
# Create timestamp
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
|
|
# Create row with all variables (use None for missing values)
|
|
row = [timestamp]
|
|
for var_name in self.variables.keys():
|
|
row.append(data.get(var_name, None))
|
|
|
|
self.current_csv_writer.writerow(row)
|
|
self.current_csv_file.flush()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error writing CSV data: {e}")
|
|
|
|
def get_streaming_data(self, all_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Filter data for streaming based on selected variables"""
|
|
if not self.streaming_variables:
|
|
return all_data
|
|
|
|
return {
|
|
name: value
|
|
for name, value in all_data.items()
|
|
if name in self.streaming_variables
|
|
}
|
|
|
|
def connect_plc(self) -> bool:
|
|
"""Connect to S7-315 PLC"""
|
|
try:
|
|
if self.plc:
|
|
self.plc.disconnect()
|
|
|
|
self.plc = snap7.client.Client()
|
|
self.plc.connect(
|
|
self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"]
|
|
)
|
|
|
|
self.connected = True
|
|
self.save_system_state()
|
|
|
|
connection_details = {
|
|
"ip": self.plc_config["ip"],
|
|
"rack": self.plc_config["rack"],
|
|
"slot": self.plc_config["slot"],
|
|
}
|
|
self.log_event(
|
|
"info",
|
|
"plc_connection",
|
|
f"Successfully connected to PLC {self.plc_config['ip']}",
|
|
connection_details,
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.connected = False
|
|
error_details = {
|
|
"ip": self.plc_config["ip"],
|
|
"rack": self.plc_config["rack"],
|
|
"slot": self.plc_config["slot"],
|
|
"error": str(e),
|
|
}
|
|
self.log_event(
|
|
"error",
|
|
"plc_connection_failed",
|
|
f"Failed to connect to PLC {self.plc_config['ip']}: {str(e)}",
|
|
error_details,
|
|
)
|
|
return False
|
|
|
|
def disconnect_plc(self):
|
|
"""Disconnect from PLC"""
|
|
try:
|
|
if self.plc:
|
|
self.plc.disconnect()
|
|
self.connected = False
|
|
self.save_system_state()
|
|
|
|
self.log_event(
|
|
"info",
|
|
"plc_disconnection",
|
|
f"Disconnected from PLC {self.plc_config['ip']}",
|
|
)
|
|
except Exception as e:
|
|
self.log_event(
|
|
"error",
|
|
"plc_disconnection_error",
|
|
f"Error disconnecting from PLC: {str(e)}",
|
|
{"error": str(e)},
|
|
)
|
|
|
|
def read_variable(self, var_config: Dict[str, Any]) -> Any:
|
|
"""Read a specific variable from the PLC"""
|
|
try:
|
|
area_type = var_config.get("area", "db").lower()
|
|
offset = var_config["offset"]
|
|
var_type = var_config["type"]
|
|
bit = var_config.get("bit") # Extract bit position for bit areas
|
|
|
|
if area_type == "db":
|
|
# Data Block access (existing functionality)
|
|
db = var_config["db"]
|
|
if var_type == "real":
|
|
raw_data = self.plc.db_read(db, offset, 4)
|
|
value = struct.unpack(">f", raw_data)[0]
|
|
elif var_type == "int":
|
|
raw_data = self.plc.db_read(db, offset, 2)
|
|
value = struct.unpack(">h", raw_data)[0]
|
|
elif var_type == "bool":
|
|
raw_data = self.plc.db_read(db, offset, 1)
|
|
if bit is not None:
|
|
# Use snap7.util.get_bool for specific bit extraction
|
|
value = snap7.util.get_bool(raw_data, 0, bit)
|
|
else:
|
|
# Default to bit 0 for backward compatibility
|
|
value = bool(raw_data[0] & 0x01)
|
|
elif var_type == "dint":
|
|
raw_data = self.plc.db_read(db, offset, 4)
|
|
value = struct.unpack(">l", raw_data)[0]
|
|
elif var_type == "word":
|
|
raw_data = self.plc.db_read(db, offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "byte":
|
|
raw_data = self.plc.db_read(db, offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
elif var_type == "uint":
|
|
raw_data = self.plc.db_read(db, offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "udint":
|
|
raw_data = self.plc.db_read(db, offset, 4)
|
|
value = struct.unpack(">L", raw_data)[0]
|
|
elif var_type == "sint":
|
|
raw_data = self.plc.db_read(db, offset, 1)
|
|
value = struct.unpack(">b", raw_data)[0]
|
|
elif var_type == "usint":
|
|
raw_data = self.plc.db_read(db, offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
else:
|
|
return None
|
|
|
|
elif area_type == "mw" or area_type == "m":
|
|
# Memory Words / Markers access
|
|
if var_type == "real":
|
|
raw_data = self.plc.mb_read(offset, 4)
|
|
value = struct.unpack(">f", raw_data)[0]
|
|
elif var_type == "int":
|
|
raw_data = self.plc.mb_read(offset, 2)
|
|
value = struct.unpack(">h", raw_data)[0]
|
|
elif var_type == "bool":
|
|
raw_data = self.plc.mb_read(offset, 1)
|
|
value = bool(raw_data[0] & 0x01)
|
|
elif var_type == "dint":
|
|
raw_data = self.plc.mb_read(offset, 4)
|
|
value = struct.unpack(">l", raw_data)[0]
|
|
elif var_type == "word":
|
|
raw_data = self.plc.mb_read(offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "byte":
|
|
raw_data = self.plc.mb_read(offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
elif var_type == "uint":
|
|
raw_data = self.plc.mb_read(offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "udint":
|
|
raw_data = self.plc.mb_read(offset, 4)
|
|
value = struct.unpack(">L", raw_data)[0]
|
|
elif var_type == "sint":
|
|
raw_data = self.plc.mb_read(offset, 1)
|
|
value = struct.unpack(">b", raw_data)[0]
|
|
elif var_type == "usint":
|
|
raw_data = self.plc.mb_read(offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
else:
|
|
return None
|
|
|
|
elif area_type == "pew" or area_type == "pe":
|
|
# Process Input Words access
|
|
if var_type == "real":
|
|
raw_data = self.plc.eb_read(offset, 4)
|
|
value = struct.unpack(">f", raw_data)[0]
|
|
elif var_type == "int":
|
|
raw_data = self.plc.eb_read(offset, 2)
|
|
value = struct.unpack(">h", raw_data)[0]
|
|
elif var_type == "bool":
|
|
raw_data = self.plc.eb_read(offset, 1)
|
|
value = bool(raw_data[0] & 0x01)
|
|
elif var_type == "dint":
|
|
raw_data = self.plc.eb_read(offset, 4)
|
|
value = struct.unpack(">l", raw_data)[0]
|
|
elif var_type == "word":
|
|
raw_data = self.plc.eb_read(offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "byte":
|
|
raw_data = self.plc.eb_read(offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
elif var_type == "uint":
|
|
raw_data = self.plc.eb_read(offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "udint":
|
|
raw_data = self.plc.eb_read(offset, 4)
|
|
value = struct.unpack(">L", raw_data)[0]
|
|
elif var_type == "sint":
|
|
raw_data = self.plc.eb_read(offset, 1)
|
|
value = struct.unpack(">b", raw_data)[0]
|
|
elif var_type == "usint":
|
|
raw_data = self.plc.eb_read(offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
else:
|
|
return None
|
|
|
|
elif area_type == "paw" or area_type == "pa":
|
|
# Process Output Words access
|
|
if var_type == "real":
|
|
raw_data = self.plc.ab_read(offset, 4)
|
|
value = struct.unpack(">f", raw_data)[0]
|
|
elif var_type == "int":
|
|
raw_data = self.plc.ab_read(offset, 2)
|
|
value = struct.unpack(">h", raw_data)[0]
|
|
elif var_type == "bool":
|
|
raw_data = self.plc.ab_read(offset, 1)
|
|
value = bool(raw_data[0] & 0x01)
|
|
elif var_type == "dint":
|
|
raw_data = self.plc.ab_read(offset, 4)
|
|
value = struct.unpack(">l", raw_data)[0]
|
|
elif var_type == "word":
|
|
raw_data = self.plc.ab_read(offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "byte":
|
|
raw_data = self.plc.ab_read(offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
elif var_type == "uint":
|
|
raw_data = self.plc.ab_read(offset, 2)
|
|
value = struct.unpack(">H", raw_data)[0]
|
|
elif var_type == "udint":
|
|
raw_data = self.plc.ab_read(offset, 4)
|
|
value = struct.unpack(">L", raw_data)[0]
|
|
elif var_type == "sint":
|
|
raw_data = self.plc.ab_read(offset, 1)
|
|
value = struct.unpack(">b", raw_data)[0]
|
|
elif var_type == "usint":
|
|
raw_data = self.plc.ab_read(offset, 1)
|
|
value = struct.unpack(">B", raw_data)[0]
|
|
else:
|
|
return None
|
|
|
|
elif area_type == "e":
|
|
# Process Input Bits access (E5.1 format)
|
|
if var_type == "bool":
|
|
raw_data = self.plc.eb_read(offset, 1)
|
|
# Use snap7.util.get_bool for proper bit extraction
|
|
value = snap7.util.get_bool(raw_data, 0, bit)
|
|
else:
|
|
return None
|
|
|
|
elif area_type == "a":
|
|
# Process Output Bits access (A3.7 format)
|
|
if var_type == "bool":
|
|
raw_data = self.plc.ab_read(offset, 1)
|
|
# Use snap7.util.get_bool for proper bit extraction
|
|
value = snap7.util.get_bool(raw_data, 0, bit)
|
|
else:
|
|
return None
|
|
|
|
elif area_type == "mb":
|
|
# Memory Bits access (M10.0 format)
|
|
if var_type == "bool":
|
|
raw_data = self.plc.mb_read(offset, 1)
|
|
# Use snap7.util.get_bool for proper bit extraction
|
|
value = snap7.util.get_bool(raw_data, 0, bit)
|
|
else:
|
|
return None
|
|
|
|
else:
|
|
self.logger.error(f"Unsupported area type: {area_type}")
|
|
return None
|
|
|
|
return value
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading variable: {e}")
|
|
return None
|
|
|
|
def read_all_variables(self) -> Dict[str, Any]:
|
|
"""Read all configured variables"""
|
|
if not self.connected or not self.plc:
|
|
return {}
|
|
|
|
data = {}
|
|
for var_name, var_config in self.variables.items():
|
|
value = self.read_variable(var_config)
|
|
if value is not None:
|
|
data[var_name] = value
|
|
|
|
return data
|
|
|
|
def setup_udp_socket(self) -> bool:
|
|
"""Setup UDP socket"""
|
|
try:
|
|
if self.udp_socket:
|
|
self.udp_socket.close()
|
|
|
|
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.logger.info(
|
|
f"UDP socket configured for {self.udp_config['host']}:{self.udp_config['port']}"
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error configuring UDP socket: {e}")
|
|
return False
|
|
|
|
def send_to_plotjuggler(self, data: Dict[str, Any]):
|
|
"""Send data to PlotJuggler via UDP JSON"""
|
|
if not self.udp_socket:
|
|
return
|
|
|
|
try:
|
|
message = {"timestamp": time.time(), "data": data}
|
|
|
|
json_message = json.dumps(message)
|
|
self.udp_socket.sendto(
|
|
json_message.encode("utf-8"),
|
|
(self.udp_config["host"], self.udp_config["port"]),
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error sending data to PlotJuggler: {e}")
|
|
|
|
def streaming_loop(self):
|
|
"""Main streaming loop"""
|
|
self.logger.info(
|
|
f"Starting streaming with interval of {self.sampling_interval}s"
|
|
)
|
|
|
|
consecutive_errors = 0
|
|
max_consecutive_errors = 5
|
|
|
|
while self.streaming:
|
|
try:
|
|
start_time = time.time()
|
|
|
|
# Read all variables
|
|
all_data = self.read_all_variables()
|
|
|
|
if all_data:
|
|
# Reset error counter on successful read
|
|
consecutive_errors = 0
|
|
|
|
# Write to CSV (all variables)
|
|
self.write_csv_data(all_data)
|
|
|
|
# Get filtered data for streaming
|
|
streaming_data = self.get_streaming_data(all_data)
|
|
|
|
# Send filtered data to PlotJuggler
|
|
if streaming_data:
|
|
self.send_to_plotjuggler(streaming_data)
|
|
|
|
# Log data
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
self.logger.info(
|
|
f"[{timestamp}] CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars"
|
|
)
|
|
else:
|
|
consecutive_errors += 1
|
|
if consecutive_errors >= max_consecutive_errors:
|
|
self.log_event(
|
|
"error",
|
|
"streaming_error",
|
|
f"Multiple consecutive read failures ({consecutive_errors}). Stopping streaming.",
|
|
{"consecutive_errors": consecutive_errors},
|
|
)
|
|
break
|
|
|
|
# Maintain sampling interval
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, self.sampling_interval - elapsed)
|
|
time.sleep(sleep_time)
|
|
|
|
except Exception as e:
|
|
consecutive_errors += 1
|
|
self.log_event(
|
|
"error",
|
|
"streaming_error",
|
|
f"Error in streaming loop: {str(e)}",
|
|
{"error": str(e), "consecutive_errors": consecutive_errors},
|
|
)
|
|
|
|
if consecutive_errors >= max_consecutive_errors:
|
|
self.log_event(
|
|
"error",
|
|
"streaming_error",
|
|
"Too many consecutive errors. Stopping streaming.",
|
|
{"consecutive_errors": consecutive_errors},
|
|
)
|
|
break
|
|
|
|
time.sleep(1) # Wait before retry
|
|
|
|
def start_streaming(self) -> bool:
|
|
"""Start data streaming - activates all datasets with variables"""
|
|
if not self.connected:
|
|
self.log_event(
|
|
"error", "streaming_error", "Cannot start streaming: PLC not connected"
|
|
)
|
|
return False
|
|
|
|
if not self.datasets:
|
|
self.log_event(
|
|
"error",
|
|
"streaming_error",
|
|
"Cannot start streaming: No datasets configured",
|
|
)
|
|
return False
|
|
|
|
if not self.setup_udp_socket():
|
|
self.log_event(
|
|
"error",
|
|
"streaming_error",
|
|
"Cannot start streaming: UDP socket setup failed",
|
|
)
|
|
return False
|
|
|
|
# Activate all datasets that have variables
|
|
activated_count = 0
|
|
for dataset_id, dataset_info in self.datasets.items():
|
|
if dataset_info.get("variables"):
|
|
try:
|
|
self.activate_dataset(dataset_id)
|
|
activated_count += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to activate dataset {dataset_id}: {e}")
|
|
|
|
if activated_count == 0:
|
|
self.log_event(
|
|
"error",
|
|
"streaming_error",
|
|
"Cannot start streaming: No datasets with variables configured",
|
|
)
|
|
return False
|
|
|
|
self.streaming = True
|
|
self.save_system_state()
|
|
|
|
self.log_event(
|
|
"info",
|
|
"streaming_started",
|
|
f"Multi-dataset streaming started: {activated_count} datasets activated",
|
|
{
|
|
"activated_datasets": activated_count,
|
|
"total_datasets": len(self.datasets),
|
|
"udp_host": self.udp_config["host"],
|
|
"udp_port": self.udp_config["port"],
|
|
},
|
|
)
|
|
return True
|
|
|
|
def stop_streaming(self):
|
|
"""Stop streaming - deactivates all active datasets"""
|
|
self.streaming = False
|
|
|
|
# Stop all dataset streaming threads
|
|
active_datasets_copy = self.active_datasets.copy()
|
|
for dataset_id in active_datasets_copy:
|
|
try:
|
|
self.deactivate_dataset(dataset_id)
|
|
except Exception as e:
|
|
self.logger.warning(f"Error deactivating dataset {dataset_id}: {e}")
|
|
|
|
# Close UDP socket
|
|
if self.udp_socket:
|
|
self.udp_socket.close()
|
|
self.udp_socket = None
|
|
|
|
self.save_system_state()
|
|
|
|
datasets_stopped = len(active_datasets_copy)
|
|
self.log_event(
|
|
"info",
|
|
"streaming_stopped",
|
|
f"Multi-dataset streaming stopped: {datasets_stopped} datasets deactivated",
|
|
)
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current system status"""
|
|
total_variables = sum(
|
|
len(dataset["variables"]) for dataset in self.datasets.values()
|
|
)
|
|
|
|
# Count only variables that are in streaming_variables list AND have streaming=true
|
|
total_streaming_vars = 0
|
|
for dataset in self.datasets.values():
|
|
streaming_vars = dataset.get("streaming_variables", [])
|
|
variables_config = dataset.get("variables", {})
|
|
active_streaming_vars = [
|
|
var
|
|
for var in streaming_vars
|
|
if variables_config.get(var, {}).get("streaming", False)
|
|
]
|
|
total_streaming_vars += len(active_streaming_vars)
|
|
|
|
return {
|
|
"plc_connected": self.connected,
|
|
"streaming": self.streaming,
|
|
"plc_config": self.plc_config,
|
|
"udp_config": self.udp_config,
|
|
"datasets_count": len(self.datasets),
|
|
"active_datasets_count": len(self.active_datasets),
|
|
"total_variables": total_variables,
|
|
"total_streaming_variables": total_streaming_vars,
|
|
"streaming_variables_count": total_streaming_vars, # Add this for frontend compatibility
|
|
"sampling_interval": self.sampling_interval,
|
|
"current_dataset_id": self.current_dataset_id,
|
|
"datasets": {
|
|
dataset_id: {
|
|
"name": info["name"],
|
|
"prefix": info["prefix"],
|
|
"variables_count": len(info["variables"]),
|
|
"streaming_count": len(
|
|
[
|
|
var
|
|
for var in info.get("streaming_variables", [])
|
|
if info.get("variables", {})
|
|
.get(var, {})
|
|
.get("streaming", False)
|
|
]
|
|
),
|
|
"sampling_interval": info.get("sampling_interval"),
|
|
"enabled": info.get("enabled", False),
|
|
"active": dataset_id in self.active_datasets,
|
|
}
|
|
for dataset_id, info in self.datasets.items()
|
|
},
|
|
}
|
|
|
|
def log_event(
|
|
self, level: str, event_type: str, message: str, details: Dict[str, Any] = None
|
|
):
|
|
"""Add an event to the persistent log"""
|
|
try:
|
|
event = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"level": level, # info, warning, error
|
|
"event_type": event_type, # connection, disconnection, error, config_change, etc.
|
|
"message": message,
|
|
"details": details or {},
|
|
}
|
|
|
|
self.events_log.append(event)
|
|
|
|
# Limit log size
|
|
if len(self.events_log) > self.max_log_entries:
|
|
self.events_log = self.events_log[-self.max_log_entries :]
|
|
|
|
# Save to file
|
|
self.save_events_log()
|
|
|
|
# Also log to regular logger
|
|
if level == "error":
|
|
self.logger.error(f"[{event_type}] {message}")
|
|
elif level == "warning":
|
|
self.logger.warning(f"[{event_type}] {message}")
|
|
else:
|
|
self.logger.info(f"[{event_type}] {message}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error adding event to log: {e}")
|
|
|
|
def load_events_log(self):
|
|
"""Load events log from JSON file"""
|
|
try:
|
|
if os.path.exists(self.events_log_file):
|
|
with open(self.events_log_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
self.events_log = data.get("events", [])
|
|
# Limit log size on load
|
|
if len(self.events_log) > self.max_log_entries:
|
|
self.events_log = self.events_log[-self.max_log_entries :]
|
|
self.logger.info(f"Events log loaded: {len(self.events_log)} entries")
|
|
else:
|
|
self.events_log = []
|
|
self.logger.info("No events log file found, starting with empty log")
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading events log: {e}")
|
|
self.events_log = []
|
|
|
|
def save_events_log(self):
|
|
"""Save events log to JSON file"""
|
|
try:
|
|
log_data = {
|
|
"events": self.events_log,
|
|
"last_updated": datetime.now().isoformat(),
|
|
"total_entries": len(self.events_log),
|
|
}
|
|
with open(self.events_log_file, "w", encoding="utf-8") as f:
|
|
json.dump(log_data, f, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving events log: {e}")
|
|
|
|
def get_recent_events(self, limit: int = 50) -> List[Dict[str, Any]]:
|
|
"""Get recent events from the log"""
|
|
return self.events_log[-limit:] if self.events_log else []
|
|
|
|
|
|
# Global streamer instance (will be initialized in main)
|
|
streamer = None
|
|
|
|
|
|
def check_streamer_initialized():
|
|
"""Check if streamer is initialized, return error response if not"""
|
|
if streamer is None:
|
|
return jsonify({"error": "Application not initialized"}), 503
|
|
return None
|
|
|
|
|
|
@app.route("/images/<filename>")
|
|
def serve_image(filename):
|
|
"""Serve images from .images directory"""
|
|
return send_from_directory(".images", filename)
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
"""Main page"""
|
|
if streamer is None:
|
|
return "Application not initialized", 503
|
|
|
|
# Get variables for the current dataset or empty dict if no current dataset
|
|
current_variables = {}
|
|
if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets:
|
|
current_variables = streamer.datasets[streamer.current_dataset_id].get(
|
|
"variables", {}
|
|
)
|
|
|
|
return render_template(
|
|
"index.html",
|
|
status=streamer.get_status(),
|
|
variables=current_variables,
|
|
datasets=streamer.datasets,
|
|
current_dataset_id=streamer.current_dataset_id,
|
|
)
|
|
|
|
|
|
@app.route("/api/plc/config", methods=["POST"])
|
|
def update_plc_config():
|
|
"""Update PLC configuration"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
data = request.get_json()
|
|
ip = data.get("ip", "10.1.33.11")
|
|
rack = int(data.get("rack", 0))
|
|
slot = int(data.get("slot", 2))
|
|
|
|
streamer.update_plc_config(ip, rack, slot)
|
|
return jsonify({"success": True, "message": "PLC configuration updated"})
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/udp/config", methods=["POST"])
|
|
def update_udp_config():
|
|
"""Update UDP configuration"""
|
|
try:
|
|
data = request.get_json()
|
|
host = data.get("host", "127.0.0.1")
|
|
port = int(data.get("port", 9870))
|
|
|
|
streamer.update_udp_config(host, port)
|
|
return jsonify({"success": True, "message": "UDP configuration updated"})
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/plc/connect", methods=["POST"])
|
|
def connect_plc():
|
|
"""Connect to PLC"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
if streamer.connect_plc():
|
|
return jsonify({"success": True, "message": "Connected to PLC"})
|
|
else:
|
|
return jsonify({"success": False, "message": "Error connecting to PLC"}), 500
|
|
|
|
|
|
@app.route("/api/plc/disconnect", methods=["POST"])
|
|
def disconnect_plc():
|
|
"""Disconnect from PLC"""
|
|
streamer.stop_streaming()
|
|
streamer.disconnect_plc()
|
|
return jsonify({"success": True, "message": "Disconnected from PLC"})
|
|
|
|
|
|
@app.route("/api/variables", methods=["POST"])
|
|
def add_variable():
|
|
"""Add a new variable"""
|
|
try:
|
|
data = request.get_json()
|
|
name = data.get("name")
|
|
area = data.get("area")
|
|
db = int(data.get("db"))
|
|
offset = int(data.get("offset"))
|
|
var_type = data.get("type")
|
|
bit = data.get("bit") # Added bit parameter
|
|
|
|
valid_types = [
|
|
"real",
|
|
"int",
|
|
"bool",
|
|
"dint",
|
|
"word",
|
|
"byte",
|
|
"uint",
|
|
"udint",
|
|
"sint",
|
|
"usint",
|
|
]
|
|
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
|
|
|
|
if (
|
|
not name
|
|
or not area
|
|
or var_type not in valid_types
|
|
or area.lower() not in valid_areas
|
|
):
|
|
return jsonify({"success": False, "message": "Invalid data"}), 400
|
|
|
|
if area.lower() in ["e", "a", "mb"] and bit is None:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Bit position must be specified for bit areas",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Bit position must be between 0 and 7",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
streamer.add_variable(name, area, db, offset, var_type, bit)
|
|
return jsonify({"success": True, "message": f"Variable {name} added"})
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/variables/<name>", methods=["DELETE"])
|
|
def remove_variable(name):
|
|
"""Remove a variable"""
|
|
streamer.remove_variable(name)
|
|
return jsonify({"success": True, "message": f"Variable {name} removed"})
|
|
|
|
|
|
@app.route("/api/variables/<name>", methods=["GET"])
|
|
def get_variable(name):
|
|
"""Get a specific variable configuration from current dataset"""
|
|
try:
|
|
if not streamer.current_dataset_id:
|
|
return (
|
|
jsonify({"success": False, "message": "No dataset selected"}),
|
|
400,
|
|
)
|
|
|
|
current_variables = streamer.get_dataset_variables(streamer.current_dataset_id)
|
|
if name not in current_variables:
|
|
return (
|
|
jsonify({"success": False, "message": f"Variable {name} not found"}),
|
|
404,
|
|
)
|
|
|
|
var_config = current_variables[name].copy()
|
|
var_config["name"] = name
|
|
|
|
# Check if variable is in streaming list for current dataset
|
|
streaming_vars = streamer.datasets[streamer.current_dataset_id].get(
|
|
"streaming_variables", []
|
|
)
|
|
var_config["streaming"] = name in streaming_vars
|
|
|
|
return jsonify({"success": True, "variable": var_config})
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/variables/<name>", methods=["PUT"])
|
|
def update_variable(name):
|
|
"""Update an existing variable in current dataset"""
|
|
try:
|
|
if not streamer.current_dataset_id:
|
|
return jsonify({"success": False, "message": "No dataset selected"}), 400
|
|
|
|
data = request.get_json()
|
|
new_name = data.get("name", name)
|
|
area = data.get("area")
|
|
db = int(data.get("db", 1))
|
|
offset = int(data.get("offset"))
|
|
var_type = data.get("type")
|
|
bit = data.get("bit")
|
|
|
|
valid_types = [
|
|
"real",
|
|
"int",
|
|
"bool",
|
|
"dint",
|
|
"word",
|
|
"byte",
|
|
"uint",
|
|
"udint",
|
|
"sint",
|
|
"usint",
|
|
]
|
|
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
|
|
|
|
if (
|
|
not new_name
|
|
or not area
|
|
or var_type not in valid_types
|
|
or area.lower() not in valid_areas
|
|
):
|
|
return jsonify({"success": False, "message": "Invalid data"}), 400
|
|
|
|
if area.lower() in ["e", "a", "mb"] and bit is None:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Bit position must be specified for bit areas",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Bit position must be between 0 and 7",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
current_dataset_id = streamer.current_dataset_id
|
|
current_variables = streamer.get_dataset_variables(current_dataset_id)
|
|
|
|
# Check if variable exists in current dataset
|
|
if name not in current_variables:
|
|
return (
|
|
jsonify({"success": False, "message": f"Variable {name} not found"}),
|
|
404,
|
|
)
|
|
|
|
# Check if new name already exists (if name is changing)
|
|
if name != new_name and new_name in current_variables:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": f"Variable {new_name} already exists",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
# Preserve streaming state
|
|
streaming_vars = streamer.datasets[current_dataset_id].get(
|
|
"streaming_variables", []
|
|
)
|
|
was_streaming = name in streaming_vars
|
|
|
|
# Remove old variable
|
|
streamer.remove_variable_from_dataset(current_dataset_id, name)
|
|
|
|
# Add updated variable
|
|
streamer.add_variable_to_dataset(
|
|
current_dataset_id, new_name, area, db, offset, var_type, bit, was_streaming
|
|
)
|
|
|
|
return jsonify({"success": True, "message": f"Variable updated successfully"})
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/variables/<name>/streaming", methods=["POST"])
|
|
def toggle_variable_streaming(name):
|
|
"""Toggle streaming for a specific variable in current dataset"""
|
|
try:
|
|
if not streamer.current_dataset_id:
|
|
return jsonify({"success": False, "message": "No dataset selected"}), 400
|
|
|
|
data = request.get_json()
|
|
enabled = data.get("enabled", False)
|
|
|
|
# Use the new dataset-specific method
|
|
streamer.toggle_variable_streaming(streamer.current_dataset_id, name, enabled)
|
|
|
|
status = "enabled" if enabled else "disabled"
|
|
return jsonify(
|
|
{"success": True, "message": f"Variable {name} streaming {status}"}
|
|
)
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/variables/streaming", methods=["GET"])
|
|
def get_streaming_variables():
|
|
"""Get list of variables enabled for streaming in current dataset"""
|
|
if streamer is None:
|
|
return jsonify({"success": False, "error": "Streamer not initialized"}), 503
|
|
|
|
# Get streaming variables from current dataset
|
|
streaming_vars = []
|
|
if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets:
|
|
streaming_vars = streamer.datasets[streamer.current_dataset_id].get(
|
|
"streaming_variables", []
|
|
)
|
|
|
|
return jsonify({"success": True, "streaming_variables": streaming_vars})
|
|
|
|
|
|
# Dataset Management API Endpoints
|
|
|
|
|
|
@app.route("/api/datasets", methods=["GET"])
|
|
def get_datasets():
|
|
"""Get all datasets"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"datasets": streamer.datasets,
|
|
"active_datasets": list(streamer.active_datasets),
|
|
"current_dataset_id": streamer.current_dataset_id,
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/api/datasets", methods=["POST"])
|
|
def create_dataset():
|
|
"""Create a new dataset"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
data = request.get_json()
|
|
dataset_id = data.get("dataset_id", "").strip()
|
|
name = data.get("name", "").strip()
|
|
prefix = data.get("prefix", "").strip()
|
|
sampling_interval = data.get("sampling_interval")
|
|
|
|
if not dataset_id or not name or not prefix:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Dataset ID, name, and prefix are required",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
# Validate sampling interval if provided
|
|
if sampling_interval is not None:
|
|
sampling_interval = float(sampling_interval)
|
|
if sampling_interval < 0.01:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Sampling interval must be at least 0.01 seconds",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
streamer.create_dataset(dataset_id, name, prefix, sampling_interval)
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": f"Dataset '{name}' created successfully",
|
|
"dataset_id": dataset_id,
|
|
}
|
|
)
|
|
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
except Exception as e:
|
|
return (
|
|
jsonify({"success": False, "message": f"Error creating dataset: {str(e)}"}),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/datasets/<dataset_id>", methods=["DELETE"])
|
|
def delete_dataset(dataset_id):
|
|
"""Delete a dataset"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
streamer.delete_dataset(dataset_id)
|
|
return jsonify({"success": True, "message": "Dataset deleted successfully"})
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 404
|
|
except Exception as e:
|
|
return (
|
|
jsonify({"success": False, "message": f"Error deleting dataset: {str(e)}"}),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/datasets/<dataset_id>/activate", methods=["POST"])
|
|
def activate_dataset(dataset_id):
|
|
"""Activate a dataset for streaming"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
streamer.activate_dataset(dataset_id)
|
|
return jsonify({"success": True, "message": "Dataset activated successfully"})
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 404
|
|
except RuntimeError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
except Exception as e:
|
|
return (
|
|
jsonify(
|
|
{"success": False, "message": f"Error activating dataset: {str(e)}"}
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/datasets/<dataset_id>/deactivate", methods=["POST"])
|
|
def deactivate_dataset(dataset_id):
|
|
"""Deactivate a dataset"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
streamer.deactivate_dataset(dataset_id)
|
|
return jsonify({"success": True, "message": "Dataset deactivated successfully"})
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 404
|
|
except Exception as e:
|
|
return (
|
|
jsonify(
|
|
{"success": False, "message": f"Error deactivating dataset: {str(e)}"}
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/datasets/<dataset_id>/variables", methods=["POST"])
|
|
def add_variable_to_dataset(dataset_id):
|
|
"""Add a variable to a dataset"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
data = request.get_json()
|
|
name = data.get("name", "").strip()
|
|
area = data.get("area", "").strip()
|
|
db = data.get("db", 1)
|
|
offset = data.get("offset", 0)
|
|
var_type = data.get("type", "").strip()
|
|
bit = data.get("bit")
|
|
streaming = data.get("streaming", False)
|
|
|
|
if not name or not area or not var_type:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": "Variable name, area, and type are required",
|
|
}
|
|
),
|
|
400,
|
|
)
|
|
|
|
streamer.add_variable_to_dataset(
|
|
dataset_id, name, area, db, offset, var_type, bit, streaming
|
|
)
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": f"Variable '{name}' added to dataset successfully",
|
|
}
|
|
)
|
|
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
except Exception as e:
|
|
return (
|
|
jsonify({"success": False, "message": f"Error adding variable: {str(e)}"}),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/datasets/<dataset_id>/variables/<variable_name>", methods=["DELETE"])
|
|
def remove_variable_from_dataset(dataset_id, variable_name):
|
|
"""Remove a variable from a dataset"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
streamer.remove_variable_from_dataset(dataset_id, variable_name)
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": f"Variable '{variable_name}' removed from dataset successfully",
|
|
}
|
|
)
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 404
|
|
except Exception as e:
|
|
return (
|
|
jsonify(
|
|
{"success": False, "message": f"Error removing variable: {str(e)}"}
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route(
|
|
"/api/datasets/<dataset_id>/variables/<variable_name>/streaming", methods=["POST"]
|
|
)
|
|
def toggle_variable_streaming_in_dataset(dataset_id, variable_name):
|
|
"""Toggle streaming for a variable in a dataset"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
data = request.get_json()
|
|
enabled = data.get("enabled", False)
|
|
|
|
streamer.toggle_variable_streaming(dataset_id, variable_name, enabled)
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": f"Variable '{variable_name}' streaming {'enabled' if enabled else 'disabled'}",
|
|
}
|
|
)
|
|
except ValueError as e:
|
|
return jsonify({"success": False, "message": str(e)}), 404
|
|
except Exception as e:
|
|
return (
|
|
jsonify(
|
|
{"success": False, "message": f"Error toggling streaming: {str(e)}"}
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/datasets/current", methods=["POST"])
|
|
def set_current_dataset():
|
|
"""Set the current dataset for editing"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
data = request.get_json()
|
|
dataset_id = data.get("dataset_id")
|
|
|
|
if dataset_id and dataset_id in streamer.datasets:
|
|
streamer.current_dataset_id = dataset_id
|
|
streamer.save_datasets()
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": "Current dataset updated successfully",
|
|
"current_dataset_id": dataset_id,
|
|
}
|
|
)
|
|
else:
|
|
return jsonify({"success": False, "message": "Invalid dataset ID"}), 400
|
|
|
|
except Exception as e:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"success": False,
|
|
"message": f"Error setting current dataset: {str(e)}",
|
|
}
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/streaming/start", methods=["POST"])
|
|
def start_streaming():
|
|
"""Start streaming"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
if streamer.start_streaming():
|
|
return jsonify({"success": True, "message": "Streaming started"})
|
|
else:
|
|
return jsonify({"success": False, "message": "Error starting streaming"}), 500
|
|
|
|
|
|
@app.route("/api/streaming/stop", methods=["POST"])
|
|
def stop_streaming():
|
|
"""Stop streaming"""
|
|
streamer.stop_streaming()
|
|
return jsonify({"success": True, "message": "Streaming stopped"})
|
|
|
|
|
|
@app.route("/api/sampling", methods=["POST"])
|
|
def update_sampling():
|
|
"""Update sampling interval"""
|
|
try:
|
|
data = request.get_json()
|
|
interval = float(data.get("interval", 0.1))
|
|
|
|
if interval < 0.01:
|
|
interval = 0.01
|
|
|
|
streamer.update_sampling_interval(interval)
|
|
return jsonify({"success": True, "message": f"Interval updated to {interval}s"})
|
|
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 400
|
|
|
|
|
|
@app.route("/api/csv/start", methods=["POST"])
|
|
def start_csv_recording():
|
|
"""Start CSV recording independently"""
|
|
if streamer.start_csv_recording():
|
|
return jsonify({"success": True, "message": "CSV recording started"})
|
|
else:
|
|
return (
|
|
jsonify({"success": False, "message": "Error starting CSV recording"}),
|
|
500,
|
|
)
|
|
|
|
|
|
@app.route("/api/csv/stop", methods=["POST"])
|
|
def stop_csv_recording():
|
|
"""Stop CSV recording independently"""
|
|
streamer.stop_csv_recording()
|
|
return jsonify({"success": True, "message": "CSV recording stopped"})
|
|
|
|
|
|
@app.route("/api/status")
|
|
def get_status():
|
|
"""Get current status"""
|
|
if streamer is None:
|
|
return jsonify({"error": "Application not initialized"}), 503
|
|
return jsonify(streamer.get_status())
|
|
|
|
|
|
@app.route("/api/events")
|
|
def get_events():
|
|
"""Get recent events from the application log"""
|
|
error_response = check_streamer_initialized()
|
|
if error_response:
|
|
return error_response
|
|
|
|
try:
|
|
limit = request.args.get("limit", 50, type=int)
|
|
limit = min(limit, 200) # Maximum 200 events per request
|
|
|
|
events = streamer.get_recent_events(limit)
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"events": events,
|
|
"total_events": len(streamer.events_log),
|
|
"showing": len(events),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
def graceful_shutdown():
|
|
"""Perform graceful shutdown"""
|
|
print("\n⏹️ Performing graceful shutdown...")
|
|
try:
|
|
streamer.stop_streaming()
|
|
streamer.disconnect_plc()
|
|
streamer.release_instance_lock()
|
|
print("✅ Shutdown completed successfully")
|
|
except Exception as e:
|
|
print(f"⚠️ Error during shutdown: {e}")
|
|
|
|
|
|
def main():
|
|
"""Main application entry point with error handling and recovery"""
|
|
max_retries = 3
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
try:
|
|
# Create templates directory if it doesn't exist
|
|
os.makedirs("templates", exist_ok=True)
|
|
|
|
print("🚀 Starting Flask server for PLC S7-315 Streamer")
|
|
print("📊 Web interface available at: http://localhost:5050")
|
|
print("🔧 Configure your PLC and variables through the web interface")
|
|
|
|
# Initialize streamer (this will handle instance locking and auto-recovery)
|
|
global streamer
|
|
|
|
# Start Flask application
|
|
app.run(debug=False, host="0.0.0.0", port=5050, use_reloader=False)
|
|
|
|
# If we reach here, the server stopped normally
|
|
break
|
|
|
|
except RuntimeError as e:
|
|
if "Another instance" in str(e):
|
|
print(f"❌ {e}")
|
|
print("💡 Tip: Stop the other instance or wait for it to finish")
|
|
sys.exit(1)
|
|
else:
|
|
print(f"⚠️ Runtime error: {e}")
|
|
retry_count += 1
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n⏸️ Received interrupt signal...")
|
|
graceful_shutdown()
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f"💥 Unexpected error: {e}")
|
|
retry_count += 1
|
|
|
|
if retry_count < max_retries:
|
|
print(f"🔄 Attempting restart ({retry_count}/{max_retries})...")
|
|
time.sleep(2) # Wait before retry
|
|
else:
|
|
print("❌ Maximum retries reached. Exiting...")
|
|
graceful_shutdown()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
# Initialize streamer instance
|
|
streamer = PLCDataStreamer()
|
|
main()
|
|
except Exception as e:
|
|
print(f"💥 Critical error during initialization: {e}")
|
|
sys.exit(1)
|