S7_snap7_Stremer_n_Recorder/main.py

1740 lines
59 KiB
Python

from flask import (
Flask,
render_template,
request,
jsonify,
redirect,
url_for,
send_from_directory,
Response,
stream_template,
)
import snap7
import snap7.util
import json
import socket
import time
import logging
import threading
from datetime import datetime
from typing import Dict, Any, Optional, List
import struct
import os
import csv
from pathlib import Path
import atexit
import psutil
import sys
from core import PLCDataStreamer
app = Flask(__name__)
app.secret_key = "plc_streamer_secret_key"
def resource_path(relative_path):
"""Get absolute path to resource, works for dev and for PyInstaller"""
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
# Not running in a bundle
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
# Global streamer instance (will be initialized in main)
streamer = None
def check_streamer_initialized():
"""Check if streamer is initialized, return error response if not"""
if streamer is None:
return jsonify({"error": "Application not initialized"}), 503
return None
@app.route("/images/<filename>")
def serve_image(filename):
"""Serve images from .images directory"""
return send_from_directory(".images", filename)
@app.route("/static/<path:filename>")
def serve_static(filename):
"""Serve static files (CSS, JS, etc.)"""
return send_from_directory("static", filename)
@app.route("/")
def index():
"""Main page"""
if streamer is None:
return "Application not initialized", 503
# Get variables for the current dataset or empty dict if no current dataset
current_variables = {}
if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets:
current_variables = streamer.datasets[streamer.current_dataset_id].get(
"variables", {}
)
return render_template(
"index.html",
status=streamer.get_status(),
variables=current_variables,
datasets=streamer.datasets,
current_dataset_id=streamer.current_dataset_id,
)
@app.route("/api/plc/config", methods=["POST"])
def update_plc_config():
"""Update PLC configuration"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
ip = data.get("ip", "10.1.33.11")
rack = int(data.get("rack", 0))
slot = int(data.get("slot", 2))
streamer.update_plc_config(ip, rack, slot)
return jsonify({"success": True, "message": "PLC configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/udp/config", methods=["POST"])
def update_udp_config():
"""Update UDP configuration"""
try:
data = request.get_json()
host = data.get("host", "127.0.0.1")
port = int(data.get("port", 9870))
streamer.update_udp_config(host, port)
return jsonify({"success": True, "message": "UDP configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/csv/config", methods=["GET"])
def get_csv_config():
"""Get CSV recording configuration"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
csv_config = streamer.config_manager.csv_config.copy()
# Add current directory information
current_dir = streamer.config_manager.get_csv_directory_path()
csv_config["current_directory"] = os.path.abspath(current_dir)
csv_config["directory_exists"] = os.path.exists(current_dir)
# Add disk space info
disk_info = streamer.get_disk_space_info()
if disk_info:
csv_config["disk_space"] = disk_info
return jsonify({"success": True, "config": csv_config})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route("/api/csv/config", methods=["POST"])
def update_csv_config():
"""Update CSV recording configuration"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
# Extract valid configuration parameters
config_updates = {}
valid_params = {
"records_directory",
"rotation_enabled",
"max_size_mb",
"max_days",
"max_hours",
"cleanup_interval_hours",
}
for param in valid_params:
if param in data:
config_updates[param] = data[param]
if not config_updates:
return (
jsonify(
{
"success": False,
"message": "No valid configuration parameters provided",
}
),
400,
)
# Update configuration
result = streamer.config_manager.update_csv_config(**config_updates)
return jsonify(
{
"success": True,
"message": "CSV configuration updated successfully",
"old_config": result["old_config"],
"new_config": result["new_config"],
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 400
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route("/api/csv/cleanup", methods=["POST"])
def trigger_csv_cleanup():
"""Manually trigger CSV cleanup"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
# Perform cleanup
streamer.streamer.perform_csv_cleanup()
return jsonify(
{"success": True, "message": "CSV cleanup completed successfully"}
)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route("/api/csv/directory/info", methods=["GET"])
def get_csv_directory_info():
"""Get information about CSV directory and files"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
base_dir = streamer.config_manager.get_csv_directory_path()
info = {
"base_directory": os.path.abspath(base_dir),
"directory_exists": os.path.exists(base_dir),
"total_files": 0,
"total_size_mb": 0,
"oldest_file": None,
"newest_file": None,
"day_folders": [],
}
if os.path.exists(base_dir):
total_size = 0
oldest_time = None
newest_time = None
for day_folder in os.listdir(base_dir):
day_path = os.path.join(base_dir, day_folder)
if os.path.isdir(day_path):
day_info = {"name": day_folder, "files": 0, "size_mb": 0}
for file_name in os.listdir(day_path):
if file_name.endswith(".csv"):
file_path = os.path.join(day_path, file_name)
if os.path.isfile(file_path):
stat = os.stat(file_path)
file_size = stat.st_size
file_time = stat.st_mtime
info["total_files"] += 1
day_info["files"] += 1
total_size += file_size
day_info["size_mb"] += file_size / (1024 * 1024)
if oldest_time is None or file_time < oldest_time:
oldest_time = file_time
info["oldest_file"] = datetime.fromtimestamp(
file_time
).isoformat()
if newest_time is None or file_time > newest_time:
newest_time = file_time
info["newest_file"] = datetime.fromtimestamp(
file_time
).isoformat()
day_info["size_mb"] = round(day_info["size_mb"], 2)
info["day_folders"].append(day_info)
info["total_size_mb"] = round(total_size / (1024 * 1024), 2)
info["day_folders"].sort(key=lambda x: x["name"], reverse=True)
return jsonify({"success": True, "info": info})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route("/api/plc/connect", methods=["POST"])
def connect_plc():
"""Connect to PLC"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.connect_plc():
return jsonify({"success": True, "message": "Connected to PLC"})
else:
return jsonify({"success": False, "message": "Error connecting to PLC"}), 500
@app.route("/api/plc/disconnect", methods=["POST"])
def disconnect_plc():
"""Disconnect from PLC"""
streamer.disconnect_plc()
return jsonify({"success": True, "message": "Disconnected from PLC"})
@app.route("/api/variables", methods=["POST"])
def add_variable():
"""Add a new variable"""
try:
data = request.get_json()
name = data.get("name")
area = data.get("area")
db = int(data.get("db"))
offset = int(data.get("offset"))
var_type = data.get("type")
bit = data.get("bit") # Added bit parameter
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
if (
not name
or not area
or var_type not in valid_types
or area.lower() not in valid_areas
):
return jsonify({"success": False, "message": "Invalid data"}), 400
if area.lower() in ["e", "a", "mb"] and bit is None:
return (
jsonify(
{
"success": False,
"message": "Bit position must be specified for bit areas",
}
),
400,
)
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
return (
jsonify(
{
"success": False,
"message": "Bit position must be between 0 and 7",
}
),
400,
)
streamer.add_variable(name, area, db, offset, var_type, bit)
return jsonify({"success": True, "message": f"Variable {name} added"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["DELETE"])
def remove_variable(name):
"""Remove a variable"""
streamer.remove_variable(name)
return jsonify({"success": True, "message": f"Variable {name} removed"})
@app.route("/api/variables/<name>", methods=["GET"])
def get_variable(name):
"""Get a specific variable configuration from current dataset"""
try:
if not streamer.current_dataset_id:
return (
jsonify({"success": False, "message": "No dataset selected"}),
400,
)
current_variables = streamer.get_dataset_variables(streamer.current_dataset_id)
if name not in current_variables:
return (
jsonify({"success": False, "message": f"Variable {name} not found"}),
404,
)
var_config = current_variables[name].copy()
var_config["name"] = name
# Check if variable is in streaming list for current dataset
streaming_vars = streamer.datasets[streamer.current_dataset_id].get(
"streaming_variables", []
)
var_config["streaming"] = name in streaming_vars
return jsonify({"success": True, "variable": var_config})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["PUT"])
def update_variable(name):
"""Update an existing variable in current dataset"""
try:
if not streamer.current_dataset_id:
return jsonify({"success": False, "message": "No dataset selected"}), 400
data = request.get_json()
new_name = data.get("name", name)
area = data.get("area")
db = int(data.get("db", 1))
offset = int(data.get("offset"))
var_type = data.get("type")
bit = data.get("bit")
valid_types = [
"real",
"int",
"bool",
"dint",
"word",
"byte",
"uint",
"udint",
"sint",
"usint",
]
valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"]
if (
not new_name
or not area
or var_type not in valid_types
or area.lower() not in valid_areas
):
return jsonify({"success": False, "message": "Invalid data"}), 400
if area.lower() in ["e", "a", "mb"] and bit is None:
return (
jsonify(
{
"success": False,
"message": "Bit position must be specified for bit areas",
}
),
400,
)
if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7):
return (
jsonify(
{
"success": False,
"message": "Bit position must be between 0 and 7",
}
),
400,
)
current_dataset_id = streamer.current_dataset_id
current_variables = streamer.get_dataset_variables(current_dataset_id)
# Check if variable exists in current dataset
if name not in current_variables:
return (
jsonify({"success": False, "message": f"Variable {name} not found"}),
404,
)
# Check if new name already exists (if name is changing)
if name != new_name and new_name in current_variables:
return (
jsonify(
{
"success": False,
"message": f"Variable {new_name} already exists",
}
),
400,
)
# Preserve streaming state
streaming_vars = streamer.datasets[current_dataset_id].get(
"streaming_variables", []
)
was_streaming = name in streaming_vars
# Remove old variable
streamer.remove_variable_from_dataset(current_dataset_id, name)
# Add updated variable
streamer.add_variable_to_dataset(
current_dataset_id, new_name, area, db, offset, var_type, bit, was_streaming
)
return jsonify({"success": True, "message": f"Variable updated successfully"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>/streaming", methods=["POST"])
def toggle_variable_streaming(name):
"""Toggle streaming for a specific variable in current dataset"""
try:
if not streamer.current_dataset_id:
return jsonify({"success": False, "message": "No dataset selected"}), 400
data = request.get_json()
enabled = data.get("enabled", False)
# Use the new dataset-specific method
streamer.toggle_variable_streaming(streamer.current_dataset_id, name, enabled)
status = "enabled" if enabled else "disabled"
return jsonify(
{"success": True, "message": f"Variable {name} streaming {status}"}
)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/streaming", methods=["GET"])
def get_streaming_variables():
"""Get list of variables enabled for streaming in current dataset"""
if streamer is None:
return jsonify({"success": False, "error": "Streamer not initialized"}), 503
# Get streaming variables from current dataset
streaming_vars = []
if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets:
streaming_vars = streamer.datasets[streamer.current_dataset_id].get(
"streaming_variables", []
)
return jsonify({"success": True, "streaming_variables": streaming_vars})
@app.route("/api/datasets/<dataset_id>/variables/values", methods=["GET"])
def get_dataset_variable_values(dataset_id):
"""Get current values of all variables in a dataset - CACHE ONLY"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
# Check if dataset exists
if dataset_id not in streamer.datasets:
return (
jsonify(
{"success": False, "message": f"Dataset '{dataset_id}' not found"}
),
404,
)
# Get dataset variables
dataset_variables = streamer.get_dataset_variables(dataset_id)
if not dataset_variables:
return jsonify(
{
"success": True,
"message": "No variables defined in this dataset",
"values": {},
"source": "no_variables",
}
)
# Check if dataset is active (required for cache to be populated)
if dataset_id not in streamer.active_datasets:
return jsonify(
{
"success": False,
"message": f"Dataset '{dataset_id}' is not active. Activate the dataset to start reading variables and populate cache.",
"error_type": "dataset_inactive",
"values": {},
"detailed_errors": {},
"stats": {
"success": 0,
"failed": 0,
"total": len(dataset_variables),
},
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source": "no_cache_dataset_inactive",
"is_cached": False,
}
)
# Check if PLC is connected (required for streaming to populate cache)
if not streamer.plc_client.is_connected():
return jsonify(
{
"success": False,
"message": "PLC not connected. Connect to PLC and activate dataset to populate cache.",
"error_type": "plc_disconnected",
"values": {},
"detailed_errors": {},
"stats": {
"success": 0,
"failed": 0,
"total": len(dataset_variables),
},
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source": "no_cache_plc_disconnected",
"is_cached": False,
}
)
# Get cached values - this is the ONLY source of data according to application principles
if not streamer.has_cached_values(dataset_id):
return jsonify(
{
"success": False,
"message": f"No cached values available for dataset '{dataset_id}'. Cache is populated by the streaming process at the dataset's configured interval. Please wait for the next reading cycle.",
"error_type": "no_cache_available",
"values": {},
"detailed_errors": {},
"stats": {
"success": 0,
"failed": 0,
"total": len(dataset_variables),
},
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source": "no_cache_available",
"is_cached": False,
"note": f"Dataset reads every {streamer.config_manager.get_dataset_sampling_interval(dataset_id)}s",
}
)
# Get cached values (the ONLY valid source according to application design)
read_result = streamer.get_cached_dataset_values(dataset_id)
# Convert timestamp from ISO format to readable format for consistency
if read_result.get("timestamp"):
try:
cached_timestamp = datetime.fromisoformat(read_result["timestamp"])
read_result["timestamp"] = cached_timestamp.strftime(
"%Y-%m-%d %H:%M:%S"
)
except:
pass # Keep original timestamp if conversion fails
# Extract values and handle diagnostics
if not read_result.get("success", False):
# Complete failure case
error_msg = read_result.get("error", "Unknown error reading variables")
error_type = read_result.get("error_type", "unknown")
# Log detailed error information
if streamer.logger:
streamer.logger.error(
f"Cached values indicate failure for dataset '{dataset_id}': {error_msg}"
)
if read_result.get("errors"):
for var_name, var_error in read_result["errors"].items():
streamer.logger.error(f" Variable '{var_name}': {var_error}")
return (
jsonify(
{
"success": False,
"message": error_msg,
"error_type": error_type,
"values": {},
"detailed_errors": read_result.get("errors", {}),
"stats": read_result.get("stats", {}),
"timestamp": read_result.get(
"timestamp", datetime.now().strftime("%Y-%m-%d %H:%M:%S")
),
"source": "cache",
"is_cached": True,
}
),
500,
)
# Success or partial success case
raw_values = read_result.get("values", {})
variable_errors = read_result.get("errors", {})
stats = read_result.get("stats", {})
# Format values for display
formatted_values = {}
error_details = {}
for var_name, value in raw_values.items():
if value is not None:
var_config = dataset_variables[var_name]
var_type = var_config.get("type", "unknown")
# Format value based on type
try:
if var_type == "real":
formatted_values[var_name] = (
f"{value:.3f}"
if isinstance(value, (int, float))
else str(value)
)
elif var_type == "bool":
formatted_values[var_name] = "TRUE" if value else "FALSE"
elif var_type in [
"int",
"uint",
"dint",
"udint",
"word",
"byte",
"sint",
"usint",
]:
formatted_values[var_name] = (
str(int(value))
if isinstance(value, (int, float))
else str(value)
)
else:
formatted_values[var_name] = str(value)
except Exception as format_error:
formatted_values[var_name] = "FORMAT_ERROR"
error_details[var_name] = f"Format error: {str(format_error)}"
else:
# Variable had an error - get the specific error message
specific_error = variable_errors.get(var_name, "Unknown error")
formatted_values[var_name] = "ERROR"
error_details[var_name] = specific_error
# Prepare response message
total_vars = stats.get("total", len(dataset_variables))
success_vars = stats.get("success", 0)
failed_vars = stats.get("failed", 0)
# Determine data source for message (always cache now)
data_source = "cache"
source_text = " (from streaming cache)"
if failed_vars == 0:
message = (
f"Successfully displaying all {success_vars} variables{source_text}"
)
response_success = True
else:
message = f"Partial data available: {success_vars}/{total_vars} variables have valid cached values, {failed_vars} failed{source_text}"
response_success = True # Still success if we got some values
# Log warnings for partial failures
if streamer.logger:
streamer.logger.warning(
f"Partial failure in cached values for dataset '{dataset_id}': {message}"
)
for var_name, var_error in error_details.items():
if formatted_values.get(var_name) == "ERROR":
streamer.logger.warning(f" Variable '{var_name}': {var_error}")
return jsonify(
{
"success": response_success,
"message": message,
"values": formatted_values,
"detailed_errors": error_details,
"stats": stats,
"timestamp": read_result.get(
"timestamp", datetime.now().strftime("%Y-%m-%d %H:%M:%S")
),
"warning": read_result.get("warning"),
"source": data_source,
"is_cached": True,
"cache_info": f"Dataset reads every {streamer.config_manager.get_dataset_sampling_interval(dataset_id)}s",
}
)
except Exception as e:
return (
jsonify(
{
"success": False,
"message": f"Error retrieving cached variable values: {str(e)}",
"values": {},
}
),
500,
)
# Dataset Management API Endpoints
@app.route("/api/datasets", methods=["GET"])
def get_datasets():
"""Get all datasets"""
error_response = check_streamer_initialized()
if error_response:
return error_response
return jsonify(
{
"success": True,
"datasets": streamer.datasets,
"active_datasets": list(streamer.active_datasets),
"current_dataset_id": streamer.current_dataset_id,
}
)
@app.route("/api/datasets", methods=["POST"])
def create_dataset():
"""Create a new dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
dataset_id = data.get("dataset_id", "").strip()
name = data.get("name", "").strip()
prefix = data.get("prefix", "").strip()
sampling_interval = data.get("sampling_interval")
if not dataset_id or not name or not prefix:
return (
jsonify(
{
"success": False,
"message": "Dataset ID, name, and prefix are required",
}
),
400,
)
# Validate sampling interval if provided
if sampling_interval is not None:
sampling_interval = float(sampling_interval)
if sampling_interval < 0.01:
return (
jsonify(
{
"success": False,
"message": "Sampling interval must be at least 0.01 seconds",
}
),
400,
)
streamer.create_dataset(dataset_id, name, prefix, sampling_interval)
return jsonify(
{
"success": True,
"message": f"Dataset '{name}' created successfully",
"dataset_id": dataset_id,
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 400
except Exception as e:
return (
jsonify({"success": False, "message": f"Error creating dataset: {str(e)}"}),
500,
)
@app.route("/api/datasets/<dataset_id>", methods=["DELETE"])
def delete_dataset(dataset_id):
"""Delete a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.delete_dataset(dataset_id)
return jsonify({"success": True, "message": "Dataset deleted successfully"})
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify({"success": False, "message": f"Error deleting dataset: {str(e)}"}),
500,
)
@app.route("/api/datasets/<dataset_id>/activate", methods=["POST"])
def activate_dataset(dataset_id):
"""Activate a dataset for streaming"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.activate_dataset(dataset_id)
return jsonify({"success": True, "message": "Dataset activated successfully"})
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except RuntimeError as e:
return jsonify({"success": False, "message": str(e)}), 400
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error activating dataset: {str(e)}"}
),
500,
)
@app.route("/api/datasets/<dataset_id>/deactivate", methods=["POST"])
def deactivate_dataset(dataset_id):
"""Deactivate a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.deactivate_dataset(dataset_id)
return jsonify({"success": True, "message": "Dataset deactivated successfully"})
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error deactivating dataset: {str(e)}"}
),
500,
)
@app.route("/api/datasets/<dataset_id>/variables", methods=["POST"])
def add_variable_to_dataset(dataset_id):
"""Add a variable to a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
name = data.get("name", "").strip()
area = data.get("area", "").strip()
db = data.get("db", 1)
offset = data.get("offset", 0)
var_type = data.get("type", "").strip()
bit = data.get("bit")
streaming = data.get("streaming", False)
if not name or not area or not var_type:
return (
jsonify(
{
"success": False,
"message": "Variable name, area, and type are required",
}
),
400,
)
streamer.add_variable_to_dataset(
dataset_id, name, area, db, offset, var_type, bit, streaming
)
return jsonify(
{
"success": True,
"message": f"Variable '{name}' added to dataset successfully",
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 400
except Exception as e:
return (
jsonify({"success": False, "message": f"Error adding variable: {str(e)}"}),
500,
)
@app.route("/api/datasets/<dataset_id>/variables/<variable_name>", methods=["DELETE"])
def remove_variable_from_dataset(dataset_id, variable_name):
"""Remove a variable from a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
streamer.remove_variable_from_dataset(dataset_id, variable_name)
return jsonify(
{
"success": True,
"message": f"Variable '{variable_name}' removed from dataset successfully",
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error removing variable: {str(e)}"}
),
500,
)
@app.route(
"/api/datasets/<dataset_id>/variables/<variable_name>/streaming", methods=["POST"]
)
def toggle_variable_streaming_in_dataset(dataset_id, variable_name):
"""Toggle streaming for a variable in a dataset"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
enabled = data.get("enabled", False)
streamer.toggle_variable_streaming(dataset_id, variable_name, enabled)
return jsonify(
{
"success": True,
"message": f"Variable '{variable_name}' streaming {'enabled' if enabled else 'disabled'}",
}
)
except ValueError as e:
return jsonify({"success": False, "message": str(e)}), 404
except Exception as e:
return (
jsonify(
{"success": False, "message": f"Error toggling streaming: {str(e)}"}
),
500,
)
@app.route("/api/datasets/current", methods=["POST"])
def set_current_dataset():
"""Set the current dataset for editing"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
dataset_id = data.get("dataset_id")
if dataset_id and dataset_id in streamer.datasets:
streamer.current_dataset_id = dataset_id
streamer.save_datasets()
return jsonify(
{
"success": True,
"message": "Current dataset updated successfully",
"current_dataset_id": dataset_id,
}
)
else:
return jsonify({"success": False, "message": "Invalid dataset ID"}), 400
except Exception as e:
return (
jsonify(
{
"success": False,
"message": f"Error setting current dataset: {str(e)}",
}
),
500,
)
@app.route("/api/streaming/start", methods=["POST"])
def start_streaming():
"""Start UDP streaming (legacy endpoint - now only starts UDP streaming)"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.start_streaming():
return jsonify({"success": True, "message": "UDP streaming started"})
else:
return (
jsonify({"success": False, "message": "Error starting UDP streaming"}),
500,
)
@app.route("/api/streaming/stop", methods=["POST"])
def stop_streaming():
"""Stop UDP streaming (legacy endpoint - now only stops UDP streaming)"""
error_response = check_streamer_initialized()
if error_response:
return error_response
streamer.stop_streaming()
return jsonify({"success": True, "message": "UDP streaming stopped"})
@app.route("/api/sampling", methods=["POST"])
def update_sampling():
"""Update sampling interval"""
try:
data = request.get_json()
interval = float(data.get("interval", 0.1))
if interval < 0.01:
interval = 0.01
streamer.update_sampling_interval(interval)
return jsonify({"success": True, "message": f"Interval updated to {interval}s"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/csv/start", methods=["POST"])
def start_csv_recording_legacy():
"""Start CSV recording independently (legacy endpoint)"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.data_streamer.start_csv_recording():
return jsonify({"success": True, "message": "CSV recording started"})
else:
return (
jsonify({"success": False, "message": "Error starting CSV recording"}),
500,
)
@app.route("/api/csv/stop", methods=["POST"])
def stop_csv_recording_legacy():
"""Stop CSV recording independently (legacy endpoint)"""
error_response = check_streamer_initialized()
if error_response:
return error_response
streamer.data_streamer.stop_csv_recording()
return jsonify({"success": True, "message": "CSV recording stopped"})
@app.route("/api/csv/recording/start", methods=["POST"])
def start_csv_recording():
"""Start CSV recording independently of UDP streaming"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.data_streamer.start_csv_recording():
return jsonify({"success": True, "message": "CSV recording started"})
else:
return (
jsonify({"success": False, "message": "Error starting CSV recording"}),
500,
)
@app.route("/api/csv/recording/stop", methods=["POST"])
def stop_csv_recording():
"""Stop CSV recording independently of UDP streaming"""
error_response = check_streamer_initialized()
if error_response:
return error_response
streamer.data_streamer.stop_csv_recording()
return jsonify({"success": True, "message": "CSV recording stopped"})
# 🔑 NEW: UDP Streaming Control (Independent)
@app.route("/api/udp/streaming/start", methods=["POST"])
def start_udp_streaming():
"""Start UDP streaming to PlotJuggler independently of CSV recording"""
error_response = check_streamer_initialized()
if error_response:
return error_response
if streamer.data_streamer.start_udp_streaming():
return jsonify(
{"success": True, "message": "UDP streaming to PlotJuggler started"}
)
else:
return (
jsonify({"success": False, "message": "Error starting UDP streaming"}),
500,
)
@app.route("/api/udp/streaming/stop", methods=["POST"])
def stop_udp_streaming():
"""Stop UDP streaming to PlotJuggler independently of CSV recording"""
error_response = check_streamer_initialized()
if error_response:
return error_response
streamer.data_streamer.stop_udp_streaming()
return jsonify({"success": True, "message": "UDP streaming to PlotJuggler stopped"})
# 📈 PLOT MANAGER API ENDPOINTS
@app.route("/api/plots", methods=["GET"])
def get_plots():
"""Get all plot sessions status"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
sessions = streamer.data_streamer.plot_manager.get_all_sessions_status()
return jsonify({"sessions": sessions})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/plots", methods=["POST"])
def create_plot():
"""Create a new plot session"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
# Validar datos requeridos
if not data.get("variables"):
return jsonify({"error": "At least one variable is required"}), 400
if not data.get("time_window"):
return jsonify({"error": "Time window is required"}), 400
# Validar que las variables existen en datasets activos
available_vars = streamer.data_streamer.plot_manager.get_available_variables(
streamer.data_streamer.get_active_datasets(),
streamer.config_manager.datasets,
)
invalid_vars = [var for var in data["variables"] if var not in available_vars]
if invalid_vars:
return (
jsonify(
{
"error": f"Variables not available: {', '.join(invalid_vars)}",
"available_variables": available_vars,
}
),
400,
)
# Validar trigger si está habilitado
if data.get("trigger_enabled") and data.get("trigger_variable"):
boolean_vars = streamer.data_streamer.plot_manager.get_boolean_variables(
streamer.data_streamer.get_active_datasets(),
streamer.config_manager.datasets,
)
if data["trigger_variable"] not in boolean_vars:
return (
jsonify(
{
"error": f"Trigger variable '{data['trigger_variable']}' is not a boolean variable",
"boolean_variables": boolean_vars,
}
),
400,
)
# Crear configuración de la sesión
config = {
"name": data.get(
"name", f"Plot {len(streamer.data_streamer.plot_manager.sessions) + 1}"
),
"variables": data["variables"],
"time_window": int(data["time_window"]),
"y_min": data.get("y_min"),
"y_max": data.get("y_max"),
"trigger_variable": data.get("trigger_variable"),
"trigger_enabled": data.get("trigger_enabled", False),
"trigger_on_true": data.get("trigger_on_true", True),
}
# Convertir valores numéricos si están presentes
if config["y_min"] is not None:
config["y_min"] = float(config["y_min"])
if config["y_max"] is not None:
config["y_max"] = float(config["y_max"])
session_id = streamer.data_streamer.plot_manager.create_session(config)
return jsonify(
{
"success": True,
"session_id": session_id,
"message": f"Plot session '{config['name']}' created successfully",
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/plots/<session_id>", methods=["DELETE"])
def remove_plot(session_id):
"""Remove a plot session"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
success = streamer.data_streamer.plot_manager.remove_session(session_id)
if success:
return jsonify({"success": True, "message": "Plot session removed"})
else:
return jsonify({"error": "Plot session not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/plots/<session_id>/control", methods=["POST"])
def control_plot(session_id):
"""Control a plot session (start/stop/pause/resume/clear)"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
data = request.get_json()
action = data.get("action")
if action not in ["start", "stop", "pause", "resume", "clear"]:
return (
jsonify(
{"error": "Invalid action. Use: start, stop, pause, resume, clear"}
),
400,
)
success = streamer.data_streamer.plot_manager.control_session(
session_id, action
)
if success:
return jsonify({"success": True, "message": f"Plot session {action}ed"})
else:
return jsonify({"error": "Plot session not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/plots/<session_id>/data", methods=["GET"])
def get_plot_data(session_id):
"""Get plot data for a specific session"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
plot_data = streamer.data_streamer.plot_manager.get_session_data(session_id)
if plot_data:
return jsonify(plot_data)
else:
return jsonify({"error": "Plot session not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/plots/variables", methods=["GET"])
def get_plot_variables():
"""Get available variables for plotting"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
available_vars = streamer.data_streamer.plot_manager.get_available_variables(
streamer.data_streamer.get_active_datasets(),
streamer.config_manager.datasets,
)
boolean_vars = streamer.data_streamer.plot_manager.get_boolean_variables(
streamer.data_streamer.get_active_datasets(),
streamer.config_manager.datasets,
)
return jsonify(
{
"available_variables": available_vars,
"boolean_variables": boolean_vars,
"active_datasets_count": len(
streamer.data_streamer.get_active_datasets()
),
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/status")
def get_status():
"""Get current status"""
if streamer is None:
return jsonify({"error": "Application not initialized"}), 503
return jsonify(streamer.get_status())
@app.route("/api/events")
def get_events():
"""Get recent events from the application log"""
error_response = check_streamer_initialized()
if error_response:
return error_response
try:
limit = request.args.get("limit", 50, type=int)
limit = min(limit, 200) # Maximum 200 events per request
events = streamer.get_recent_events(limit)
return jsonify(
{
"success": True,
"events": events,
"total_events": len(streamer.event_logger.events_log),
"showing": len(events),
}
)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/stream/variables", methods=["GET"])
def stream_variables():
"""Stream variable values in real-time using Server-Sent Events"""
error_response = check_streamer_initialized()
if error_response:
return error_response
# Get request parameters outside the generator
dataset_id = request.args.get("dataset_id")
interval = float(request.args.get("interval", 1.0)) # Default 1 second
if not dataset_id:
return jsonify({"error": "Dataset ID required"}), 400
if dataset_id not in streamer.datasets:
return jsonify({"error": f"Dataset {dataset_id} not found"}), 404
def generate():
"""Generate SSE data stream - CACHE ONLY"""
# Send initial connection message
yield f"data: {json.dumps({'type': 'connected', 'message': 'SSE connection established - monitoring cache'})}\n\n"
last_values = {}
while True:
try:
# Check basic preconditions for cache availability
if not streamer.plc_client.is_connected():
# PLC not connected - cache won't be populated
data = {
"type": "plc_disconnected",
"message": "PLC not connected - cache not being populated",
"timestamp": datetime.now().isoformat(),
}
yield f"data: {json.dumps(data)}\n\n"
time.sleep(interval)
continue
# Check if dataset is active
if dataset_id not in streamer.active_datasets:
data = {
"type": "dataset_inactive",
"message": f"Dataset '{dataset_id}' is not active - activate to populate cache",
"timestamp": datetime.now().isoformat(),
}
yield f"data: {json.dumps(data)}\n\n"
time.sleep(interval)
continue
# Get dataset variables
dataset_variables = streamer.get_dataset_variables(dataset_id)
if not dataset_variables:
# No variables in dataset
data = {
"type": "no_variables",
"message": "No variables defined in this dataset",
"timestamp": datetime.now().isoformat(),
}
yield f"data: {json.dumps(data)}\n\n"
time.sleep(interval)
continue
# Get cached values - the ONLY source according to application principles
if not streamer.has_cached_values(dataset_id):
# No cache available yet - dataset might be starting up
sampling_interval = (
streamer.config_manager.get_dataset_sampling_interval(
dataset_id
)
)
data = {
"type": "no_cache",
"message": f"Waiting for cache to be populated (dataset reads every {sampling_interval}s)",
"timestamp": datetime.now().isoformat(),
"sampling_interval": sampling_interval,
}
yield f"data: {json.dumps(data)}\n\n"
time.sleep(interval)
continue
# Get cached values (the ONLY valid source)
read_result = streamer.get_cached_dataset_values(dataset_id)
if read_result.get("success", False):
values = read_result.get("values", {})
timestamp = read_result.get("timestamp", datetime.now().isoformat())
# Format values for display
formatted_values = {}
for var_name, value in values.items():
if value is not None:
var_config = dataset_variables[var_name]
var_type = var_config.get("type", "unknown")
try:
if var_type == "real":
formatted_values[var_name] = (
f"{value:.3f}"
if isinstance(value, (int, float))
else str(value)
)
elif var_type == "bool":
formatted_values[var_name] = (
"TRUE" if value else "FALSE"
)
elif var_type in [
"int",
"uint",
"dint",
"udint",
"word",
"byte",
"sint",
"usint",
]:
formatted_values[var_name] = (
str(int(value))
if isinstance(value, (int, float))
else str(value)
)
else:
formatted_values[var_name] = str(value)
except:
formatted_values[var_name] = "FORMAT_ERROR"
else:
formatted_values[var_name] = "ERROR"
# Only send if values changed
if formatted_values != last_values:
data = {
"type": "values",
"values": formatted_values,
"timestamp": timestamp,
"source": "cache",
"stats": read_result.get("stats", {}),
"cache_age_info": f"Dataset reads every {streamer.config_manager.get_dataset_sampling_interval(dataset_id)}s",
}
yield f"data: {json.dumps(data)}\n\n"
last_values = formatted_values.copy()
else:
# Send error data from cache
error_data = {
"type": "cache_error",
"message": read_result.get(
"error", "Cached data indicates error"
),
"timestamp": datetime.now().isoformat(),
"error_type": read_result.get("error_type", "unknown"),
"source": "cache",
}
yield f"data: {json.dumps(error_data)}\n\n"
time.sleep(interval)
except Exception as e:
error_data = {
"type": "stream_error",
"message": f"SSE stream error: {str(e)}",
"timestamp": datetime.now().isoformat(),
"source": "cache_monitoring",
}
yield f"data: {json.dumps(error_data)}\n\n"
time.sleep(interval)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control",
},
)
@app.route("/api/stream/status", methods=["GET"])
def stream_status():
"""Stream application status in real-time using Server-Sent Events"""
error_response = check_streamer_initialized()
if error_response:
return error_response
# Get request parameters outside the generator
interval = float(request.args.get("interval", 2.0)) # Default 2 seconds
def generate():
"""Generate SSE status stream"""
last_status = None
# Send initial connection message
yield f"data: {json.dumps({'type': 'connected', 'message': 'Status stream connected'})}\n\n"
while True:
try:
# Get current status
current_status = streamer.get_status()
# Only send if status changed
if current_status != last_status:
data = {
"type": "status",
"status": current_status,
"timestamp": datetime.now().isoformat(),
}
yield f"data: {json.dumps(data)}\n\n"
last_status = current_status
time.sleep(interval)
except Exception as e:
error_data = {
"type": "error",
"message": f"Status stream error: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
yield f"data: {json.dumps(error_data)}\n\n"
time.sleep(interval)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control",
},
)
def graceful_shutdown():
"""Perform graceful shutdown"""
print("\n⏹️ Performing graceful shutdown...")
try:
streamer.stop_streaming()
streamer.disconnect_plc()
streamer.release_instance_lock()
print("✅ Shutdown completed successfully")
except Exception as e:
print(f"⚠️ Error during shutdown: {e}")
def main():
"""Main application entry point with error handling and recovery"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Create templates directory if it doesn't exist
os.makedirs("templates", exist_ok=True)
print("🚀 Starting Flask server for PLC S7-315 Streamer")
print("📊 Web interface available at: http://localhost:5050")
print("🔧 Configure your PLC and variables through the web interface")
# Initialize streamer (this will handle instance locking and auto-recovery)
global streamer
# Start Flask application
app.run(debug=False, host="0.0.0.0", port=5050, use_reloader=False)
# If we reach here, the server stopped normally
break
except RuntimeError as e:
if "Another instance" in str(e):
print(f"{e}")
print("💡 Tip: Stop the other instance or wait for it to finish")
sys.exit(1)
else:
print(f"⚠️ Runtime error: {e}")
retry_count += 1
except KeyboardInterrupt:
print("\n⏸️ Received interrupt signal...")
graceful_shutdown()
break
except Exception as e:
print(f"💥 Unexpected error: {e}")
retry_count += 1
if retry_count < max_retries:
print(f"🔄 Attempting restart ({retry_count}/{max_retries})...")
time.sleep(2) # Wait before retry
else:
print("❌ Maximum retries reached. Exiting...")
graceful_shutdown()
sys.exit(1)
if __name__ == "__main__":
try:
# Initialize streamer instance
streamer = PLCDataStreamer()
main()
except Exception as e:
print(f"💥 Critical error during initialization: {e}")
sys.exit(1)