import snap7 import json import socket import time import logging from datetime import datetime from typing import Dict, Any import struct class PLCDataStreamer: def __init__(self, plc_ip: str, plc_rack: int = 0, plc_slot: int = 2, udp_host: str = "127.0.0.1", udp_port: int = 9870): """ Inicializa el streamer de datos del PLC Args: plc_ip: IP del PLC S7-315 plc_rack: Rack del PLC (típicamente 0) plc_slot: Slot del PLC (típicamente 2) udp_host: IP para el servidor UDP udp_port: Puerto UDP para PlotJuggler """ self.plc_ip = plc_ip self.plc_rack = plc_rack self.plc_slot = plc_slot self.udp_host = udp_host self.udp_port = udp_port # Inicializar cliente PLC self.plc = snap7.client.Client() # Inicializar socket UDP self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Configurar logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('plc_data.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) # Variables a leer del PLC (personalizar según tu aplicación) self.data_blocks = { 'DB1': { 'address': 1, 'size': 100, 'variables': { 'temperatura': {'offset': 0, 'type': 'real'}, 'presion': {'offset': 4, 'type': 'real'}, 'nivel': {'offset': 8, 'type': 'real'}, 'estado_bomba': {'offset': 12, 'type': 'bool'}, 'contador': {'offset': 14, 'type': 'int'} } } } self.running = False def connect_plc(self) -> bool: """Conecta al PLC S7-315""" try: self.plc.connect(self.plc_ip, self.plc_rack, self.plc_slot) self.logger.info(f"Conectado al PLC {self.plc_ip}") return True except Exception as e: self.logger.error(f"Error conectando al PLC: {e}") return False def disconnect_plc(self): """Desconecta del PLC""" try: self.plc.disconnect() self.logger.info("Desconectado del PLC") except Exception as e: self.logger.error(f"Error desconectando del PLC: {e}") def read_data_block(self, db_name: str) -> Dict[str, Any]: """Lee un bloque de datos del PLC""" db_config = self.data_blocks[db_name] try: # Leer el bloque completo raw_data = self.plc.db_read(db_config['address'], 0, db_config['size']) parsed_data = {} # Parsear cada variable según su tipo for var_name, var_config in db_config['variables'].items(): offset = var_config['offset'] var_type = var_config['type'] if var_type == 'real': # REAL de Siemens (32-bit IEEE 754, big endian) value = struct.unpack('>f', raw_data[offset:offset+4])[0] elif var_type == 'int': # INT de Siemens (16-bit signed, big endian) value = struct.unpack('>h', raw_data[offset:offset+2])[0] elif var_type == 'bool': # BOOL - primer bit del byte value = bool(raw_data[offset] & 0x01) else: continue parsed_data[var_name] = value return parsed_data except Exception as e: self.logger.error(f"Error leyendo {db_name}: {e}") return {} def send_to_plotjuggler(self, data: Dict[str, Any]): """Envía datos a PlotJuggler vía UDP JSON""" try: # Agregar timestamp message = { 'timestamp': time.time(), 'data': data } # Convertir a JSON y enviar json_message = json.dumps(message) self.udp_socket.sendto( json_message.encode('utf-8'), (self.udp_host, self.udp_port) ) except Exception as e: self.logger.error(f"Error enviando datos a PlotJuggler: {e}") def log_data(self, data: Dict[str, Any]): """Registra datos en el log""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] self.logger.info(f"[{timestamp}] {data}") def run(self, sampling_interval: float = 0.1): """Ejecuta el bucle principal de lectura y streaming""" if not self.connect_plc(): return self.running = True self.logger.info(f"Iniciando streaming con intervalo de {sampling_interval}s") try: while self.running: start_time = time.time() # Leer todos los bloques de datos configurados all_data = {} for db_name in self.data_blocks: db_data = self.read_data_block(db_name) all_data.update(db_data) if all_data: # Enviar a PlotJuggler self.send_to_plotjuggler(all_data) # Registrar en log self.log_data(all_data) # Mantener intervalo de muestreo elapsed = time.time() - start_time sleep_time = max(0, sampling_interval - elapsed) time.sleep(sleep_time) except KeyboardInterrupt: self.logger.info("Deteniendo por interrupción del usuario") except Exception as e: self.logger.error(f"Error en el bucle principal: {e}") finally: self.stop() def stop(self): """Detiene el streaming y limpia recursos""" self.running = False self.disconnect_plc() self.udp_socket.close() self.logger.info("Streaming detenido") def main(): # Configuración PLC_IP = "192.168.1.100" # Cambiar por la IP de tu PLC SAMPLING_INTERVAL = 0.1 # 10 Hz # Crear y ejecutar el streamer streamer = PLCDataStreamer(plc_ip=PLC_IP) try: streamer.run(sampling_interval=SAMPLING_INTERVAL) except Exception as e: print(f"Error: {e}") finally: streamer.stop() if __name__ == "__main__": main()