200 lines
6.7 KiB
Python
200 lines
6.7 KiB
Python
import snap7
|
|
import json
|
|
import socket
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
import struct
|
|
|
|
class PLCDataStreamer:
|
|
def __init__(self, plc_ip: str, plc_rack: int = 0, plc_slot: int = 2,
|
|
udp_host: str = "127.0.0.1", udp_port: int = 9870):
|
|
"""
|
|
Inicializa el streamer de datos del PLC
|
|
|
|
Args:
|
|
plc_ip: IP del PLC S7-315
|
|
plc_rack: Rack del PLC (típicamente 0)
|
|
plc_slot: Slot del PLC (típicamente 2)
|
|
udp_host: IP para el servidor UDP
|
|
udp_port: Puerto UDP para PlotJuggler
|
|
"""
|
|
self.plc_ip = plc_ip
|
|
self.plc_rack = plc_rack
|
|
self.plc_slot = plc_slot
|
|
self.udp_host = udp_host
|
|
self.udp_port = udp_port
|
|
|
|
# Inicializar cliente PLC
|
|
self.plc = snap7.client.Client()
|
|
|
|
# Inicializar socket UDP
|
|
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
# Configurar logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('plc_data.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Variables a leer del PLC (personalizar según tu aplicación)
|
|
self.data_blocks = {
|
|
'DB1': {
|
|
'address': 1,
|
|
'size': 100,
|
|
'variables': {
|
|
'temperatura': {'offset': 0, 'type': 'real'},
|
|
'presion': {'offset': 4, 'type': 'real'},
|
|
'nivel': {'offset': 8, 'type': 'real'},
|
|
'estado_bomba': {'offset': 12, 'type': 'bool'},
|
|
'contador': {'offset': 14, 'type': 'int'}
|
|
}
|
|
}
|
|
}
|
|
|
|
self.running = False
|
|
|
|
def connect_plc(self) -> bool:
|
|
"""Conecta al PLC S7-315"""
|
|
try:
|
|
self.plc.connect(self.plc_ip, self.plc_rack, self.plc_slot)
|
|
self.logger.info(f"Conectado al PLC {self.plc_ip}")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Error conectando al PLC: {e}")
|
|
return False
|
|
|
|
def disconnect_plc(self):
|
|
"""Desconecta del PLC"""
|
|
try:
|
|
self.plc.disconnect()
|
|
self.logger.info("Desconectado del PLC")
|
|
except Exception as e:
|
|
self.logger.error(f"Error desconectando del PLC: {e}")
|
|
|
|
def read_data_block(self, db_name: str) -> Dict[str, Any]:
|
|
"""Lee un bloque de datos del PLC"""
|
|
db_config = self.data_blocks[db_name]
|
|
|
|
try:
|
|
# Leer el bloque completo
|
|
raw_data = self.plc.db_read(db_config['address'], 0, db_config['size'])
|
|
|
|
parsed_data = {}
|
|
|
|
# Parsear cada variable según su tipo
|
|
for var_name, var_config in db_config['variables'].items():
|
|
offset = var_config['offset']
|
|
var_type = var_config['type']
|
|
|
|
if var_type == 'real':
|
|
# REAL de Siemens (32-bit IEEE 754, big endian)
|
|
value = struct.unpack('>f', raw_data[offset:offset+4])[0]
|
|
elif var_type == 'int':
|
|
# INT de Siemens (16-bit signed, big endian)
|
|
value = struct.unpack('>h', raw_data[offset:offset+2])[0]
|
|
elif var_type == 'bool':
|
|
# BOOL - primer bit del byte
|
|
value = bool(raw_data[offset] & 0x01)
|
|
else:
|
|
continue
|
|
|
|
parsed_data[var_name] = value
|
|
|
|
return parsed_data
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error leyendo {db_name}: {e}")
|
|
return {}
|
|
|
|
def send_to_plotjuggler(self, data: Dict[str, Any]):
|
|
"""Envía datos a PlotJuggler vía UDP JSON"""
|
|
try:
|
|
# Agregar timestamp
|
|
message = {
|
|
'timestamp': time.time(),
|
|
'data': data
|
|
}
|
|
|
|
# Convertir a JSON y enviar
|
|
json_message = json.dumps(message)
|
|
self.udp_socket.sendto(
|
|
json_message.encode('utf-8'),
|
|
(self.udp_host, self.udp_port)
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error enviando datos a PlotJuggler: {e}")
|
|
|
|
def log_data(self, data: Dict[str, Any]):
|
|
"""Registra datos en el log"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
|
self.logger.info(f"[{timestamp}] {data}")
|
|
|
|
def run(self, sampling_interval: float = 0.1):
|
|
"""Ejecuta el bucle principal de lectura y streaming"""
|
|
if not self.connect_plc():
|
|
return
|
|
|
|
self.running = True
|
|
self.logger.info(f"Iniciando streaming con intervalo de {sampling_interval}s")
|
|
|
|
try:
|
|
while self.running:
|
|
start_time = time.time()
|
|
|
|
# Leer todos los bloques de datos configurados
|
|
all_data = {}
|
|
for db_name in self.data_blocks:
|
|
db_data = self.read_data_block(db_name)
|
|
all_data.update(db_data)
|
|
|
|
if all_data:
|
|
# Enviar a PlotJuggler
|
|
self.send_to_plotjuggler(all_data)
|
|
|
|
# Registrar en log
|
|
self.log_data(all_data)
|
|
|
|
# Mantener intervalo de muestreo
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, sampling_interval - elapsed)
|
|
time.sleep(sleep_time)
|
|
|
|
except KeyboardInterrupt:
|
|
self.logger.info("Deteniendo por interrupción del usuario")
|
|
except Exception as e:
|
|
self.logger.error(f"Error en el bucle principal: {e}")
|
|
finally:
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
"""Detiene el streaming y limpia recursos"""
|
|
self.running = False
|
|
self.disconnect_plc()
|
|
self.udp_socket.close()
|
|
self.logger.info("Streaming detenido")
|
|
|
|
def main():
|
|
# Configuración
|
|
PLC_IP = "192.168.1.100" # Cambiar por la IP de tu PLC
|
|
SAMPLING_INTERVAL = 0.1 # 10 Hz
|
|
|
|
# Crear y ejecutar el streamer
|
|
streamer = PLCDataStreamer(plc_ip=PLC_IP)
|
|
|
|
try:
|
|
streamer.run(sampling_interval=SAMPLING_INTERVAL)
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
finally:
|
|
streamer.stop()
|
|
|
|
if __name__ == "__main__":
|
|
main() |