S7_snap7_Stremer_n_Recorder/main.py

1668 lines
59 KiB
Python

from flask import (
Flask,
render_template,
request,
jsonify,
redirect,
url_for,
send_from_directory,
)
import snap7
import snap7.util
import json
import socket
import time
import logging
import threading
from datetime import datetime
from typing import Dict, Any, Optional, List
import struct
import os
import csv
from pathlib import Path
import atexit
import psutil
import sys
app = Flask(__name__)
app.secret_key = "plc_streamer_secret_key"
class PLCDataStreamer:
def __init__(self):
"""Initialize the PLC data streamer"""
# Configuration file paths
self.config_file = "plc_config.json"
self.variables_file = "plc_variables.json"
self.state_file = "system_state.json"
self.events_log_file = "application_events.json"
# Default configuration
self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2}
self.udp_config = {"host": "127.0.0.1", "port": 9870}
# Configurable variables
self.variables = {}
self.streaming_variables = set() # Variables selected for streaming
# CSV recording settings
self.csv_recording = False
self.csv_record_thread = None
self.current_csv_file = None
self.current_csv_writer = None
self.current_hour = None
self.csv_headers_written = False
self.using_modification_file = (
False # Flag to track if using modification timestamp file
)
# System states
self.plc = None
self.udp_socket = None
self.connected = False
self.streaming = False
self.stream_thread = None
self.sampling_interval = 0.1
# Auto-recovery settings
self.auto_recovery_enabled = True
self.last_state = {
"should_connect": False,
"should_stream": False,
"should_record_csv": False,
}
# Single instance control
self.lock_file = "plc_streamer.lock"
self.lock_fd = None
# Events log for persistent logging
self.events_log = []
self.max_log_entries = 1000 # Maximum number of log entries to keep
# Setup logging first
self.setup_logging()
# Load configuration from files
self.load_configuration()
self.load_variables()
self.load_system_state()
self.load_events_log()
# Acquire instance lock and attempt auto-recovery
if self.acquire_instance_lock():
# Small delay to ensure previous instance has fully cleaned up
time.sleep(1)
self.log_event(
"info",
"Application started",
"Application initialization completed successfully",
)
self.attempt_auto_recovery()
else:
raise RuntimeError("Another instance of the application is already running")
def setup_logging(self):
"""Configure the logging system"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()],
)
self.logger = logging.getLogger(__name__)
def load_configuration(self):
"""Load PLC and UDP configuration from JSON file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
config = json.load(f)
self.plc_config = config.get("plc_config", self.plc_config)
self.udp_config = config.get("udp_config", self.udp_config)
self.sampling_interval = config.get(
"sampling_interval", self.sampling_interval
)
self.logger.info(f"Configuration loaded from {self.config_file}")
else:
self.logger.info("No configuration file found, using defaults")
except Exception as e:
self.logger.error(f"Error loading configuration: {e}")
def save_configuration(self):
"""Save PLC and UDP configuration to JSON file"""
try:
config = {
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"sampling_interval": self.sampling_interval,
}
with open(self.config_file, "w") as f:
json.dump(config, f, indent=4)
self.logger.info(f"Configuration saved to {self.config_file}")
except Exception as e:
self.logger.error(f"Error saving configuration: {e}")
def load_variables(self):
"""Load variables configuration from JSON file"""
try:
if os.path.exists(self.variables_file):
with open(self.variables_file, "r") as f:
self.variables = json.load(f)
# Load streaming configuration
self.streaming_variables.clear()
for var_name, var_config in self.variables.items():
# If streaming property doesn't exist, default to False for backward compatibility
if var_config.get("streaming", False):
self.streaming_variables.add(var_name)
self.logger.info(
f"Variables loaded from {self.variables_file}: {len(self.variables)} variables, {len(self.streaming_variables)} enabled for streaming"
)
else:
self.logger.info(
"No variables file found, starting with empty variables"
)
except Exception as e:
self.logger.error(f"Error loading variables: {e}")
def load_system_state(self):
"""Load system state from JSON file"""
try:
if os.path.exists(self.state_file):
with open(self.state_file, "r") as f:
state_data = json.load(f)
self.last_state = state_data.get("last_state", self.last_state)
self.auto_recovery_enabled = state_data.get(
"auto_recovery_enabled", True
)
self.logger.info(f"System state loaded from {self.state_file}")
else:
self.logger.info("No system state file found, starting with defaults")
except Exception as e:
self.logger.error(f"Error loading system state: {e}")
def save_system_state(self):
"""Save current system state to JSON file"""
try:
state_data = {
"last_state": {
"should_connect": self.connected,
"should_stream": self.streaming,
"should_record_csv": self.csv_recording,
},
"auto_recovery_enabled": self.auto_recovery_enabled,
"last_update": datetime.now().isoformat(),
}
with open(self.state_file, "w") as f:
json.dump(state_data, f, indent=4)
self.logger.debug("System state saved")
except Exception as e:
self.logger.error(f"Error saving system state: {e}")
def attempt_auto_recovery(self):
"""Attempt to restore previous system state"""
if not self.auto_recovery_enabled:
self.logger.info("Auto-recovery disabled, skipping state restoration")
return
self.logger.info("Attempting auto-recovery of previous state...")
# Try to restore connection
if self.last_state.get("should_connect", False):
self.logger.info("Attempting to restore PLC connection...")
if self.connect_plc():
self.logger.info("PLC connection restored successfully")
# Try to restore streaming if connection was successful
if self.last_state.get("should_stream", False):
self.logger.info("Attempting to restore streaming...")
if self.start_streaming():
self.logger.info("Streaming restored successfully")
else:
self.logger.warning("Failed to restore streaming")
# Try to restore CSV recording if needed
elif self.last_state.get("should_record_csv", False):
self.logger.info("Attempting to restore CSV recording...")
if self.start_csv_recording():
self.logger.info("CSV recording restored successfully")
else:
self.logger.warning("Failed to restore CSV recording")
else:
self.logger.warning("Failed to restore PLC connection")
def acquire_instance_lock(self) -> bool:
"""Acquire lock to ensure single instance execution"""
try:
# Check if lock file exists
if os.path.exists(self.lock_file):
# Read PID from existing lock file
with open(self.lock_file, "r") as f:
try:
old_pid = int(f.read().strip())
# Check if process is still running
if psutil.pid_exists(old_pid):
# Get process info to verify it's our application
try:
proc = psutil.Process(old_pid)
cmdline = " ".join(proc.cmdline())
if "main.py" in cmdline or "plc" in cmdline.lower():
self.logger.error(
f"Another instance is already running (PID: {old_pid})"
)
return False
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process doesn't exist or can't access, continue
pass
# Old process is dead, remove stale lock file
os.remove(self.lock_file)
self.logger.info("Removed stale lock file")
except (ValueError, IOError):
# Invalid lock file, remove it
os.remove(self.lock_file)
self.logger.info("Removed invalid lock file")
# Create new lock file with current PID
with open(self.lock_file, "w") as f:
f.write(str(os.getpid()))
# Register cleanup function
atexit.register(self.release_instance_lock)
self.logger.info(
f"Instance lock acquired: {self.lock_file} (PID: {os.getpid()})"
)
return True
except Exception as e:
self.logger.error(f"Error acquiring instance lock: {e}")
return False
def release_instance_lock(self):
"""Release instance lock"""
try:
# Remove lock file
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
self.logger.info("Instance lock released")
except Exception as e:
self.logger.error(f"Error releasing instance lock: {e}")
def save_variables(self):
"""Save variables configuration to JSON file"""
try:
# Update streaming state in variables before saving
for var_name in self.variables:
self.variables[var_name]["streaming"] = (
var_name in self.streaming_variables
)
with open(self.variables_file, "w") as f:
json.dump(self.variables, f, indent=4)
self.logger.info(f"Variables saved to {self.variables_file}")
except Exception as e:
self.logger.error(f"Error saving variables: {e}")
def update_plc_config(self, ip: str, rack: int, slot: int):
"""Update PLC configuration"""
old_config = self.plc_config.copy()
self.plc_config = {"ip": ip, "rack": rack, "slot": slot}
self.save_configuration()
config_details = {"old_config": old_config, "new_config": self.plc_config}
self.log_event(
"info",
"config_change",
f"PLC configuration updated: {ip}:{rack}/{slot}",
config_details,
)
def update_udp_config(self, host: str, port: int):
"""Update UDP configuration"""
old_config = self.udp_config.copy()
self.udp_config = {"host": host, "port": port}
self.save_configuration()
config_details = {"old_config": old_config, "new_config": self.udp_config}
self.log_event(
"info",
"config_change",
f"UDP configuration updated: {host}:{port}",
config_details,
)
def update_sampling_interval(self, interval: float):
"""Update sampling interval"""
old_interval = self.sampling_interval
self.sampling_interval = interval
self.save_configuration()
config_details = {"old_interval": old_interval, "new_interval": interval}
self.log_event(
"info",
"config_change",
f"Sampling interval updated: {interval}s",
config_details,
)
def add_variable(
self, name: str, area: str, db: int, offset: int, var_type: str, bit: int = None
):
"""Add a variable for polling"""
area = area.lower()
# Validate area type - ahora incluye áreas de bits individuales
if area not in ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]:
raise ValueError(
f"Unsupported area type: {area}. Supported: db, mw, m, pew, pe, paw, pa, e, a, mb"
)
# Validate data type
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
if var_type not in valid_types:
raise ValueError(
f"Invalid data type: {var_type}. Supported: {', '.join(valid_types)}"
)
# Para áreas de bits individuales, el tipo debe ser bool y bit debe estar especificado
if area in ["e", "a", "mb"] and var_type != "bool":
raise ValueError(f"For bit areas ({area}), data type must be 'bool'")
if area in ["e", "a", "mb"] and bit is None:
raise ValueError(
f"For bit areas ({area}), bit position must be specified (0-7)"
)
if bit is not None and (bit < 0 or bit > 7):
raise ValueError("Bit position must be between 0 and 7")
# Create variable configuration
var_config = {
"area": area,
"offset": offset,
"type": var_type,
"streaming": False,
}
# Add DB number only for DB area
if area == "db":
var_config["db"] = db
# Add bit position for bit areas
if area in ["e", "a", "mb"]:
var_config["bit"] = bit
self.variables[name] = var_config
self.save_variables()
variable_details = {
"name": name,
"area": area,
"db": db if area == "db" else None,
"offset": offset,
"bit": bit,
"type": var_type,
"total_variables": len(self.variables),
}
# Updated area description to include bit addresses
area_description = {
"db": f"DB{db}.{offset}",
"mw": f"MW{offset}",
"m": f"M{offset}",
"pew": f"PEW{offset}",
"pe": f"PE{offset}",
"paw": f"PAW{offset}",
"pa": f"PA{offset}",
"e": f"E{offset}.{bit}",
"a": f"A{offset}.{bit}",
"mb": f"M{offset}.{bit}",
}
self.log_event(
"info",
"variable_added",
f"Variable added: {name} -> {area_description[area]} ({var_type})",
variable_details,
)
self.create_new_csv_file_for_variable_modification()
def remove_variable(self, name: str):
"""Remove a variable from polling"""
if name in self.variables:
var_config = self.variables[name].copy()
del self.variables[name]
# Also remove from streaming variables if present
self.streaming_variables.discard(name)
self.save_variables()
variable_details = {
"name": name,
"removed_config": var_config,
"total_variables": len(self.variables),
}
self.log_event(
"info",
"variable_removed",
f"Variable removed: {name}",
variable_details,
)
self.create_new_csv_file_for_variable_modification()
def toggle_streaming_variable(self, name: str, enabled: bool):
"""Enable or disable a variable for streaming"""
if name in self.variables:
if enabled:
self.streaming_variables.add(name)
else:
self.streaming_variables.discard(name)
# Save changes to persist streaming configuration
self.save_variables()
self.logger.info(
f"Variable {name} streaming: {'enabled' if enabled else 'disabled'}"
)
def get_csv_directory_path(self) -> str:
"""Get the directory path for current day's CSV files"""
now = datetime.now()
day_folder = now.strftime("%d-%m-%Y")
return os.path.join("records", day_folder)
def get_csv_file_path(self, use_modification_timestamp: bool = False) -> str:
"""Get the complete file path for current hour's CSV file"""
now = datetime.now()
if use_modification_timestamp:
# Create filename with hour_min_sec format for variable modifications
time_suffix = now.strftime("%H_%M_%S")
filename = f"_{time_suffix}.csv"
else:
# Standard hourly format
hour = now.strftime("%H")
filename = f"{hour}.csv"
directory = self.get_csv_directory_path()
return os.path.join(directory, filename)
def ensure_csv_directory(self):
"""Create CSV directory structure if it doesn't exist"""
directory = self.get_csv_directory_path()
Path(directory).mkdir(parents=True, exist_ok=True)
def create_new_csv_file_for_variable_modification(self):
"""Create a new CSV file when variables are modified during active recording"""
if not self.csv_recording:
return
try:
# Close current file if open
if self.current_csv_file:
self.current_csv_file.close()
self.logger.info(
f"Closed previous CSV file due to variable modification"
)
# Create new file with modification timestamp
self.ensure_csv_directory()
csv_path = self.get_csv_file_path(use_modification_timestamp=True)
self.current_csv_file = open(csv_path, "w", newline="", encoding="utf-8")
self.current_csv_writer = csv.writer(self.current_csv_file)
# Mark that we're using a modification file and set current hour
self.using_modification_file = True
self.current_hour = datetime.now().hour
# Write headers with new variable configuration
if self.variables:
headers = ["timestamp"] + list(self.variables.keys())
self.current_csv_writer.writerow(headers)
self.current_csv_file.flush()
self.csv_headers_written = True
self.logger.info(
f"New CSV file created after variable modification: {csv_path}"
)
self.log_event(
"info",
"csv_file_created",
f"New CSV file created after variable modification: {os.path.basename(csv_path)}",
{
"file_path": csv_path,
"variables_count": len(self.variables),
"reason": "variable_modification",
},
)
except Exception as e:
self.logger.error(
f"Error creating new CSV file after variable modification: {e}"
)
self.log_event(
"error",
"csv_error",
f"Failed to create new CSV file after variable modification: {str(e)}",
{"error": str(e)},
)
def setup_csv_file(self):
"""Setup CSV file for the current hour"""
current_hour = datetime.now().hour
# If we're using a modification file and the hour hasn't changed, keep using it
if (
self.using_modification_file
and self.current_hour == current_hour
and self.current_csv_file is not None
):
return
# Check if we need to create a new file
if self.current_hour != current_hour or self.current_csv_file is None:
# Close previous file if open
if self.current_csv_file:
self.current_csv_file.close()
# Create directory and file for current hour
self.ensure_csv_directory()
csv_path = self.get_csv_file_path()
# Check if file exists to determine if we need headers
file_exists = os.path.exists(csv_path)
self.current_csv_file = open(csv_path, "a", newline="", encoding="utf-8")
self.current_csv_writer = csv.writer(self.current_csv_file)
self.current_hour = current_hour
# Reset modification file flag when creating regular hourly file
self.using_modification_file = False
# Write headers if it's a new file
if not file_exists and self.variables:
headers = ["timestamp"] + list(self.variables.keys())
self.current_csv_writer.writerow(headers)
self.current_csv_file.flush()
self.csv_headers_written = True
self.logger.info(f"CSV file created: {csv_path}")
def write_csv_data(self, data: Dict[str, Any]):
"""Write data to CSV file"""
if not self.csv_recording or not self.variables:
return
try:
self.setup_csv_file()
if self.current_csv_writer:
# Create timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Create row with all variables (use None for missing values)
row = [timestamp]
for var_name in self.variables.keys():
row.append(data.get(var_name, None))
self.current_csv_writer.writerow(row)
self.current_csv_file.flush()
except Exception as e:
self.logger.error(f"Error writing CSV data: {e}")
def get_streaming_data(self, all_data: Dict[str, Any]) -> Dict[str, Any]:
"""Filter data for streaming based on selected variables"""
if not self.streaming_variables:
return all_data
return {
name: value
for name, value in all_data.items()
if name in self.streaming_variables
}
def start_csv_recording(self) -> bool:
"""Start CSV recording"""
if not self.connected:
self.log_event(
"error", "csv_error", "Cannot start CSV recording: PLC not connected"
)
return False
if not self.variables:
self.log_event(
"error",
"csv_error",
"Cannot start CSV recording: No variables configured",
)
return False
self.csv_recording = True
self.save_system_state()
csv_details = {
"variables_count": len(self.variables),
"output_directory": self.get_csv_directory_path(),
}
self.log_event(
"info",
"csv_started",
f"CSV recording started for {len(self.variables)} variables",
csv_details,
)
return True
def stop_csv_recording(self):
"""Stop CSV recording"""
self.csv_recording = False
if self.current_csv_file:
self.current_csv_file.close()
self.current_csv_file = None
self.current_csv_writer = None
self.current_hour = None
self.using_modification_file = False
self.save_system_state()
self.log_event("info", "csv_stopped", "CSV recording stopped")
def connect_plc(self) -> bool:
"""Connect to S7-315 PLC"""
try:
if self.plc:
self.plc.disconnect()
self.plc = snap7.client.Client()
self.plc.connect(
self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"]
)
self.connected = True
self.save_system_state()
connection_details = {
"ip": self.plc_config["ip"],
"rack": self.plc_config["rack"],
"slot": self.plc_config["slot"],
}
self.log_event(
"info",
"plc_connection",
f"Successfully connected to PLC {self.plc_config['ip']}",
connection_details,
)
return True
except Exception as e:
self.connected = False
error_details = {
"ip": self.plc_config["ip"],
"rack": self.plc_config["rack"],
"slot": self.plc_config["slot"],
"error": str(e),
}
self.log_event(
"error",
"plc_connection_failed",
f"Failed to connect to PLC {self.plc_config['ip']}: {str(e)}",
error_details,
)
return False
def disconnect_plc(self):
"""Disconnect from PLC"""
try:
if self.plc:
self.plc.disconnect()
self.connected = False
self.save_system_state()
self.log_event(
"info",
"plc_disconnection",
f"Disconnected from PLC {self.plc_config['ip']}",
)
except Exception as e:
self.log_event(
"error",
"plc_disconnection_error",
f"Error disconnecting from PLC: {str(e)}",
{"error": str(e)},
)
def read_variable(self, var_config: Dict[str, Any]) -> Any:
"""Read a specific variable from the PLC"""
try:
area_type = var_config.get("area", "db").lower()
offset = var_config["offset"]
var_type = var_config["type"]
bit = var_config.get("bit") # Extract bit position for bit areas
if area_type == "db":
# Data Block access (existing functionality)
db = var_config["db"]
if var_type == "real":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.db_read(db, offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.db_read(db, offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.db_read(db, offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.db_read(db, offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "mw" or area_type == "m":
# Memory Words / Markers access
if var_type == "real":
raw_data = self.plc.mb_read(offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.mb_read(offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.mb_read(offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.mb_read(offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.mb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.mb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.mb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.mb_read(offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.mb_read(offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.mb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "pew" or area_type == "pe":
# Process Input Words access
if var_type == "real":
raw_data = self.plc.eb_read(offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.eb_read(offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.eb_read(offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.eb_read(offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.eb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.eb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.eb_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.eb_read(offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.eb_read(offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.eb_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "paw" or area_type == "pa":
# Process Output Words access
if var_type == "real":
raw_data = self.plc.ab_read(offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.ab_read(offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.ab_read(offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.ab_read(offset, 4)
value = struct.unpack(">l", raw_data)[0]
elif var_type == "word":
raw_data = self.plc.ab_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "byte":
raw_data = self.plc.ab_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
elif var_type == "uint":
raw_data = self.plc.ab_read(offset, 2)
value = struct.unpack(">H", raw_data)[0]
elif var_type == "udint":
raw_data = self.plc.ab_read(offset, 4)
value = struct.unpack(">L", raw_data)[0]
elif var_type == "sint":
raw_data = self.plc.ab_read(offset, 1)
value = struct.unpack(">b", raw_data)[0]
elif var_type == "usint":
raw_data = self.plc.ab_read(offset, 1)
value = struct.unpack(">B", raw_data)[0]
else:
return None
elif area_type == "e":
# Process Input Bits access (E5.1 format)
if var_type == "bool":
raw_data = self.plc.eb_read(offset, 1)
# Use snap7.util.get_bool for proper bit extraction
value = snap7.util.get_bool(raw_data, 0, bit)
else:
return None
elif area_type == "a":
# Process Output Bits access (A3.7 format)
if var_type == "bool":
raw_data = self.plc.ab_read(offset, 1)
# Use snap7.util.get_bool for proper bit extraction
value = snap7.util.get_bool(raw_data, 0, bit)
else:
return None
elif area_type == "mb":
# Memory Bits access (M10.0 format)
if var_type == "bool":
raw_data = self.plc.mb_read(offset, 1)
# Use snap7.util.get_bool for proper bit extraction
value = snap7.util.get_bool(raw_data, 0, bit)
else:
return None
else:
self.logger.error(f"Unsupported area type: {area_type}")
return None
return value
except Exception as e:
self.logger.error(f"Error reading variable: {e}")
return None
def read_all_variables(self) -> Dict[str, Any]:
"""Read all configured variables"""
if not self.connected or not self.plc:
return {}
data = {}
for var_name, var_config in self.variables.items():
value = self.read_variable(var_config)
if value is not None:
data[var_name] = value
return data
def setup_udp_socket(self) -> bool:
"""Setup UDP socket"""
try:
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.logger.info(
f"UDP socket configured for {self.udp_config['host']}:{self.udp_config['port']}"
)
return True
except Exception as e:
self.logger.error(f"Error configuring UDP socket: {e}")
return False
def send_to_plotjuggler(self, data: Dict[str, Any]):
"""Send data to PlotJuggler via UDP JSON"""
if not self.udp_socket:
return
try:
message = {"timestamp": time.time(), "data": data}
json_message = json.dumps(message)
self.udp_socket.sendto(
json_message.encode("utf-8"),
(self.udp_config["host"], self.udp_config["port"]),
)
except Exception as e:
self.logger.error(f"Error sending data to PlotJuggler: {e}")
def streaming_loop(self):
"""Main streaming loop"""
self.logger.info(
f"Starting streaming with interval of {self.sampling_interval}s"
)
consecutive_errors = 0
max_consecutive_errors = 5
while self.streaming:
try:
start_time = time.time()
# Read all variables
all_data = self.read_all_variables()
if all_data:
# Reset error counter on successful read
consecutive_errors = 0
# Write to CSV (all variables)
self.write_csv_data(all_data)
# Get filtered data for streaming
streaming_data = self.get_streaming_data(all_data)
# Send filtered data to PlotJuggler
if streaming_data:
self.send_to_plotjuggler(streaming_data)
# Log data
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.logger.info(
f"[{timestamp}] CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars"
)
else:
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
self.log_event(
"error",
"streaming_error",
f"Multiple consecutive read failures ({consecutive_errors}). Stopping streaming.",
{"consecutive_errors": consecutive_errors},
)
break
# Maintain sampling interval
elapsed = time.time() - start_time
sleep_time = max(0, self.sampling_interval - elapsed)
time.sleep(sleep_time)
except Exception as e:
consecutive_errors += 1
self.log_event(
"error",
"streaming_error",
f"Error in streaming loop: {str(e)}",
{"error": str(e), "consecutive_errors": consecutive_errors},
)
if consecutive_errors >= max_consecutive_errors:
self.log_event(
"error",
"streaming_error",
"Too many consecutive errors. Stopping streaming.",
{"consecutive_errors": consecutive_errors},
)
break
time.sleep(1) # Wait before retry
def start_streaming(self) -> bool:
"""Start data streaming"""
if not self.connected:
self.log_event(
"error", "streaming_error", "Cannot start streaming: PLC not connected"
)
return False
if not self.variables:
self.log_event(
"error",
"streaming_error",
"Cannot start streaming: No variables configured",
)
return False
if not self.setup_udp_socket():
self.log_event(
"error",
"streaming_error",
"Cannot start streaming: UDP socket setup failed",
)
return False
# Start CSV recording automatically
self.start_csv_recording()
self.streaming = True
self.stream_thread = threading.Thread(target=self.streaming_loop)
self.stream_thread.daemon = True
self.stream_thread.start()
self.save_system_state()
streaming_details = {
"variables_count": len(self.variables),
"streaming_variables_count": len(self.streaming_variables),
"sampling_interval": self.sampling_interval,
"udp_host": self.udp_config["host"],
"udp_port": self.udp_config["port"],
}
self.log_event(
"info",
"streaming_started",
f"Streaming started with {len(self.streaming_variables)} variables",
streaming_details,
)
return True
def stop_streaming(self):
"""Stop streaming"""
self.streaming = False
if self.stream_thread:
self.stream_thread.join(timeout=2)
# Stop CSV recording
self.stop_csv_recording()
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = None
self.save_system_state()
self.log_event(
"info", "streaming_stopped", "Data streaming and CSV recording stopped"
)
def get_status(self) -> Dict[str, Any]:
"""Get current system status"""
return {
"plc_connected": self.connected,
"streaming": self.streaming,
"csv_recording": self.csv_recording,
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"variables_count": len(self.variables),
"streaming_variables_count": len(self.streaming_variables),
"sampling_interval": self.sampling_interval,
"current_csv_file": (
self.get_csv_file_path() if self.csv_recording else None
),
}
def log_event(
self, level: str, event_type: str, message: str, details: Dict[str, Any] = None
):
"""Add an event to the persistent log"""
try:
event = {
"timestamp": datetime.now().isoformat(),
"level": level, # info, warning, error
"event_type": event_type, # connection, disconnection, error, config_change, etc.
"message": message,
"details": details or {},
}
self.events_log.append(event)
# Limit log size
if len(self.events_log) > self.max_log_entries:
self.events_log = self.events_log[-self.max_log_entries :]
# Save to file
self.save_events_log()
# Also log to regular logger
if level == "error":
self.logger.error(f"[{event_type}] {message}")
elif level == "warning":
self.logger.warning(f"[{event_type}] {message}")
else:
self.logger.info(f"[{event_type}] {message}")
except Exception as e:
self.logger.error(f"Error adding event to log: {e}")
def load_events_log(self):
"""Load events log from JSON file"""
try:
if os.path.exists(self.events_log_file):
with open(self.events_log_file, "r", encoding="utf-8") as f:
data = json.load(f)
self.events_log = data.get("events", [])
# Limit log size on load
if len(self.events_log) > self.max_log_entries:
self.events_log = self.events_log[-self.max_log_entries :]
self.logger.info(f"Events log loaded: {len(self.events_log)} entries")
else:
self.events_log = []
self.logger.info("No events log file found, starting with empty log")
except Exception as e:
self.logger.error(f"Error loading events log: {e}")
self.events_log = []
def save_events_log(self):
"""Save events log to JSON file"""
try:
log_data = {
"events": self.events_log,
"last_updated": datetime.now().isoformat(),
"total_entries": len(self.events_log),
}
with open(self.events_log_file, "w", encoding="utf-8") as f:
json.dump(log_data, f, indent=2, ensure_ascii=False)
except Exception as e:
self.logger.error(f"Error saving events log: {e}")
def get_recent_events(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent events from the log"""
return self.events_log[-limit:] if self.events_log else []
# Global streamer instance (will be initialized in main)
streamer = None
def check_streamer_initialized():
"""Check if streamer is initialized, return error response if not"""
if streamer is None:
return jsonify({"error": "Application not initialized"}), 503
return None
@app.route("/images/<filename>")
def serve_image(filename):
"""Serve images from .images directory"""
return send_from_directory(".images", filename)
@app.route("/")
def index():
"""Main page"""
if streamer is None:
return "Application not initialized", 503
return render_template(
"index.html", status=streamer.get_status(), variables=streamer.variables
)
@app.route("/api/plc/config", methods=["POST"])
def update_plc_config():
"""Update PLC configuration"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
ip = data.get("ip", "10.1.33.11")
rack = int(data.get("rack", 0))
slot = int(data.get("slot", 2))
streamer.update_plc_config(ip, rack, slot)
return jsonify({"success": True, "message": "PLC configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/udp/config", methods=["POST"])
def update_udp_config():
"""Update UDP configuration"""
try:
data = request.get_json()
host = data.get("host", "127.0.0.1")
port = int(data.get("port", 9870))
streamer.update_udp_config(host, port)
return jsonify({"success": True, "message": "UDP configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/plc/connect", methods=["POST"])
def connect_plc():
"""Connect to PLC"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.connect_plc():
return jsonify({"success": True, "message": "Connected to PLC"})
else:
return jsonify({"success": False, "message": "Error connecting to PLC"}), 500
@app.route("/api/plc/disconnect", methods=["POST"])
def disconnect_plc():
"""Disconnect from PLC"""
streamer.stop_streaming()
streamer.disconnect_plc()
return jsonify({"success": True, "message": "Disconnected from PLC"})
@app.route("/api/variables", methods=["POST"])
def add_variable():
"""Add a new variable"""
try:
data = request.get_json()
name = data.get("name")
area = data.get("area")
db = int(data.get("db"))
offset = int(data.get("offset"))
var_type = data.get("type")
bit = data.get("bit") # Added bit parameter
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
if (
not name
or not area
or var_type not in valid_types
or area.lower() not in valid_areas
):
return jsonify({"success": False, "message": "Invalid data"}), 400
if area.lower() in ["e", "a", "mb"] and bit is None:
return (
jsonify(
{
"success": False,
"message": "Bit position must be specified for bit areas",
}
),
400,
)
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
return (
jsonify(
{
"success": False,
"message": "Bit position must be between 0 and 7",
}
),
400,
)
streamer.add_variable(name, area, db, offset, var_type, bit)
return jsonify({"success": True, "message": f"Variable {name} added"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["DELETE"])
def remove_variable(name):
"""Remove a variable"""
streamer.remove_variable(name)
return jsonify({"success": True, "message": f"Variable {name} removed"})
@app.route("/api/variables/<name>", methods=["GET"])
def get_variable(name):
"""Get a specific variable configuration"""
try:
if name not in streamer.variables:
return (
jsonify({"success": False, "message": f"Variable {name} not found"}),
404,
)
var_config = streamer.variables[name].copy()
var_config["name"] = name
var_config["streaming"] = name in streamer.streaming_variables
return jsonify({"success": True, "variable": var_config})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["PUT"])
def update_variable(name):
"""Update an existing variable"""
try:
data = request.get_json()
new_name = data.get("name", name)
area = data.get("area")
db = int(data.get("db", 1))
offset = int(data.get("offset"))
var_type = data.get("type")
bit = data.get("bit") # Added bit parameter
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
if (
not new_name
or not area
or var_type not in valid_types
or area.lower() not in valid_areas
):
return jsonify({"success": False, "message": "Invalid data"}), 400
if area.lower() in ["e", "a", "mb"] and bit is None:
return (
jsonify(
{
"success": False,
"message": "Bit position must be specified for bit areas",
}
),
400,
)
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
return (
jsonify(
{
"success": False,
"message": "Bit position must be between 0 and 7",
}
),
400,
)
# Remove old variable if name changed
if name != new_name and name in streamer.variables:
# Check if new name already exists
if new_name in streamer.variables:
return (
jsonify(
{
"success": False,
"message": f"Variable {new_name} already exists",
}
),
400,
)
# Preserve streaming state
was_streaming = name in streamer.streaming_variables
streamer.remove_variable(name)
# Add updated variable
streamer.add_variable(new_name, area, db, offset, var_type, bit)
# Restore streaming state if it was enabled
if was_streaming:
streamer.toggle_streaming_variable(new_name, True)
else:
# Update existing variable
if name not in streamer.variables:
return (
jsonify(
{"success": False, "message": f"Variable {name} not found"}
),
404,
)
# Preserve streaming state
was_streaming = name in streamer.streaming_variables
# Remove and re-add with new configuration
streamer.remove_variable(name)
streamer.add_variable(new_name, area, db, offset, var_type, bit)
# Restore streaming state if it was enabled
if was_streaming:
streamer.toggle_streaming_variable(new_name, True)
return jsonify({"success": True, "message": f"Variable updated successfully"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>/streaming", methods=["POST"])
def toggle_variable_streaming(name):
"""Toggle streaming for a specific variable"""
try:
data = request.get_json()
enabled = data.get("enabled", False)
streamer.toggle_streaming_variable(name, enabled)
status = "enabled" if enabled else "disabled"
return jsonify(
{"success": True, "message": f"Variable {name} streaming {status}"}
)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/streaming", methods=["GET"])
def get_streaming_variables():
"""Get list of variables enabled for streaming"""
return jsonify(
{"success": True, "streaming_variables": list(streamer.streaming_variables)}
)
@app.route("/api/streaming/start", methods=["POST"])
def start_streaming():
"""Start streaming"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.start_streaming():
return jsonify({"success": True, "message": "Streaming started"})
else:
return jsonify({"success": False, "message": "Error starting streaming"}), 500
@app.route("/api/streaming/stop", methods=["POST"])
def stop_streaming():
"""Stop streaming"""
streamer.stop_streaming()
return jsonify({"success": True, "message": "Streaming stopped"})
@app.route("/api/sampling", methods=["POST"])
def update_sampling():
"""Update sampling interval"""
try:
data = request.get_json()
interval = float(data.get("interval", 0.1))
if interval < 0.01:
interval = 0.01
streamer.update_sampling_interval(interval)
return jsonify({"success": True, "message": f"Interval updated to {interval}s"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/csv/start", methods=["POST"])
def start_csv_recording():
"""Start CSV recording independently"""
if streamer.start_csv_recording():
return jsonify({"success": True, "message": "CSV recording started"})
else:
return (
jsonify({"success": False, "message": "Error starting CSV recording"}),
500,
)
@app.route("/api/csv/stop", methods=["POST"])
def stop_csv_recording():
"""Stop CSV recording independently"""
streamer.stop_csv_recording()
return jsonify({"success": True, "message": "CSV recording stopped"})
@app.route("/api/status")
def get_status():
"""Get current status"""
if streamer is None:
return jsonify({"error": "Application not initialized"}), 503
return jsonify(streamer.get_status())
@app.route("/api/events")
def get_events():
"""Get recent events from the application log"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
limit = request.args.get("limit", 50, type=int)
limit = min(limit, 200) # Maximum 200 events per request
events = streamer.get_recent_events(limit)
return jsonify(
{
"success": True,
"events": events,
"total_events": len(streamer.events_log),
"showing": len(events),
}
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
def graceful_shutdown():
"""Perform graceful shutdown"""
print("\n⏹️ Performing graceful shutdown...")
try:
streamer.stop_streaming()
streamer.disconnect_plc()
streamer.release_instance_lock()
print("✅ Shutdown completed successfully")
except Exception as e:
print(f"⚠️ Error during shutdown: {e}")
def main():
"""Main application entry point with error handling and recovery"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Create templates directory if it doesn't exist
os.makedirs("templates", exist_ok=True)
print("🚀 Starting Flask server for PLC S7-315 Streamer")
print("📊 Web interface available at: http://localhost:5050")
print("🔧 Configure your PLC and variables through the web interface")
# Initialize streamer (this will handle instance locking and auto-recovery)
global streamer
# Start Flask application
app.run(debug=False, host="0.0.0.0", port=5050, use_reloader=False)
# If we reach here, the server stopped normally
break
except RuntimeError as e:
if "Another instance" in str(e):
print(f"{e}")
print("💡 Tip: Stop the other instance or wait for it to finish")
sys.exit(1)
else:
print(f"⚠️ Runtime error: {e}")
retry_count += 1
except KeyboardInterrupt:
print("\n⏸️ Received interrupt signal...")
graceful_shutdown()
break
except Exception as e:
print(f"💥 Unexpected error: {e}")
retry_count += 1
if retry_count < max_retries:
print(f"🔄 Attempting restart ({retry_count}/{max_retries})...")
time.sleep(2) # Wait before retry
else:
print("❌ Maximum retries reached. Exiting...")
graceful_shutdown()
sys.exit(1)
if __name__ == "__main__":
try:
# Initialize streamer instance
streamer = PLCDataStreamer()
main()
except Exception as e:
print(f"💥 Critical error during initialization: {e}")
sys.exit(1)