import tkinter as tk from tkinter import ttk, scrolledtext, messagebox import serial import socket import threading import time import math import json import os from collections import deque import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import matplotlib.animation as animation class MaselliSimulatorApp: def __init__(self, root_window): self.root = root_window self.root.title("Simulador Protocolo Maselli - Serial/Ethernet") self.connection = None # Puede ser serial.Serial o socket self.connection_type = None # 'serial', 'tcp', o 'udp' self.simulating = False self.simulation_thread = None self.simulation_step = 0 # Para el gráfico self.max_points = 100 self.time_data = deque(maxlen=self.max_points) self.brix_data = deque(maxlen=self.max_points) self.ma_data = deque(maxlen=self.max_points) self.start_time = time.time() # Archivo de configuración self.config_file = "maselli_simulator_config.json" # --- Configuration Frame --- config_frame = ttk.LabelFrame(self.root, text="Configuración") config_frame.grid(row=0, column=0, padx=10, pady=10, sticky="ew", columnspan=2) # Tipo de conexión ttk.Label(config_frame, text="Tipo de Conexión:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.connection_type_var = tk.StringVar(value="Serial") self.connection_type_combo = ttk.Combobox(config_frame, textvariable=self.connection_type_var, values=["Serial", "TCP", "UDP"], state="readonly", width=10) self.connection_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew") self.connection_type_combo.bind("<>", self.on_connection_type_change) # Frame para configuración Serial self.serial_frame = ttk.Frame(config_frame) self.serial_frame.grid(row=1, column=0, columnspan=4, padx=5, pady=5, sticky="ew") ttk.Label(self.serial_frame, text="Puerto COM:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.com_port_var = tk.StringVar(value="COM3") self.com_port_entry = ttk.Entry(self.serial_frame, textvariable=self.com_port_var, width=10) self.com_port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") ttk.Label(self.serial_frame, text="Baud Rate:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.baud_rate_var = tk.StringVar(value="115200") self.baud_rate_entry = ttk.Entry(self.serial_frame, textvariable=self.baud_rate_var, width=10) self.baud_rate_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew") # Frame para configuración Ethernet self.ethernet_frame = ttk.Frame(config_frame) self.ethernet_frame.grid(row=1, column=0, columnspan=4, padx=5, pady=5, sticky="ew") self.ethernet_frame.grid_remove() # Oculto por defecto ttk.Label(self.ethernet_frame, text="Dirección IP:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.ip_address_var = tk.StringVar(value="192.168.1.100") self.ip_address_entry = ttk.Entry(self.ethernet_frame, textvariable=self.ip_address_var, width=15) self.ip_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") ttk.Label(self.ethernet_frame, text="Puerto:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.port_var = tk.StringVar(value="502") self.port_entry = ttk.Entry(self.ethernet_frame, textvariable=self.port_var, width=10) self.port_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew") # Configuración común ttk.Label(config_frame, text="ADAM Address (2c):").grid(row=2, column=0, padx=5, pady=5, sticky="w") self.adam_address_var = tk.StringVar(value="01") self.adam_address_entry = ttk.Entry(config_frame, textvariable=self.adam_address_var, width=5) self.adam_address_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew") ttk.Label(config_frame, text="Función:").grid(row=2, column=2, padx=5, pady=5, sticky="w") self.function_type_var = tk.StringVar(value="Lineal") self.function_type_combo = ttk.Combobox(config_frame, textvariable=self.function_type_var, values=["Lineal", "Sinusoidal", "Manual"], state="readonly", width=10) self.function_type_combo.grid(row=2, column=3, padx=5, pady=5, sticky="ew") self.function_type_combo.bind("<>", self.on_function_type_change) # Parámetros para mapeo 4-20mA ttk.Label(config_frame, text="Valor Mínimo (Brix) [p/ 4mA]:").grid(row=3, column=0, padx=5, pady=5, sticky="w") self.min_brix_map_var = tk.StringVar(value="0") self.min_brix_map_entry = ttk.Entry(config_frame, textvariable=self.min_brix_map_var, width=10) self.min_brix_map_entry.grid(row=3, column=1, padx=5, pady=5, sticky="ew") ttk.Label(config_frame, text="Valor Máximo (Brix) [p/ 20mA]:").grid(row=3, column=2, padx=5, pady=5, sticky="w") self.max_brix_map_var = tk.StringVar(value="80") self.max_brix_map_entry = ttk.Entry(config_frame, textvariable=self.max_brix_map_var, width=10) self.max_brix_map_entry.grid(row=3, column=3, padx=5, pady=5, sticky="ew") # Parámetros específicos para simulación continua ttk.Label(config_frame, text="Periodo Sim. (s):").grid(row=4, column=0, padx=5, pady=5, sticky="w") self.period_var = tk.StringVar(value="1.0") self.period_entry = ttk.Entry(config_frame, textvariable=self.period_var, width=5) self.period_entry.grid(row=4, column=1, padx=5, pady=5, sticky="ew") # Botones de persistencia self.save_config_button = ttk.Button(config_frame, text="Guardar Config", command=self.save_config) self.save_config_button.grid(row=4, column=2, padx=5, pady=5, sticky="ew") self.load_config_button = ttk.Button(config_frame, text="Cargar Config", command=self.load_config) self.load_config_button.grid(row=4, column=3, padx=5, pady=5, sticky="ew") # Frame para modo Manual con slider manual_frame = ttk.LabelFrame(config_frame, text="Modo Manual") manual_frame.grid(row=5, column=0, columnspan=4, padx=5, pady=5, sticky="ew") ttk.Label(manual_frame, text="Valor Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.manual_brix_var = tk.StringVar(value="10.0") self.manual_brix_entry = ttk.Entry(manual_frame, textvariable=self.manual_brix_var, width=10, state=tk.DISABLED) self.manual_brix_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") self.manual_brix_entry.bind('', lambda e: self.update_slider_from_entry()) self.manual_brix_entry.bind('', lambda e: self.update_slider_from_entry()) # Slider para valor manual self.manual_slider_var = tk.DoubleVar(value=10.0) self.manual_slider = ttk.Scale(manual_frame, from_=0, to=100, orient=tk.HORIZONTAL, variable=self.manual_slider_var, command=self.on_slider_change, state=tk.DISABLED, length=200) self.manual_slider.grid(row=0, column=2, padx=5, pady=5, sticky="ew") self.manual_send_button = ttk.Button(manual_frame, text="Enviar Manual", command=self.send_manual_value, state=tk.DISABLED) self.manual_send_button.grid(row=0, column=3, padx=5, pady=5, sticky="ew") manual_frame.columnconfigure(2, weight=1) # --- Controls Frame --- controls_frame = ttk.LabelFrame(self.root, text="Control Simulación Continua") controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew") self.start_button = ttk.Button(controls_frame, text="Iniciar Simulación", command=self.start_simulation) self.start_button.pack(side=tk.LEFT, padx=5) self.stop_button = ttk.Button(controls_frame, text="Detener Simulación", command=self.stop_simulation, state=tk.DISABLED) self.stop_button.pack(side=tk.LEFT, padx=5) self.clear_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_graph) self.clear_graph_button.pack(side=tk.LEFT, padx=5) # --- Display Frame --- display_frame = ttk.LabelFrame(self.root, text="Visualización") display_frame.grid(row=1, column=1, rowspan=2, padx=10, pady=10, sticky="nsew") ttk.Label(display_frame, text="Valor Brix Actual:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.current_brix_display_var = tk.StringVar(value="---") self.current_brix_label = ttk.Label(display_frame, textvariable=self.current_brix_display_var, font=("Courier", 14, "bold")) self.current_brix_label.grid(row=0, column=1, padx=5, pady=5, sticky="w") ttk.Label(display_frame, text="Valor mA Correspondiente:").grid(row=1, column=0, padx=5, pady=5, sticky="w") self.current_ma_display_var = tk.StringVar(value="--.-- mA") self.current_ma_label = ttk.Label(display_frame, textvariable=self.current_ma_display_var, font=("Courier", 14, "bold")) self.current_ma_label.grid(row=1, column=1, padx=5, pady=5, sticky="w") # --- Graph Frame --- graph_frame = ttk.LabelFrame(self.root, text="Gráfico de Valores") graph_frame.grid(row=2, column=0, padx=10, pady=5, sticky="nsew") # Crear figura de matplotlib self.fig = Figure(figsize=(6, 3.5), dpi=100) self.ax1 = self.fig.add_subplot(111) self.ax2 = self.ax1.twinx() # Configurar el gráfico self.ax1.set_xlabel('Tiempo (s)') self.ax1.set_ylabel('Brix', color='b') self.ax2.set_ylabel('mA', color='r') self.ax1.tick_params(axis='y', labelcolor='b') self.ax2.tick_params(axis='y', labelcolor='r') self.ax1.grid(True, alpha=0.3) # Líneas del gráfico self.line_brix, = self.ax1.plot([], [], 'b-', label='Brix', linewidth=2) self.line_ma, = self.ax2.plot([], [], 'r-', label='mA', linewidth=2) # Canvas de matplotlib en tkinter self.canvas = FigureCanvasTkAgg(self.fig, master=graph_frame) self.canvas.draw() self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # --- Log Frame --- log_frame = ttk.LabelFrame(self.root, text="Log de Comunicación") log_frame.grid(row=3, column=0, padx=10, pady=10, sticky="nsew", columnspan=2) self.com_log_text = scrolledtext.ScrolledText(log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED) self.com_log_text.pack(padx=5,pady=5,fill=tk.BOTH, expand=True) # Configurar pesos de las filas y columnas self.root.columnconfigure(0, weight=1) self.root.columnconfigure(1, weight=1) self.root.rowconfigure(2, weight=1) self.root.rowconfigure(3, weight=1) self.root.protocol("WM_DELETE_WINDOW", self.on_closing) # Cargar configuración si existe self.load_config(silent=True) # Inicializar estado de la interfaz self.on_connection_type_change() self.on_function_type_change() # Iniciar animación del gráfico self.ani = animation.FuncAnimation(self.fig, self.update_graph, interval=100, blit=False) def save_config(self): """Guarda la configuración actual en un archivo JSON""" config = { 'connection_type': self.connection_type_var.get(), 'com_port': self.com_port_var.get(), 'baud_rate': self.baud_rate_var.get(), 'ip_address': self.ip_address_var.get(), 'port': self.port_var.get(), 'adam_address': self.adam_address_var.get(), 'function_type': self.function_type_var.get(), 'min_brix_map': self.min_brix_map_var.get(), 'max_brix_map': self.max_brix_map_var.get(), 'period': self.period_var.get(), 'manual_brix': self.manual_brix_var.get() } try: with open(self.config_file, 'w') as f: json.dump(config, f, indent=4) self._log_message("Configuración guardada exitosamente.") messagebox.showinfo("Éxito", "Configuración guardada correctamente.") except Exception as e: self._log_message(f"Error al guardar configuración: {e}") messagebox.showerror("Error", f"No se pudo guardar la configuración: {e}") def load_config(self, silent=False): """Carga la configuración desde un archivo JSON""" if not os.path.exists(self.config_file): if not silent: messagebox.showinfo("Información", "No se encontró archivo de configuración.") return try: with open(self.config_file, 'r') as f: config = json.load(f) # Aplicar configuración self.connection_type_var.set(config.get('connection_type', 'Serial')) self.com_port_var.set(config.get('com_port', 'COM3')) self.baud_rate_var.set(config.get('baud_rate', '115200')) self.ip_address_var.set(config.get('ip_address', '192.168.1.100')) self.port_var.set(config.get('port', '502')) self.adam_address_var.set(config.get('adam_address', '01')) self.function_type_var.set(config.get('function_type', 'Lineal')) self.min_brix_map_var.set(config.get('min_brix_map', '0')) self.max_brix_map_var.set(config.get('max_brix_map', '80')) self.period_var.set(config.get('period', '1.0')) self.manual_brix_var.set(config.get('manual_brix', '10.0')) # Actualizar slider try: self.manual_slider_var.set(float(config.get('manual_brix', '10.0'))) except: pass # Actualizar interfaz self.on_connection_type_change() if not silent: self._log_message("Configuración cargada exitosamente.") messagebox.showinfo("Éxito", "Configuración cargada correctamente.") except Exception as e: if not silent: self._log_message(f"Error al cargar configuración: {e}") messagebox.showerror("Error", f"No se pudo cargar la configuración: {e}") def on_slider_change(self, value): """Actualiza el campo de texto cuando cambia el slider""" self.manual_brix_var.set(f"{float(value):.1f}") def update_slider_from_entry(self): """Actualiza el slider cuando se modifica el campo de texto""" try: value = float(self.manual_brix_var.get()) # Limitar el valor al rango del slider value = max(0, min(100, value)) self.manual_slider_var.set(value) self.manual_brix_var.set(f"{value:.1f}") except ValueError: pass def update_graph(self, frame): """Actualiza el gráfico con los datos actuales""" if len(self.time_data) > 0: self.line_brix.set_data(list(self.time_data), list(self.brix_data)) self.line_ma.set_data(list(self.time_data), list(self.ma_data)) # Ajustar límites if len(self.time_data) > 1: self.ax1.set_xlim(min(self.time_data), max(self.time_data)) if len(self.brix_data) > 0: brix_min = min(self.brix_data) - 1 brix_max = max(self.brix_data) + 1 self.ax1.set_ylim(brix_min, brix_max) if len(self.ma_data) > 0: ma_min = min(self.ma_data) - 0.5 ma_max = max(self.ma_data) + 0.5 self.ax2.set_ylim(ma_min, ma_max) return self.line_brix, self.line_ma def add_data_point(self, brix_value, ma_value): """Agrega un punto de datos al gráfico""" current_time = time.time() - self.start_time self.time_data.append(current_time) self.brix_data.append(brix_value) self.ma_data.append(ma_value) # Redibujar el canvas self.canvas.draw_idle() def clear_graph(self): """Limpia los datos del gráfico""" self.time_data.clear() self.brix_data.clear() self.ma_data.clear() self.start_time = time.time() self.canvas.draw_idle() self._log_message("Gráfico limpiado.") def on_connection_type_change(self, event=None): conn_type = self.connection_type_var.get() if conn_type == "Serial": self.ethernet_frame.grid_remove() self.serial_frame.grid() else: # TCP o UDP self.serial_frame.grid_remove() self.ethernet_frame.grid() def on_function_type_change(self, event=None): func_type = self.function_type_var.get() if func_type == "Manual": if self.simulating: self.stop_simulation() self.manual_brix_entry.config(state=tk.NORMAL) self.manual_send_button.config(state=tk.NORMAL) self.manual_slider.config(state=tk.NORMAL) self.period_entry.config(state=tk.DISABLED) self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.DISABLED) else: self.manual_brix_entry.config(state=tk.DISABLED) self.manual_send_button.config(state=tk.DISABLED) self.manual_slider.config(state=tk.DISABLED) self.period_entry.config(state=tk.NORMAL) if not self.simulating: self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) else: self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) def _log_message(self, message): self.com_log_text.configure(state=tk.NORMAL) self.com_log_text.insert(tk.END, message + "\n") self.com_log_text.see(tk.END) self.com_log_text.configure(state=tk.DISABLED) def calculate_checksum(self, message_part): s = sum(ord(c) for c in message_part) checksum_byte = s % 256 return f"{checksum_byte:02X}" def scale_to_mA(self, brix_value, min_brix_map, max_brix_map): if max_brix_map == min_brix_map: return 4.0 percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map) percentage = max(0.0, min(1.0, percentage)) mA_value = 4.0 + percentage * 16.0 return mA_value def format_mA_value(self, mA_val): return f"{mA_val:06.3f}" def _get_common_params(self): try: adam_address = self.adam_address_var.get() if len(adam_address) != 2: messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.") return None min_brix_map = float(self.min_brix_map_var.get()) max_brix_map = float(self.max_brix_map_var.get()) if min_brix_map > max_brix_map: messagebox.showerror("Error", "Valor Mínimo (Brix) para mapeo no puede ser mayor que Valor Máximo (Brix).") return None conn_type = self.connection_type_var.get() if conn_type == "Serial": com_port = self.com_port_var.get() baud_rate = int(self.baud_rate_var.get()) return conn_type, {'port': com_port, 'baud': baud_rate}, adam_address, min_brix_map, max_brix_map else: # TCP o UDP ip_address = self.ip_address_var.get() port = int(self.port_var.get()) return conn_type, {'ip': ip_address, 'port': port}, adam_address, min_brix_map, max_brix_map except ValueError as e: messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración: {e}") return None def _open_connection(self, conn_type, conn_params): """Abre la conexión según el tipo especificado""" try: if conn_type == "Serial": return serial.Serial(conn_params['port'], conn_params['baud'], timeout=1) elif conn_type == "TCP": sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) sock.connect((conn_params['ip'], conn_params['port'])) return sock elif conn_type == "UDP": sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(1.0) # Para UDP guardamos la dirección destino sock.dest_address = (conn_params['ip'], conn_params['port']) return sock except Exception as e: raise Exception(f"Error al abrir conexión {conn_type}: {e}") def _close_connection(self, connection, conn_type): """Cierra la conexión según el tipo""" try: if conn_type == "Serial": if connection and connection.is_open: connection.close() elif conn_type in ["TCP", "UDP"]: if connection: connection.close() except Exception as e: self._log_message(f"Error al cerrar conexión: {e}") def _send_data(self, connection, conn_type, data): """Envía datos según el tipo de conexión""" try: if conn_type == "Serial": connection.write(data.encode('ascii')) elif conn_type == "TCP": connection.send(data.encode('ascii')) elif conn_type == "UDP": connection.sendto(data.encode('ascii'), connection.dest_address) except Exception as e: raise Exception(f"Error al enviar datos: {e}") def send_manual_value(self): common_params = self._get_common_params() if not common_params: return conn_type, conn_params, adam_address, min_brix_map, max_brix_map = common_params try: manual_brix = float(self.manual_brix_var.get()) except ValueError: messagebox.showerror("Error de Entrada", "Valor Brix Manual inválido.") return mA_val = self.scale_to_mA(manual_brix, min_brix_map, max_brix_map) mA_str = self.format_mA_value(mA_val) self.current_brix_display_var.set(f"{manual_brix:.3f} Brix") self.current_ma_display_var.set(f"{mA_str} mA") # Agregar punto al gráfico self.add_data_point(manual_brix, mA_val) message_part = f"#{adam_address}{mA_str}" checksum = self.calculate_checksum(message_part) full_string_to_send = f"{message_part}{checksum}\r" log_display_string = full_string_to_send.replace('\r', '') temp_connection = None try: temp_connection = self._open_connection(conn_type, conn_params) self._log_message(f"Conexión {conn_type} abierta temporalmente para envío manual.") self._log_message(f"Enviando Manual: {log_display_string}") self._send_data(temp_connection, conn_type, full_string_to_send) except Exception as e: self._log_message(f"Error al enviar manualmente: {e}") messagebox.showerror("Error de Conexión", str(e)) finally: if temp_connection: self._close_connection(temp_connection, conn_type) self._log_message(f"Conexión {conn_type} cerrada tras envío manual.") def start_simulation(self): if self.simulating: messagebox.showwarning("Advertencia", "La simulación ya está en curso.") return common_params = self._get_common_params() if not common_params: return self.connection_type, self.conn_params, self.adam_address, self.min_brix_map, self.max_brix_map = common_params try: self.simulation_period = float(self.period_var.get()) if self.simulation_period <= 0: messagebox.showerror("Error", "El periodo debe ser un número positivo.") return self.function_type = self.function_type_var.get() if self.function_type == "Manual": messagebox.showinfo("Info", "Seleccione modo Lineal o Sinusoidal para simulación continua.") return except ValueError as e: messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración de simulación: {e}") return try: self.connection = self._open_connection(self.connection_type, self.conn_params) self._log_message(f"Conexión {self.connection_type} abierta para simulación continua.") except Exception as e: messagebox.showerror("Error de Conexión", str(e)) self.connection = None return self.simulating = True self.simulation_step = 0 self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self._set_config_entries_state(tk.DISABLED) self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True) self.simulation_thread.start() self._log_message("Simulación continua iniciada.") def _set_config_entries_state(self, state): self.connection_type_combo.config(state=state) self.com_port_entry.config(state=state) self.baud_rate_entry.config(state=state) self.ip_address_entry.config(state=state) self.port_entry.config(state=state) self.adam_address_entry.config(state=state) self.function_type_combo.config(state=state) self.min_brix_map_entry.config(state=state) self.max_brix_map_entry.config(state=state) self.save_config_button.config(state=state) self.load_config_button.config(state=state) current_func_type = self.function_type_var.get() if current_func_type != "Manual": self.period_entry.config(state=state) self.manual_brix_entry.config(state=tk.DISABLED) self.manual_send_button.config(state=tk.DISABLED) self.manual_slider.config(state=tk.DISABLED) else: self.period_entry.config(state=tk.DISABLED) if state == tk.DISABLED: self.manual_brix_entry.config(state=tk.DISABLED) self.manual_send_button.config(state=tk.DISABLED) self.manual_slider.config(state=tk.DISABLED) else: self.manual_brix_entry.config(state=tk.NORMAL) self.manual_send_button.config(state=tk.NORMAL) self.manual_slider.config(state=tk.NORMAL) def stop_simulation(self): if not self.simulating: if self.function_type_var.get() != "Manual": self._log_message("Simulación continua ya estaba detenida.") if self.function_type_var.get() != "Manual": self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self._set_config_entries_state(tk.NORMAL) return self.simulating = False if self.simulation_thread and self.simulation_thread.is_alive(): try: self.simulation_thread.join(timeout=max(0.1, self.simulation_period * 1.5 if hasattr(self, 'simulation_period') else 2.0)) except Exception as e: self._log_message(f"Error al esperar el hilo de simulación: {e}") if self.connection: self._close_connection(self.connection, self.connection_type) self._log_message(f"Conexión {self.connection_type} cerrada (simulación continua).") self.connection = None self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self._set_config_entries_state(tk.NORMAL) self.on_function_type_change() self._log_message("Simulación continua detenida.") self.current_brix_display_var.set("---") self.current_ma_display_var.set("--.-- mA") def run_simulation(self): steps_for_full_cycle = 100 while self.simulating: current_brix_val = 0.0 if self.function_type == "Lineal": progress = (self.simulation_step % (2 * steps_for_full_cycle)) / steps_for_full_cycle if progress > 1.0: progress = 2.0 - progress current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * progress elif self.function_type == "Sinusoidal": phase = (self.simulation_step / steps_for_full_cycle) * 2 * math.pi sin_val = (math.sin(phase) + 1) / 2 current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * sin_val mA_val = self.scale_to_mA(current_brix_val, self.min_brix_map, self.max_brix_map) mA_str = self.format_mA_value(mA_val) self.current_brix_display_var.set(f"{current_brix_val:.3f} Brix") self.current_ma_display_var.set(f"{mA_str} mA") # Agregar punto al gráfico self.root.after(0, lambda b=current_brix_val, m=mA_val: self.add_data_point(b, m)) message_part = f"#{self.adam_address}{mA_str}" checksum = self.calculate_checksum(message_part) full_string_to_send = f"{message_part}{checksum}\r" log_display_string = full_string_to_send.replace('\r', '') self._log_message(f"Enviando Sim: {log_display_string}") if self.connection: try: self._send_data(self.connection, self.connection_type, full_string_to_send) except Exception as e: self._log_message(f"Error al escribir en conexión (sim): {e}") self.root.after(0, self.stop_simulation_from_thread_error) break self.simulation_step += 1 time.sleep(self.simulation_period) if not self.simulating and self.root.winfo_exists(): self.root.after(0, self.ensure_gui_stopped_state_sim) def stop_simulation_from_thread_error(self): """Called from main thread if connection error occurs in sim thread.""" if self.simulating: messagebox.showerror("Error de Simulación", "Error de conexión durante la simulación. Simulación detenida.") self.stop_simulation() def ensure_gui_stopped_state_sim(self): """Asegura que la GUI refleje el estado detenido si el hilo de simulación continua termina.""" if not self.simulating: self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self._set_config_entries_state(tk.NORMAL) if self.connection: self._close_connection(self.connection, self.connection_type) self._log_message(f"Conexión {self.connection_type} cerrada (auto, sim_end).") self.connection = None self._log_message("Simulación continua terminada (hilo finalizado).") def on_closing(self): if self.simulating: self.stop_simulation() elif self.connection: self._close_connection(self.connection, self.connection_type) self.root.destroy() if __name__ == "__main__": main_root = tk.Tk() app = MaselliSimulatorApp(main_root) main_root.mainloop()