import tkinter as tk from tkinter import ttk, scrolledtext, messagebox import serial import socket import threading import time import math import json import os import csv from datetime import datetime from collections import deque import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import matplotlib.animation as animation # Simulador y Trace para Protocolo Maselli (ADAM) # Soporta conexiones Serial, TCP y UDP # # Para cambiar el icono, coloca uno de estos archivos en el mismo directorio: # - icon.png (recomendado) # - icon.ico (para Windows) # - icon.gif # # Características: # - Modo Simulador: Genera valores de prueba en protocolo ADAM # - Modo Trace: Recibe y registra valores del medidor real # - Conversión automática mA <-> Brix # - Registro en CSV con timestamp # - Gráficos en tiempo real # - Respuestas del dispositivo mostradas en el log class MaselliSimulatorApp: def __init__(self, root_window): self.root = root_window self.root.title("Simulador/Trace Protocolo Maselli") self.root.geometry("900x700") # Tamaño inicial de ventana # Intentar cargar el icono icon_loaded = False for icon_file in ['icon.png', 'icon.ico', 'icon.gif']: if os.path.exists(icon_file): try: if icon_file.endswith('.ico'): self.root.iconbitmap(icon_file) else: icon = tk.PhotoImage(file=icon_file) self.root.iconphoto(True, icon) icon_loaded = True break except Exception as e: print(f"No se pudo cargar {icon_file}: {e}") if not icon_loaded: print("No se encontró ningún archivo de icono (icon.png, icon.ico, icon.gif)") self.connection = None self.connection_type = None self.simulating = False self.simulation_thread = None self.simulation_step = 0 # Para modo Trace self.tracing = False self.trace_thread = None self.csv_file = None self.csv_writer = None # Para los gráficos self.max_points = 100 # Datos para simulador self.sim_time_data = deque(maxlen=self.max_points) self.sim_brix_data = deque(maxlen=self.max_points) self.sim_ma_data = deque(maxlen=self.max_points) self.sim_start_time = time.time() # Datos para trace self.trace_time_data = deque(maxlen=self.max_points) self.trace_brix_data = deque(maxlen=self.max_points) self.trace_start_time = time.time() # Archivo de configuración self.config_file = "maselli_simulator_config.json" # Crear notebook para tabs self.notebook = ttk.Notebook(self.root) self.notebook.grid(row=0, column=0, sticky="nsew", padx=5, pady=5) # Tab Simulador self.simulator_tab = ttk.Frame(self.notebook) self.notebook.add(self.simulator_tab, text="Simulador") # Tab Trace self.trace_tab = ttk.Frame(self.notebook) self.notebook.add(self.trace_tab, text="Trace") # --- Frame de Configuración Compartida --- self.create_shared_config_frame() # --- Crear contenido de cada tab --- self.create_simulator_tab() self.create_trace_tab() # Configurar pesos self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) self.root.protocol("WM_DELETE_WINDOW", self.on_closing) # Cargar configuración si existe self.load_config(silent=True) # Inicializar estado de la interfaz self.on_connection_type_change() self.on_function_type_change() # Iniciar animaciones de gráficos self.sim_ani = animation.FuncAnimation(self.sim_fig, self.update_sim_graph, interval=100, blit=False) self.trace_ani = animation.FuncAnimation(self.trace_fig, self.update_trace_graph, interval=100, blit=False) def create_shared_config_frame(self): """Crea el frame de configuración compartida que aparece arriba del notebook""" shared_config_frame = ttk.LabelFrame(self.root, text="Configuración de Conexión") shared_config_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew") # Tipo de conexión ttk.Label(shared_config_frame, text="Tipo:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.connection_type_var = tk.StringVar(value="Serial") self.connection_type_combo = ttk.Combobox(shared_config_frame, textvariable=self.connection_type_var, values=["Serial", "TCP", "UDP"], state="readonly", width=10) self.connection_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew") self.connection_type_combo.bind("<>", self.on_connection_type_change) # Frame para configuración Serial self.serial_frame = ttk.Frame(shared_config_frame) self.serial_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew") ttk.Label(self.serial_frame, text="Puerto:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.com_port_var = tk.StringVar(value="COM3") self.com_port_entry = ttk.Entry(self.serial_frame, textvariable=self.com_port_var, width=10) self.com_port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") ttk.Label(self.serial_frame, text="Baud:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.baud_rate_var = tk.StringVar(value="115200") self.baud_rate_entry = ttk.Entry(self.serial_frame, textvariable=self.baud_rate_var, width=10) self.baud_rate_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew") # Frame para configuración Ethernet self.ethernet_frame = ttk.Frame(shared_config_frame) self.ethernet_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew") self.ethernet_frame.grid_remove() ttk.Label(self.ethernet_frame, text="IP:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.ip_address_var = tk.StringVar(value="192.168.1.100") self.ip_address_entry = ttk.Entry(self.ethernet_frame, textvariable=self.ip_address_var, width=15) self.ip_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") ttk.Label(self.ethernet_frame, text="Puerto:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.port_var = tk.StringVar(value="502") self.port_entry = ttk.Entry(self.ethernet_frame, textvariable=self.port_var, width=10) self.port_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew") # Parámetros de mapeo (compartidos) ttk.Label(shared_config_frame, text="Min Brix [4mA]:").grid(row=1, column=0, padx=5, pady=5, sticky="w") self.min_brix_map_var = tk.StringVar(value="0") self.min_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.min_brix_map_var, width=10) self.min_brix_map_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew") ttk.Label(shared_config_frame, text="Max Brix [20mA]:").grid(row=1, column=2, padx=5, pady=5, sticky="w") self.max_brix_map_var = tk.StringVar(value="80") self.max_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.max_brix_map_var, width=10) self.max_brix_map_entry.grid(row=1, column=3, padx=5, pady=5, sticky="ew") # Botones de persistencia self.save_config_button = ttk.Button(shared_config_frame, text="Guardar", command=self.save_config) self.save_config_button.grid(row=1, column=4, padx=5, pady=5, sticky="ew") self.load_config_button = ttk.Button(shared_config_frame, text="Cargar", command=self.load_config) self.load_config_button.grid(row=1, column=5, padx=5, pady=5, sticky="ew") def create_simulator_tab(self): """Crea el contenido del tab Simulador""" # Frame de configuración del simulador sim_config_frame = ttk.LabelFrame(self.simulator_tab, text="Configuración Simulador") sim_config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2) # Dirección ADAM ttk.Label(sim_config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.adam_address_var = tk.StringVar(value="01") self.adam_address_entry = ttk.Entry(sim_config_frame, textvariable=self.adam_address_var, width=5) self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") # Función ttk.Label(sim_config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.function_type_var = tk.StringVar(value="Lineal") self.function_type_combo = ttk.Combobox(sim_config_frame, textvariable=self.function_type_var, values=["Lineal", "Sinusoidal", "Manual"], state="readonly", width=10) self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew") self.function_type_combo.bind("<>", self.on_function_type_change) # Periodo ttk.Label(sim_config_frame, text="Periodo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w") self.period_var = tk.StringVar(value="1.0") self.period_entry = ttk.Entry(sim_config_frame, textvariable=self.period_var, width=5) self.period_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew") # Frame para modo Manual manual_frame = ttk.LabelFrame(sim_config_frame, text="Modo Manual") manual_frame.grid(row=1, column=0, columnspan=6, padx=5, pady=5, sticky="ew") ttk.Label(manual_frame, text="Valor Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.manual_brix_var = tk.StringVar(value="10.0") self.manual_brix_entry = ttk.Entry(manual_frame, textvariable=self.manual_brix_var, width=10, state=tk.DISABLED) self.manual_brix_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") self.manual_brix_entry.bind('', lambda e: self.update_slider_from_entry()) self.manual_brix_entry.bind('', lambda e: self.update_slider_from_entry()) # Slider self.manual_slider_var = tk.DoubleVar(value=10.0) self.manual_slider = ttk.Scale(manual_frame, from_=0, to=100, orient=tk.HORIZONTAL, variable=self.manual_slider_var, command=self.on_slider_change, state=tk.DISABLED, length=200) self.manual_slider.grid(row=0, column=2, padx=5, pady=5, sticky="ew") self.manual_send_button = ttk.Button(manual_frame, text="Enviar Manual", command=self.send_manual_value, state=tk.DISABLED) self.manual_send_button.grid(row=0, column=3, padx=5, pady=5, sticky="ew") manual_frame.columnconfigure(2, weight=1) # Controls Frame controls_frame = ttk.LabelFrame(self.simulator_tab, text="Control Simulación") controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew") self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation) self.start_button.pack(side=tk.LEFT, padx=5) self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED) self.stop_button.pack(side=tk.LEFT, padx=5) self.clear_sim_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_sim_graph) self.clear_sim_graph_button.pack(side=tk.LEFT, padx=5) # Display Frame display_frame = ttk.LabelFrame(self.simulator_tab, text="Valores Actuales") display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="ew") ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.current_brix_display_var = tk.StringVar(value="---") self.current_brix_label = ttk.Label(display_frame, textvariable=self.current_brix_display_var, font=("Courier", 14, "bold")) self.current_brix_label.grid(row=0, column=1, padx=5, pady=5, sticky="w") ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w") self.current_ma_display_var = tk.StringVar(value="--.-- mA") self.current_ma_label = ttk.Label(display_frame, textvariable=self.current_ma_display_var, font=("Courier", 14, "bold")) self.current_ma_label.grid(row=1, column=1, padx=5, pady=5, sticky="w") # Graph Frame sim_graph_frame = ttk.LabelFrame(self.simulator_tab, text="Gráfico Simulador") sim_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") # Crear figura para simulador self.sim_fig = Figure(figsize=(8, 3.5), dpi=100) self.sim_ax1 = self.sim_fig.add_subplot(111) self.sim_ax2 = self.sim_ax1.twinx() self.sim_ax1.set_xlabel('Tiempo (s)') self.sim_ax1.set_ylabel('Brix', color='b') self.sim_ax2.set_ylabel('mA', color='r') self.sim_ax1.tick_params(axis='y', labelcolor='b') self.sim_ax2.tick_params(axis='y', labelcolor='r') self.sim_ax1.grid(True, alpha=0.3) self.sim_line_brix, = self.sim_ax1.plot([], [], 'b-', label='Brix', linewidth=2) self.sim_line_ma, = self.sim_ax2.plot([], [], 'r-', label='mA', linewidth=2) self.sim_canvas = FigureCanvasTkAgg(self.sim_fig, master=sim_graph_frame) self.sim_canvas.draw() self.sim_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Log Frame sim_log_frame = ttk.LabelFrame(self.simulator_tab, text="Log de Comunicación") sim_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") self.sim_log_text = scrolledtext.ScrolledText(sim_log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED) self.sim_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) # Configurar pesos self.simulator_tab.columnconfigure(0, weight=1) self.simulator_tab.columnconfigure(1, weight=1) self.simulator_tab.rowconfigure(2, weight=1) self.simulator_tab.rowconfigure(3, weight=1) def create_trace_tab(self): """Crea el contenido del tab Trace""" # Control Frame trace_control_frame = ttk.LabelFrame(self.trace_tab, text="Control Trace") trace_control_frame.grid(row=0, column=0, columnspan=2, padx=10, pady=5, sticky="ew") self.start_trace_button = ttk.Button(trace_control_frame, text="Iniciar Trace", command=self.start_trace) self.start_trace_button.pack(side=tk.LEFT, padx=5) self.stop_trace_button = ttk.Button(trace_control_frame, text="Detener Trace", command=self.stop_trace, state=tk.DISABLED) self.stop_trace_button.pack(side=tk.LEFT, padx=5) self.clear_trace_graph_button = ttk.Button(trace_control_frame, text="Limpiar Gráfico", command=self.clear_trace_graph) self.clear_trace_graph_button.pack(side=tk.LEFT, padx=5) ttk.Label(trace_control_frame, text="Archivo CSV:").pack(side=tk.LEFT, padx=(20, 5)) self.csv_filename_var = tk.StringVar(value="Sin archivo") self.csv_filename_label = ttk.Label(trace_control_frame, textvariable=self.csv_filename_var) self.csv_filename_label.pack(side=tk.LEFT, padx=5) # Display Frame trace_display_frame = ttk.LabelFrame(self.trace_tab, text="Último Valor Recibido") trace_display_frame.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky="ew") ttk.Label(trace_display_frame, text="Timestamp:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.trace_timestamp_var = tk.StringVar(value="---") ttk.Label(trace_display_frame, textvariable=self.trace_timestamp_var, font=("Courier", 12)).grid(row=0, column=1, padx=5, pady=5, sticky="w") ttk.Label(trace_display_frame, text="Valor mA:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.trace_ma_var = tk.StringVar(value="---") ttk.Label(trace_display_frame, textvariable=self.trace_ma_var, font=("Courier", 12, "bold")).grid(row=0, column=3, padx=5, pady=5, sticky="w") ttk.Label(trace_display_frame, text="Valor Brix:").grid(row=0, column=4, padx=5, pady=5, sticky="w") self.trace_brix_var = tk.StringVar(value="---") ttk.Label(trace_display_frame, textvariable=self.trace_brix_var, font=("Courier", 12, "bold")).grid(row=0, column=5, padx=5, pady=5, sticky="w") # Graph Frame trace_graph_frame = ttk.LabelFrame(self.trace_tab, text="Gráfico Trace (Brix)") trace_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") # Crear figura para trace self.trace_fig = Figure(figsize=(8, 4), dpi=100) self.trace_ax = self.trace_fig.add_subplot(111) self.trace_ax.set_xlabel('Tiempo (s)') self.trace_ax.set_ylabel('Brix', color='b') self.trace_ax.tick_params(axis='y', labelcolor='b') self.trace_ax.grid(True, alpha=0.3) self.trace_line_brix, = self.trace_ax.plot([], [], 'b-', label='Brix', linewidth=2, marker='o', markersize=4) self.trace_canvas = FigureCanvasTkAgg(self.trace_fig, master=trace_graph_frame) self.trace_canvas.draw() self.trace_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Log Frame trace_log_frame = ttk.LabelFrame(self.trace_tab, text="Log de Recepción") trace_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") self.trace_log_text = scrolledtext.ScrolledText(trace_log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED) self.trace_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) # Configurar pesos self.trace_tab.columnconfigure(0, weight=1) self.trace_tab.columnconfigure(1, weight=1) self.trace_tab.rowconfigure(2, weight=1) self.trace_tab.rowconfigure(3, weight=1) def _log_message(self, message, log_widget=None): """Escribe mensaje en el log especificado o en el log activo""" if log_widget is None: # Determinar qué log usar basado en la pestaña activa current_tab = self.notebook.index(self.notebook.select()) log_widget = self.sim_log_text if current_tab == 0 else self.trace_log_text log_widget.configure(state=tk.NORMAL) log_widget.insert(tk.END, f"[{datetime.now().strftime('%H:%M:%S')}] {message}\n") log_widget.see(tk.END) log_widget.configure(state=tk.DISABLED) def parse_adam_message(self, data, log_widget=None): """ Parsea un mensaje del protocolo ADAM y retorna el valor en mA Formato esperado: #AA[valor_mA][checksum]\r Donde: - # : Carácter inicial (opcional en algunas respuestas) - AA : Dirección del dispositivo (2 caracteres) - valor_mA : Valor en mA (6 caracteres, formato XX.XXX) - checksum : Suma de verificación (2 caracteres hex) - \r : Carácter de fin (opcional) """ try: # Formato esperado: #AA[valor_mA][checksum]\r # Pero también manejar respuestas sin # inicial o sin \r final data = data.strip() # Si empieza con #, es un mensaje estándar if data.startswith('#'): data = data[1:] # Remover # # Si termina con \r, removerlo if data.endswith('\r'): data = data[:-1] # Verificar longitud mínima if len(data) < 8: # 2 addr + 6 valor mínimo return None address = data[:2] value_str = data[2:8] # 6 caracteres para el valor (XX.XXX) # Verificar si hay checksum if len(data) >= 10: checksum = data[8:10] # 2 caracteres para checksum # Verificar checksum message_part = f"#{address}{value_str}" calculated_checksum = self.calculate_checksum(message_part) if checksum != calculated_checksum and log_widget: self._log_message(f"Checksum incorrecto: recibido={checksum}, calculado={calculated_checksum}", log_widget) # Continuar de todos modos si el valor parece válido # Convertir valor a float try: ma_value = float(value_str) return {'address': address, 'ma': ma_value} except ValueError: return None except Exception as e: if log_widget: self._log_message(f"Error parseando mensaje: {e}", log_widget) return None def ma_to_brix(self, ma_value): """Convierte valor mA a Brix usando el mapeo configurado""" try: min_brix = float(self.min_brix_map_var.get()) max_brix = float(self.max_brix_map_var.get()) if ma_value <= 4.0: return min_brix elif ma_value >= 20.0: return max_brix else: # Interpolación lineal percentage = (ma_value - 4.0) / 16.0 return min_brix + percentage * (max_brix - min_brix) except: return 0.0 def start_trace(self): """Inicia el modo trace para recibir datos""" if self.tracing: messagebox.showwarning("Advertencia", "El trace ya está en curso.") return # Obtener parámetros de conexión try: conn_type = self.connection_type_var.get() if conn_type == "Serial": conn_params = { 'port': self.com_port_var.get(), 'baud': int(self.baud_rate_var.get()) } else: conn_params = { 'ip': self.ip_address_var.get(), 'port': int(self.port_var.get()) } except ValueError as e: messagebox.showerror("Error", f"Parámetros de conexión inválidos: {e}") return # Crear archivo CSV timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f"maselli_trace_{timestamp}.csv" try: self.csv_file = open(csv_filename, 'w', newline='') self.csv_writer = csv.writer(self.csv_file) self.csv_writer.writerow(['Timestamp', 'Address', 'mA', 'Brix', 'Raw_Message']) self.csv_filename_var.set(csv_filename) except Exception as e: messagebox.showerror("Error", f"No se pudo crear archivo CSV: {e}") return # Abrir conexión try: self.connection = self._open_connection(conn_type, conn_params) self.connection_type = conn_type self._log_message(f"Conexión {conn_type} abierta para trace.", self.trace_log_text) except Exception as e: messagebox.showerror("Error de Conexión", str(e)) if self.csv_file: self.csv_file.close() return self.tracing = True self.trace_start_time = time.time() self.start_trace_button.config(state=tk.DISABLED) self.stop_trace_button.config(state=tk.NORMAL) self._set_trace_entries_state(tk.DISABLED) # Iniciar thread de recepción self.trace_thread = threading.Thread(target=self.run_trace, daemon=True) self.trace_thread.start() self._log_message("Trace iniciado.", self.trace_log_text) def stop_trace(self): """Detiene el modo trace""" if not self.tracing: return self.tracing = False # Esperar a que termine el thread if self.trace_thread and self.trace_thread.is_alive(): self.trace_thread.join(timeout=2.0) # Cerrar conexión if self.connection: self._close_connection(self.connection, self.connection_type) self._log_message("Conexión cerrada.", self.trace_log_text) self.connection = None # Cerrar archivo CSV if self.csv_file: self.csv_file.close() self.csv_file = None self.csv_writer = None self._log_message(f"Archivo CSV guardado: {self.csv_filename_var.get()}", self.trace_log_text) self.start_trace_button.config(state=tk.NORMAL) self.stop_trace_button.config(state=tk.DISABLED) self._set_trace_entries_state(tk.NORMAL) self._log_message("Trace detenido.", self.trace_log_text) def run_trace(self): """Thread principal para recepción de datos en modo trace""" buffer = "" while self.tracing: try: # Leer datos según el tipo de conexión data = None if self.connection_type == "Serial": if self.connection.in_waiting > 0: data = self.connection.read(self.connection.in_waiting).decode('ascii', errors='ignore') elif self.connection_type == "TCP": self.connection.settimeout(0.1) try: data = self.connection.recv(1024).decode('ascii', errors='ignore') if not data: # Conexión cerrada self._log_message("Conexión TCP cerrada por el servidor.", self.trace_log_text) break except socket.timeout: continue elif self.connection_type == "UDP": self.connection.settimeout(0.1) try: data, addr = self.connection.recvfrom(1024) data = data.decode('ascii', errors='ignore') except socket.timeout: continue if data: buffer += data # Buscar mensajes completos (terminan con \r o \n) while '\r' in buffer or '\n' in buffer: # Encontrar el primer terminador end_idx = len(buffer) for term in ['\r', '\n']: if term in buffer: idx = buffer.index(term) + 1 if idx < end_idx: end_idx = idx if end_idx > 0: message = buffer[:end_idx] buffer = buffer[end_idx:] # Procesar mensaje si tiene contenido if message.strip(): self._process_trace_message(message) else: break # Si el buffer tiene un mensaje completo sin terminador (>= 10 chars) # y no han llegado más datos en un tiempo, procesarlo if len(buffer) >= 10 and not ('\r' in buffer or '\n' in buffer): # Verificar si parece un mensaje ADAM completo if buffer.startswith('#') or len(buffer) == 10: self._process_trace_message(buffer) buffer = "" except Exception as e: if self.tracing: # Solo loguear si todavía estamos en trace self._log_message(f"Error en trace: {e}", self.trace_log_text) break # Pequeña pausa para no consumir demasiado CPU if not data: time.sleep(0.01) def _process_trace_message(self, message): """Procesa un mensaje recibido en modo trace""" # Log del mensaje raw display_msg = message.replace('\r', '').replace('\n', '') self._log_message(f"Recibido: {display_msg}", self.trace_log_text) # Parsear mensaje parsed = self.parse_adam_message(message, self.trace_log_text) if parsed: ma_value = parsed['ma'] brix_value = self.ma_to_brix(ma_value) timestamp = datetime.now() # Actualizar display self.trace_timestamp_var.set(timestamp.strftime("%H:%M:%S.%f")[:-3]) self.trace_ma_var.set(f"{ma_value:.3f} mA") self.trace_brix_var.set(f"{brix_value:.3f} Brix") # Log con detalles parseados self._log_message(f" -> Addr: {parsed['address']}, mA: {ma_value:.3f}, Brix: {brix_value:.3f}", self.trace_log_text) # Guardar en CSV if self.csv_writer: self.csv_writer.writerow([ timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], parsed['address'], f"{ma_value:.3f}", f"{brix_value:.3f}", display_msg ]) if self.csv_file: self.csv_file.flush() # Agregar al gráfico current_time = time.time() - self.trace_start_time self.trace_time_data.append(current_time) self.trace_brix_data.append(brix_value) # Actualizar gráfico self.root.after(0, self.trace_canvas.draw_idle) else: # Si no es un mensaje ADAM válido, podría ser otro tipo de respuesta self._log_message(f"Mensaje no ADAM: {display_msg}", self.trace_log_text) def _set_trace_entries_state(self, state): """Habilita/deshabilita controles durante el trace""" self.connection_type_combo.config(state=state) self.com_port_entry.config(state=state) self.baud_rate_entry.config(state=state) self.ip_address_entry.config(state=state) self.port_entry.config(state=state) self.min_brix_map_entry.config(state=state) self.max_brix_map_entry.config(state=state) def update_sim_graph(self, frame): """Actualiza el gráfico del simulador""" if len(self.sim_time_data) > 0: self.sim_line_brix.set_data(list(self.sim_time_data), list(self.sim_brix_data)) self.sim_line_ma.set_data(list(self.sim_time_data), list(self.sim_ma_data)) if len(self.sim_time_data) > 1: self.sim_ax1.set_xlim(min(self.sim_time_data), max(self.sim_time_data)) if len(self.sim_brix_data) > 0: brix_min = min(self.sim_brix_data) - 1 brix_max = max(self.sim_brix_data) + 1 self.sim_ax1.set_ylim(brix_min, brix_max) if len(self.sim_ma_data) > 0: ma_min = min(self.sim_ma_data) - 0.5 ma_max = max(self.sim_ma_data) + 0.5 self.sim_ax2.set_ylim(ma_min, ma_max) return self.sim_line_brix, self.sim_line_ma def update_trace_graph(self, frame): """Actualiza el gráfico del trace""" if len(self.trace_time_data) > 0: self.trace_line_brix.set_data(list(self.trace_time_data), list(self.trace_brix_data)) if len(self.trace_time_data) > 1: self.trace_ax.set_xlim(min(self.trace_time_data), max(self.trace_time_data)) if len(self.trace_brix_data) > 0: brix_min = min(self.trace_brix_data) - 1 brix_max = max(self.trace_brix_data) + 1 self.trace_ax.set_ylim(brix_min, brix_max) return self.trace_line_brix, def clear_sim_graph(self): """Limpia el gráfico del simulador""" self.sim_time_data.clear() self.sim_brix_data.clear() self.sim_ma_data.clear() self.sim_start_time = time.time() self.sim_canvas.draw_idle() self._log_message("Gráfico del simulador limpiado.", self.sim_log_text) def clear_trace_graph(self): """Limpia el gráfico del trace""" self.trace_time_data.clear() self.trace_brix_data.clear() self.trace_start_time = time.time() self.trace_canvas.draw_idle() self._log_message("Gráfico del trace limpiado.", self.trace_log_text) def save_config(self): """Guarda la configuración actual""" config = { 'connection_type': self.connection_type_var.get(), 'com_port': self.com_port_var.get(), 'baud_rate': self.baud_rate_var.get(), 'ip_address': self.ip_address_var.get(), 'port': self.port_var.get(), 'adam_address': self.adam_address_var.get(), 'function_type': self.function_type_var.get(), 'min_brix_map': self.min_brix_map_var.get(), 'max_brix_map': self.max_brix_map_var.get(), 'period': self.period_var.get(), 'manual_brix': self.manual_brix_var.get() } try: with open(self.config_file, 'w') as f: json.dump(config, f, indent=4) self._log_message("Configuración guardada exitosamente.") messagebox.showinfo("Éxito", "Configuración guardada correctamente.") except Exception as e: self._log_message(f"Error al guardar configuración: {e}") messagebox.showerror("Error", f"No se pudo guardar la configuración: {e}") def load_config(self, silent=False): """Carga la configuración desde archivo""" if not os.path.exists(self.config_file): if not silent: messagebox.showinfo("Información", "No se encontró archivo de configuración.") return try: with open(self.config_file, 'r') as f: config = json.load(f) self.connection_type_var.set(config.get('connection_type', 'Serial')) self.com_port_var.set(config.get('com_port', 'COM3')) self.baud_rate_var.set(config.get('baud_rate', '115200')) self.ip_address_var.set(config.get('ip_address', '192.168.1.100')) self.port_var.set(config.get('port', '502')) self.adam_address_var.set(config.get('adam_address', '01')) self.function_type_var.set(config.get('function_type', 'Lineal')) self.min_brix_map_var.set(config.get('min_brix_map', '0')) self.max_brix_map_var.set(config.get('max_brix_map', '80')) self.period_var.set(config.get('period', '1.0')) self.manual_brix_var.set(config.get('manual_brix', '10.0')) try: self.manual_slider_var.set(float(config.get('manual_brix', '10.0'))) except: pass self.on_connection_type_change() if not silent: self._log_message("Configuración cargada exitosamente.") messagebox.showinfo("Éxito", "Configuración cargada correctamente.") except Exception as e: if not silent: self._log_message(f"Error al cargar configuración: {e}") messagebox.showerror("Error", f"No se pudo cargar la configuración: {e}") def on_connection_type_change(self, event=None): conn_type = self.connection_type_var.get() if conn_type == "Serial": self.ethernet_frame.grid_remove() self.serial_frame.grid() else: self.serial_frame.grid_remove() self.ethernet_frame.grid() def on_function_type_change(self, event=None): func_type = self.function_type_var.get() if func_type == "Manual": if self.simulating: self.stop_simulation() self.manual_brix_entry.config(state=tk.NORMAL) self.manual_send_button.config(state=tk.NORMAL) self.manual_slider.config(state=tk.NORMAL) self.period_entry.config(state=tk.DISABLED) self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.DISABLED) else: self.manual_brix_entry.config(state=tk.DISABLED) self.manual_send_button.config(state=tk.DISABLED) self.manual_slider.config(state=tk.DISABLED) self.period_entry.config(state=tk.NORMAL) if not self.simulating: self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) else: self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) def on_slider_change(self, value): self.manual_brix_var.set(f"{float(value):.1f}") def update_slider_from_entry(self): try: value = float(self.manual_brix_var.get()) value = max(0, min(100, value)) self.manual_slider_var.set(value) self.manual_brix_var.set(f"{value:.1f}") except ValueError: pass def calculate_checksum(self, message_part): s = sum(ord(c) for c in message_part) checksum_byte = s % 256 return f"{checksum_byte:02X}" def scale_to_mA(self, brix_value, min_brix_map, max_brix_map): if max_brix_map == min_brix_map: return 4.0 percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map) percentage = max(0.0, min(1.0, percentage)) mA_value = 4.0 + percentage * 16.0 return mA_value def format_mA_value(self, mA_val): # Formato: "XX.XXX" (6 caracteres incluyendo el punto) return f"{mA_val:06.3f}" def _get_common_params(self): try: adam_address = self.adam_address_var.get() if len(adam_address) != 2: messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.") return None min_brix_map = float(self.min_brix_map_var.get()) max_brix_map = float(self.max_brix_map_var.get()) if min_brix_map > max_brix_map: messagebox.showerror("Error", "Valor Mínimo (Brix) no puede ser mayor que Valor Máximo (Brix).") return None conn_type = self.connection_type_var.get() if conn_type == "Serial": com_port = self.com_port_var.get() baud_rate = int(self.baud_rate_var.get()) return conn_type, {'port': com_port, 'baud': baud_rate}, adam_address, min_brix_map, max_brix_map else: ip_address = self.ip_address_var.get() port = int(self.port_var.get()) return conn_type, {'ip': ip_address, 'port': port}, adam_address, min_brix_map, max_brix_map except ValueError as e: messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración: {e}") return None def _open_connection(self, conn_type, conn_params): try: if conn_type == "Serial": return serial.Serial(conn_params['port'], conn_params['baud'], timeout=1) elif conn_type == "TCP": sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) sock.connect((conn_params['ip'], conn_params['port'])) return sock elif conn_type == "UDP": sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(1.0) sock.dest_address = (conn_params['ip'], conn_params['port']) return sock except Exception as e: raise Exception(f"Error al abrir conexión {conn_type}: {e}") def _close_connection(self, connection, conn_type): try: if conn_type == "Serial": if connection and connection.is_open: connection.close() elif conn_type in ["TCP", "UDP"]: if connection: connection.close() except Exception as e: self._log_message(f"Error al cerrar conexión: {e}") def _send_data(self, connection, conn_type, data): try: if conn_type == "Serial": connection.write(data.encode('ascii')) elif conn_type == "TCP": connection.send(data.encode('ascii')) elif conn_type == "UDP": connection.sendto(data.encode('ascii'), connection.dest_address) except Exception as e: raise Exception(f"Error al enviar datos: {e}") def add_sim_data_point(self, brix_value, ma_value): current_time = time.time() - self.sim_start_time self.sim_time_data.append(current_time) self.sim_brix_data.append(brix_value) self.sim_ma_data.append(ma_value) self.sim_canvas.draw_idle() def _read_response(self, connection, conn_type, timeout=0.5): """Intenta leer una respuesta del dispositivo""" try: response = None if conn_type == "Serial": # Guardar timeout original original_timeout = connection.timeout connection.timeout = timeout # Esperar un poco para que llegue la respuesta time.sleep(0.05) # Leer todos los bytes disponibles response_bytes = b"" start_time = time.time() while (time.time() - start_time) < timeout: if connection.in_waiting > 0: response_bytes += connection.read(connection.in_waiting) # Si encontramos un terminador, salir if b'\r' in response_bytes or b'\n' in response_bytes: break else: time.sleep(0.01) if response_bytes: response = response_bytes.decode('ascii', errors='ignore') connection.timeout = original_timeout elif conn_type == "TCP": connection.settimeout(timeout) try: response = connection.recv(1024).decode('ascii', errors='ignore') except socket.timeout: pass elif conn_type == "UDP": connection.settimeout(timeout) try: response, addr = connection.recvfrom(1024) response = response.decode('ascii', errors='ignore') except socket.timeout: pass return response except Exception as e: self._log_message(f"Error al leer respuesta: {e}", self.sim_log_text) return None def send_manual_value(self): common_params = self._get_common_params() if not common_params: return conn_type, conn_params, adam_address, min_brix_map, max_brix_map = common_params try: manual_brix = float(self.manual_brix_var.get()) except ValueError: messagebox.showerror("Error de Entrada", "Valor Brix Manual inválido.") return mA_val = self.scale_to_mA(manual_brix, min_brix_map, max_brix_map) mA_str = self.format_mA_value(mA_val) self.current_brix_display_var.set(f"{manual_brix:.3f} Brix") self.current_ma_display_var.set(f"{mA_str} mA") self.add_sim_data_point(manual_brix, mA_val) message_part = f"#{adam_address}{mA_str}" checksum = self.calculate_checksum(message_part) full_string_to_send = f"{message_part}{checksum}\r" log_display_string = full_string_to_send.replace('\r', '') temp_connection = None try: temp_connection = self._open_connection(conn_type, conn_params) self._log_message(f"Conexión {conn_type} abierta temporalmente para envío manual.", self.sim_log_text) self._log_message(f"Enviando Manual: {log_display_string}", self.sim_log_text) self._send_data(temp_connection, conn_type, full_string_to_send) # Intentar leer respuesta response = self._read_response(temp_connection, conn_type) if response and response.strip(): # Solo procesar si hay contenido display_resp = response.replace('\r', '').replace('\n', '') self._log_message(f"Respuesta: {display_resp}", self.sim_log_text) # Intentar parsear como mensaje ADAM parsed = self.parse_adam_message(response, self.sim_log_text) if parsed: # Convertir mA a Brix para mostrar brix_value = self.ma_to_brix(parsed['ma']) self._log_message(f" -> Dirección: {parsed['address']}, Valor: {parsed['ma']:.3f} mA ({brix_value:.3f} Brix)", self.sim_log_text) except Exception as e: self._log_message(f"Error al enviar manualmente: {e}", self.sim_log_text) messagebox.showerror("Error de Conexión", str(e)) finally: if temp_connection: self._close_connection(temp_connection, conn_type) self._log_message(f"Conexión {conn_type} cerrada tras envío manual.", self.sim_log_text) def start_simulation(self): if self.simulating: messagebox.showwarning("Advertencia", "La simulación ya está en curso.") return common_params = self._get_common_params() if not common_params: return self.connection_type, self.conn_params, self.adam_address, self.min_brix_map, self.max_brix_map = common_params try: self.simulation_period = float(self.period_var.get()) if self.simulation_period <= 0: messagebox.showerror("Error", "El periodo debe ser un número positivo.") return self.function_type = self.function_type_var.get() if self.function_type == "Manual": messagebox.showinfo("Info", "Seleccione modo Lineal o Sinusoidal para simulación continua.") return except ValueError as e: messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración de simulación: {e}") return try: self.connection = self._open_connection(self.connection_type, self.conn_params) self._log_message(f"Conexión {self.connection_type} abierta para simulación continua.", self.sim_log_text) except Exception as e: messagebox.showerror("Error de Conexión", str(e)) self.connection = None return self.simulating = True self.simulation_step = 0 self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self._set_sim_config_entries_state(tk.DISABLED) self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True) self.simulation_thread.start() self._log_message("Simulación continua iniciada.", self.sim_log_text) def _set_sim_config_entries_state(self, state): self.connection_type_combo.config(state=state) self.com_port_entry.config(state=state) self.baud_rate_entry.config(state=state) self.ip_address_entry.config(state=state) self.port_entry.config(state=state) self.adam_address_entry.config(state=state) self.function_type_combo.config(state=state) self.min_brix_map_entry.config(state=state) self.max_brix_map_entry.config(state=state) self.save_config_button.config(state=state) self.load_config_button.config(state=state) current_func_type = self.function_type_var.get() if current_func_type != "Manual": self.period_entry.config(state=state) self.manual_brix_entry.config(state=tk.DISABLED) self.manual_send_button.config(state=tk.DISABLED) self.manual_slider.config(state=tk.DISABLED) else: self.period_entry.config(state=tk.DISABLED) if state == tk.DISABLED: self.manual_brix_entry.config(state=tk.DISABLED) self.manual_send_button.config(state=tk.DISABLED) self.manual_slider.config(state=tk.DISABLED) else: self.manual_brix_entry.config(state=tk.NORMAL) self.manual_send_button.config(state=tk.NORMAL) self.manual_slider.config(state=tk.NORMAL) def stop_simulation(self): if not self.simulating: if self.function_type_var.get() != "Manual": self._log_message("Simulación continua ya estaba detenida.", self.sim_log_text) if self.function_type_var.get() != "Manual": self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self._set_sim_config_entries_state(tk.NORMAL) return self.simulating = False if self.simulation_thread and self.simulation_thread.is_alive(): try: self.simulation_thread.join(timeout=max(0.1, self.simulation_period * 1.5 if hasattr(self, 'simulation_period') else 2.0)) except Exception as e: self._log_message(f"Error al esperar el hilo de simulación: {e}", self.sim_log_text) if self.connection: self._close_connection(self.connection, self.connection_type) self._log_message(f"Conexión {self.connection_type} cerrada (simulación continua).", self.sim_log_text) self.connection = None self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self._set_sim_config_entries_state(tk.NORMAL) self.on_function_type_change() self._log_message("Simulación continua detenida.", self.sim_log_text) self.current_brix_display_var.set("---") self.current_ma_display_var.set("--.-- mA") def run_simulation(self): steps_for_full_cycle = 100 while self.simulating: current_brix_val = 0.0 if self.function_type == "Lineal": progress = (self.simulation_step % (2 * steps_for_full_cycle)) / steps_for_full_cycle if progress > 1.0: progress = 2.0 - progress current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * progress elif self.function_type == "Sinusoidal": phase = (self.simulation_step / steps_for_full_cycle) * 2 * math.pi sin_val = (math.sin(phase) + 1) / 2 current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * sin_val mA_val = self.scale_to_mA(current_brix_val, self.min_brix_map, self.max_brix_map) mA_str = self.format_mA_value(mA_val) self.current_brix_display_var.set(f"{current_brix_val:.3f} Brix") self.current_ma_display_var.set(f"{mA_str} mA") self.root.after(0, lambda b=current_brix_val, m=mA_val: self.add_sim_data_point(b, m)) message_part = f"#{self.adam_address}{mA_str}" checksum = self.calculate_checksum(message_part) full_string_to_send = f"{message_part}{checksum}\r" log_display_string = full_string_to_send.replace('\r', '') self._log_message(f"Enviando Sim: {log_display_string}", self.sim_log_text) if self.connection: try: self._send_data(self.connection, self.connection_type, full_string_to_send) # Intentar leer respuesta (timeout corto para no ralentizar simulación) response = self._read_response(self.connection, self.connection_type, timeout=0.1) if response and response.strip(): # Solo procesar si hay contenido display_resp = response.replace('\r', '').replace('\n', '') self._log_message(f"Respuesta: {display_resp}", self.sim_log_text) # Intentar parsear como mensaje ADAM parsed = self.parse_adam_message(response, self.sim_log_text) if parsed: # Convertir mA a Brix para mostrar brix_value = self.ma_to_brix(parsed['ma']) self._log_message(f" -> Dirección: {parsed['address']}, Valor: {parsed['ma']:.3f} mA ({brix_value:.3f} Brix)", self.sim_log_text) except Exception as e: self._log_message(f"Error al escribir en conexión (sim): {e}", self.sim_log_text) self.root.after(0, self.stop_simulation_from_thread_error) break self.simulation_step += 1 time.sleep(self.simulation_period) if not self.simulating and self.root.winfo_exists(): self.root.after(0, self.ensure_gui_stopped_state_sim) def stop_simulation_from_thread_error(self): if self.simulating: messagebox.showerror("Error de Simulación", "Error de conexión durante la simulación. Simulación detenida.") self.stop_simulation() def ensure_gui_stopped_state_sim(self): if not self.simulating: self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self._set_sim_config_entries_state(tk.NORMAL) if self.connection: self._close_connection(self.connection, self.connection_type) self._log_message(f"Conexión {self.connection_type} cerrada (auto, sim_end).", self.sim_log_text) self.connection = None self._log_message("Simulación continua terminada (hilo finalizado).", self.sim_log_text) def on_closing(self): """Maneja el cierre de la aplicación""" # Detener simulación si está activa if self.simulating: self.stop_simulation() # Detener trace si está activo if self.tracing: self.stop_trace() # Cerrar cualquier conexión abierta if self.connection: self._close_connection(self.connection, self.connection_type) # Destruir ventana self.root.destroy() if __name__ == "__main__": main_root = tk.Tk() app = MaselliSimulatorApp(main_root) main_root.mainloop()