from flask import Flask, render_template, request, jsonify, redirect, url_for import snap7 import json import socket import time import logging import threading from datetime import datetime from typing import Dict, Any, Optional import struct import os app = Flask(__name__) app.secret_key = "plc_streamer_secret_key" class PLCDataStreamer: def __init__(self): """Inicializa el streamer de datos del PLC""" # Configuración por defectoclear self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2} self.udp_config = {"host": "127.0.0.1", "port": 9870} # Variables configurables self.variables = {} # Estados self.plc = None self.udp_socket = None self.connected = False self.streaming = False self.stream_thread = None self.sampling_interval = 0.1 # Configurar logging self.setup_logging() def setup_logging(self): """Configura el sistema de logging""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()], ) self.logger = logging.getLogger(__name__) def update_plc_config(self, ip: str, rack: int, slot: int): """Actualiza la configuración del PLC""" self.plc_config = {"ip": ip, "rack": rack, "slot": slot} self.logger.info(f"Configuración PLC actualizada: {self.plc_config}") def update_udp_config(self, host: str, port: int): """Actualiza la configuración UDP""" self.udp_config = {"host": host, "port": port} self.logger.info(f"Configuración UDP actualizada: {self.udp_config}") def add_variable(self, name: str, db: int, offset: int, var_type: str): """Añade una variable para polling""" self.variables[name] = {"db": db, "offset": offset, "type": var_type} self.logger.info(f"Variable añadida: {name} -> DB{db}.{offset} ({var_type})") def remove_variable(self, name: str): """Elimina una variable del polling""" if name in self.variables: del self.variables[name] self.logger.info(f"Variable eliminada: {name}") def connect_plc(self) -> bool: """Conecta al PLC S7-315""" try: if self.plc: self.plc.disconnect() self.plc = snap7.client.Client() self.plc.connect( self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"] ) self.connected = True self.logger.info(f"Conectado al PLC {self.plc_config['ip']}") return True except Exception as e: self.connected = False self.logger.error(f"Error conectando al PLC: {e}") return False def disconnect_plc(self): """Desconecta del PLC""" try: if self.plc: self.plc.disconnect() self.connected = False self.logger.info("Desconectado del PLC") except Exception as e: self.logger.error(f"Error desconectando del PLC: {e}") def read_variable(self, var_config: Dict[str, Any]) -> Any: """Lee una variable específica del PLC""" try: db = var_config["db"] offset = var_config["offset"] var_type = var_config["type"] if var_type == "real": raw_data = self.plc.db_read(db, offset, 4) value = struct.unpack(">f", raw_data)[0] elif var_type == "int": raw_data = self.plc.db_read(db, offset, 2) value = struct.unpack(">h", raw_data)[0] elif var_type == "bool": raw_data = self.plc.db_read(db, offset, 1) value = bool(raw_data[0] & 0x01) elif var_type == "dint": raw_data = self.plc.db_read(db, offset, 4) value = struct.unpack(">l", raw_data)[0] else: return None return value except Exception as e: self.logger.error(f"Error leyendo variable: {e}") return None def read_all_variables(self) -> Dict[str, Any]: """Lee todas las variables configuradas""" if not self.connected or not self.plc: return {} data = {} for var_name, var_config in self.variables.items(): value = self.read_variable(var_config) if value is not None: data[var_name] = value return data def setup_udp_socket(self) -> bool: """Configura el socket UDP""" try: if self.udp_socket: self.udp_socket.close() self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.logger.info( f"Socket UDP configurado para {self.udp_config['host']}:{self.udp_config['port']}" ) return True except Exception as e: self.logger.error(f"Error configurando socket UDP: {e}") return False def send_to_plotjuggler(self, data: Dict[str, Any]): """Envía datos a PlotJuggler vía UDP JSON""" if not self.udp_socket: return try: message = {"timestamp": time.time(), "data": data} json_message = json.dumps(message) self.udp_socket.sendto( json_message.encode("utf-8"), (self.udp_config["host"], self.udp_config["port"]), ) except Exception as e: self.logger.error(f"Error enviando datos a PlotJuggler: {e}") def streaming_loop(self): """Bucle principal de streaming""" self.logger.info( f"Iniciando streaming con intervalo de {self.sampling_interval}s" ) while self.streaming: try: start_time = time.time() # Leer todas las variables data = self.read_all_variables() if data: # Enviar a PlotJuggler self.send_to_plotjuggler(data) # Log de datos timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] self.logger.info(f"[{timestamp}] {data}") # Mantener intervalo de muestreo elapsed = time.time() - start_time sleep_time = max(0, self.sampling_interval - elapsed) time.sleep(sleep_time) except Exception as e: self.logger.error(f"Error en streaming loop: {e}") break def start_streaming(self) -> bool: """Inicia el streaming de datos""" if not self.connected: self.logger.error("PLC no conectado") return False if not self.variables: self.logger.error("No hay variables configuradas") return False if not self.setup_udp_socket(): return False self.streaming = True self.stream_thread = threading.Thread(target=self.streaming_loop) self.stream_thread.daemon = True self.stream_thread.start() self.logger.info("Streaming iniciado") return True def stop_streaming(self): """Detiene el streaming""" self.streaming = False if self.stream_thread: self.stream_thread.join(timeout=2) if self.udp_socket: self.udp_socket.close() self.udp_socket = None self.logger.info("Streaming detenido") def get_status(self) -> Dict[str, Any]: """Obtiene el estado actual del sistema""" return { "plc_connected": self.connected, "streaming": self.streaming, "plc_config": self.plc_config, "udp_config": self.udp_config, "variables_count": len(self.variables), "sampling_interval": self.sampling_interval, } # Instancia global del streamer streamer = PLCDataStreamer() @app.route("/") def index(): """Página principal""" return render_template( "index.html", status=streamer.get_status(), variables=streamer.variables ) @app.route("/api/plc/config", methods=["POST"]) def update_plc_config(): """Actualiza la configuración del PLC""" try: data = request.get_json() ip = data.get("ip", "192.168.1.100") rack = int(data.get("rack", 0)) slot = int(data.get("slot", 2)) streamer.update_plc_config(ip, rack, slot) return jsonify({"success": True, "message": "Configuración PLC actualizada"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/udp/config", methods=["POST"]) def update_udp_config(): """Actualiza la configuración UDP""" try: data = request.get_json() host = data.get("host", "127.0.0.1") port = int(data.get("port", 9870)) streamer.update_udp_config(host, port) return jsonify({"success": True, "message": "Configuración UDP actualizada"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/plc/connect", methods=["POST"]) def connect_plc(): """Conecta al PLC""" if streamer.connect_plc(): return jsonify({"success": True, "message": "Conectado al PLC"}) else: return jsonify({"success": False, "message": "Error conectando al PLC"}), 500 @app.route("/api/plc/disconnect", methods=["POST"]) def disconnect_plc(): """Desconecta del PLC""" streamer.stop_streaming() streamer.disconnect_plc() return jsonify({"success": True, "message": "Desconectado del PLC"}) @app.route("/api/variables", methods=["POST"]) def add_variable(): """Añade una nueva variable""" try: data = request.get_json() name = data.get("name") db = int(data.get("db")) offset = int(data.get("offset")) var_type = data.get("type") if not name or var_type not in ["real", "int", "bool", "dint"]: return jsonify({"success": False, "message": "Datos inválidos"}), 400 streamer.add_variable(name, db, offset, var_type) return jsonify({"success": True, "message": f"Variable {name} añadida"}) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/variables/", methods=["DELETE"]) def remove_variable(name): """Elimina una variable""" streamer.remove_variable(name) return jsonify({"success": True, "message": f"Variable {name} eliminada"}) @app.route("/api/streaming/start", methods=["POST"]) def start_streaming(): """Inicia el streaming""" if streamer.start_streaming(): return jsonify({"success": True, "message": "Streaming iniciado"}) else: return jsonify({"success": False, "message": "Error iniciando streaming"}), 500 @app.route("/api/streaming/stop", methods=["POST"]) def stop_streaming(): """Detiene el streaming""" streamer.stop_streaming() return jsonify({"success": True, "message": "Streaming detenido"}) @app.route("/api/sampling", methods=["POST"]) def update_sampling(): """Actualiza el intervalo de muestreo""" try: data = request.get_json() interval = float(data.get("interval", 0.1)) if interval < 0.01: interval = 0.01 streamer.sampling_interval = interval return jsonify( {"success": True, "message": f"Intervalo actualizado a {interval}s"} ) except Exception as e: return jsonify({"success": False, "message": str(e)}), 400 @app.route("/api/status") def get_status(): """Obtiene el estado actual""" return jsonify(streamer.get_status()) if __name__ == "__main__": # Crear directorio de templates si no existe os.makedirs("templates", exist_ok=True) print("🚀 Iniciando servidor Flask para PLC S7-315 Streamer") print("📊 Interfaz web disponible en: http://localhost:5050") print("🔧 Configure su PLC y variables a través de la interfaz web") try: app.run(debug=True, host="0.0.0.0", port=5050) except KeyboardInterrupt: print("\n⏹️ Deteniendo servidor...") streamer.stop_streaming() streamer.disconnect_plc()