Add support for TCP and UDP connections in MaselliSimulatorApp

This commit is contained in:
Miguel 2025-05-22 21:34:30 +02:00
parent a9315d774c
commit cd11e6d911
1 changed files with 171 additions and 96 deletions

View File

@ -1,6 +1,7 @@
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import serial
import socket
import threading
import time
import math
@ -8,9 +9,10 @@ import math
class MaselliSimulatorApp:
def __init__(self, root_window):
self.root = root_window
self.root.title("Simulador Protocolo Maselli")
self.root.title("Simulador Protocolo Maselli - Serial/Ethernet")
self.serial_port = None # Para simulación continua
self.connection = None # Puede ser serial.Serial o socket
self.connection_type = None # 'serial', 'tcp', o 'udp'
self.simulating = False
self.simulation_thread = None
self.simulation_step = 0
@ -19,56 +21,83 @@ class MaselliSimulatorApp:
config_frame = ttk.LabelFrame(self.root, text="Configuración")
config_frame.grid(row=0, column=0, padx=10, pady=10, sticky="ew", columnspan=2)
ttk.Label(config_frame, text="Puerto COM:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
# Tipo de conexión
ttk.Label(config_frame, text="Tipo de Conexión:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.connection_type_var = tk.StringVar(value="Serial")
self.connection_type_combo = ttk.Combobox(config_frame, textvariable=self.connection_type_var,
values=["Serial", "TCP", "UDP"], state="readonly", width=10)
self.connection_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.connection_type_combo.bind("<<ComboboxSelected>>", self.on_connection_type_change)
# Frame para configuración Serial
self.serial_frame = ttk.Frame(config_frame)
self.serial_frame.grid(row=1, column=0, columnspan=4, padx=5, pady=5, sticky="ew")
ttk.Label(self.serial_frame, text="Puerto COM:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.com_port_var = tk.StringVar(value="COM3")
self.com_port_entry = ttk.Entry(config_frame, textvariable=self.com_port_var, width=10)
self.com_port_entry = ttk.Entry(self.serial_frame, textvariable=self.com_port_var, width=10)
self.com_port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(config_frame, text="Baud Rate:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
ttk.Label(self.serial_frame, text="Baud Rate:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.baud_rate_var = tk.StringVar(value="115200")
self.baud_rate_entry = ttk.Entry(config_frame, textvariable=self.baud_rate_var, width=10)
self.baud_rate_entry = ttk.Entry(self.serial_frame, textvariable=self.baud_rate_var, width=10)
self.baud_rate_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
ttk.Label(config_frame, text="ADAM Address (2c):").grid(row=1, column=0, padx=5, pady=5, sticky="w")
# Frame para configuración Ethernet
self.ethernet_frame = ttk.Frame(config_frame)
self.ethernet_frame.grid(row=1, column=0, columnspan=4, padx=5, pady=5, sticky="ew")
self.ethernet_frame.grid_remove() # Oculto por defecto
ttk.Label(self.ethernet_frame, text="Dirección IP:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.ip_address_var = tk.StringVar(value="192.168.1.100")
self.ip_address_entry = ttk.Entry(self.ethernet_frame, textvariable=self.ip_address_var, width=15)
self.ip_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(self.ethernet_frame, text="Puerto:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.port_var = tk.StringVar(value="502")
self.port_entry = ttk.Entry(self.ethernet_frame, textvariable=self.port_var, width=10)
self.port_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
# Configuración común
ttk.Label(config_frame, text="ADAM Address (2c):").grid(row=2, column=0, padx=5, pady=5, sticky="w")
self.adam_address_var = tk.StringVar(value="01")
self.adam_address_entry = ttk.Entry(config_frame, textvariable=self.adam_address_var, width=5)
self.adam_address_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
self.adam_address_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(config_frame, text="Función:").grid(row=1, column=2, padx=5, pady=5, sticky="w")
ttk.Label(config_frame, text="Función:").grid(row=2, column=2, padx=5, pady=5, sticky="w")
self.function_type_var = tk.StringVar(value="Lineal")
self.function_type_combo = ttk.Combobox(config_frame, textvariable=self.function_type_var,
values=["Lineal", "Sinusoidal", "Manual"], state="readonly", width=10)
self.function_type_combo.grid(row=1, column=3, padx=5, pady=5, sticky="ew")
self.function_type_combo.grid(row=2, column=3, padx=5, pady=5, sticky="ew")
self.function_type_combo.bind("<<ComboboxSelected>>", self.on_function_type_change)
# Parámetros para mapeo 4-20mA (y generación en Lineal/Sinusoidal)
ttk.Label(config_frame, text="Valor Mínimo (Brix) [p/ 4mA]:").grid(row=2, column=0, padx=5, pady=5, sticky="w")
# Parámetros para mapeo 4-20mA
ttk.Label(config_frame, text="Valor Mínimo (Brix) [p/ 4mA]:").grid(row=3, column=0, padx=5, pady=5, sticky="w")
self.min_brix_map_var = tk.StringVar(value="0")
self.min_brix_map_entry = ttk.Entry(config_frame, textvariable=self.min_brix_map_var, width=10)
self.min_brix_map_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.min_brix_map_entry.grid(row=3, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(config_frame, text="Valor Máximo (Brix) [p/ 20mA]:").grid(row=2, column=2, padx=5, pady=5, sticky="w")
ttk.Label(config_frame, text="Valor Máximo (Brix) [p/ 20mA]:").grid(row=3, column=2, padx=5, pady=5, sticky="w")
self.max_brix_map_var = tk.StringVar(value="80")
self.max_brix_map_entry = ttk.Entry(config_frame, textvariable=self.max_brix_map_var, width=10)
self.max_brix_map_entry.grid(row=2, column=3, padx=5, pady=5, sticky="ew")
self.max_brix_map_entry.grid(row=3, column=3, padx=5, pady=5, sticky="ew")
# Parámetros específicos para simulación continua
ttk.Label(config_frame, text="Periodo Sim. (s):").grid(row=3, column=0, padx=5, pady=5, sticky="w")
ttk.Label(config_frame, text="Periodo Sim. (s):").grid(row=4, column=0, padx=5, pady=5, sticky="w")
self.period_var = tk.StringVar(value="1.0")
self.period_entry = ttk.Entry(config_frame, textvariable=self.period_var, width=5)
self.period_entry.grid(row=3, column=1, padx=5, pady=5, sticky="ew")
self.period_entry.grid(row=4, column=1, padx=5, pady=5, sticky="ew")
# Parámetros específicos para modo Manual
ttk.Label(config_frame, text="Valor Brix Manual:").grid(row=4, column=0, padx=5, pady=5, sticky="w")
ttk.Label(config_frame, text="Valor Brix Manual:").grid(row=5, column=0, padx=5, pady=5, sticky="w")
self.manual_brix_var = tk.StringVar(value="10.0")
self.manual_brix_entry = ttk.Entry(config_frame, textvariable=self.manual_brix_var, width=10, state=tk.DISABLED)
self.manual_brix_entry.grid(row=4, column=1, padx=5, pady=5, sticky="ew")
self.manual_brix_entry.grid(row=5, column=1, padx=5, pady=5, sticky="ew")
self.manual_send_button = ttk.Button(config_frame, text="Enviar Manual", command=self.send_manual_value, state=tk.DISABLED)
self.manual_send_button.grid(row=4, column=2, columnspan=2, padx=5, pady=5, sticky="ew")
self.manual_send_button.grid(row=5, column=2, columnspan=2, padx=5, pady=5, sticky="ew")
# --- Controls Frame (para simulación continua) ---
# --- Controls Frame ---
controls_frame = ttk.LabelFrame(self.root, text="Control Simulación Continua")
controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
@ -92,7 +121,6 @@ class MaselliSimulatorApp:
self.current_ma_label = ttk.Label(display_frame, textvariable=self.current_ma_display_var, font=("Courier", 10))
self.current_ma_label.grid(row=1, column=1, padx=5, pady=5, sticky="w")
# --- Log Frame ---
log_frame = ttk.LabelFrame(self.root, text="Log de Comunicación")
log_frame.grid(row=2, column=0, padx=10, pady=10, sticky="nsew", columnspan=2)
@ -100,20 +128,27 @@ class MaselliSimulatorApp:
self.com_log_text = scrolledtext.ScrolledText(log_frame, height=12, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.com_log_text.pack(padx=5,pady=5,fill=tk.BOTH, expand=True)
self.root.columnconfigure(1, weight=1) # Allow display_frame to expand
self.root.columnconfigure(1, weight=1)
log_frame.columnconfigure(0, weight=1)
log_frame.rowconfigure(0, weight=1)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.on_function_type_change() # Set initial state of widgets
self.on_function_type_change()
def on_connection_type_change(self, event=None):
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
self.ethernet_frame.grid_remove()
self.serial_frame.grid()
else: # TCP o UDP
self.serial_frame.grid_remove()
self.ethernet_frame.grid()
def on_function_type_change(self, event=None):
func_type = self.function_type_var.get()
if func_type == "Manual":
if self.simulating:
self.stop_simulation() # Detiene simulación continua y cierra puerto si estaba abierto por ella
self.stop_simulation()
self.manual_brix_entry.config(state=tk.NORMAL)
self.manual_send_button.config(state=tk.NORMAL)
@ -121,7 +156,7 @@ class MaselliSimulatorApp:
self.period_entry.config(state=tk.DISABLED)
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.DISABLED)
else: # Lineal o Sinusoidal
else:
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
@ -159,8 +194,6 @@ class MaselliSimulatorApp:
def _get_common_params(self):
try:
com_port = self.com_port_var.get()
baud_rate = int(self.baud_rate_var.get())
adam_address = self.adam_address_var.get()
if len(adam_address) != 2:
messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.")
@ -168,19 +201,73 @@ class MaselliSimulatorApp:
min_brix_map = float(self.min_brix_map_var.get())
max_brix_map = float(self.max_brix_map_var.get())
if min_brix_map > max_brix_map: # Allow min_brix_map == max_brix_map
messagebox.showerror("Error", "Valor Mínimo (Brix) para mapeo no puede ser mayor que Valor Máximo (Brix).")
return None
return com_port, baud_rate, adam_address, min_brix_map, max_brix_map
if min_brix_map > max_brix_map:
messagebox.showerror("Error", "Valor Mínimo (Brix) para mapeo no puede ser mayor que Valor Máximo (Brix).")
return None
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
com_port = self.com_port_var.get()
baud_rate = int(self.baud_rate_var.get())
return conn_type, {'port': com_port, 'baud': baud_rate}, adam_address, min_brix_map, max_brix_map
else: # TCP o UDP
ip_address = self.ip_address_var.get()
port = int(self.port_var.get())
return conn_type, {'ip': ip_address, 'port': port}, adam_address, min_brix_map, max_brix_map
except ValueError as e:
messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración común: {e}")
messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración: {e}")
return None
def _open_connection(self, conn_type, conn_params):
"""Abre la conexión según el tipo especificado"""
try:
if conn_type == "Serial":
return serial.Serial(conn_params['port'], conn_params['baud'], timeout=1)
elif conn_type == "TCP":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5.0)
sock.connect((conn_params['ip'], conn_params['port']))
return sock
elif conn_type == "UDP":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
# Para UDP guardamos la dirección destino
sock.dest_address = (conn_params['ip'], conn_params['port'])
return sock
except Exception as e:
raise Exception(f"Error al abrir conexión {conn_type}: {e}")
def _close_connection(self, connection, conn_type):
"""Cierra la conexión según el tipo"""
try:
if conn_type == "Serial":
if connection and connection.is_open:
connection.close()
elif conn_type in ["TCP", "UDP"]:
if connection:
connection.close()
except Exception as e:
self._log_message(f"Error al cerrar conexión: {e}")
def _send_data(self, connection, conn_type, data):
"""Envía datos según el tipo de conexión"""
try:
if conn_type == "Serial":
connection.write(data.encode('ascii'))
elif conn_type == "TCP":
connection.send(data.encode('ascii'))
elif conn_type == "UDP":
connection.sendto(data.encode('ascii'), connection.dest_address)
except Exception as e:
raise Exception(f"Error al enviar datos: {e}")
def send_manual_value(self):
common_params = self._get_common_params()
if not common_params:
return
com_port, baud_rate, adam_address, min_brix_map, max_brix_map = common_params
conn_type, conn_params, adam_address, min_brix_map, max_brix_map = common_params
try:
manual_brix = float(self.manual_brix_var.get())
@ -199,20 +286,19 @@ class MaselliSimulatorApp:
full_string_to_send = f"{message_part}{checksum}\r"
log_display_string = full_string_to_send.replace('\r', '<CR>')
temp_serial_port = None
temp_connection = None
try:
temp_serial_port = serial.Serial(com_port, baud_rate, timeout=1)
self._log_message(f"Puerto {com_port} abierto temporalmente para envío manual.")
temp_connection = self._open_connection(conn_type, conn_params)
self._log_message(f"Conexión {conn_type} abierta temporalmente para envío manual.")
self._log_message(f"Enviando Manual: {log_display_string}")
temp_serial_port.write(full_string_to_send.encode('ascii'))
except serial.SerialException as e:
self._log_message(f"Error al enviar manualmente por puerto COM: {e}")
messagebox.showerror("Error de Puerto COM", f"No se pudo abrir/escribir en {com_port}: {e}")
self._send_data(temp_connection, conn_type, full_string_to_send)
except Exception as e:
self._log_message(f"Error al enviar manualmente: {e}")
messagebox.showerror("Error de Conexión", str(e))
finally:
if temp_serial_port and temp_serial_port.is_open:
temp_serial_port.close()
self._log_message(f"Puerto {com_port} cerrado tras envío manual.")
if temp_connection:
self._close_connection(temp_connection, conn_type)
self._log_message(f"Conexión {conn_type} cerrada tras envío manual.")
def start_simulation(self):
if self.simulating:
@ -222,7 +308,7 @@ class MaselliSimulatorApp:
common_params = self._get_common_params()
if not common_params:
return
com_port, baud_rate, self.adam_address, self.min_brix_map, self.max_brix_map = common_params
self.connection_type, self.conn_params, self.adam_address, self.min_brix_map, self.max_brix_map = common_params
try:
self.simulation_period = float(self.period_var.get())
@ -230,7 +316,7 @@ class MaselliSimulatorApp:
messagebox.showerror("Error", "El periodo debe ser un número positivo.")
return
self.function_type = self.function_type_var.get()
if self.function_type == "Manual": # Should not happen if GUI logic is correct
if self.function_type == "Manual":
messagebox.showinfo("Info", "Seleccione modo Lineal o Sinusoidal para simulación continua.")
return
@ -239,11 +325,11 @@ class MaselliSimulatorApp:
return
try:
self.serial_port = serial.Serial(com_port, baud_rate, timeout=1)
self._log_message(f"Puerto {com_port} abierto a {baud_rate} baud para simulación continua.")
except serial.SerialException as e:
messagebox.showerror("Error de Puerto COM", f"No se pudo abrir el puerto {com_port}: {e}")
self.serial_port = None
self.connection = self._open_connection(self.connection_type, self.conn_params)
self._log_message(f"Conexión {self.connection_type} abierta para simulación continua.")
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
self.connection = None
return
self.simulating = True
@ -257,37 +343,34 @@ class MaselliSimulatorApp:
self._log_message("Simulación continua iniciada.")
def _set_config_entries_state(self, state):
self.connection_type_combo.config(state=state)
self.com_port_entry.config(state=state)
self.baud_rate_entry.config(state=state)
self.ip_address_entry.config(state=state)
self.port_entry.config(state=state)
self.adam_address_entry.config(state=state)
self.function_type_combo.config(state=state)
self.min_brix_map_entry.config(state=state)
self.max_brix_map_entry.config(state=state)
# Specific to sim type
current_func_type = self.function_type_var.get()
if current_func_type != "Manual":
self.period_entry.config(state=state)
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
else: # Manual
else:
self.period_entry.config(state=tk.DISABLED)
# For manual, these are handled by on_function_type_change based on main state
# If state is tk.DISABLED, ensure manual ones are also disabled
if state == tk.DISABLED:
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
else: # tk.NORMAL
self.manual_brix_entry.config(state=tk.NORMAL)
self.manual_send_button.config(state=tk.NORMAL)
else:
self.manual_brix_entry.config(state=tk.NORMAL)
self.manual_send_button.config(state=tk.NORMAL)
def stop_simulation(self):
if not self.simulating:
# Could be called when switching to Manual mode, even if not simulating
if self.function_type_var.get() != "Manual":
self._log_message("Simulación continua ya estaba detenida.")
# Ensure GUI elements are in a consistent state for non-manual modes
self._log_message("Simulación continua ya estaba detenida.")
if self.function_type_var.get() != "Manual":
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
@ -301,25 +384,20 @@ class MaselliSimulatorApp:
except Exception as e:
self._log_message(f"Error al esperar el hilo de simulación: {e}")
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self._log_message(f"Puerto {self.serial_port.name} cerrado (simulación continua).")
self.serial_port = None
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message(f"Conexión {self.connection_type} cerrada (simulación continua).")
self.connection = None
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_config_entries_state(tk.NORMAL)
# Call on_function_type_change to ensure manual fields are correctly set
# if the current mode is manual after stopping.
self.on_function_type_change()
self._log_message("Simulación continua detenida.")
self.current_brix_display_var.set("---")
self.current_ma_display_var.set("--.-- mA")
def run_simulation(self):
steps_for_full_cycle = 100
@ -328,7 +406,8 @@ class MaselliSimulatorApp:
if self.function_type == "Lineal":
progress = (self.simulation_step % (2 * steps_for_full_cycle)) / steps_for_full_cycle
if progress > 1.0: progress = 2.0 - progress
if progress > 1.0:
progress = 2.0 - progress
current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * progress
elif self.function_type == "Sinusoidal":
@ -348,47 +427,43 @@ class MaselliSimulatorApp:
log_display_string = full_string_to_send.replace('\r', '<CR>')
self._log_message(f"Enviando Sim: {log_display_string}")
if self.serial_port and self.serial_port.is_open:
if self.connection:
try:
self.serial_port.write(full_string_to_send.encode('ascii'))
except serial.SerialException as e:
self._log_message(f"Error al escribir en puerto COM (sim): {e}")
self.root.after(0, self.stop_simulation_from_thread_error) # Schedule stop from main thread
self._send_data(self.connection, self.connection_type, full_string_to_send)
except Exception as e:
self._log_message(f"Error al escribir en conexión (sim): {e}")
self.root.after(0, self.stop_simulation_from_thread_error)
break
self.simulation_step += 1
time.sleep(self.simulation_period)
# Ensure GUI updates if loop exits due to self.simulating = False
if not self.simulating and self.root.winfo_exists():
self.root.after(0, self.ensure_gui_stopped_state_sim)
def stop_simulation_from_thread_error(self):
"""Called from main thread if serial error occurs in sim thread."""
if self.simulating: # Check if it wasn't already stopped
messagebox.showerror("Error de Simulación", "Error de puerto COM durante la simulación. Simulación detenida.")
"""Called from main thread if connection error occurs in sim thread."""
if self.simulating:
messagebox.showerror("Error de Simulación", "Error de conexión durante la simulación. Simulación detenida.")
self.stop_simulation()
def ensure_gui_stopped_state_sim(self):
"""Asegura que la GUI refleje el estado detenido si el hilo de simulación continua termina."""
if not self.simulating: # If stop_simulation wasn't called or completed fully
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_config_entries_state(tk.NORMAL)
# self.on_function_type_change() # Reset based on current function type
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self._log_message(f"Puerto {self.serial_port.name} cerrado (auto, sim_end).")
self.serial_port = None
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message(f"Conexión {self.connection_type} cerrada (auto, sim_end).")
self.connection = None
self._log_message("Simulación continua terminada (hilo finalizado).")
def on_closing(self):
if self.simulating:
self.stop_simulation() # Esto ya maneja el thread y el puerto
elif self.serial_port and self.serial_port.is_open: # Si el puerto quedó abierto por otra razón (improbable con lógica actual)
self.serial_port.close()
self.stop_simulation()
elif self.connection:
self._close_connection(self.connection, self.connection_type)
self.root.destroy()
if __name__ == "__main__":