from flask import ( Flask, render_template, request, jsonify, redirect, url_for, send_from_directory, Response, stream_template, ) import snap7 import snap7.util import json import socket import time import logging import threading from datetime import datetime from typing import Dict, Any, Optional, List import struct import os import csv from pathlib import Path import atexit import psutil import sys from core import PLCDataStreamer app = Flask(__name__) app.secret_key = "plc_streamer_secret_key" def resource_path(relative_path): """Get absolute path to resource, works for dev and for PyInstaller""" try: # PyInstaller creates a temp folder and stores path in _MEIPASS base_path = sys._MEIPASS except Exception: # Not running in a bundle base_path = os.path.abspath(".") return os.path.join(base_path, relative_path) # Global streamer instance (will be initialized in main) streamer = None def check_streamer_initialized(): """Check if streamer is initialized, return error response if not""" if streamer is None: return jsonify({"error": "Application not initialized"}), 503 return None @app.route("/images/") def serve_image(filename): """Serve images from .images directory""" return send_from_directory(".images", filename) @app.route("/static/") def serve_static(filename): """Serve static files (CSS, JS, etc.)""" return send_from_directory("static", filename) @app.route("/") def index(): """Main page""" if streamer is None: return "Application not initialized", 503 # Get variables for the current dataset or empty dict if no current dataset current_variables = {} if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets: current_variables = streamer.datasets[streamer.current_dataset_id].get( "variables", {} ) return render_template( "index.html", status=streamer.get_status(), variables=current_variables, datasets=streamer.datasets, current_dataset_id=streamer.current_dataset_id, ) @app.route("/api/plc/config", methods=["POST"]) def update_plc_config(): """Update PLC configuration""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() ip = data.get("ip", "10.1.33.11") rack = int(data.get("rack", 0)) slot = int(data.get("slot", 2)) streamer.update_plc_config(ip, rack, slot) return jsonify({"success": True, "message": "PLC configuration updated"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/udp/config", methods=["POST"]) def update_udp_config(): """Update UDP configuration""" try: data = request.get_json() host = data.get("host", "127.0.0.1") port = int(data.get("port", 9870)) streamer.update_udp_config(host, port) return jsonify({"success": True, "message": "UDP configuration updated"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/csv/config", methods=["GET"]) def get_csv_config(): """Get CSV recording configuration""" error_response = check_streamer_initialized() if error_response: return error_response try: csv_config = streamer.config_manager.csv_config.copy() # Add current directory information current_dir = streamer.config_manager.get_csv_directory_path() csv_config["current_directory"] = os.path.abspath(current_dir) csv_config["directory_exists"] = os.path.exists(current_dir) # Add disk space info disk_info = streamer.get_disk_space_info() if disk_info: csv_config["disk_space"] = disk_info return jsonify({"success": True, "config": csv_config}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route("/api/csv/config", methods=["POST"]) def update_csv_config(): """Update CSV recording configuration""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() # Extract valid configuration parameters config_updates = {} valid_params = { "records_directory", "rotation_enabled", "max_size_mb", "max_days", "max_hours", "cleanup_interval_hours", } for param in valid_params: if param in data: config_updates[param] = data[param] if not config_updates: return ( jsonify( { "success": False, "message": "No valid configuration parameters provided", } ), 400, ) # Update configuration result = streamer.config_manager.update_csv_config(**config_updates) return jsonify( { "success": True, "message": "CSV configuration updated successfully", "old_config": result["old_config"], "new_config": result["new_config"], } ) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 400 except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route("/api/csv/cleanup", methods=["POST"]) def trigger_csv_cleanup(): """Manually trigger CSV cleanup""" error_response = check_streamer_initialized() if error_response: return error_response try: # Perform cleanup streamer.streamer.perform_csv_cleanup() return jsonify( {"success": True, "message": "CSV cleanup completed successfully"} ) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route("/api/csv/directory/info", methods=["GET"]) def get_csv_directory_info(): """Get information about CSV directory and files""" error_response = check_streamer_initialized() if error_response: return error_response try: base_dir = streamer.config_manager.get_csv_directory_path() info = { "base_directory": os.path.abspath(base_dir), "directory_exists": os.path.exists(base_dir), "total_files": 0, "total_size_mb": 0, "oldest_file": None, "newest_file": None, "day_folders": [], } if os.path.exists(base_dir): total_size = 0 oldest_time = None newest_time = None for day_folder in os.listdir(base_dir): day_path = os.path.join(base_dir, day_folder) if os.path.isdir(day_path): day_info = {"name": day_folder, "files": 0, "size_mb": 0} for file_name in os.listdir(day_path): if file_name.endswith(".csv"): file_path = os.path.join(day_path, file_name) if os.path.isfile(file_path): stat = os.stat(file_path) file_size = stat.st_size file_time = stat.st_mtime info["total_files"] += 1 day_info["files"] += 1 total_size += file_size day_info["size_mb"] += file_size / (1024 * 1024) if oldest_time is None or file_time < oldest_time: oldest_time = file_time info["oldest_file"] = datetime.fromtimestamp( file_time ).isoformat() if newest_time is None or file_time > newest_time: newest_time = file_time info["newest_file"] = datetime.fromtimestamp( file_time ).isoformat() day_info["size_mb"] = round(day_info["size_mb"], 2) info["day_folders"].append(day_info) info["total_size_mb"] = round(total_size / (1024 * 1024), 2) info["day_folders"].sort(key=lambda x: x["name"], reverse=True) return jsonify({"success": True, "info": info}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route("/api/plc/connect", methods=["POST"]) def connect_plc(): """Connect to PLC""" error_response = check_streamer_initialized() if error_response: return error_response if streamer.connect_plc(): return jsonify({"success": True, "message": "Connected to PLC"}) else: return jsonify({"success": False, "message": "Error connecting to PLC"}), 500 @app.route("/api/plc/disconnect", methods=["POST"]) def disconnect_plc(): """Disconnect from PLC""" streamer.disconnect_plc() return jsonify({"success": True, "message": "Disconnected from PLC"}) @app.route("/api/variables", methods=["POST"]) def add_variable(): """Add a new variable""" try: data = request.get_json() name = data.get("name") area = data.get("area") db = int(data.get("db")) offset = int(data.get("offset")) var_type = data.get("type") bit = data.get("bit") # Added bit parameter valid_types = [ "real", "int", "bool", "dint", "word", "byte", "uint", "udint", "sint", "usint", ] valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"] if ( not name or not area or var_type not in valid_types or area.lower() not in valid_areas ): return jsonify({"success": False, "message": "Invalid data"}), 400 if area.lower() in ["e", "a", "mb"] and bit is None: return ( jsonify( { "success": False, "message": "Bit position must be specified for bit areas", } ), 400, ) if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7): return ( jsonify( { "success": False, "message": "Bit position must be between 0 and 7", } ), 400, ) streamer.add_variable(name, area, db, offset, var_type, bit) return jsonify({"success": True, "message": f"Variable {name} added"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/variables/", methods=["DELETE"]) def remove_variable(name): """Remove a variable""" streamer.remove_variable(name) return jsonify({"success": True, "message": f"Variable {name} removed"}) @app.route("/api/variables/", methods=["GET"]) def get_variable(name): """Get a specific variable configuration from current dataset""" try: if not streamer.current_dataset_id: return ( jsonify({"success": False, "message": "No dataset selected"}), 400, ) current_variables = streamer.get_dataset_variables(streamer.current_dataset_id) if name not in current_variables: return ( jsonify({"success": False, "message": f"Variable {name} not found"}), 404, ) var_config = current_variables[name].copy() var_config["name"] = name # Check if variable is in streaming list for current dataset streaming_vars = streamer.datasets[streamer.current_dataset_id].get( "streaming_variables", [] ) var_config["streaming"] = name in streaming_vars return jsonify({"success": True, "variable": var_config}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/variables/", methods=["PUT"]) def update_variable(name): """Update an existing variable in current dataset""" try: if not streamer.current_dataset_id: return jsonify({"success": False, "message": "No dataset selected"}), 400 data = request.get_json() new_name = data.get("name", name) area = data.get("area") db = int(data.get("db", 1)) offset = int(data.get("offset")) var_type = data.get("type") bit = data.get("bit") valid_types = [ "real", "int", "bool", "dint", "word", "byte", "uint", "udint", "sint", "usint", ] valid_areas = ["db", "mw", "m", "pew", "pe", "paw", "pa", "e", "a", "mb"] if ( not new_name or not area or var_type not in valid_types or area.lower() not in valid_areas ): return jsonify({"success": False, "message": "Invalid data"}), 400 if area.lower() in ["e", "a", "mb"] and bit is None: return ( jsonify( { "success": False, "message": "Bit position must be specified for bit areas", } ), 400, ) if area.lower() in ["e", "a", "mb"] and (bit < 0 or bit > 7): return ( jsonify( { "success": False, "message": "Bit position must be between 0 and 7", } ), 400, ) current_dataset_id = streamer.current_dataset_id current_variables = streamer.get_dataset_variables(current_dataset_id) # Check if variable exists in current dataset if name not in current_variables: return ( jsonify({"success": False, "message": f"Variable {name} not found"}), 404, ) # Check if new name already exists (if name is changing) if name != new_name and new_name in current_variables: return ( jsonify( { "success": False, "message": f"Variable {new_name} already exists", } ), 400, ) # Preserve streaming state streaming_vars = streamer.datasets[current_dataset_id].get( "streaming_variables", [] ) was_streaming = name in streaming_vars # Remove old variable streamer.remove_variable_from_dataset(current_dataset_id, name) # Add updated variable streamer.add_variable_to_dataset( current_dataset_id, new_name, area, db, offset, var_type, bit, was_streaming ) return jsonify({"success": True, "message": f"Variable updated successfully"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/variables//streaming", methods=["POST"]) def toggle_variable_streaming(name): """Toggle streaming for a specific variable in current dataset""" try: if not streamer.current_dataset_id: return jsonify({"success": False, "message": "No dataset selected"}), 400 data = request.get_json() enabled = data.get("enabled", False) # Use the new dataset-specific method streamer.toggle_variable_streaming(streamer.current_dataset_id, name, enabled) status = "enabled" if enabled else "disabled" return jsonify( {"success": True, "message": f"Variable {name} streaming {status}"} ) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/variables/streaming", methods=["GET"]) def get_streaming_variables(): """Get list of variables enabled for streaming in current dataset""" if streamer is None: return jsonify({"success": False, "error": "Streamer not initialized"}), 503 # Get streaming variables from current dataset streaming_vars = [] if streamer.current_dataset_id and streamer.current_dataset_id in streamer.datasets: streaming_vars = streamer.datasets[streamer.current_dataset_id].get( "streaming_variables", [] ) return jsonify({"success": True, "streaming_variables": streaming_vars}) @app.route("/api/datasets//variables/values", methods=["GET"]) def get_dataset_variable_values(dataset_id): """Get current values of all variables in a dataset - CACHE ONLY""" error_response = check_streamer_initialized() if error_response: return error_response try: # Check if dataset exists if dataset_id not in streamer.datasets: return ( jsonify( {"success": False, "message": f"Dataset '{dataset_id}' not found"} ), 404, ) # Get dataset variables dataset_variables = streamer.get_dataset_variables(dataset_id) if not dataset_variables: return jsonify( { "success": True, "message": "No variables defined in this dataset", "values": {}, "source": "no_variables", } ) # Check if dataset is active (required for cache to be populated) if dataset_id not in streamer.active_datasets: return jsonify( { "success": False, "message": f"Dataset '{dataset_id}' is not active. Activate the dataset to start reading variables and populate cache.", "error_type": "dataset_inactive", "values": {}, "detailed_errors": {}, "stats": { "success": 0, "failed": 0, "total": len(dataset_variables), }, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": "no_cache_dataset_inactive", "is_cached": False, } ) # Check if PLC is connected (required for streaming to populate cache) if not streamer.plc_client.is_connected(): return jsonify( { "success": False, "message": "PLC not connected. Connect to PLC and activate dataset to populate cache.", "error_type": "plc_disconnected", "values": {}, "detailed_errors": {}, "stats": { "success": 0, "failed": 0, "total": len(dataset_variables), }, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": "no_cache_plc_disconnected", "is_cached": False, } ) # Get cached values - this is the ONLY source of data according to application principles if not streamer.has_cached_values(dataset_id): return jsonify( { "success": False, "message": f"No cached values available for dataset '{dataset_id}'. Cache is populated by the streaming process at the dataset's configured interval. Please wait for the next reading cycle.", "error_type": "no_cache_available", "values": {}, "detailed_errors": {}, "stats": { "success": 0, "failed": 0, "total": len(dataset_variables), }, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": "no_cache_available", "is_cached": False, "note": f"Dataset reads every {streamer.config_manager.get_dataset_sampling_interval(dataset_id)}s", } ) # Get cached values (the ONLY valid source according to application design) read_result = streamer.get_cached_dataset_values(dataset_id) # Convert timestamp from ISO format to readable format for consistency if read_result.get("timestamp"): try: cached_timestamp = datetime.fromisoformat(read_result["timestamp"]) read_result["timestamp"] = cached_timestamp.strftime( "%Y-%m-%d %H:%M:%S" ) except: pass # Keep original timestamp if conversion fails # Extract values and handle diagnostics if not read_result.get("success", False): # Complete failure case error_msg = read_result.get("error", "Unknown error reading variables") error_type = read_result.get("error_type", "unknown") # Log detailed error information if streamer.logger: streamer.logger.error( f"Cached values indicate failure for dataset '{dataset_id}': {error_msg}" ) if read_result.get("errors"): for var_name, var_error in read_result["errors"].items(): streamer.logger.error(f" Variable '{var_name}': {var_error}") return ( jsonify( { "success": False, "message": error_msg, "error_type": error_type, "values": {}, "detailed_errors": read_result.get("errors", {}), "stats": read_result.get("stats", {}), "timestamp": read_result.get( "timestamp", datetime.now().strftime("%Y-%m-%d %H:%M:%S") ), "source": "cache", "is_cached": True, } ), 500, ) # Success or partial success case raw_values = read_result.get("values", {}) variable_errors = read_result.get("errors", {}) stats = read_result.get("stats", {}) # Format values for display formatted_values = {} error_details = {} for var_name, value in raw_values.items(): if value is not None: var_config = dataset_variables[var_name] var_type = var_config.get("type", "unknown") # Format value based on type try: if var_type == "real": formatted_values[var_name] = ( f"{value:.3f}" if isinstance(value, (int, float)) else str(value) ) elif var_type == "bool": formatted_values[var_name] = "TRUE" if value else "FALSE" elif var_type in [ "int", "uint", "dint", "udint", "word", "byte", "sint", "usint", ]: formatted_values[var_name] = ( str(int(value)) if isinstance(value, (int, float)) else str(value) ) else: formatted_values[var_name] = str(value) except Exception as format_error: formatted_values[var_name] = "FORMAT_ERROR" error_details[var_name] = f"Format error: {str(format_error)}" else: # Variable had an error - get the specific error message specific_error = variable_errors.get(var_name, "Unknown error") formatted_values[var_name] = "ERROR" error_details[var_name] = specific_error # Prepare response message total_vars = stats.get("total", len(dataset_variables)) success_vars = stats.get("success", 0) failed_vars = stats.get("failed", 0) # Determine data source for message (always cache now) data_source = "cache" source_text = " (from streaming cache)" if failed_vars == 0: message = ( f"Successfully displaying all {success_vars} variables{source_text}" ) response_success = True else: message = f"Partial data available: {success_vars}/{total_vars} variables have valid cached values, {failed_vars} failed{source_text}" response_success = True # Still success if we got some values # Log warnings for partial failures if streamer.logger: streamer.logger.warning( f"Partial failure in cached values for dataset '{dataset_id}': {message}" ) for var_name, var_error in error_details.items(): if formatted_values.get(var_name) == "ERROR": streamer.logger.warning(f" Variable '{var_name}': {var_error}") return jsonify( { "success": response_success, "message": message, "values": formatted_values, "detailed_errors": error_details, "stats": stats, "timestamp": read_result.get( "timestamp", datetime.now().strftime("%Y-%m-%d %H:%M:%S") ), "warning": read_result.get("warning"), "source": data_source, "is_cached": True, "cache_info": f"Dataset reads every {streamer.config_manager.get_dataset_sampling_interval(dataset_id)}s", } ) except Exception as e: return ( jsonify( { "success": False, "message": f"Error retrieving cached variable values: {str(e)}", "values": {}, } ), 500, ) # Dataset Management API Endpoints @app.route("/api/datasets", methods=["GET"]) def get_datasets(): """Get all datasets""" error_response = check_streamer_initialized() if error_response: return error_response return jsonify( { "success": True, "datasets": streamer.datasets, "active_datasets": list(streamer.active_datasets), "current_dataset_id": streamer.current_dataset_id, } ) @app.route("/api/datasets", methods=["POST"]) def create_dataset(): """Create a new dataset""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() dataset_id = data.get("dataset_id", "").strip() name = data.get("name", "").strip() prefix = data.get("prefix", "").strip() sampling_interval = data.get("sampling_interval") if not dataset_id or not name or not prefix: return ( jsonify( { "success": False, "message": "Dataset ID, name, and prefix are required", } ), 400, ) # Validate sampling interval if provided if sampling_interval is not None: sampling_interval = float(sampling_interval) if sampling_interval < 0.01: return ( jsonify( { "success": False, "message": "Sampling interval must be at least 0.01 seconds", } ), 400, ) streamer.create_dataset(dataset_id, name, prefix, sampling_interval) return jsonify( { "success": True, "message": f"Dataset '{name}' created successfully", "dataset_id": dataset_id, } ) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 400 except Exception as e: return ( jsonify({"success": False, "message": f"Error creating dataset: {str(e)}"}), 500, ) @app.route("/api/datasets/", methods=["DELETE"]) def delete_dataset(dataset_id): """Delete a dataset""" error_response = check_streamer_initialized() if error_response: return error_response try: streamer.delete_dataset(dataset_id) return jsonify({"success": True, "message": "Dataset deleted successfully"}) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 404 except Exception as e: return ( jsonify({"success": False, "message": f"Error deleting dataset: {str(e)}"}), 500, ) @app.route("/api/datasets//activate", methods=["POST"]) def activate_dataset(dataset_id): """Activate a dataset for streaming""" error_response = check_streamer_initialized() if error_response: return error_response try: streamer.activate_dataset(dataset_id) return jsonify({"success": True, "message": "Dataset activated successfully"}) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 404 except RuntimeError as e: return jsonify({"success": False, "message": str(e)}), 400 except Exception as e: return ( jsonify( {"success": False, "message": f"Error activating dataset: {str(e)}"} ), 500, ) @app.route("/api/datasets//deactivate", methods=["POST"]) def deactivate_dataset(dataset_id): """Deactivate a dataset""" error_response = check_streamer_initialized() if error_response: return error_response try: streamer.deactivate_dataset(dataset_id) return jsonify({"success": True, "message": "Dataset deactivated successfully"}) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 404 except Exception as e: return ( jsonify( {"success": False, "message": f"Error deactivating dataset: {str(e)}"} ), 500, ) @app.route("/api/datasets//variables", methods=["POST"]) def add_variable_to_dataset(dataset_id): """Add a variable to a dataset""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() name = data.get("name", "").strip() area = data.get("area", "").strip() db = data.get("db", 1) offset = data.get("offset", 0) var_type = data.get("type", "").strip() bit = data.get("bit") streaming = data.get("streaming", False) if not name or not area or not var_type: return ( jsonify( { "success": False, "message": "Variable name, area, and type are required", } ), 400, ) streamer.add_variable_to_dataset( dataset_id, name, area, db, offset, var_type, bit, streaming ) return jsonify( { "success": True, "message": f"Variable '{name}' added to dataset successfully", } ) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 400 except Exception as e: return ( jsonify({"success": False, "message": f"Error adding variable: {str(e)}"}), 500, ) @app.route("/api/datasets//variables/", methods=["DELETE"]) def remove_variable_from_dataset(dataset_id, variable_name): """Remove a variable from a dataset""" error_response = check_streamer_initialized() if error_response: return error_response try: streamer.remove_variable_from_dataset(dataset_id, variable_name) return jsonify( { "success": True, "message": f"Variable '{variable_name}' removed from dataset successfully", } ) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 404 except Exception as e: return ( jsonify( {"success": False, "message": f"Error removing variable: {str(e)}"} ), 500, ) @app.route( "/api/datasets//variables//streaming", methods=["POST"] ) def toggle_variable_streaming_in_dataset(dataset_id, variable_name): """Toggle streaming for a variable in a dataset""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() enabled = data.get("enabled", False) streamer.toggle_variable_streaming(dataset_id, variable_name, enabled) return jsonify( { "success": True, "message": f"Variable '{variable_name}' streaming {'enabled' if enabled else 'disabled'}", } ) except ValueError as e: return jsonify({"success": False, "message": str(e)}), 404 except Exception as e: return ( jsonify( {"success": False, "message": f"Error toggling streaming: {str(e)}"} ), 500, ) @app.route("/api/datasets/current", methods=["POST"]) def set_current_dataset(): """Set the current dataset for editing""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() dataset_id = data.get("dataset_id") if dataset_id and dataset_id in streamer.datasets: streamer.current_dataset_id = dataset_id streamer.save_datasets() return jsonify( { "success": True, "message": "Current dataset updated successfully", "current_dataset_id": dataset_id, } ) else: return jsonify({"success": False, "message": "Invalid dataset ID"}), 400 except Exception as e: return ( jsonify( { "success": False, "message": f"Error setting current dataset: {str(e)}", } ), 500, ) @app.route("/api/streaming/start", methods=["POST"]) def start_streaming(): """Start UDP streaming (legacy endpoint - now only starts UDP streaming)""" error_response = check_streamer_initialized() if error_response: return error_response if streamer.start_streaming(): return jsonify({"success": True, "message": "UDP streaming started"}) else: return ( jsonify({"success": False, "message": "Error starting UDP streaming"}), 500, ) @app.route("/api/streaming/stop", methods=["POST"]) def stop_streaming(): """Stop UDP streaming (legacy endpoint - now only stops UDP streaming)""" error_response = check_streamer_initialized() if error_response: return error_response streamer.stop_streaming() return jsonify({"success": True, "message": "UDP streaming stopped"}) @app.route("/api/sampling", methods=["POST"]) def update_sampling(): """Update sampling interval""" try: data = request.get_json() interval = float(data.get("interval", 0.1)) if interval < 0.01: interval = 0.01 streamer.update_sampling_interval(interval) return jsonify({"success": True, "message": f"Interval updated to {interval}s"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/csv/start", methods=["POST"]) def start_csv_recording_legacy(): """Start CSV recording independently (legacy endpoint)""" error_response = check_streamer_initialized() if error_response: return error_response if streamer.data_streamer.start_csv_recording(): return jsonify({"success": True, "message": "CSV recording started"}) else: return ( jsonify({"success": False, "message": "Error starting CSV recording"}), 500, ) @app.route("/api/csv/stop", methods=["POST"]) def stop_csv_recording_legacy(): """Stop CSV recording independently (legacy endpoint)""" error_response = check_streamer_initialized() if error_response: return error_response streamer.data_streamer.stop_csv_recording() return jsonify({"success": True, "message": "CSV recording stopped"}) @app.route("/api/csv/recording/start", methods=["POST"]) def start_csv_recording(): """Start CSV recording independently of UDP streaming""" error_response = check_streamer_initialized() if error_response: return error_response if streamer.data_streamer.start_csv_recording(): return jsonify({"success": True, "message": "CSV recording started"}) else: return ( jsonify({"success": False, "message": "Error starting CSV recording"}), 500, ) @app.route("/api/csv/recording/stop", methods=["POST"]) def stop_csv_recording(): """Stop CSV recording independently of UDP streaming""" error_response = check_streamer_initialized() if error_response: return error_response streamer.data_streamer.stop_csv_recording() return jsonify({"success": True, "message": "CSV recording stopped"}) # 🔑 NEW: UDP Streaming Control (Independent) @app.route("/api/udp/streaming/start", methods=["POST"]) def start_udp_streaming(): """Start UDP streaming to PlotJuggler independently of CSV recording""" error_response = check_streamer_initialized() if error_response: return error_response if streamer.data_streamer.start_udp_streaming(): return jsonify( {"success": True, "message": "UDP streaming to PlotJuggler started"} ) else: return ( jsonify({"success": False, "message": "Error starting UDP streaming"}), 500, ) @app.route("/api/udp/streaming/stop", methods=["POST"]) def stop_udp_streaming(): """Stop UDP streaming to PlotJuggler independently of CSV recording""" error_response = check_streamer_initialized() if error_response: return error_response streamer.data_streamer.stop_udp_streaming() return jsonify({"success": True, "message": "UDP streaming to PlotJuggler stopped"}) # 📈 PLOT MANAGER API ENDPOINTS @app.route("/api/plots", methods=["GET"]) def get_plots(): """Get all plot sessions status""" error_response = check_streamer_initialized() if error_response: return error_response try: sessions = streamer.data_streamer.plot_manager.get_all_sessions_status() return jsonify({"sessions": sessions}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/plots", methods=["POST"]) def create_plot(): """Create a new plot session""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() # Validar datos requeridos if not data.get("variables"): return jsonify({"error": "At least one variable is required"}), 400 if not data.get("time_window"): return jsonify({"error": "Time window is required"}), 400 # Validar que las variables existen en datasets activos available_vars = streamer.data_streamer.plot_manager.get_available_variables( streamer.data_streamer.get_active_datasets(), streamer.config_manager.datasets, ) invalid_vars = [var for var in data["variables"] if var not in available_vars] if invalid_vars: return ( jsonify( { "error": f"Variables not available: {', '.join(invalid_vars)}", "available_variables": available_vars, } ), 400, ) # Validar trigger si está habilitado if data.get("trigger_enabled") and data.get("trigger_variable"): boolean_vars = streamer.data_streamer.plot_manager.get_boolean_variables( streamer.data_streamer.get_active_datasets(), streamer.config_manager.datasets, ) if data["trigger_variable"] not in boolean_vars: return ( jsonify( { "error": f"Trigger variable '{data['trigger_variable']}' is not a boolean variable", "boolean_variables": boolean_vars, } ), 400, ) # Crear configuración de la sesión config = { "name": data.get( "name", f"Plot {len(streamer.data_streamer.plot_manager.sessions) + 1}" ), "variables": data["variables"], "time_window": int(data["time_window"]), "y_min": data.get("y_min"), "y_max": data.get("y_max"), "trigger_variable": data.get("trigger_variable"), "trigger_enabled": data.get("trigger_enabled", False), "trigger_on_true": data.get("trigger_on_true", True), } # Convertir valores numéricos si están presentes if config["y_min"] is not None: config["y_min"] = float(config["y_min"]) if config["y_max"] is not None: config["y_max"] = float(config["y_max"]) session_id = streamer.data_streamer.plot_manager.create_session(config) return jsonify( { "success": True, "session_id": session_id, "message": f"Plot session '{config['name']}' created successfully", } ) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/plots/", methods=["DELETE"]) def remove_plot(session_id): """Remove a plot session""" error_response = check_streamer_initialized() if error_response: return error_response try: success = streamer.data_streamer.plot_manager.remove_session(session_id) if success: return jsonify({"success": True, "message": "Plot session removed"}) else: return jsonify({"error": "Plot session not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/plots//control", methods=["POST"]) def control_plot(session_id): """Control a plot session (start/stop/pause/resume/clear)""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() action = data.get("action") if action not in ["start", "stop", "pause", "resume", "clear"]: return ( jsonify( {"error": "Invalid action. Use: start, stop, pause, resume, clear"} ), 400, ) success = streamer.data_streamer.plot_manager.control_session( session_id, action ) if success: return jsonify({"success": True, "message": f"Plot session {action}ed"}) else: return jsonify({"error": "Plot session not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/plots//data", methods=["GET"]) def get_plot_data(session_id): """Get plot data for a specific session""" error_response = check_streamer_initialized() if error_response: return error_response try: plot_data = streamer.data_streamer.plot_manager.get_session_data(session_id) if plot_data: return jsonify(plot_data) else: return jsonify({"error": "Plot session not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/plots//config", methods=["GET"]) def get_plot_config(session_id): """Get plot configuration for a specific session""" error_response = check_streamer_initialized() if error_response: return error_response try: config = streamer.data_streamer.plot_manager.get_session_config(session_id) if config: return jsonify({"success": True, "config": config}) else: return jsonify({"error": "Plot session not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/plots//config", methods=["PUT"]) def update_plot_config(session_id): """Update plot configuration for a specific session""" error_response = check_streamer_initialized() if error_response: return error_response try: data = request.get_json() # Validar datos requeridos if not data.get("variables"): return jsonify({"error": "At least one variable is required"}), 400 if not data.get("time_window"): return jsonify({"error": "Time window is required"}), 400 # Validar que las variables existen en datasets activos available_vars = streamer.data_streamer.plot_manager.get_available_variables( streamer.data_streamer.get_active_datasets(), streamer.config_manager.datasets, ) invalid_vars = [var for var in data["variables"] if var not in available_vars] if invalid_vars: return ( jsonify( { "error": f"Variables not available: {', '.join(invalid_vars)}", "available_variables": available_vars, } ), 400, ) # Validar trigger si está habilitado if data.get("trigger_enabled") and data.get("trigger_variable"): boolean_vars = streamer.data_streamer.plot_manager.get_boolean_variables( streamer.data_streamer.get_active_datasets(), streamer.config_manager.datasets, ) if data["trigger_variable"] not in boolean_vars: return ( jsonify( { "error": f"Trigger variable '{data['trigger_variable']}' is not a boolean variable", "boolean_variables": boolean_vars, } ), 400, ) # Crear configuración actualizada config = { "name": data.get("name", f"Plot {session_id}"), "variables": data["variables"], "time_window": int(data["time_window"]), "y_min": data.get("y_min"), "y_max": data.get("y_max"), "trigger_variable": data.get("trigger_variable"), "trigger_enabled": data.get("trigger_enabled", False), "trigger_on_true": data.get("trigger_on_true", True), } # Convertir valores numéricos si están presentes if config["y_min"] is not None: config["y_min"] = float(config["y_min"]) if config["y_max"] is not None: config["y_max"] = float(config["y_max"]) # Actualizar configuración success = streamer.data_streamer.plot_manager.update_session_config( session_id, config ) if success: return jsonify( { "success": True, "message": f"Plot session '{config['name']}' updated successfully", } ) else: return jsonify({"error": "Plot session not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/plots/variables", methods=["GET"]) def get_plot_variables(): """Get available variables for plotting""" error_response = check_streamer_initialized() if error_response: return error_response try: available_vars = streamer.data_streamer.plot_manager.get_available_variables( streamer.data_streamer.get_active_datasets(), streamer.config_manager.datasets, ) boolean_vars = streamer.data_streamer.plot_manager.get_boolean_variables( streamer.data_streamer.get_active_datasets(), streamer.config_manager.datasets, ) return jsonify( { "available_variables": available_vars, "boolean_variables": boolean_vars, "active_datasets_count": len( streamer.data_streamer.get_active_datasets() ), } ) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/status") def get_status(): """Get current status""" if streamer is None: return jsonify({"error": "Application not initialized"}), 503 return jsonify(streamer.get_status()) @app.route("/api/events") def get_events(): """Get recent events from the application log""" error_response = check_streamer_initialized() if error_response: return error_response try: limit = request.args.get("limit", 50, type=int) limit = min(limit, 200) # Maximum 200 events per request events = streamer.get_recent_events(limit) return jsonify( { "success": True, "events": events, "total_events": len(streamer.event_logger.events_log), "showing": len(events), } ) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/stream/variables", methods=["GET"]) def stream_variables(): """Stream variable values in real-time using Server-Sent Events""" error_response = check_streamer_initialized() if error_response: return error_response # Get request parameters outside the generator dataset_id = request.args.get("dataset_id") interval = float(request.args.get("interval", 1.0)) # Default 1 second if not dataset_id: return jsonify({"error": "Dataset ID required"}), 400 if dataset_id not in streamer.datasets: return jsonify({"error": f"Dataset {dataset_id} not found"}), 404 def generate(): """Generate SSE data stream - CACHE ONLY""" # Send initial connection message yield f"data: {json.dumps({'type': 'connected', 'message': 'SSE connection established - monitoring cache'})}\n\n" last_values = {} while True: try: # Check basic preconditions for cache availability if not streamer.plc_client.is_connected(): # PLC not connected - cache won't be populated data = { "type": "plc_disconnected", "message": "PLC not connected - cache not being populated", "timestamp": datetime.now().isoformat(), } yield f"data: {json.dumps(data)}\n\n" time.sleep(interval) continue # Check if dataset is active if dataset_id not in streamer.active_datasets: data = { "type": "dataset_inactive", "message": f"Dataset '{dataset_id}' is not active - activate to populate cache", "timestamp": datetime.now().isoformat(), } yield f"data: {json.dumps(data)}\n\n" time.sleep(interval) continue # Get dataset variables dataset_variables = streamer.get_dataset_variables(dataset_id) if not dataset_variables: # No variables in dataset data = { "type": "no_variables", "message": "No variables defined in this dataset", "timestamp": datetime.now().isoformat(), } yield f"data: {json.dumps(data)}\n\n" time.sleep(interval) continue # Get cached values - the ONLY source according to application principles if not streamer.has_cached_values(dataset_id): # No cache available yet - dataset might be starting up sampling_interval = ( streamer.config_manager.get_dataset_sampling_interval( dataset_id ) ) data = { "type": "no_cache", "message": f"Waiting for cache to be populated (dataset reads every {sampling_interval}s)", "timestamp": datetime.now().isoformat(), "sampling_interval": sampling_interval, } yield f"data: {json.dumps(data)}\n\n" time.sleep(interval) continue # Get cached values (the ONLY valid source) read_result = streamer.get_cached_dataset_values(dataset_id) if read_result.get("success", False): values = read_result.get("values", {}) timestamp = read_result.get("timestamp", datetime.now().isoformat()) # Format values for display formatted_values = {} for var_name, value in values.items(): if value is not None: var_config = dataset_variables[var_name] var_type = var_config.get("type", "unknown") try: if var_type == "real": formatted_values[var_name] = ( f"{value:.3f}" if isinstance(value, (int, float)) else str(value) ) elif var_type == "bool": formatted_values[var_name] = ( "TRUE" if value else "FALSE" ) elif var_type in [ "int", "uint", "dint", "udint", "word", "byte", "sint", "usint", ]: formatted_values[var_name] = ( str(int(value)) if isinstance(value, (int, float)) else str(value) ) else: formatted_values[var_name] = str(value) except: formatted_values[var_name] = "FORMAT_ERROR" else: formatted_values[var_name] = "ERROR" # Only send if values changed if formatted_values != last_values: data = { "type": "values", "values": formatted_values, "timestamp": timestamp, "source": "cache", "stats": read_result.get("stats", {}), "cache_age_info": f"Dataset reads every {streamer.config_manager.get_dataset_sampling_interval(dataset_id)}s", } yield f"data: {json.dumps(data)}\n\n" last_values = formatted_values.copy() else: # Send error data from cache error_data = { "type": "cache_error", "message": read_result.get( "error", "Cached data indicates error" ), "timestamp": datetime.now().isoformat(), "error_type": read_result.get("error_type", "unknown"), "source": "cache", } yield f"data: {json.dumps(error_data)}\n\n" time.sleep(interval) except Exception as e: error_data = { "type": "stream_error", "message": f"SSE stream error: {str(e)}", "timestamp": datetime.now().isoformat(), "source": "cache_monitoring", } yield f"data: {json.dumps(error_data)}\n\n" time.sleep(interval) return Response( generate(), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "Cache-Control", }, ) @app.route("/api/stream/status", methods=["GET"]) def stream_status(): """Stream application status in real-time using Server-Sent Events""" error_response = check_streamer_initialized() if error_response: return error_response # Get request parameters outside the generator interval = float(request.args.get("interval", 2.0)) # Default 2 seconds def generate(): """Generate SSE status stream""" last_status = None # Send initial connection message yield f"data: {json.dumps({'type': 'connected', 'message': 'Status stream connected'})}\n\n" while True: try: # Get current status current_status = streamer.get_status() # Only send if status changed if current_status != last_status: data = { "type": "status", "status": current_status, "timestamp": datetime.now().isoformat(), } yield f"data: {json.dumps(data)}\n\n" last_status = current_status time.sleep(interval) except Exception as e: error_data = { "type": "error", "message": f"Status stream error: {str(e)}", "timestamp": datetime.now().isoformat(), } yield f"data: {json.dumps(error_data)}\n\n" time.sleep(interval) return Response( generate(), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "Cache-Control", }, ) def graceful_shutdown(): """Perform graceful shutdown""" print("\n⏹️ Performing graceful shutdown...") try: streamer.stop_streaming() streamer.disconnect_plc() streamer.release_instance_lock() print("✅ Shutdown completed successfully") except Exception as e: print(f"⚠️ Error during shutdown: {e}") def main(): """Main application entry point with error handling and recovery""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: # Create templates directory if it doesn't exist os.makedirs("templates", exist_ok=True) print("🚀 Starting Flask server for PLC S7-315 Streamer") print("📊 Web interface available at: http://localhost:5050") print("🔧 Configure your PLC and variables through the web interface") # Initialize streamer (this will handle instance locking and auto-recovery) global streamer # Start Flask application app.run(debug=False, host="0.0.0.0", port=5050, use_reloader=False) # If we reach here, the server stopped normally break except RuntimeError as e: if "Another instance" in str(e): print(f"❌ {e}") print("💡 Tip: Stop the other instance or wait for it to finish") sys.exit(1) else: print(f"⚠️ Runtime error: {e}") retry_count += 1 except KeyboardInterrupt: print("\n⏸️ Received interrupt signal...") graceful_shutdown() break except Exception as e: print(f"💥 Unexpected error: {e}") retry_count += 1 if retry_count < max_retries: print(f"🔄 Attempting restart ({retry_count}/{max_retries})...") time.sleep(2) # Wait before retry else: print("❌ Maximum retries reached. Exiting...") graceful_shutdown() sys.exit(1) if __name__ == "__main__": try: # Initialize streamer instance streamer = PLCDataStreamer() main() except Exception as e: print(f"💥 Critical error during initialization: {e}") sys.exit(1)