from flask import ( Flask, render_template, request, jsonify, redirect, url_for, send_from_directory, ) import snap7 import json import socket import time import logging import threading from datetime import datetime from typing import Dict, Any, Optional import struct import os import csv from pathlib import Path app = Flask(__name__) app.secret_key = "plc_streamer_secret_key" class PLCDataStreamer: def __init__(self): """Initialize the PLC data streamer""" # Configuration file paths self.config_file = "plc_config.json" self.variables_file = "plc_variables.json" # Default configuration self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2} self.udp_config = {"host": "127.0.0.1", "port": 9870} # Configurable variables self.variables = {} self.streaming_variables = set() # Variables selected for streaming # CSV recording settings self.csv_recording = False self.csv_record_thread = None self.current_csv_file = None self.current_csv_writer = None self.current_hour = None self.csv_headers_written = False # System states self.plc = None self.udp_socket = None self.connected = False self.streaming = False self.stream_thread = None self.sampling_interval = 0.1 # Setup logging first self.setup_logging() # Load configuration from files self.load_configuration() self.load_variables() def setup_logging(self): """Configure the logging system""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()], ) self.logger = logging.getLogger(__name__) def load_configuration(self): """Load PLC and UDP configuration from JSON file""" try: if os.path.exists(self.config_file): with open(self.config_file, "r") as f: config = json.load(f) self.plc_config = config.get("plc_config", self.plc_config) self.udp_config = config.get("udp_config", self.udp_config) self.sampling_interval = config.get( "sampling_interval", self.sampling_interval ) self.logger.info(f"Configuration loaded from {self.config_file}") else: self.logger.info("No configuration file found, using defaults") except Exception as e: self.logger.error(f"Error loading configuration: {e}") def save_configuration(self): """Save PLC and UDP configuration to JSON file""" try: config = { "plc_config": self.plc_config, "udp_config": self.udp_config, "sampling_interval": self.sampling_interval, } with open(self.config_file, "w") as f: json.dump(config, f, indent=4) self.logger.info(f"Configuration saved to {self.config_file}") except Exception as e: self.logger.error(f"Error saving configuration: {e}") def load_variables(self): """Load variables configuration from JSON file""" try: if os.path.exists(self.variables_file): with open(self.variables_file, "r") as f: self.variables = json.load(f) self.logger.info( f"Variables loaded from {self.variables_file}: {len(self.variables)} variables" ) else: self.logger.info( "No variables file found, starting with empty variables" ) except Exception as e: self.logger.error(f"Error loading variables: {e}") def save_variables(self): """Save variables configuration to JSON file""" try: with open(self.variables_file, "w") as f: json.dump(self.variables, f, indent=4) self.logger.info(f"Variables saved to {self.variables_file}") except Exception as e: self.logger.error(f"Error saving variables: {e}") def update_plc_config(self, ip: str, rack: int, slot: int): """Update PLC configuration""" self.plc_config = {"ip": ip, "rack": rack, "slot": slot} self.save_configuration() self.logger.info(f"PLC configuration updated: {self.plc_config}") def update_udp_config(self, host: str, port: int): """Update UDP configuration""" self.udp_config = {"host": host, "port": port} self.save_configuration() self.logger.info(f"UDP configuration updated: {self.udp_config}") def update_sampling_interval(self, interval: float): """Update sampling interval""" self.sampling_interval = interval self.save_configuration() self.logger.info(f"Sampling interval updated: {interval}s") def add_variable(self, name: str, db: int, offset: int, var_type: str): """Add a variable for polling""" self.variables[name] = {"db": db, "offset": offset, "type": var_type} self.save_variables() self.logger.info(f"Variable added: {name} -> DB{db}.{offset} ({var_type})") def remove_variable(self, name: str): """Remove a variable from polling""" if name in self.variables: del self.variables[name] # Also remove from streaming variables if present self.streaming_variables.discard(name) self.save_variables() self.logger.info(f"Variable removed: {name}") def toggle_streaming_variable(self, name: str, enabled: bool): """Enable or disable a variable for streaming""" if name in self.variables: if enabled: self.streaming_variables.add(name) else: self.streaming_variables.discard(name) self.logger.info( f"Variable {name} streaming: {'enabled' if enabled else 'disabled'}" ) def get_csv_directory_path(self) -> str: """Get the directory path for current day's CSV files""" now = datetime.now() day_folder = now.strftime("%d-%m-%Y") return os.path.join("records", day_folder) def get_csv_file_path(self) -> str: """Get the complete file path for current hour's CSV file""" now = datetime.now() hour = now.strftime("%H") directory = self.get_csv_directory_path() return os.path.join(directory, f"{hour}.csv") def ensure_csv_directory(self): """Create CSV directory structure if it doesn't exist""" directory = self.get_csv_directory_path() Path(directory).mkdir(parents=True, exist_ok=True) def setup_csv_file(self): """Setup CSV file for the current hour""" current_hour = datetime.now().hour # Check if we need to create a new file if self.current_hour != current_hour or self.current_csv_file is None: # Close previous file if open if self.current_csv_file: self.current_csv_file.close() # Create directory and file for current hour self.ensure_csv_directory() csv_path = self.get_csv_file_path() # Check if file exists to determine if we need headers file_exists = os.path.exists(csv_path) self.current_csv_file = open(csv_path, "a", newline="", encoding="utf-8") self.current_csv_writer = csv.writer(self.current_csv_file) self.current_hour = current_hour # Write headers if it's a new file if not file_exists and self.variables: headers = ["timestamp"] + list(self.variables.keys()) self.current_csv_writer.writerow(headers) self.current_csv_file.flush() self.csv_headers_written = True self.logger.info(f"CSV file created: {csv_path}") def write_csv_data(self, data: Dict[str, Any]): """Write data to CSV file""" if not self.csv_recording or not self.variables: return try: self.setup_csv_file() if self.current_csv_writer: # Create timestamp timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Create row with all variables (use None for missing values) row = [timestamp] for var_name in self.variables.keys(): row.append(data.get(var_name, None)) self.current_csv_writer.writerow(row) self.current_csv_file.flush() except Exception as e: self.logger.error(f"Error writing CSV data: {e}") def get_streaming_data(self, all_data: Dict[str, Any]) -> Dict[str, Any]: """Filter data for streaming based on selected variables""" if not self.streaming_variables: return all_data return { name: value for name, value in all_data.items() if name in self.streaming_variables } def start_csv_recording(self) -> bool: """Start CSV recording""" if not self.connected: self.logger.error("PLC not connected") return False if not self.variables: self.logger.error("No variables configured") return False self.csv_recording = True self.logger.info("CSV recording started") return True def stop_csv_recording(self): """Stop CSV recording""" self.csv_recording = False if self.current_csv_file: self.current_csv_file.close() self.current_csv_file = None self.current_csv_writer = None self.current_hour = None self.logger.info("CSV recording stopped") def connect_plc(self) -> bool: """Connect to S7-315 PLC""" try: if self.plc: self.plc.disconnect() self.plc = snap7.client.Client() self.plc.connect( self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"] ) self.connected = True self.logger.info(f"Connected to PLC {self.plc_config['ip']}") return True except Exception as e: self.connected = False self.logger.error(f"Error connecting to PLC: {e}") return False def disconnect_plc(self): """Disconnect from PLC""" try: if self.plc: self.plc.disconnect() self.connected = False self.logger.info("Disconnected from PLC") except Exception as e: self.logger.error(f"Error disconnecting from PLC: {e}") def read_variable(self, var_config: Dict[str, Any]) -> Any: """Read a specific variable from the PLC""" try: db = var_config["db"] offset = var_config["offset"] var_type = var_config["type"] if var_type == "real": raw_data = self.plc.db_read(db, offset, 4) value = struct.unpack(">f", raw_data)[0] elif var_type == "int": raw_data = self.plc.db_read(db, offset, 2) value = struct.unpack(">h", raw_data)[0] elif var_type == "bool": raw_data = self.plc.db_read(db, offset, 1) value = bool(raw_data[0] & 0x01) elif var_type == "dint": raw_data = self.plc.db_read(db, offset, 4) value = struct.unpack(">l", raw_data)[0] else: return None return value except Exception as e: self.logger.error(f"Error reading variable: {e}") return None def read_all_variables(self) -> Dict[str, Any]: """Read all configured variables""" if not self.connected or not self.plc: return {} data = {} for var_name, var_config in self.variables.items(): value = self.read_variable(var_config) if value is not None: data[var_name] = value return data def setup_udp_socket(self) -> bool: """Setup UDP socket""" try: if self.udp_socket: self.udp_socket.close() self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.logger.info( f"UDP socket configured for {self.udp_config['host']}:{self.udp_config['port']}" ) return True except Exception as e: self.logger.error(f"Error configuring UDP socket: {e}") return False def send_to_plotjuggler(self, data: Dict[str, Any]): """Send data to PlotJuggler via UDP JSON""" if not self.udp_socket: return try: message = {"timestamp": time.time(), "data": data} json_message = json.dumps(message) self.udp_socket.sendto( json_message.encode("utf-8"), (self.udp_config["host"], self.udp_config["port"]), ) except Exception as e: self.logger.error(f"Error sending data to PlotJuggler: {e}") def streaming_loop(self): """Main streaming loop""" self.logger.info( f"Starting streaming with interval of {self.sampling_interval}s" ) while self.streaming: try: start_time = time.time() # Read all variables all_data = self.read_all_variables() if all_data: # Write to CSV (all variables) self.write_csv_data(all_data) # Get filtered data for streaming streaming_data = self.get_streaming_data(all_data) # Send filtered data to PlotJuggler if streaming_data: self.send_to_plotjuggler(streaming_data) # Log data timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.logger.info( f"[{timestamp}] CSV: {len(all_data)} vars, Streaming: {len(streaming_data)} vars" ) # Maintain sampling interval elapsed = time.time() - start_time sleep_time = max(0, self.sampling_interval - elapsed) time.sleep(sleep_time) except Exception as e: self.logger.error(f"Error in streaming loop: {e}") break def start_streaming(self) -> bool: """Start data streaming""" if not self.connected: self.logger.error("PLC not connected") return False if not self.variables: self.logger.error("No variables configured") return False if not self.setup_udp_socket(): return False # Start CSV recording automatically self.start_csv_recording() self.streaming = True self.stream_thread = threading.Thread(target=self.streaming_loop) self.stream_thread.daemon = True self.stream_thread.start() self.logger.info("Streaming and CSV recording started") return True def stop_streaming(self): """Stop streaming""" self.streaming = False if self.stream_thread: self.stream_thread.join(timeout=2) # Stop CSV recording self.stop_csv_recording() if self.udp_socket: self.udp_socket.close() self.udp_socket = None self.logger.info("Streaming and CSV recording stopped") def get_status(self) -> Dict[str, Any]: """Get current system status""" return { "plc_connected": self.connected, "streaming": self.streaming, "csv_recording": self.csv_recording, "plc_config": self.plc_config, "udp_config": self.udp_config, "variables_count": len(self.variables), "streaming_variables_count": len(self.streaming_variables), "sampling_interval": self.sampling_interval, "current_csv_file": ( self.get_csv_file_path() if self.csv_recording else None ), } # Global streamer instance streamer = PLCDataStreamer() @app.route("/images/") def serve_image(filename): """Serve images from .images directory""" return send_from_directory(".images", filename) @app.route("/") def index(): """Main page""" return render_template( "index.html", status=streamer.get_status(), variables=streamer.variables ) @app.route("/api/plc/config", methods=["POST"]) def update_plc_config(): """Update PLC configuration""" try: data = request.get_json() ip = data.get("ip", "10.1.33.11") rack = int(data.get("rack", 0)) slot = int(data.get("slot", 2)) streamer.update_plc_config(ip, rack, slot) return jsonify({"success": True, "message": "PLC configuration updated"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/udp/config", methods=["POST"]) def update_udp_config(): """Update UDP configuration""" try: data = request.get_json() host = data.get("host", "127.0.0.1") port = int(data.get("port", 9870)) streamer.update_udp_config(host, port) return jsonify({"success": True, "message": "UDP configuration updated"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/plc/connect", methods=["POST"]) def connect_plc(): """Connect to PLC""" if streamer.connect_plc(): return jsonify({"success": True, "message": "Connected to PLC"}) else: return jsonify({"success": False, "message": "Error connecting to PLC"}), 500 @app.route("/api/plc/disconnect", methods=["POST"]) def disconnect_plc(): """Disconnect from PLC""" streamer.stop_streaming() streamer.disconnect_plc() return jsonify({"success": True, "message": "Disconnected from PLC"}) @app.route("/api/variables", methods=["POST"]) def add_variable(): """Add a new variable""" try: data = request.get_json() name = data.get("name") db = int(data.get("db")) offset = int(data.get("offset")) var_type = data.get("type") if not name or var_type not in ["real", "int", "bool", "dint"]: return jsonify({"success": False, "message": "Invalid data"}), 400 streamer.add_variable(name, db, offset, var_type) return jsonify({"success": True, "message": f"Variable {name} added"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/variables/", methods=["DELETE"]) def remove_variable(name): """Remove a variable""" streamer.remove_variable(name) return jsonify({"success": True, "message": f"Variable {name} removed"}) @app.route("/api/variables//streaming", methods=["POST"]) def toggle_variable_streaming(name): """Toggle streaming for a specific variable""" try: data = request.get_json() enabled = data.get("enabled", False) streamer.toggle_streaming_variable(name, enabled) status = "enabled" if enabled else "disabled" return jsonify( {"success": True, "message": f"Variable {name} streaming {status}"} ) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/variables/streaming", methods=["GET"]) def get_streaming_variables(): """Get list of variables enabled for streaming""" return jsonify( {"success": True, "streaming_variables": list(streamer.streaming_variables)} ) @app.route("/api/streaming/start", methods=["POST"]) def start_streaming(): """Start streaming""" if streamer.start_streaming(): return jsonify({"success": True, "message": "Streaming started"}) else: return jsonify({"success": False, "message": "Error starting streaming"}), 500 @app.route("/api/streaming/stop", methods=["POST"]) def stop_streaming(): """Stop streaming""" streamer.stop_streaming() return jsonify({"success": True, "message": "Streaming stopped"}) @app.route("/api/sampling", methods=["POST"]) def update_sampling(): """Update sampling interval""" try: data = request.get_json() interval = float(data.get("interval", 0.1)) if interval < 0.01: interval = 0.01 streamer.update_sampling_interval(interval) return jsonify({"success": True, "message": f"Interval updated to {interval}s"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/csv/start", methods=["POST"]) def start_csv_recording(): """Start CSV recording independently""" if streamer.start_csv_recording(): return jsonify({"success": True, "message": "CSV recording started"}) else: return ( jsonify({"success": False, "message": "Error starting CSV recording"}), 500, ) @app.route("/api/csv/stop", methods=["POST"]) def stop_csv_recording(): """Stop CSV recording independently""" streamer.stop_csv_recording() return jsonify({"success": True, "message": "CSV recording stopped"}) @app.route("/api/status") def get_status(): """Get current status""" return jsonify(streamer.get_status()) if __name__ == "__main__": # Create templates directory if it doesn't exist os.makedirs("templates", exist_ok=True) print("šŸš€ Starting Flask server for PLC S7-315 Streamer") print("šŸ“Š Web interface available at: http://localhost:5050") print("šŸ”§ Configure your PLC and variables through the web interface") try: app.run(debug=True, host="0.0.0.0", port=5050) except KeyboardInterrupt: print("\nā¹ļø Stopping server...") streamer.stop_streaming() streamer.disconnect_plc()