S7_snap7_Stremer_n_Recorder/main.py

390 lines
12 KiB
Python

from flask import Flask, render_template, request, jsonify, redirect, url_for
import snap7
import json
import socket
import time
import logging
import threading
from datetime import datetime
from typing import Dict, Any, Optional
import struct
import os
app = Flask(__name__)
app.secret_key = "plc_streamer_secret_key"
class PLCDataStreamer:
def __init__(self):
"""Inicializa el streamer de datos del PLC"""
# Configuración por defectoclear
self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2}
self.udp_config = {"host": "127.0.0.1", "port": 9870}
# Variables configurables
self.variables = {}
# Estados
self.plc = None
self.udp_socket = None
self.connected = False
self.streaming = False
self.stream_thread = None
self.sampling_interval = 0.1
# Configurar logging
self.setup_logging()
def setup_logging(self):
"""Configura el sistema de logging"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()],
)
self.logger = logging.getLogger(__name__)
def update_plc_config(self, ip: str, rack: int, slot: int):
"""Actualiza la configuración del PLC"""
self.plc_config = {"ip": ip, "rack": rack, "slot": slot}
self.logger.info(f"Configuración PLC actualizada: {self.plc_config}")
def update_udp_config(self, host: str, port: int):
"""Actualiza la configuración UDP"""
self.udp_config = {"host": host, "port": port}
self.logger.info(f"Configuración UDP actualizada: {self.udp_config}")
def add_variable(self, name: str, db: int, offset: int, var_type: str):
"""Añade una variable para polling"""
self.variables[name] = {"db": db, "offset": offset, "type": var_type}
self.logger.info(f"Variable añadida: {name} -> DB{db}.{offset} ({var_type})")
def remove_variable(self, name: str):
"""Elimina una variable del polling"""
if name in self.variables:
del self.variables[name]
self.logger.info(f"Variable eliminada: {name}")
def connect_plc(self) -> bool:
"""Conecta al PLC S7-315"""
try:
if self.plc:
self.plc.disconnect()
self.plc = snap7.client.Client()
self.plc.connect(
self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"]
)
self.connected = True
self.logger.info(f"Conectado al PLC {self.plc_config['ip']}")
return True
except Exception as e:
self.connected = False
self.logger.error(f"Error conectando al PLC: {e}")
return False
def disconnect_plc(self):
"""Desconecta del PLC"""
try:
if self.plc:
self.plc.disconnect()
self.connected = False
self.logger.info("Desconectado del PLC")
except Exception as e:
self.logger.error(f"Error desconectando del PLC: {e}")
def read_variable(self, var_config: Dict[str, Any]) -> Any:
"""Lee una variable específica del PLC"""
try:
db = var_config["db"]
offset = var_config["offset"]
var_type = var_config["type"]
if var_type == "real":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.db_read(db, offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">l", raw_data)[0]
else:
return None
return value
except Exception as e:
self.logger.error(f"Error leyendo variable: {e}")
return None
def read_all_variables(self) -> Dict[str, Any]:
"""Lee todas las variables configuradas"""
if not self.connected or not self.plc:
return {}
data = {}
for var_name, var_config in self.variables.items():
value = self.read_variable(var_config)
if value is not None:
data[var_name] = value
return data
def setup_udp_socket(self) -> bool:
"""Configura el socket UDP"""
try:
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.logger.info(
f"Socket UDP configurado para {self.udp_config['host']}:{self.udp_config['port']}"
)
return True
except Exception as e:
self.logger.error(f"Error configurando socket UDP: {e}")
return False
def send_to_plotjuggler(self, data: Dict[str, Any]):
"""Envía datos a PlotJuggler vía UDP JSON"""
if not self.udp_socket:
return
try:
message = {"timestamp": time.time(), "data": data}
json_message = json.dumps(message)
self.udp_socket.sendto(
json_message.encode("utf-8"),
(self.udp_config["host"], self.udp_config["port"]),
)
except Exception as e:
self.logger.error(f"Error enviando datos a PlotJuggler: {e}")
def streaming_loop(self):
"""Bucle principal de streaming"""
self.logger.info(
f"Iniciando streaming con intervalo de {self.sampling_interval}s"
)
while self.streaming:
try:
start_time = time.time()
# Leer todas las variables
data = self.read_all_variables()
if data:
# Enviar a PlotJuggler
self.send_to_plotjuggler(data)
# Log de datos
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.logger.info(f"[{timestamp}] {data}")
# Mantener intervalo de muestreo
elapsed = time.time() - start_time
sleep_time = max(0, self.sampling_interval - elapsed)
time.sleep(sleep_time)
except Exception as e:
self.logger.error(f"Error en streaming loop: {e}")
break
def start_streaming(self) -> bool:
"""Inicia el streaming de datos"""
if not self.connected:
self.logger.error("PLC no conectado")
return False
if not self.variables:
self.logger.error("No hay variables configuradas")
return False
if not self.setup_udp_socket():
return False
self.streaming = True
self.stream_thread = threading.Thread(target=self.streaming_loop)
self.stream_thread.daemon = True
self.stream_thread.start()
self.logger.info("Streaming iniciado")
return True
def stop_streaming(self):
"""Detiene el streaming"""
self.streaming = False
if self.stream_thread:
self.stream_thread.join(timeout=2)
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = None
self.logger.info("Streaming detenido")
def get_status(self) -> Dict[str, Any]:
"""Obtiene el estado actual del sistema"""
return {
"plc_connected": self.connected,
"streaming": self.streaming,
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"variables_count": len(self.variables),
"sampling_interval": self.sampling_interval,
}
# Instancia global del streamer
streamer = PLCDataStreamer()
@app.route("/")
def index():
"""Página principal"""
return render_template(
"index.html", status=streamer.get_status(), variables=streamer.variables
)
@app.route("/api/plc/config", methods=["POST"])
def update_plc_config():
"""Actualiza la configuración del PLC"""
try:
data = request.get_json()
ip = data.get("ip", "192.168.1.100")
rack = int(data.get("rack", 0))
slot = int(data.get("slot", 2))
streamer.update_plc_config(ip, rack, slot)
return jsonify({"success": True, "message": "Configuración PLC actualizada"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/udp/config", methods=["POST"])
def update_udp_config():
"""Actualiza la configuración UDP"""
try:
data = request.get_json()
host = data.get("host", "127.0.0.1")
port = int(data.get("port", 9870))
streamer.update_udp_config(host, port)
return jsonify({"success": True, "message": "Configuración UDP actualizada"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/plc/connect", methods=["POST"])
def connect_plc():
"""Conecta al PLC"""
if streamer.connect_plc():
return jsonify({"success": True, "message": "Conectado al PLC"})
else:
return jsonify({"success": False, "message": "Error conectando al PLC"}), 500
@app.route("/api/plc/disconnect", methods=["POST"])
def disconnect_plc():
"""Desconecta del PLC"""
streamer.stop_streaming()
streamer.disconnect_plc()
return jsonify({"success": True, "message": "Desconectado del PLC"})
@app.route("/api/variables", methods=["POST"])
def add_variable():
"""Añade una nueva variable"""
try:
data = request.get_json()
name = data.get("name")
db = int(data.get("db"))
offset = int(data.get("offset"))
var_type = data.get("type")
if not name or var_type not in ["real", "int", "bool", "dint"]:
return jsonify({"success": False, "message": "Datos inválidos"}), 400
streamer.add_variable(name, db, offset, var_type)
return jsonify({"success": True, "message": f"Variable {name} añadida"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["DELETE"])
def remove_variable(name):
"""Elimina una variable"""
streamer.remove_variable(name)
return jsonify({"success": True, "message": f"Variable {name} eliminada"})
@app.route("/api/streaming/start", methods=["POST"])
def start_streaming():
"""Inicia el streaming"""
if streamer.start_streaming():
return jsonify({"success": True, "message": "Streaming iniciado"})
else:
return jsonify({"success": False, "message": "Error iniciando streaming"}), 500
@app.route("/api/streaming/stop", methods=["POST"])
def stop_streaming():
"""Detiene el streaming"""
streamer.stop_streaming()
return jsonify({"success": True, "message": "Streaming detenido"})
@app.route("/api/sampling", methods=["POST"])
def update_sampling():
"""Actualiza el intervalo de muestreo"""
try:
data = request.get_json()
interval = float(data.get("interval", 0.1))
if interval < 0.01:
interval = 0.01
streamer.sampling_interval = interval
return jsonify(
{"success": True, "message": f"Intervalo actualizado a {interval}s"}
)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/status")
def get_status():
"""Obtiene el estado actual"""
return jsonify(streamer.get_status())
if __name__ == "__main__":
# Crear directorio de templates si no existe
os.makedirs("templates", exist_ok=True)
print("🚀 Iniciando servidor Flask para PLC S7-315 Streamer")
print("📊 Interfaz web disponible en: http://localhost:5050")
print("🔧 Configure su PLC y variables a través de la interfaz web")
try:
app.run(debug=True, host="0.0.0.0", port=5050)
except KeyboardInterrupt:
print("\n⏹️ Deteniendo servidor...")
streamer.stop_streaming()
streamer.disconnect_plc()