S7_snap7_Stremer_n_Recorder/.doc/example.py

200 lines
6.7 KiB
Python

import snap7
import json
import socket
import time
import logging
from datetime import datetime
from typing import Dict, Any
import struct
class PLCDataStreamer:
def __init__(self, plc_ip: str, plc_rack: int = 0, plc_slot: int = 2,
udp_host: str = "127.0.0.1", udp_port: int = 9870):
"""
Inicializa el streamer de datos del PLC
Args:
plc_ip: IP del PLC S7-315
plc_rack: Rack del PLC (típicamente 0)
plc_slot: Slot del PLC (típicamente 2)
udp_host: IP para el servidor UDP
udp_port: Puerto UDP para PlotJuggler
"""
self.plc_ip = plc_ip
self.plc_rack = plc_rack
self.plc_slot = plc_slot
self.udp_host = udp_host
self.udp_port = udp_port
# Inicializar cliente PLC
self.plc = snap7.client.Client()
# Inicializar socket UDP
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('plc_data.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# Variables a leer del PLC (personalizar según tu aplicación)
self.data_blocks = {
'DB1': {
'address': 1,
'size': 100,
'variables': {
'temperatura': {'offset': 0, 'type': 'real'},
'presion': {'offset': 4, 'type': 'real'},
'nivel': {'offset': 8, 'type': 'real'},
'estado_bomba': {'offset': 12, 'type': 'bool'},
'contador': {'offset': 14, 'type': 'int'}
}
}
}
self.running = False
def connect_plc(self) -> bool:
"""Conecta al PLC S7-315"""
try:
self.plc.connect(self.plc_ip, self.plc_rack, self.plc_slot)
self.logger.info(f"Conectado al PLC {self.plc_ip}")
return True
except Exception as e:
self.logger.error(f"Error conectando al PLC: {e}")
return False
def disconnect_plc(self):
"""Desconecta del PLC"""
try:
self.plc.disconnect()
self.logger.info("Desconectado del PLC")
except Exception as e:
self.logger.error(f"Error desconectando del PLC: {e}")
def read_data_block(self, db_name: str) -> Dict[str, Any]:
"""Lee un bloque de datos del PLC"""
db_config = self.data_blocks[db_name]
try:
# Leer el bloque completo
raw_data = self.plc.db_read(db_config['address'], 0, db_config['size'])
parsed_data = {}
# Parsear cada variable según su tipo
for var_name, var_config in db_config['variables'].items():
offset = var_config['offset']
var_type = var_config['type']
if var_type == 'real':
# REAL de Siemens (32-bit IEEE 754, big endian)
value = struct.unpack('>f', raw_data[offset:offset+4])[0]
elif var_type == 'int':
# INT de Siemens (16-bit signed, big endian)
value = struct.unpack('>h', raw_data[offset:offset+2])[0]
elif var_type == 'bool':
# BOOL - primer bit del byte
value = bool(raw_data[offset] & 0x01)
else:
continue
parsed_data[var_name] = value
return parsed_data
except Exception as e:
self.logger.error(f"Error leyendo {db_name}: {e}")
return {}
def send_to_plotjuggler(self, data: Dict[str, Any]):
"""Envía datos a PlotJuggler vía UDP JSON"""
try:
# Agregar timestamp
message = {
'timestamp': time.time(),
'data': data
}
# Convertir a JSON y enviar
json_message = json.dumps(message)
self.udp_socket.sendto(
json_message.encode('utf-8'),
(self.udp_host, self.udp_port)
)
except Exception as e:
self.logger.error(f"Error enviando datos a PlotJuggler: {e}")
def log_data(self, data: Dict[str, Any]):
"""Registra datos en el log"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
self.logger.info(f"[{timestamp}] {data}")
def run(self, sampling_interval: float = 0.1):
"""Ejecuta el bucle principal de lectura y streaming"""
if not self.connect_plc():
return
self.running = True
self.logger.info(f"Iniciando streaming con intervalo de {sampling_interval}s")
try:
while self.running:
start_time = time.time()
# Leer todos los bloques de datos configurados
all_data = {}
for db_name in self.data_blocks:
db_data = self.read_data_block(db_name)
all_data.update(db_data)
if all_data:
# Enviar a PlotJuggler
self.send_to_plotjuggler(all_data)
# Registrar en log
self.log_data(all_data)
# Mantener intervalo de muestreo
elapsed = time.time() - start_time
sleep_time = max(0, sampling_interval - elapsed)
time.sleep(sleep_time)
except KeyboardInterrupt:
self.logger.info("Deteniendo por interrupción del usuario")
except Exception as e:
self.logger.error(f"Error en el bucle principal: {e}")
finally:
self.stop()
def stop(self):
"""Detiene el streaming y limpia recursos"""
self.running = False
self.disconnect_plc()
self.udp_socket.close()
self.logger.info("Streaming detenido")
def main():
# Configuración
PLC_IP = "192.168.1.100" # Cambiar por la IP de tu PLC
SAMPLING_INTERVAL = 0.1 # 10 Hz
# Crear y ejecutar el streamer
streamer = PLCDataStreamer(plc_ip=PLC_IP)
try:
streamer.run(sampling_interval=SAMPLING_INTERVAL)
except Exception as e:
print(f"Error: {e}")
finally:
streamer.stop()
if __name__ == "__main__":
main()