diff --git a/MaselliSimulatorApp.py b/MaselliSimulatorApp.py deleted file mode 100644 index cd8fad4..0000000 --- a/MaselliSimulatorApp.py +++ /dev/null @@ -1,1210 +0,0 @@ -import tkinter as tk -from tkinter import ttk, scrolledtext, messagebox -import serial -import socket -import threading -import time -import math -import json -import os -import csv -from datetime import datetime -from collections import deque -import matplotlib.pyplot as plt -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -from matplotlib.figure import Figure -import matplotlib.animation as animation - -# Simulador y Trace para Protocolo Maselli (ADAM) -# Soporta conexiones Serial, TCP y UDP -# -# Para cambiar el icono, coloca uno de estos archivos en el mismo directorio: -# - icon.png (recomendado) -# - icon.ico (para Windows) -# - icon.gif -# -# Características: -# - Modo Simulador: Genera valores de prueba en protocolo ADAM -# - Modo Trace: Recibe y registra valores del medidor real -# - Conversión automática mA <-> Brix -# - Registro en CSV con timestamp -# - Gráficos en tiempo real -# - Respuestas del dispositivo mostradas en el log - -class MaselliSimulatorApp: - def __init__(self, root_window): - self.root = root_window - self.root.title("Simulador/Trace Protocolo Maselli") - self.root.geometry("900x700") # Tamaño inicial de ventana - - # Intentar cargar el icono - icon_loaded = False - for icon_file in ['icon.png', 'icon.ico', 'icon.gif']: - if os.path.exists(icon_file): - try: - if icon_file.endswith('.ico'): - self.root.iconbitmap(icon_file) - else: - icon = tk.PhotoImage(file=icon_file) - self.root.iconphoto(True, icon) - icon_loaded = True - break - except Exception as e: - print(f"No se pudo cargar {icon_file}: {e}") - - if not icon_loaded: - print("No se encontró ningún archivo de icono (icon.png, icon.ico, icon.gif)") - - self.connection = None - self.connection_type = None - self.simulating = False - self.simulation_thread = None - self.simulation_step = 0 - - # Para modo Trace - self.tracing = False - self.trace_thread = None - self.csv_file = None - self.csv_writer = None - - # Para los gráficos - self.max_points = 100 - # Datos para simulador - self.sim_time_data = deque(maxlen=self.max_points) - self.sim_brix_data = deque(maxlen=self.max_points) - self.sim_ma_data = deque(maxlen=self.max_points) - self.sim_start_time = time.time() - - # Datos para trace - self.trace_time_data = deque(maxlen=self.max_points) - self.trace_brix_data = deque(maxlen=self.max_points) - self.trace_start_time = time.time() - - # Archivo de configuración - self.config_file = "maselli_simulator_config.json" - - # Crear notebook para tabs - self.notebook = ttk.Notebook(self.root) - self.notebook.grid(row=0, column=0, sticky="nsew", padx=5, pady=5) - - # Tab Simulador - self.simulator_tab = ttk.Frame(self.notebook) - self.notebook.add(self.simulator_tab, text="Simulador") - - # Tab Trace - self.trace_tab = ttk.Frame(self.notebook) - self.notebook.add(self.trace_tab, text="Trace") - - # --- Frame de Configuración Compartida --- - self.create_shared_config_frame() - - # --- Crear contenido de cada tab --- - self.create_simulator_tab() - self.create_trace_tab() - - # Configurar pesos - self.root.columnconfigure(0, weight=1) - self.root.rowconfigure(0, weight=1) - - self.root.protocol("WM_DELETE_WINDOW", self.on_closing) - - # Cargar configuración si existe - self.load_config(silent=True) - - # Inicializar estado de la interfaz - self.on_connection_type_change() - self.on_function_type_change() - - # Iniciar animaciones de gráficos - self.sim_ani = animation.FuncAnimation(self.sim_fig, self.update_sim_graph, interval=100, blit=False) - self.trace_ani = animation.FuncAnimation(self.trace_fig, self.update_trace_graph, interval=100, blit=False) - - def create_shared_config_frame(self): - """Crea el frame de configuración compartida que aparece arriba del notebook""" - shared_config_frame = ttk.LabelFrame(self.root, text="Configuración de Conexión") - shared_config_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew") - - # Tipo de conexión - ttk.Label(shared_config_frame, text="Tipo:").grid(row=0, column=0, padx=5, pady=5, sticky="w") - self.connection_type_var = tk.StringVar(value="Serial") - self.connection_type_combo = ttk.Combobox(shared_config_frame, textvariable=self.connection_type_var, - values=["Serial", "TCP", "UDP"], state="readonly", width=10) - self.connection_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew") - self.connection_type_combo.bind("<>", self.on_connection_type_change) - - # Frame para configuración Serial - self.serial_frame = ttk.Frame(shared_config_frame) - self.serial_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew") - - ttk.Label(self.serial_frame, text="Puerto:").grid(row=0, column=0, padx=5, pady=5, sticky="w") - self.com_port_var = tk.StringVar(value="COM3") - self.com_port_entry = ttk.Entry(self.serial_frame, textvariable=self.com_port_var, width=10) - self.com_port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") - - ttk.Label(self.serial_frame, text="Baud:").grid(row=0, column=2, padx=5, pady=5, sticky="w") - self.baud_rate_var = tk.StringVar(value="115200") - self.baud_rate_entry = ttk.Entry(self.serial_frame, textvariable=self.baud_rate_var, width=10) - self.baud_rate_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew") - - # Frame para configuración Ethernet - self.ethernet_frame = ttk.Frame(shared_config_frame) - self.ethernet_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew") - self.ethernet_frame.grid_remove() - - ttk.Label(self.ethernet_frame, text="IP:").grid(row=0, column=0, padx=5, pady=5, sticky="w") - self.ip_address_var = tk.StringVar(value="192.168.1.100") - self.ip_address_entry = ttk.Entry(self.ethernet_frame, textvariable=self.ip_address_var, width=15) - self.ip_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") - - ttk.Label(self.ethernet_frame, text="Puerto:").grid(row=0, column=2, padx=5, pady=5, sticky="w") - self.port_var = tk.StringVar(value="502") - self.port_entry = ttk.Entry(self.ethernet_frame, textvariable=self.port_var, width=10) - self.port_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew") - - # Parámetros de mapeo (compartidos) - ttk.Label(shared_config_frame, text="Min Brix [4mA]:").grid(row=1, column=0, padx=5, pady=5, sticky="w") - self.min_brix_map_var = tk.StringVar(value="0") - self.min_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.min_brix_map_var, width=10) - self.min_brix_map_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew") - - ttk.Label(shared_config_frame, text="Max Brix [20mA]:").grid(row=1, column=2, padx=5, pady=5, sticky="w") - self.max_brix_map_var = tk.StringVar(value="80") - self.max_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.max_brix_map_var, width=10) - self.max_brix_map_entry.grid(row=1, column=3, padx=5, pady=5, sticky="ew") - - # Botones de persistencia - self.save_config_button = ttk.Button(shared_config_frame, text="Guardar", command=self.save_config) - self.save_config_button.grid(row=1, column=4, padx=5, pady=5, sticky="ew") - - self.load_config_button = ttk.Button(shared_config_frame, text="Cargar", command=self.load_config) - self.load_config_button.grid(row=1, column=5, padx=5, pady=5, sticky="ew") - - def create_simulator_tab(self): - """Crea el contenido del tab Simulador""" - # Frame de configuración del simulador - sim_config_frame = ttk.LabelFrame(self.simulator_tab, text="Configuración Simulador") - sim_config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2) - - # Dirección ADAM - ttk.Label(sim_config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w") - self.adam_address_var = tk.StringVar(value="01") - self.adam_address_entry = ttk.Entry(sim_config_frame, textvariable=self.adam_address_var, width=5) - self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") - - # Función - ttk.Label(sim_config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w") - self.function_type_var = tk.StringVar(value="Lineal") - self.function_type_combo = ttk.Combobox(sim_config_frame, textvariable=self.function_type_var, - values=["Lineal", "Sinusoidal", "Manual"], state="readonly", width=10) - self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew") - self.function_type_combo.bind("<>", self.on_function_type_change) - - # Periodo - ttk.Label(sim_config_frame, text="Periodo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w") - self.period_var = tk.StringVar(value="1.0") - self.period_entry = ttk.Entry(sim_config_frame, textvariable=self.period_var, width=5) - self.period_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew") - - # Frame para modo Manual - manual_frame = ttk.LabelFrame(sim_config_frame, text="Modo Manual") - manual_frame.grid(row=1, column=0, columnspan=6, padx=5, pady=5, sticky="ew") - - ttk.Label(manual_frame, text="Valor Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w") - self.manual_brix_var = tk.StringVar(value="10.0") - self.manual_brix_entry = ttk.Entry(manual_frame, textvariable=self.manual_brix_var, width=10, state=tk.DISABLED) - self.manual_brix_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") - self.manual_brix_entry.bind('', lambda e: self.update_slider_from_entry()) - self.manual_brix_entry.bind('', lambda e: self.update_slider_from_entry()) - - # Slider - self.manual_slider_var = tk.DoubleVar(value=10.0) - self.manual_slider = ttk.Scale(manual_frame, from_=0, to=100, orient=tk.HORIZONTAL, - variable=self.manual_slider_var, command=self.on_slider_change, - state=tk.DISABLED, length=200) - self.manual_slider.grid(row=0, column=2, padx=5, pady=5, sticky="ew") - - self.manual_send_button = ttk.Button(manual_frame, text="Enviar Manual", command=self.send_manual_value, state=tk.DISABLED) - self.manual_send_button.grid(row=0, column=3, padx=5, pady=5, sticky="ew") - - manual_frame.columnconfigure(2, weight=1) - - # Controls Frame - controls_frame = ttk.LabelFrame(self.simulator_tab, text="Control Simulación") - controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew") - - self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation) - self.start_button.pack(side=tk.LEFT, padx=5) - - self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED) - self.stop_button.pack(side=tk.LEFT, padx=5) - - self.clear_sim_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_sim_graph) - self.clear_sim_graph_button.pack(side=tk.LEFT, padx=5) - - # Display Frame - display_frame = ttk.LabelFrame(self.simulator_tab, text="Valores Actuales") - display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="ew") - - ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w") - self.current_brix_display_var = tk.StringVar(value="---") - self.current_brix_label = ttk.Label(display_frame, textvariable=self.current_brix_display_var, - font=("Courier", 14, "bold")) - self.current_brix_label.grid(row=0, column=1, padx=5, pady=5, sticky="w") - - ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w") - self.current_ma_display_var = tk.StringVar(value="--.-- mA") - self.current_ma_label = ttk.Label(display_frame, textvariable=self.current_ma_display_var, - font=("Courier", 14, "bold")) - self.current_ma_label.grid(row=1, column=1, padx=5, pady=5, sticky="w") - - # Graph Frame - sim_graph_frame = ttk.LabelFrame(self.simulator_tab, text="Gráfico Simulador") - sim_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") - - # Crear figura para simulador - self.sim_fig = Figure(figsize=(8, 3.5), dpi=100) - self.sim_ax1 = self.sim_fig.add_subplot(111) - self.sim_ax2 = self.sim_ax1.twinx() - - self.sim_ax1.set_xlabel('Tiempo (s)') - self.sim_ax1.set_ylabel('Brix', color='b') - self.sim_ax2.set_ylabel('mA', color='r') - self.sim_ax1.tick_params(axis='y', labelcolor='b') - self.sim_ax2.tick_params(axis='y', labelcolor='r') - self.sim_ax1.grid(True, alpha=0.3) - - self.sim_line_brix, = self.sim_ax1.plot([], [], 'b-', label='Brix', linewidth=2) - self.sim_line_ma, = self.sim_ax2.plot([], [], 'r-', label='mA', linewidth=2) - - self.sim_canvas = FigureCanvasTkAgg(self.sim_fig, master=sim_graph_frame) - self.sim_canvas.draw() - self.sim_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - - # Log Frame - sim_log_frame = ttk.LabelFrame(self.simulator_tab, text="Log de Comunicación") - sim_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") - - self.sim_log_text = scrolledtext.ScrolledText(sim_log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED) - self.sim_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) - - # Configurar pesos - self.simulator_tab.columnconfigure(0, weight=1) - self.simulator_tab.columnconfigure(1, weight=1) - self.simulator_tab.rowconfigure(2, weight=1) - self.simulator_tab.rowconfigure(3, weight=1) - - def create_trace_tab(self): - """Crea el contenido del tab Trace""" - # Control Frame - trace_control_frame = ttk.LabelFrame(self.trace_tab, text="Control Trace") - trace_control_frame.grid(row=0, column=0, columnspan=2, padx=10, pady=5, sticky="ew") - - self.start_trace_button = ttk.Button(trace_control_frame, text="Iniciar Trace", command=self.start_trace) - self.start_trace_button.pack(side=tk.LEFT, padx=5) - - self.stop_trace_button = ttk.Button(trace_control_frame, text="Detener Trace", command=self.stop_trace, state=tk.DISABLED) - self.stop_trace_button.pack(side=tk.LEFT, padx=5) - - self.clear_trace_graph_button = ttk.Button(trace_control_frame, text="Limpiar Gráfico", command=self.clear_trace_graph) - self.clear_trace_graph_button.pack(side=tk.LEFT, padx=5) - - ttk.Label(trace_control_frame, text="Archivo CSV:").pack(side=tk.LEFT, padx=(20, 5)) - self.csv_filename_var = tk.StringVar(value="Sin archivo") - self.csv_filename_label = ttk.Label(trace_control_frame, textvariable=self.csv_filename_var) - self.csv_filename_label.pack(side=tk.LEFT, padx=5) - - # Display Frame - trace_display_frame = ttk.LabelFrame(self.trace_tab, text="Último Valor Recibido") - trace_display_frame.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky="ew") - - ttk.Label(trace_display_frame, text="Timestamp:").grid(row=0, column=0, padx=5, pady=5, sticky="w") - self.trace_timestamp_var = tk.StringVar(value="---") - ttk.Label(trace_display_frame, textvariable=self.trace_timestamp_var, font=("Courier", 12)).grid(row=0, column=1, padx=5, pady=5, sticky="w") - - ttk.Label(trace_display_frame, text="Valor mA:").grid(row=0, column=2, padx=5, pady=5, sticky="w") - self.trace_ma_var = tk.StringVar(value="---") - ttk.Label(trace_display_frame, textvariable=self.trace_ma_var, font=("Courier", 12, "bold")).grid(row=0, column=3, padx=5, pady=5, sticky="w") - - ttk.Label(trace_display_frame, text="Valor Brix:").grid(row=0, column=4, padx=5, pady=5, sticky="w") - self.trace_brix_var = tk.StringVar(value="---") - ttk.Label(trace_display_frame, textvariable=self.trace_brix_var, font=("Courier", 12, "bold")).grid(row=0, column=5, padx=5, pady=5, sticky="w") - - # Graph Frame - trace_graph_frame = ttk.LabelFrame(self.trace_tab, text="Gráfico Trace (Brix)") - trace_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") - - # Crear figura para trace - self.trace_fig = Figure(figsize=(8, 4), dpi=100) - self.trace_ax = self.trace_fig.add_subplot(111) - - self.trace_ax.set_xlabel('Tiempo (s)') - self.trace_ax.set_ylabel('Brix', color='b') - self.trace_ax.tick_params(axis='y', labelcolor='b') - self.trace_ax.grid(True, alpha=0.3) - - self.trace_line_brix, = self.trace_ax.plot([], [], 'b-', label='Brix', linewidth=2, marker='o', markersize=4) - - self.trace_canvas = FigureCanvasTkAgg(self.trace_fig, master=trace_graph_frame) - self.trace_canvas.draw() - self.trace_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - - # Log Frame - trace_log_frame = ttk.LabelFrame(self.trace_tab, text="Log de Recepción") - trace_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") - - self.trace_log_text = scrolledtext.ScrolledText(trace_log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED) - self.trace_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) - - # Configurar pesos - self.trace_tab.columnconfigure(0, weight=1) - self.trace_tab.columnconfigure(1, weight=1) - self.trace_tab.rowconfigure(2, weight=1) - self.trace_tab.rowconfigure(3, weight=1) - - def _log_message(self, message, log_widget=None): - """Escribe mensaje en el log especificado o en el log activo""" - if log_widget is None: - # Determinar qué log usar basado en la pestaña activa - current_tab = self.notebook.index(self.notebook.select()) - log_widget = self.sim_log_text if current_tab == 0 else self.trace_log_text - - log_widget.configure(state=tk.NORMAL) - log_widget.insert(tk.END, f"[{datetime.now().strftime('%H:%M:%S')}] {message}\n") - log_widget.see(tk.END) - log_widget.configure(state=tk.DISABLED) - - def parse_adam_message(self, data, log_widget=None): - """ - Parsea un mensaje del protocolo ADAM y retorna el valor en mA - Formato esperado: #AA[valor_mA][checksum]\r - Donde: - - # : Carácter inicial (opcional en algunas respuestas) - - AA : Dirección del dispositivo (2 caracteres) - - valor_mA : Valor en mA (6 caracteres, formato XX.XXX) - - checksum : Suma de verificación (2 caracteres hex) - - \r : Carácter de fin (opcional) - """ - try: - # Formato esperado: #AA[valor_mA][checksum]\r - # Pero también manejar respuestas sin # inicial o sin \r final - data = data.strip() - - # Si empieza con #, es un mensaje estándar - if data.startswith('#'): - data = data[1:] # Remover # - - # Si termina con \r, removerlo - if data.endswith('\r'): - data = data[:-1] - - # Verificar longitud mínima - if len(data) < 8: # 2 addr + 6 valor mínimo - return None - - address = data[:2] - value_str = data[2:8] # 6 caracteres para el valor (XX.XXX) - - # Verificar si hay checksum - if len(data) >= 10: - checksum = data[8:10] # 2 caracteres para checksum - - # Verificar checksum - message_part = f"#{address}{value_str}" - calculated_checksum = self.calculate_checksum(message_part) - - if checksum != calculated_checksum and log_widget: - self._log_message(f"Checksum incorrecto: recibido={checksum}, calculado={calculated_checksum}", - log_widget) - # Continuar de todos modos si el valor parece válido - - # Convertir valor a float - try: - ma_value = float(value_str) - return {'address': address, 'ma': ma_value} - except ValueError: - return None - - except Exception as e: - if log_widget: - self._log_message(f"Error parseando mensaje: {e}", log_widget) - return None - - def ma_to_brix(self, ma_value): - """Convierte valor mA a Brix usando el mapeo configurado""" - try: - min_brix = float(self.min_brix_map_var.get()) - max_brix = float(self.max_brix_map_var.get()) - - if ma_value <= 4.0: - return min_brix - elif ma_value >= 20.0: - return max_brix - else: - # Interpolación lineal - percentage = (ma_value - 4.0) / 16.0 - return min_brix + percentage * (max_brix - min_brix) - except: - return 0.0 - - def start_trace(self): - """Inicia el modo trace para recibir datos""" - if self.tracing: - messagebox.showwarning("Advertencia", "El trace ya está en curso.") - return - - # Obtener parámetros de conexión - try: - conn_type = self.connection_type_var.get() - if conn_type == "Serial": - conn_params = { - 'port': self.com_port_var.get(), - 'baud': int(self.baud_rate_var.get()) - } - else: - conn_params = { - 'ip': self.ip_address_var.get(), - 'port': int(self.port_var.get()) - } - except ValueError as e: - messagebox.showerror("Error", f"Parámetros de conexión inválidos: {e}") - return - - # Crear archivo CSV - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - csv_filename = f"maselli_trace_{timestamp}.csv" - try: - self.csv_file = open(csv_filename, 'w', newline='') - self.csv_writer = csv.writer(self.csv_file) - self.csv_writer.writerow(['Timestamp', 'Address', 'mA', 'Brix', 'Raw_Message']) - self.csv_filename_var.set(csv_filename) - except Exception as e: - messagebox.showerror("Error", f"No se pudo crear archivo CSV: {e}") - return - - # Abrir conexión - try: - self.connection = self._open_connection(conn_type, conn_params) - self.connection_type = conn_type - self._log_message(f"Conexión {conn_type} abierta para trace.", self.trace_log_text) - except Exception as e: - messagebox.showerror("Error de Conexión", str(e)) - if self.csv_file: - self.csv_file.close() - return - - self.tracing = True - self.trace_start_time = time.time() - self.start_trace_button.config(state=tk.DISABLED) - self.stop_trace_button.config(state=tk.NORMAL) - self._set_trace_entries_state(tk.DISABLED) - - # Iniciar thread de recepción - self.trace_thread = threading.Thread(target=self.run_trace, daemon=True) - self.trace_thread.start() - self._log_message("Trace iniciado.", self.trace_log_text) - - def stop_trace(self): - """Detiene el modo trace""" - if not self.tracing: - return - - self.tracing = False - - # Esperar a que termine el thread - if self.trace_thread and self.trace_thread.is_alive(): - self.trace_thread.join(timeout=2.0) - - # Cerrar conexión - if self.connection: - self._close_connection(self.connection, self.connection_type) - self._log_message("Conexión cerrada.", self.trace_log_text) - self.connection = None - - # Cerrar archivo CSV - if self.csv_file: - self.csv_file.close() - self.csv_file = None - self.csv_writer = None - self._log_message(f"Archivo CSV guardado: {self.csv_filename_var.get()}", self.trace_log_text) - - self.start_trace_button.config(state=tk.NORMAL) - self.stop_trace_button.config(state=tk.DISABLED) - self._set_trace_entries_state(tk.NORMAL) - - self._log_message("Trace detenido.", self.trace_log_text) - - def run_trace(self): - """Thread principal para recepción de datos en modo trace""" - buffer = "" - - while self.tracing: - try: - # Leer datos según el tipo de conexión - data = None - if self.connection_type == "Serial": - if self.connection.in_waiting > 0: - data = self.connection.read(self.connection.in_waiting).decode('ascii', errors='ignore') - elif self.connection_type == "TCP": - self.connection.settimeout(0.1) - try: - data = self.connection.recv(1024).decode('ascii', errors='ignore') - if not data: # Conexión cerrada - self._log_message("Conexión TCP cerrada por el servidor.", self.trace_log_text) - break - except socket.timeout: - continue - elif self.connection_type == "UDP": - self.connection.settimeout(0.1) - try: - data, addr = self.connection.recvfrom(1024) - data = data.decode('ascii', errors='ignore') - except socket.timeout: - continue - - if data: - buffer += data - - # Buscar mensajes completos (terminan con \r o \n) - while '\r' in buffer or '\n' in buffer: - # Encontrar el primer terminador - end_idx = len(buffer) - for term in ['\r', '\n']: - if term in buffer: - idx = buffer.index(term) + 1 - if idx < end_idx: - end_idx = idx - - if end_idx > 0: - message = buffer[:end_idx] - buffer = buffer[end_idx:] - - # Procesar mensaje si tiene contenido - if message.strip(): - self._process_trace_message(message) - else: - break - - # Si el buffer tiene un mensaje completo sin terminador (>= 10 chars) - # y no han llegado más datos en un tiempo, procesarlo - if len(buffer) >= 10 and not ('\r' in buffer or '\n' in buffer): - # Verificar si parece un mensaje ADAM completo - if buffer.startswith('#') or len(buffer) == 10: - self._process_trace_message(buffer) - buffer = "" - - except Exception as e: - if self.tracing: # Solo loguear si todavía estamos en trace - self._log_message(f"Error en trace: {e}", self.trace_log_text) - break - - # Pequeña pausa para no consumir demasiado CPU - if not data: - time.sleep(0.01) - - def _process_trace_message(self, message): - """Procesa un mensaje recibido en modo trace""" - # Log del mensaje raw - display_msg = message.replace('\r', '').replace('\n', '') - self._log_message(f"Recibido: {display_msg}", self.trace_log_text) - - # Parsear mensaje - parsed = self.parse_adam_message(message, self.trace_log_text) - if parsed: - ma_value = parsed['ma'] - brix_value = self.ma_to_brix(ma_value) - timestamp = datetime.now() - - # Actualizar display - self.trace_timestamp_var.set(timestamp.strftime("%H:%M:%S.%f")[:-3]) - self.trace_ma_var.set(f"{ma_value:.3f} mA") - self.trace_brix_var.set(f"{brix_value:.3f} Brix") - - # Log con detalles parseados - self._log_message(f" -> Addr: {parsed['address']}, mA: {ma_value:.3f}, Brix: {brix_value:.3f}", - self.trace_log_text) - - # Guardar en CSV - if self.csv_writer: - self.csv_writer.writerow([ - timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], - parsed['address'], - f"{ma_value:.3f}", - f"{brix_value:.3f}", - display_msg - ]) - if self.csv_file: - self.csv_file.flush() - - # Agregar al gráfico - current_time = time.time() - self.trace_start_time - self.trace_time_data.append(current_time) - self.trace_brix_data.append(brix_value) - - # Actualizar gráfico - self.root.after(0, self.trace_canvas.draw_idle) - else: - # Si no es un mensaje ADAM válido, podría ser otro tipo de respuesta - self._log_message(f"Mensaje no ADAM: {display_msg}", self.trace_log_text) - - def _set_trace_entries_state(self, state): - """Habilita/deshabilita controles durante el trace""" - self.connection_type_combo.config(state=state) - self.com_port_entry.config(state=state) - self.baud_rate_entry.config(state=state) - self.ip_address_entry.config(state=state) - self.port_entry.config(state=state) - self.min_brix_map_entry.config(state=state) - self.max_brix_map_entry.config(state=state) - - def update_sim_graph(self, frame): - """Actualiza el gráfico del simulador""" - if len(self.sim_time_data) > 0: - self.sim_line_brix.set_data(list(self.sim_time_data), list(self.sim_brix_data)) - self.sim_line_ma.set_data(list(self.sim_time_data), list(self.sim_ma_data)) - - if len(self.sim_time_data) > 1: - self.sim_ax1.set_xlim(min(self.sim_time_data), max(self.sim_time_data)) - - if len(self.sim_brix_data) > 0: - brix_min = min(self.sim_brix_data) - 1 - brix_max = max(self.sim_brix_data) + 1 - self.sim_ax1.set_ylim(brix_min, brix_max) - - if len(self.sim_ma_data) > 0: - ma_min = min(self.sim_ma_data) - 0.5 - ma_max = max(self.sim_ma_data) + 0.5 - self.sim_ax2.set_ylim(ma_min, ma_max) - - return self.sim_line_brix, self.sim_line_ma - - def update_trace_graph(self, frame): - """Actualiza el gráfico del trace""" - if len(self.trace_time_data) > 0: - self.trace_line_brix.set_data(list(self.trace_time_data), list(self.trace_brix_data)) - - if len(self.trace_time_data) > 1: - self.trace_ax.set_xlim(min(self.trace_time_data), max(self.trace_time_data)) - - if len(self.trace_brix_data) > 0: - brix_min = min(self.trace_brix_data) - 1 - brix_max = max(self.trace_brix_data) + 1 - self.trace_ax.set_ylim(brix_min, brix_max) - - return self.trace_line_brix, - - def clear_sim_graph(self): - """Limpia el gráfico del simulador""" - self.sim_time_data.clear() - self.sim_brix_data.clear() - self.sim_ma_data.clear() - self.sim_start_time = time.time() - self.sim_canvas.draw_idle() - self._log_message("Gráfico del simulador limpiado.", self.sim_log_text) - - def clear_trace_graph(self): - """Limpia el gráfico del trace""" - self.trace_time_data.clear() - self.trace_brix_data.clear() - self.trace_start_time = time.time() - self.trace_canvas.draw_idle() - self._log_message("Gráfico del trace limpiado.", self.trace_log_text) - - def save_config(self): - """Guarda la configuración actual""" - config = { - 'connection_type': self.connection_type_var.get(), - 'com_port': self.com_port_var.get(), - 'baud_rate': self.baud_rate_var.get(), - 'ip_address': self.ip_address_var.get(), - 'port': self.port_var.get(), - 'adam_address': self.adam_address_var.get(), - 'function_type': self.function_type_var.get(), - 'min_brix_map': self.min_brix_map_var.get(), - 'max_brix_map': self.max_brix_map_var.get(), - 'period': self.period_var.get(), - 'manual_brix': self.manual_brix_var.get() - } - - try: - with open(self.config_file, 'w') as f: - json.dump(config, f, indent=4) - self._log_message("Configuración guardada exitosamente.") - messagebox.showinfo("Éxito", "Configuración guardada correctamente.") - except Exception as e: - self._log_message(f"Error al guardar configuración: {e}") - messagebox.showerror("Error", f"No se pudo guardar la configuración: {e}") - - def load_config(self, silent=False): - """Carga la configuración desde archivo""" - if not os.path.exists(self.config_file): - if not silent: - messagebox.showinfo("Información", "No se encontró archivo de configuración.") - return - - try: - with open(self.config_file, 'r') as f: - config = json.load(f) - - self.connection_type_var.set(config.get('connection_type', 'Serial')) - self.com_port_var.set(config.get('com_port', 'COM3')) - self.baud_rate_var.set(config.get('baud_rate', '115200')) - self.ip_address_var.set(config.get('ip_address', '192.168.1.100')) - self.port_var.set(config.get('port', '502')) - self.adam_address_var.set(config.get('adam_address', '01')) - self.function_type_var.set(config.get('function_type', 'Lineal')) - self.min_brix_map_var.set(config.get('min_brix_map', '0')) - self.max_brix_map_var.set(config.get('max_brix_map', '80')) - self.period_var.set(config.get('period', '1.0')) - self.manual_brix_var.set(config.get('manual_brix', '10.0')) - - try: - self.manual_slider_var.set(float(config.get('manual_brix', '10.0'))) - except: - pass - - self.on_connection_type_change() - - if not silent: - self._log_message("Configuración cargada exitosamente.") - messagebox.showinfo("Éxito", "Configuración cargada correctamente.") - except Exception as e: - if not silent: - self._log_message(f"Error al cargar configuración: {e}") - messagebox.showerror("Error", f"No se pudo cargar la configuración: {e}") - - def on_connection_type_change(self, event=None): - conn_type = self.connection_type_var.get() - if conn_type == "Serial": - self.ethernet_frame.grid_remove() - self.serial_frame.grid() - else: - self.serial_frame.grid_remove() - self.ethernet_frame.grid() - - def on_function_type_change(self, event=None): - func_type = self.function_type_var.get() - if func_type == "Manual": - if self.simulating: - self.stop_simulation() - - self.manual_brix_entry.config(state=tk.NORMAL) - self.manual_send_button.config(state=tk.NORMAL) - self.manual_slider.config(state=tk.NORMAL) - - self.period_entry.config(state=tk.DISABLED) - self.start_button.config(state=tk.DISABLED) - self.stop_button.config(state=tk.DISABLED) - else: - self.manual_brix_entry.config(state=tk.DISABLED) - self.manual_send_button.config(state=tk.DISABLED) - self.manual_slider.config(state=tk.DISABLED) - - self.period_entry.config(state=tk.NORMAL) - if not self.simulating: - self.start_button.config(state=tk.NORMAL) - self.stop_button.config(state=tk.DISABLED) - else: - self.start_button.config(state=tk.DISABLED) - self.stop_button.config(state=tk.NORMAL) - - def on_slider_change(self, value): - self.manual_brix_var.set(f"{float(value):.1f}") - - def update_slider_from_entry(self): - try: - value = float(self.manual_brix_var.get()) - value = max(0, min(100, value)) - self.manual_slider_var.set(value) - self.manual_brix_var.set(f"{value:.1f}") - except ValueError: - pass - - def calculate_checksum(self, message_part): - s = sum(ord(c) for c in message_part) - checksum_byte = s % 256 - return f"{checksum_byte:02X}" - - def scale_to_mA(self, brix_value, min_brix_map, max_brix_map): - if max_brix_map == min_brix_map: - return 4.0 - - percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map) - percentage = max(0.0, min(1.0, percentage)) - - mA_value = 4.0 + percentage * 16.0 - return mA_value - - def format_mA_value(self, mA_val): - # Formato: "XX.XXX" (6 caracteres incluyendo el punto) - return f"{mA_val:06.3f}" - - def _get_common_params(self): - try: - adam_address = self.adam_address_var.get() - if len(adam_address) != 2: - messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.") - return None - - min_brix_map = float(self.min_brix_map_var.get()) - max_brix_map = float(self.max_brix_map_var.get()) - if min_brix_map > max_brix_map: - messagebox.showerror("Error", "Valor Mínimo (Brix) no puede ser mayor que Valor Máximo (Brix).") - return None - - conn_type = self.connection_type_var.get() - - if conn_type == "Serial": - com_port = self.com_port_var.get() - baud_rate = int(self.baud_rate_var.get()) - return conn_type, {'port': com_port, 'baud': baud_rate}, adam_address, min_brix_map, max_brix_map - else: - ip_address = self.ip_address_var.get() - port = int(self.port_var.get()) - return conn_type, {'ip': ip_address, 'port': port}, adam_address, min_brix_map, max_brix_map - - except ValueError as e: - messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración: {e}") - return None - - def _open_connection(self, conn_type, conn_params): - try: - if conn_type == "Serial": - return serial.Serial(conn_params['port'], conn_params['baud'], timeout=1) - elif conn_type == "TCP": - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(5.0) - sock.connect((conn_params['ip'], conn_params['port'])) - return sock - elif conn_type == "UDP": - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.settimeout(1.0) - sock.dest_address = (conn_params['ip'], conn_params['port']) - return sock - except Exception as e: - raise Exception(f"Error al abrir conexión {conn_type}: {e}") - - def _close_connection(self, connection, conn_type): - try: - if conn_type == "Serial": - if connection and connection.is_open: - connection.close() - elif conn_type in ["TCP", "UDP"]: - if connection: - connection.close() - except Exception as e: - self._log_message(f"Error al cerrar conexión: {e}") - - def _send_data(self, connection, conn_type, data): - try: - if conn_type == "Serial": - connection.write(data.encode('ascii')) - elif conn_type == "TCP": - connection.send(data.encode('ascii')) - elif conn_type == "UDP": - connection.sendto(data.encode('ascii'), connection.dest_address) - except Exception as e: - raise Exception(f"Error al enviar datos: {e}") - - def add_sim_data_point(self, brix_value, ma_value): - current_time = time.time() - self.sim_start_time - self.sim_time_data.append(current_time) - self.sim_brix_data.append(brix_value) - self.sim_ma_data.append(ma_value) - self.sim_canvas.draw_idle() - - def _read_response(self, connection, conn_type, timeout=0.5): - """Intenta leer una respuesta del dispositivo""" - try: - response = None - if conn_type == "Serial": - # Guardar timeout original - original_timeout = connection.timeout - connection.timeout = timeout - # Esperar un poco para que llegue la respuesta - time.sleep(0.05) - # Leer todos los bytes disponibles - response_bytes = b"" - start_time = time.time() - while (time.time() - start_time) < timeout: - if connection.in_waiting > 0: - response_bytes += connection.read(connection.in_waiting) - # Si encontramos un terminador, salir - if b'\r' in response_bytes or b'\n' in response_bytes: - break - else: - time.sleep(0.01) - - if response_bytes: - response = response_bytes.decode('ascii', errors='ignore') - connection.timeout = original_timeout - elif conn_type == "TCP": - connection.settimeout(timeout) - try: - response = connection.recv(1024).decode('ascii', errors='ignore') - except socket.timeout: - pass - elif conn_type == "UDP": - connection.settimeout(timeout) - try: - response, addr = connection.recvfrom(1024) - response = response.decode('ascii', errors='ignore') - except socket.timeout: - pass - - return response - except Exception as e: - self._log_message(f"Error al leer respuesta: {e}", self.sim_log_text) - return None - - def send_manual_value(self): - common_params = self._get_common_params() - if not common_params: - return - conn_type, conn_params, adam_address, min_brix_map, max_brix_map = common_params - - try: - manual_brix = float(self.manual_brix_var.get()) - except ValueError: - messagebox.showerror("Error de Entrada", "Valor Brix Manual inválido.") - return - - mA_val = self.scale_to_mA(manual_brix, min_brix_map, max_brix_map) - mA_str = self.format_mA_value(mA_val) - - self.current_brix_display_var.set(f"{manual_brix:.3f} Brix") - self.current_ma_display_var.set(f"{mA_str} mA") - - self.add_sim_data_point(manual_brix, mA_val) - - message_part = f"#{adam_address}{mA_str}" - checksum = self.calculate_checksum(message_part) - full_string_to_send = f"{message_part}{checksum}\r" - log_display_string = full_string_to_send.replace('\r', '') - - temp_connection = None - try: - temp_connection = self._open_connection(conn_type, conn_params) - self._log_message(f"Conexión {conn_type} abierta temporalmente para envío manual.", self.sim_log_text) - self._log_message(f"Enviando Manual: {log_display_string}", self.sim_log_text) - self._send_data(temp_connection, conn_type, full_string_to_send) - - # Intentar leer respuesta - response = self._read_response(temp_connection, conn_type) - if response and response.strip(): # Solo procesar si hay contenido - display_resp = response.replace('\r', '').replace('\n', '') - self._log_message(f"Respuesta: {display_resp}", self.sim_log_text) - - # Intentar parsear como mensaje ADAM - parsed = self.parse_adam_message(response, self.sim_log_text) - if parsed: - # Convertir mA a Brix para mostrar - brix_value = self.ma_to_brix(parsed['ma']) - self._log_message(f" -> Dirección: {parsed['address']}, Valor: {parsed['ma']:.3f} mA ({brix_value:.3f} Brix)", self.sim_log_text) - - except Exception as e: - self._log_message(f"Error al enviar manualmente: {e}", self.sim_log_text) - messagebox.showerror("Error de Conexión", str(e)) - finally: - if temp_connection: - self._close_connection(temp_connection, conn_type) - self._log_message(f"Conexión {conn_type} cerrada tras envío manual.", self.sim_log_text) - - def start_simulation(self): - if self.simulating: - messagebox.showwarning("Advertencia", "La simulación ya está en curso.") - return - - common_params = self._get_common_params() - if not common_params: - return - self.connection_type, self.conn_params, self.adam_address, self.min_brix_map, self.max_brix_map = common_params - - try: - self.simulation_period = float(self.period_var.get()) - if self.simulation_period <= 0: - messagebox.showerror("Error", "El periodo debe ser un número positivo.") - return - self.function_type = self.function_type_var.get() - if self.function_type == "Manual": - messagebox.showinfo("Info", "Seleccione modo Lineal o Sinusoidal para simulación continua.") - return - - except ValueError as e: - messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración de simulación: {e}") - return - - try: - self.connection = self._open_connection(self.connection_type, self.conn_params) - self._log_message(f"Conexión {self.connection_type} abierta para simulación continua.", self.sim_log_text) - except Exception as e: - messagebox.showerror("Error de Conexión", str(e)) - self.connection = None - return - - self.simulating = True - self.simulation_step = 0 - self.start_button.config(state=tk.DISABLED) - self.stop_button.config(state=tk.NORMAL) - self._set_sim_config_entries_state(tk.DISABLED) - - self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True) - self.simulation_thread.start() - self._log_message("Simulación continua iniciada.", self.sim_log_text) - - def _set_sim_config_entries_state(self, state): - self.connection_type_combo.config(state=state) - self.com_port_entry.config(state=state) - self.baud_rate_entry.config(state=state) - self.ip_address_entry.config(state=state) - self.port_entry.config(state=state) - self.adam_address_entry.config(state=state) - self.function_type_combo.config(state=state) - self.min_brix_map_entry.config(state=state) - self.max_brix_map_entry.config(state=state) - self.save_config_button.config(state=state) - self.load_config_button.config(state=state) - - current_func_type = self.function_type_var.get() - if current_func_type != "Manual": - self.period_entry.config(state=state) - self.manual_brix_entry.config(state=tk.DISABLED) - self.manual_send_button.config(state=tk.DISABLED) - self.manual_slider.config(state=tk.DISABLED) - else: - self.period_entry.config(state=tk.DISABLED) - if state == tk.DISABLED: - self.manual_brix_entry.config(state=tk.DISABLED) - self.manual_send_button.config(state=tk.DISABLED) - self.manual_slider.config(state=tk.DISABLED) - else: - self.manual_brix_entry.config(state=tk.NORMAL) - self.manual_send_button.config(state=tk.NORMAL) - self.manual_slider.config(state=tk.NORMAL) - - def stop_simulation(self): - if not self.simulating: - if self.function_type_var.get() != "Manual": - self._log_message("Simulación continua ya estaba detenida.", self.sim_log_text) - if self.function_type_var.get() != "Manual": - self.start_button.config(state=tk.NORMAL) - self.stop_button.config(state=tk.DISABLED) - self._set_sim_config_entries_state(tk.NORMAL) - return - - self.simulating = False - if self.simulation_thread and self.simulation_thread.is_alive(): - try: - self.simulation_thread.join(timeout=max(0.1, self.simulation_period * 1.5 if hasattr(self, 'simulation_period') else 2.0)) - except Exception as e: - self._log_message(f"Error al esperar el hilo de simulación: {e}", self.sim_log_text) - - if self.connection: - self._close_connection(self.connection, self.connection_type) - self._log_message(f"Conexión {self.connection_type} cerrada (simulación continua).", self.sim_log_text) - self.connection = None - - self.start_button.config(state=tk.NORMAL) - self.stop_button.config(state=tk.DISABLED) - self._set_sim_config_entries_state(tk.NORMAL) - self.on_function_type_change() - - self._log_message("Simulación continua detenida.", self.sim_log_text) - self.current_brix_display_var.set("---") - self.current_ma_display_var.set("--.-- mA") - - def run_simulation(self): - steps_for_full_cycle = 100 - - while self.simulating: - current_brix_val = 0.0 - - if self.function_type == "Lineal": - progress = (self.simulation_step % (2 * steps_for_full_cycle)) / steps_for_full_cycle - if progress > 1.0: - progress = 2.0 - progress - current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * progress - - elif self.function_type == "Sinusoidal": - phase = (self.simulation_step / steps_for_full_cycle) * 2 * math.pi - sin_val = (math.sin(phase) + 1) / 2 - current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * sin_val - - mA_val = self.scale_to_mA(current_brix_val, self.min_brix_map, self.max_brix_map) - mA_str = self.format_mA_value(mA_val) - - self.current_brix_display_var.set(f"{current_brix_val:.3f} Brix") - self.current_ma_display_var.set(f"{mA_str} mA") - - self.root.after(0, lambda b=current_brix_val, m=mA_val: self.add_sim_data_point(b, m)) - - message_part = f"#{self.adam_address}{mA_str}" - checksum = self.calculate_checksum(message_part) - full_string_to_send = f"{message_part}{checksum}\r" - log_display_string = full_string_to_send.replace('\r', '') - self._log_message(f"Enviando Sim: {log_display_string}", self.sim_log_text) - - if self.connection: - try: - self._send_data(self.connection, self.connection_type, full_string_to_send) - - # Intentar leer respuesta (timeout corto para no ralentizar simulación) - response = self._read_response(self.connection, self.connection_type, timeout=0.1) - if response and response.strip(): # Solo procesar si hay contenido - display_resp = response.replace('\r', '').replace('\n', '') - self._log_message(f"Respuesta: {display_resp}", self.sim_log_text) - - # Intentar parsear como mensaje ADAM - parsed = self.parse_adam_message(response, self.sim_log_text) - if parsed: - # Convertir mA a Brix para mostrar - brix_value = self.ma_to_brix(parsed['ma']) - self._log_message(f" -> Dirección: {parsed['address']}, Valor: {parsed['ma']:.3f} mA ({brix_value:.3f} Brix)", self.sim_log_text) - - except Exception as e: - self._log_message(f"Error al escribir en conexión (sim): {e}", self.sim_log_text) - self.root.after(0, self.stop_simulation_from_thread_error) - break - - self.simulation_step += 1 - time.sleep(self.simulation_period) - - if not self.simulating and self.root.winfo_exists(): - self.root.after(0, self.ensure_gui_stopped_state_sim) - - def stop_simulation_from_thread_error(self): - if self.simulating: - messagebox.showerror("Error de Simulación", "Error de conexión durante la simulación. Simulación detenida.") - self.stop_simulation() - - def ensure_gui_stopped_state_sim(self): - if not self.simulating: - self.start_button.config(state=tk.NORMAL) - self.stop_button.config(state=tk.DISABLED) - self._set_sim_config_entries_state(tk.NORMAL) - if self.connection: - self._close_connection(self.connection, self.connection_type) - self._log_message(f"Conexión {self.connection_type} cerrada (auto, sim_end).", self.sim_log_text) - self.connection = None - self._log_message("Simulación continua terminada (hilo finalizado).", self.sim_log_text) - - def on_closing(self): - """Maneja el cierre de la aplicación""" - # Detener simulación si está activa - if self.simulating: - self.stop_simulation() - - # Detener trace si está activo - if self.tracing: - self.stop_trace() - - # Cerrar cualquier conexión abierta - if self.connection: - self._close_connection(self.connection, self.connection_type) - - # Destruir ventana - self.root.destroy() - -if __name__ == "__main__": - main_root = tk.Tk() - app = MaselliSimulatorApp(main_root) - main_root.mainloop() \ No newline at end of file diff --git a/config_manager.py b/config_manager.py index 3e3aa87..10f43b2 100644 --- a/config_manager.py +++ b/config_manager.py @@ -21,6 +21,7 @@ class ConfigManager: 'cycle_time': '0.5', 'manual_input_type': 'Brix', # Nuevo: 'Brix', 'mA', 'Voltaje' 'manual_value': '10.0', # Nuevo: valor correspondiente al manual_input_type + 'random_error_interval': '10.0', # Intervalo para errores aleatorios en el simulador # Configuración para NetCom 'netcom_com_port': 'COM3', 'netcom_baud_rate': '115200', @@ -135,6 +136,14 @@ class ConfigManager: except ValueError: errors.append("El valor manual debe ser un número válido") + # Validar intervalo de errores aleatorios del simulador + try: + random_error_interval = float(config.get('random_error_interval', '10.0')) + if random_error_interval <= 0: + errors.append("El intervalo para errores aleatorios debe ser un número positivo.") + except ValueError: + errors.append("El intervalo para errores aleatorios debe ser un número válido.") + # Validar puerto serie if config.get('connection_type') == 'Serial': com_port = config.get('com_port', '') diff --git a/maselli_app.py b/maselli_app.py index 8626434..270b046 100644 --- a/maselli_app.py +++ b/maselli_app.py @@ -45,10 +45,10 @@ class MaselliApp: # Inicializar animaciones de gráficos self.sim_ani = animation.FuncAnimation( - self.sim_fig, self.update_sim_graph, interval=100, blit=False + self.sim_fig, self.update_sim_graph, interval=100, blit=False, cache_frame_data=False ) self.trace_ani = animation.FuncAnimation( - self.trace_fig, self.update_trace_graph, interval=100, blit=False + self.trace_fig, self.update_trace_graph, interval=100, blit=False, cache_frame_data=False ) def create_widgets(self): @@ -419,4 +419,3 @@ class MaselliApp: # Cerrar ventana self.root.destroy() - diff --git a/maselli_simulator_config.json b/maselli_simulator_config.json index 261ca2f..b9872b3 100644 --- a/maselli_simulator_config.json +++ b/maselli_simulator_config.json @@ -9,9 +9,10 @@ "adam_address": "01", "function_type": "Sinusoidal", "cycle_time": "15", - "samples_per_cycle": "100", + "samples_per_cycle": "70", "manual_input_type": "Brix", "manual_value": "0.00", + "random_error_interval": "2.0", "netcom_com_port": "COM11", "netcom_baud_rate": "9600", "netcom_rtscts": false, diff --git a/maselli_trace_20250522_214450.csv b/maselli_trace_20250522_214450.csv deleted file mode 100644 index 4e6d2f3..0000000 --- a/maselli_trace_20250522_214450.csv +++ /dev/null @@ -1 +0,0 @@ -Timestamp,mA,Brix,Raw_Message diff --git a/protocol_handler.py b/protocol_handler.py index 79f250f..8b539d6 100644 --- a/protocol_handler.py +++ b/protocol_handler.py @@ -66,6 +66,32 @@ class ProtocolHandler: checksum = ProtocolHandler.calculate_checksum(message_part) full_message_str = f"{message_part}{checksum}\r" return full_message_str.encode('ascii'), ma_value + + @staticmethod + def create_adam_message_with_bad_checksum(adam_address, ma_value): + """ + Crea un mensaje completo ADAM (como bytes) directamente desde un valor mA, + pero con un checksum deliberadamente incorrecto. + """ + ma_str = ProtocolHandler.format_ma_value(ma_value) # Formato XX.XXX + message_part = f"#{adam_address}{ma_str}" + correct_checksum = ProtocolHandler.calculate_checksum(message_part) + + # Generar un checksum incorrecto. + bad_checksum_str = "XX" # Valor por defecto si algo falla + try: + correct_sum_val = int(correct_checksum, 16) + # Sumar 1 (o cualquier otro valor) y tomar módulo 256 para que siga siendo un byte. + # Asegurarse de que sea diferente al original. + bad_sum_val = (correct_sum_val + 1) % 256 + if f"{bad_sum_val:02X}" == correct_checksum: # En caso de que correct_checksum fuera FF + bad_sum_val = (correct_sum_val + 2) % 256 + bad_checksum_str = f"{bad_sum_val:02X}" + except ValueError: # Si correct_checksum no es un hexadecimal válido (no debería pasar) + pass # bad_checksum_str se queda como "XX" + + full_message_str = f"{message_part}{bad_checksum_str}\r" + return full_message_str.encode('ascii'), ma_value @staticmethod def ma_to_voltage(ma_value): diff --git a/tabs/simulator_tab.py b/tabs/simulator_tab.py index d4d9474..fcd3871 100644 --- a/tabs/simulator_tab.py +++ b/tabs/simulator_tab.py @@ -5,6 +5,7 @@ Tab del Simulador - Genera valores de prueba en protocolo ADAM import tkinter as tk from tkinter import ttk, scrolledtext, messagebox import threading +import random import time import math from collections import deque @@ -41,6 +42,7 @@ class SimulatorTab: self.cycle_time_var = tk.StringVar(value=initial_config.get('cycle_time', '10.0')) self.samples_per_cycle_var = tk.StringVar(value=initial_config.get('samples_per_cycle', '100')) + # Configuración para modo manual y errores self.manual_input_type_var = tk.StringVar(value=initial_config.get('manual_input_type', 'Brix')) self.manual_value_var = tk.StringVar(value=initial_config.get('manual_value', '10.0')) @@ -53,6 +55,14 @@ class SimulatorTab: self.current_brix_var = tk.StringVar(value="---") self.current_ma_var = tk.StringVar(value="--.-- mA") self.current_voltage_var = tk.StringVar(value="-.-- V") # Nueva para voltaje + + # Para simulación de errores + self.random_error_timer = None + self.random_error_timer_stop_event = threading.Event() + self.replace_normal_with_error_var = tk.BooleanVar(value=False) + self.next_frame_is_error_event = threading.Event() + self.random_error_interval_var = tk.StringVar(value=initial_config.get('random_error_interval', '10.0')) + self.error_details_for_replacement = None # (message_bytes, log_suffix, error_type_str) self.create_widgets() @@ -115,10 +125,6 @@ class SimulatorTab: state=tk.DISABLED, length=200) self.manual_slider.grid(row=1, column=2, padx=5, pady=5, sticky="ew") - self.manual_send_button = ttk.Button(manual_frame, text="Enviar Manual", - command=self.send_manual_value, state=tk.DISABLED) - self.manual_send_button.grid(row=1, column=3, padx=5, pady=5, sticky="ew") - manual_frame.columnconfigure(2, weight=1) # Controls Frame @@ -160,11 +166,14 @@ class SimulatorTab: self.log_text = scrolledtext.ScrolledText(log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED) self.log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) + # --- Frame para Simulación de Errores --- + self._setup_error_simulation_ui() # Se añade al final de create_widgets + # Configurar pesos self.frame.columnconfigure(0, weight=1) self.frame.columnconfigure(1, weight=1) self.frame.rowconfigure(2, weight=1) # Log frame - + # Inicializar estado self.on_function_type_change() @@ -172,38 +181,166 @@ class SimulatorTab: """Crea y retorna el frame para el gráfico""" graph_frame = ttk.LabelFrame(self.frame, text="Gráfico Simulador") graph_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") - self.frame.rowconfigure(3, weight=1) # Graph frame + # El rowconfigure para el gráfico se hace aquí, y el de errores abajo + self.frame.rowconfigure(3, weight=1) # Graph frame (se mueve una fila abajo) return graph_frame - + + def _setup_error_simulation_ui(self): + """Crea los controles para la simulación de errores.""" + error_frame = ttk.LabelFrame(self.frame, text="Simulación de Errores (Modo TCP Server)") + error_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=10, sticky="ew") + self.frame.rowconfigure(4, weight=0) # Error frame no se expande tanto como el log o gráfico + + ttk.Label(error_frame, text="Tipo de Error:").grid(row=0, column=0, padx=5, pady=5, sticky="w") + self.error_type_var = tk.StringVar(value="Ninguno") + self.error_type_combo = ttk.Combobox( + error_frame, + textvariable=self.error_type_var, + state="disabled", # Se habilita/deshabilita dinámicamente + values=[ + "Ninguno", # Para enviar una trama normal desde este control + "ID Erróneo", + "Valor Fuera de Escala (mA)", + "Checksum Erróneo", + "Longitud Errónea (Aleatoria)", + "Trama Faltante (Omitir Envío)" + ] + ) + self.error_type_combo.current(0) + self.error_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew") + + self.send_error_button = ttk.Button(error_frame, text="Enviar Trama Errónea", + command=self.send_selected_error_manually, state=tk.DISABLED) + self.send_error_button.grid(row=0, column=2, padx=5, pady=5) + + self.random_error_var = tk.BooleanVar(value=False) + self.random_error_check = ttk.Checkbutton( + error_frame, + text="Errores Aleatorios (cada ~10s)", + variable=self.random_error_var, + command=self.toggle_random_errors, + state="disabled" # Se habilita/deshabilita dinámicamente + ) + self.random_error_check.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky="w") + + # Checkbox para reemplazar trama normal con error (ahora en su propia fila para claridad) + self.replace_with_error_check = ttk.Checkbutton( + error_frame, + text="Reemplazar trama normal con error", + variable=self.replace_normal_with_error_var, + state="disabled" # Se habilita/deshabilita dinámicamente + ) + self.replace_with_error_check.grid(row=1, column=2, padx=(10,5), pady=5, sticky="w") + + ttk.Label(error_frame, text="Intervalo Errores Aleatorios (s):").grid(row=2, column=0, padx=5, pady=5, sticky="w") + self.random_error_interval_entry = ttk.Entry( + error_frame, + textvariable=self.random_error_interval_var, + width=8, # El Entry solo necesita el parent, textvariable, width y state. + state="disabled" # Se habilita/deshabilita dinámicamente + ) + self.random_error_interval_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew") # Añadir el grid para el Entry + # El grid para self.replace_with_error_check ya está definido donde se crea ese widget. + error_frame.columnconfigure(1, weight=1) + + self.update_error_controls_state() # Establecer estado inicial + + def update_error_controls_state(self): + """Habilita o deshabilita los controles de error según el modo de conexión.""" + # Asegurarse de que los widgets de error existan antes de intentar configurarlos + if not hasattr(self, 'error_type_combo'): + return + + is_tcp_server_mode = self.shared_config['connection_type_var'].get() == "TCP-Server" + # Considerar si la simulación (conexión) está activa para habilitar el envío + # is_connection_active = self.simulating # O una propiedad más directa de ConnectionManager + + # Los controles de error solo tienen sentido si estamos en modo TCP-Server + # y la conexión está activa (es decir, la simulación principal está corriendo o + # el servidor está escuchando de alguna forma). + # Por ahora, lo basaremos en is_tcp_server_mode y self.simulating + + enable_controls = is_tcp_server_mode and self.simulating + + new_state_tk = tk.NORMAL if enable_controls else tk.DISABLED + new_state_str = "normal" if enable_controls else "disabled" # Para Checkbutton + + self.error_type_combo.config(state=new_state_tk if is_tcp_server_mode else tk.DISABLED) # Combo siempre según modo + self.send_error_button.config(state=new_state_tk) + self.random_error_check.config(state=new_state_str) + + # El entry del intervalo de errores aleatorios depende de que el check de errores aleatorios esté activo + interval_entry_state_tk = tk.NORMAL if enable_controls and self.random_error_var.get() else tk.DISABLED + self.random_error_interval_entry.config(state=interval_entry_state_tk) + + # El check de "Reemplazar trama normal" se habilita si los controles de error están habilitados + self.replace_with_error_check.config(state=new_state_str) + + if not enable_controls and self.random_error_var.get(): + self.random_error_var.set(False) + self.toggle_random_errors() # Detiene el timer si estaba activo y se deshabilitan controles + + def get_current_error_sim_parameters(self): + """Obtiene parámetros para la simulación de errores (dirección ADAM, valor mA base).""" + adam_address = self.adam_address_var.get() + base_ma_value = 12.345 # Valor por defecto + + if self.function_type_var.get() == "Manual": + try: + manual_val = float(self.manual_value_var.get()) + input_type = self.manual_input_type_var.get() + if input_type == "Brix": + min_b = float(self.shared_config['min_brix_map_var'].get()) + max_b = float(self.shared_config['max_brix_map_var'].get()) + base_ma_value = ProtocolHandler.scale_to_ma(manual_val, min_b, max_b) + elif input_type == "mA": + base_ma_value = manual_val + elif input_type == "Voltaje": + base_ma_value = ProtocolHandler.voltage_to_ma(manual_val) + except (ValueError, KeyError, TypeError): + Utils.log_message(self.log_text, "Error Sim: Usando valor mA base por defecto para error.") + else: # Si no es manual, o para tener un valor si la simulación principal no corre + # Podríamos tomar el self.current_ma_var si la simulación está corriendo + # pero para simplicidad, un valor fijo si no es manual. + pass # Mantiene 12.345 + + return adam_address, base_ma_value + def on_function_type_change(self, event=None): """Maneja el cambio de tipo de función""" func_type = self.function_type_var.get() - if func_type == "Manual": - if self.simulating: - self.stop_simulation() - - self.manual_input_type_combo.config(state=tk.NORMAL) - self.manual_value_entry.config(state=tk.NORMAL) - self.manual_send_button.config(state=tk.NORMAL) - self.manual_slider.config(state=tk.NORMAL) - - self.cycle_time_entry.config(state=tk.DISABLED) - self.samples_per_cycle_entry.config(state=tk.DISABLED) - self.start_button.config(state=tk.DISABLED) + is_manual_mode = (func_type == "Manual") + + # Si la simulación está corriendo y el tipo de función cambia, detenerla. + if self.simulating: + self.stop_simulation() + + # Configurar controles de entrada manual + manual_specific_state = tk.NORMAL if is_manual_mode else tk.DISABLED + self.manual_input_type_combo.config(state=manual_specific_state) + self.manual_value_entry.config(state=manual_specific_state) + self.manual_slider.config(state=manual_specific_state) + + # Tiempo de ciclo y muestras por ciclo ahora están habilitados para todos los modos continuos + self.cycle_time_entry.config(state=tk.NORMAL) + self.samples_per_cycle_entry.config(state=tk.NORMAL) + + if is_manual_mode: + self.on_manual_input_type_change() # Actualizar rangos de slider/entry y valor actual + + # El estado de los botones Start/Stop depende de si la simulación está (o estaba) corriendo. + # Como stop_simulation() se llama arriba si estaba corriendo, self.simulating debería ser False aquí. + if not self.simulating: + self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) - self.on_manual_input_type_change() # Configurar según el tipo actual else: - self.manual_input_type_combo.config(state=tk.DISABLED) - self.manual_value_entry.config(state=tk.DISABLED) - self.manual_send_button.config(state=tk.DISABLED) - self.manual_slider.config(state=tk.DISABLED) - - self.cycle_time_entry.config(state=tk.NORMAL) - self.samples_per_cycle_entry.config(state=tk.NORMAL) - if not self.simulating: - self.start_button.config(state=tk.NORMAL) - self.stop_button.config(state=tk.DISABLED) - + # Este estado idealmente no se alcanzaría si stop_simulation() + # establece correctamente self.simulating a False y actualiza los botones. + # Sin embargo, como salvaguarda: + self.start_button.config(state=tk.DISABLED) + self.stop_button.config(state=tk.NORMAL) + self.update_error_controls_state() # Actualizar estado de controles de error + def on_manual_input_type_change(self, event=None): """Maneja el cambio de tipo de entrada manual (Brix, mA, Voltaje)""" input_type = self.manual_input_type_var.get() @@ -280,127 +417,6 @@ class SimulatorTab: precision_fallback = 2 if self.manual_input_type_var.get() == "mA": precision_fallback = 3 self.manual_value_var.set(f"{current_slider_val:.{precision_fallback}f}") - - def send_manual_value(self): - """Envía un valor manual único""" - try: - # Obtener valores de mapeo - min_brix_map = float(self.shared_config['min_brix_map_var'].get()) - max_brix_map = float(self.shared_config['max_brix_map_var'].get()) - adam_address = self.adam_address_var.get() - - if len(adam_address) != 2: - messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.") - return - - if min_brix_map >= max_brix_map: - messagebox.showerror("Error de Configuración", "Min Brix debe ser menor que Max Brix.") - return - - input_type = self.manual_input_type_var.get() - manual_numeric_value = float(self.manual_value_var.get()) - - final_brix, final_ma, final_voltage = 0.0, 0.0, 0.0 - - if input_type == "Brix": - final_brix = manual_numeric_value - final_ma = ProtocolHandler.scale_to_ma(final_brix, min_brix_map, max_brix_map) - final_voltage = ProtocolHandler.ma_to_voltage(final_ma) - elif input_type == "mA": - final_ma = manual_numeric_value - final_brix = ProtocolHandler.ma_to_brix(final_ma, min_brix_map, max_brix_map) - final_voltage = ProtocolHandler.ma_to_voltage(final_ma) - elif input_type == "Voltaje": - final_voltage = manual_numeric_value - final_ma = ProtocolHandler.voltage_to_ma(final_voltage) - final_brix = ProtocolHandler.ma_to_brix(final_ma, min_brix_map, max_brix_map) - - # Usar el nuevo método que toma ma_value directamente para el modo manual - message, returned_ma = ProtocolHandler.create_adam_message_from_ma(adam_address, final_ma) - - # Actualizar display - brix_display_text = "" - # Si la entrada fue mA o Voltaje y el resultado es un mA < 4 (estado de error) - if (input_type == "mA" or input_type == "Voltaje") and final_ma < 4.0: - brix_display_text = "Error (Sub 4mA)" - else: - # Para entrada Brix, o mA/Voltaje que resultan en mA >= 4.0 - brix_display_text = Utils.format_brix_display(final_brix) - - self.current_brix_var.set(brix_display_text) - self.current_ma_var.set(Utils.format_ma_display(returned_ma)) # Usar returned_ma que es igual a final_ma - self.current_voltage_var.set(ProtocolHandler.format_voltage_display(final_voltage)) - - # Agregar al gráfico - # final_brix aquí es el valor numérico calculado (ej. min_brix_map si mA < 4), - # lo cual es consistente para el gráfico. - self.add_data_point(final_brix, returned_ma) - - # Enviar por conexión temporal - current_config_values = { - 'connection_type': self.shared_config['connection_type_var'].get(), - 'com_port': self.shared_config['com_port_var'].get(), - 'baud_rate': self.shared_config['baud_rate_var'].get(), - 'ip_address': self.shared_config['ip_address_var'].get(), - 'port': self.shared_config['port_var'].get(), - } - conn_type = current_config_values['connection_type'] - conn_params = self.shared_config['config_manager'].get_connection_params(current_config_values) - - if conn_type == "TCP-Server": - if not self.connection_manager.is_client_connected(): # Verificar el connection_manager de la pestaña - Utils.log_message(self.log_text, "Envío Manual (TCP Server): Ningún cliente conectado.") - messagebox.showinfo("TCP Server", "Ningún cliente conectado para enviar datos manualmente.") - return - - # No necesitamos 'listening_details' aquí porque la conexión ya está establecida - # y el log de inicio ya se hizo. Solo usamos la conexión existente. - # La llamada a open_connection no ocurre aquí para TCP-Server en modo manual. - - try: - # message ya es bytes - self.connection_manager.send_data(message) - Utils.log_message(self.log_text, f"Enviando Manual (TCP Server): {ProtocolHandler.format_for_display(message)}") - # No se espera respuesta en modo servidor para el simulador - except self.connection_manager.ClientDisconnectedError: - Utils.log_message(self.log_text, "Envío Manual (TCP Server): Cliente desconectado durante el envío.") - messagebox.showerror("TCP Server Error", "El cliente se desconectó durante el envío manual.") - except Exception as e_manual_server: - Utils.log_message(self.log_text, f"Error al enviar manualmente (TCP Server): {e_manual_server}") - messagebox.showerror("Error", str(e_manual_server)) - return # Terminar aquí para envío manual en TCP-Server - - # Lógica existente para otros tipos de conexión (Serial, TCP Client, UDP) - else: - temp_conn = ConnectionManager() - try: - # open_connection ahora devuelve (connection_object, listening_info) - _, _ = temp_conn.open_connection(conn_type, conn_params) # Ignoramos listening_info para conexión temporal - Utils.log_message(self.log_text, f"Conexión {conn_type} abierta temporalmente.") - Utils.log_message(self.log_text, f"Enviando Manual: {ProtocolHandler.format_for_display(message)}") - - temp_conn.send_data(message) # message ya es bytes - - response = temp_conn.read_response(timeout=0.5) - if response and response.strip(): - Utils.log_message(self.log_text, f"Respuesta: {ProtocolHandler.format_for_display(response)}") - - parsed = ProtocolHandler.parse_adam_message(response) - if parsed: - brix_resp = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix_map, max_brix_map) - Utils.log_message(self.log_text, - f" -> Addr: {parsed['address']}, " - f"mA: {parsed['ma']:.3f}, " - f"Brix: {brix_resp:.3f}") - except Exception as e: - Utils.log_message(self.log_text, f"Error al enviar: {e}") - messagebox.showerror("Error", str(e)) - finally: - temp_conn.close_connection() - Utils.log_message(self.log_text, "Conexión cerrada.") - - except (ValueError, KeyError, TypeError) as e: - messagebox.showerror("Error", f"Valores inválidos en la configuración o entrada: {e}") def start_simulation(self): """Inicia la simulación continua""" @@ -461,6 +477,7 @@ class SimulatorTab: self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self._set_entries_state(tk.DISABLED) + self.update_error_controls_state() # Habilitar controles de error si es TCP Server if conn_type == "TCP-Server": self.shared_config['client_connected_var'].set("Esperando...") @@ -475,14 +492,20 @@ class SimulatorTab: self.simulating = False + # Detener el timer de errores aleatorios primero + if self.random_error_timer and self.random_error_timer.is_alive(): + self.random_error_timer_stop_event.set() + self.random_error_timer.join(timeout=1.0) # Esperar un poco + self.random_error_timer = None + self.next_frame_is_error_event.clear() + self.error_details_for_replacement = None + if self.simulation_thread and self.simulation_thread.is_alive(): self.simulation_thread.join(timeout=2.0) self.connection_manager.close_connection() Utils.log_message(self.log_text, "Conexión cerrada.") - self.start_button.config(state=tk.NORMAL) - self.stop_button.config(state=tk.DISABLED) self._set_entries_state(tk.NORMAL) self.on_function_type_change() # Re-evaluar estado de controles manuales if self.connection_manager.connection_type == "TCP-Server": # Limpiar info del cliente @@ -492,6 +515,10 @@ class SimulatorTab: self.current_brix_var.set("---") self.current_ma_var.set("--.-- mA") self.current_voltage_var.set("-.-- V") + + self.start_button.config(state=tk.NORMAL) # Mover después de _set_entries_state y on_function_type_change + self.stop_button.config(state=tk.DISABLED) + self.update_error_controls_state() # Deshabilitar controles de error def run_simulation(self): """Thread principal de simulación""" @@ -516,70 +543,146 @@ class SimulatorTab: sample_period = cycle_time / samples_per_cycle while self.simulating: - current_brix = 0.0 - progress = (self.simulation_step % samples_per_cycle) / samples_per_cycle + message_to_send = None + ma_value_for_message_generation = 0.0 # mA que se usaría para generar la trama (normal o base para error) - if function_type == "Lineal": + # --- Determinar valores base de la simulación para este ciclo (Brix, mA) --- + # Esta lógica calcula los valores que se mostrarían y graficarían, + # y que se usarían para generar una trama normal. + target_brix = 0.0 # Brix consistente con target_ma para display/graph + # target_ma es el valor de mA que se usaría para generar el mensaje ADAM si fuera normal + # o el valor base si un error lo reemplaza. + + current_manual_input_type = self.manual_input_type_var.get() # Cache para este ciclo + + if function_type == "Manual": # Lógica para modo Manual + manual_input_type = self.manual_input_type_var.get() + manual_numeric_value = 0.0 + try: + manual_numeric_value = float(self.manual_value_var.get()) + except ValueError: + Utils.log_message(self.log_text, f"Valor manual inválido: '{self.manual_value_var.get()}'. Usando valor por defecto.") + if manual_input_type == "Brix": manual_numeric_value = min_brix_map + elif manual_input_type == "mA": manual_numeric_value = 4.0 + elif manual_input_type == "Voltaje": manual_numeric_value = ProtocolHandler.ma_to_voltage(4.0) + + if manual_input_type == "Brix": + target_brix = manual_numeric_value + ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map) + elif manual_input_type == "mA": + ma_value_for_message_generation = manual_numeric_value + target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map) + elif manual_input_type == "Voltaje": + voltage_input = manual_numeric_value + ma_value_for_message_generation = ProtocolHandler.voltage_to_ma(voltage_input) + target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map) + + elif function_type == "Lineal": cycle_progress = (self.simulation_step % (2 * samples_per_cycle)) / samples_per_cycle if cycle_progress > 1.0: cycle_progress = 2.0 - cycle_progress - current_brix = min_brix_map + (max_brix_map - min_brix_map) * cycle_progress - + target_brix = min_brix_map + (max_brix_map - min_brix_map) * cycle_progress + ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map) + elif function_type == "Sinusoidal": + progress = (self.simulation_step % samples_per_cycle) / samples_per_cycle phase = progress * 2 * math.pi sin_val = (math.sin(phase) + 1) / 2 - current_brix = min_brix_map + (max_brix_map - min_brix_map) * sin_val - - message, ma_value = ProtocolHandler.create_adam_message(adam_address, current_brix, min_brix_map, max_brix_map) - voltage_value = ProtocolHandler.ma_to_voltage(ma_value) - - self.current_brix_var.set(Utils.format_brix_display(current_brix)) - self.current_ma_var.set(Utils.format_ma_display(ma_value)) - self.current_voltage_var.set(ProtocolHandler.format_voltage_display(voltage_value)) - - self.frame.after(0, lambda b=current_brix, m=ma_value: self.add_data_point(b, m)) - - try: - if conn_type == "TCP-Server": - if not self.connection_manager.is_client_connected(): - # Loguear solo si el estado cambia o periódicamente para evitar spam - if not hasattr(self, '_waiting_for_client_logged') or not self._waiting_for_client_logged: - port_to_log = self.shared_config['config_manager'].get_connection_params(current_config_values)['port'] - Utils.log_message(self.log_text, f"TCP Server: Esperando cliente en puerto {port_to_log}...") - self._waiting_for_client_logged = True - - if self.connection_manager.accept_client(timeout=0.05): # Intento corto no bloqueante - Utils.log_message(self.log_text, f"TCP Server: Cliente conectado desde {self.connection_manager.client_address}") - client_info = f"{self.connection_manager.client_address[0]}:{self.connection_manager.client_address[1]}" - self.shared_config['client_connected_var'].set(client_info) - self._waiting_for_client_logged = False # Resetear flag de log - elif not self.connection_manager.is_client_connected() and \ - self.shared_config['client_connected_var'].get() != "Esperando...": + target_brix = min_brix_map + (max_brix_map - min_brix_map) * sin_val + ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map) + + # ma_value_in_message es el valor de mA que realmente se usaría en la trama o que se mostraría + # Si la trama es reemplazada por un error, este valor sigue siendo el de la simulación normal + # para la UI, pero la trama enviada será diferente. + ma_value_for_ui_display = ma_value_for_message_generation + voltage_value_display = ProtocolHandler.ma_to_voltage(ma_value_for_ui_display) + + # --- Preparar la trama a enviar (normal o error de reemplazo) --- + log_prefix_for_send = "Enviando" + log_suffix_for_send = "" + actual_error_type_sent = "Normal" # Para el log + + if self.next_frame_is_error_event.is_set() and \ + self.error_details_for_replacement is not None and \ + self.replace_normal_with_error_var.get(): + + error_msg_bytes, error_log_suffix, error_type_str = self.error_details_for_replacement + message_to_send = error_msg_bytes + log_prefix_for_send = "Error Sim (Reemplazo Programado)" + log_suffix_for_send = error_log_suffix + actual_error_type_sent = error_type_str + + self.next_frame_is_error_event.clear() + self.error_details_for_replacement = None + else: + # Generar trama normal + message_to_send, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, ma_value_for_message_generation) + + # Preparar texto para display + brix_display_text = "" + if ma_value_for_ui_display < 4.0 and function_type == "Manual" and \ + (current_manual_input_type == "mA" or current_manual_input_type == "Voltaje"): + brix_display_text = "Error (Sub 4mA)" + else: + brix_display_text = Utils.format_brix_display(target_brix) + + # Actualizar GUI (StringVars son thread-safe para .set()) + self.current_brix_var.set(brix_display_text) + self.current_ma_var.set(Utils.format_ma_display(ma_value_for_ui_display)) + self.current_voltage_var.set(ProtocolHandler.format_voltage_display(voltage_value_display)) + + # Agregar punto de datos al gráfico (desde el thread GUI) + self.frame.after(0, lambda b=target_brix, m=ma_value_for_ui_display: self.add_data_point(b, m)) + + # --- Enviar la trama (normal o de error) --- + if message_to_send: # Si hay algo que enviar (no es "Trama Faltante" de reemplazo) + try: + if conn_type == "TCP-Server": + if not self.connection_manager.is_client_connected(): + if not hasattr(self, '_waiting_for_client_logged') or not self._waiting_for_client_logged: + port_to_log = self.shared_config['config_manager'].get_connection_params(current_config_values)['port'] + Utils.log_message(self.log_text, f"TCP Server: Esperando cliente en puerto {port_to_log}...") + self._waiting_for_client_logged = True + if self.connection_manager.accept_client(timeout=0.05): + Utils.log_message(self.log_text, f"TCP Server: Cliente conectado desde {self.connection_manager.client_address}") + client_info = f"{self.connection_manager.client_address[0]}:{self.connection_manager.client_address[1]}" + self.shared_config['client_connected_var'].set(client_info) + self._waiting_for_client_logged = False + elif not self.connection_manager.is_client_connected() and \ + self.shared_config['client_connected_var'].get() != "Esperando...": + self.shared_config['client_connected_var'].set("Esperando...") + + log_content = ProtocolHandler.format_for_display(message_to_send, hex_non_printable=True) + if actual_error_type_sent != "Normal" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"): + Utils.log_message(self.log_text, f"{log_prefix_for_send}: Trama '{actual_error_type_sent}'{log_suffix_for_send} -> {log_content}") + else: + Utils.log_message(self.log_text, f"{log_prefix_for_send}: {log_content}") + + self.connection_manager.send_data(message_to_send) + + if conn_type != "TCP-Server": # No leer respuesta en modo servidor + response = self.connection_manager.read_response(timeout=0.1) + if response and response.strip(): + Utils.log_message(self.log_text, f"Respuesta: {ProtocolHandler.format_for_display(response)}") + parsed = ProtocolHandler.parse_adam_message(response) + if parsed: + brix_resp = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix_map, max_brix_map) + Utils.log_message(self.log_text, + f" -> Addr: {parsed['address']}, " + f"mA: {parsed['ma']:.3f}, " + f"Brix: {brix_resp:.3f}") + except self.connection_manager.ClientDisconnectedError: + Utils.log_message(self.log_text, "TCP Server: Cliente desconectado. Esperando nueva conexión.") + if conn_type == "TCP-Server": self.shared_config['client_connected_var'].set("Esperando...") - - Utils.log_message(self.log_text, f"Enviando: {ProtocolHandler.format_for_display(message)}") - self.connection_manager.send_data(message) - - if conn_type != "TCP-Server": # No leer respuesta en modo servidor - response = self.connection_manager.read_response(timeout=0.1) - if response and response.strip(): - Utils.log_message(self.log_text, f"Respuesta: {ProtocolHandler.format_for_display(response)}") - parsed = ProtocolHandler.parse_adam_message(response) - if parsed: - brix_resp = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix_map, max_brix_map) - Utils.log_message(self.log_text, - f" -> Addr: {parsed['address']}, " - f"mA: {parsed['ma']:.3f}, " - f"Brix: {brix_resp:.3f}") - except self.connection_manager.ClientDisconnectedError: - Utils.log_message(self.log_text, "TCP Server: Cliente desconectado. Esperando nueva conexión.") - if conn_type == "TCP-Server": - self.shared_config['client_connected_var'].set("Esperando...") - self._waiting_for_client_logged = False # Permitir que se loguee "esperando" de nuevo - except Exception as e: - Utils.log_message(self.log_text, f"Error en comunicación ({conn_type}): {e}") - self.frame.after(0, self.stop_simulation_error) # Schedule GUI update from main thread - break + self._waiting_for_client_logged = False + except Exception as e: + Utils.log_message(self.log_text, f"Error en comunicación ({conn_type}): {e}") + self.frame.after(0, self.stop_simulation_error) + break + elif actual_error_type_sent == "Trama Faltante (Omitir Envío)" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"): + # Loguear que se omitió una trama debido al reemplazo por "Trama Faltante" + Utils.log_message(self.log_text, f"{log_prefix_for_send}: Simulación de '{actual_error_type_sent}'{log_suffix_for_send}. No se envió trama.") self.simulation_step += 1 time.sleep(sample_period) @@ -594,6 +697,172 @@ class SimulatorTab: if self.simulating: # Solo actuar si la simulación estaba activa messagebox.showerror("Error de Simulación", "Error durante la simulación. Simulación detenida.") self.stop_simulation() # Llama al método normal de parada + + def generate_erroneous_message_logic(self, error_type, adam_address, base_ma_value): + """Genera la trama (bytes) según el tipo de error.""" + message_bytes = None + log_message_suffix = "" + + if error_type == "ID Erróneo": + wrong_adam_address = "99" if adam_address != "99" else "98" + message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(wrong_adam_address, base_ma_value) + log_message_suffix = f" (ID cambiado a {wrong_adam_address})" + elif error_type == "Valor Fuera de Escala (mA)": + out_of_scale_ma = 2.500 if random.random() < 0.5 else 22.500 + message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, out_of_scale_ma) + log_message_suffix = f" (valor mA: {out_of_scale_ma:.3f})" + elif error_type == "Checksum Erróneo": + message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value) + log_message_suffix = " (checksum incorrecto)" + elif error_type == "Longitud Errónea (Aleatoria)": + base_msg_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value) + if len(base_msg_bytes) > 1: + if random.choice([True, False]): # Acortar + cut_len = random.randint(1, max(1, len(base_msg_bytes) // 2)) + message_bytes = base_msg_bytes[:-cut_len] + log_message_suffix = f" (longitud acortada en {cut_len} bytes)" + else: # Alargar + add_len = random.randint(1, 5) # Aumentado un poco el largo posible + garbage = bytes([random.randint(32, 126) for _ in range(add_len)]) + message_bytes = base_msg_bytes + garbage # Podría ser al final o en medio + log_message_suffix = f" (longitud aumentada en {add_len} bytes)" + else: + message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value) + log_message_suffix = " (longitud errónea -> fallback a checksum incorrecto)" + elif error_type == "Trama Faltante (Omitir Envío)": + log_message_suffix = " (trama omitida)" + return None, log_message_suffix + elif error_type == "Ninguno": # Enviar trama normal + message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value) + log_message_suffix = " (trama normal)" + else: + Utils.log_message(self.log_text, f"Error Sim: Tipo de error '{error_type}' desconocido.") + return None, f" (tipo de error '{error_type}' desconocido)" + + return message_bytes, log_message_suffix + + def send_selected_error_manually(self): + """Manejador del botón 'Enviar Trama Errónea'.""" + if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating): + messagebox.showwarning("No Activo", "La simulación de errores manuales requiere modo TCP-Server y simulación activa.") + return + + if not self.connection_manager.is_client_connected(): + Utils.log_message(self.log_text, "Error Sim: No hay cliente conectado para enviar trama errónea.") + # messagebox.showinfo("Sin Cliente", "No hay cliente conectado para enviar la trama errónea.") + # return # Permitir enviar aunque no haya cliente, el log lo indicará + + error_type = self.error_type_var.get() + adam_address, base_ma_value = self.get_current_error_sim_parameters() + + message_bytes, log_suffix_from_gen = self.generate_erroneous_message_logic(error_type, adam_address, base_ma_value) + + if self.replace_normal_with_error_var.get(): + # Programar para reemplazo en el siguiente ciclo de simulación + self.error_details_for_replacement = (message_bytes, log_suffix_from_gen, error_type) + self.next_frame_is_error_event.set() + if error_type == "Trama Faltante (Omitir Envío)": + Utils.log_message(self.log_text, f"Error Sim Manual: Programada OMISIÓN de trama '{error_type}'{log_suffix_from_gen} para reemplazo.") + elif message_bytes: + Utils.log_message(self.log_text, f"Error Sim Manual: Programada trama '{error_type}'{log_suffix_from_gen} para reemplazo.") + else: # Error en generación o tipo desconocido + Utils.log_message(self.log_text, f"Error Sim Manual: No se pudo programar trama '{error_type}'{log_suffix_from_gen} para reemplazo.") + else: + # Enviar inmediatamente como trama adicional + if message_bytes: + try: + self.connection_manager.send_data(message_bytes) + Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Trama '{error_type}'{log_suffix_from_gen} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}") + except Exception as e: + Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Fallo al enviar trama: {e}") + elif error_type == "Trama Faltante (Omitir Envío)": + Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Simulación de '{error_type}'{log_suffix_from_gen}. No se envió trama adicional.") + # else: Ya logueado por generate_erroneous_message_logic si message_bytes es None y no es "Trama Faltante" + + def toggle_random_errors(self): + """Activa o desactiva el envío de errores aleatorios.""" + # self.random_error_var.get() refleja el nuevo estado del checkbox debido al clic del usuario + + can_actually_start_random_errors = (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating) + + if self.random_error_var.get(): # Si el usuario intenta activar los errores aleatorios + if not can_actually_start_random_errors: + Utils.log_message(self.log_text, "Error Sim: Errores aleatorios solo en TCP-Server con simulación activa.") + self.random_error_var.set(False) # Forzar a False ya que las condiciones no se cumplen + # El timer no se iniciará. update_error_controls_state() al final se encargará. + else: # Las condiciones se cumplen, iniciar el timer si no está ya activo + try: + interval_val = float(self.random_error_interval_var.get()) + if interval_val <= 0: + messagebox.showerror("Error de Intervalo", "El intervalo para errores aleatorios debe ser un número positivo.") + self.random_error_var.set(False) + self.update_error_controls_state() + return + except ValueError: + messagebox.showerror("Error de Intervalo", "Valor inválido para el intervalo de errores aleatorios.") + self.random_error_var.set(False) + self.update_error_controls_state() + return + + # Las condiciones se cumplen, iniciar el timer si no está ya activo + if self.random_error_timer is None or not self.random_error_timer.is_alive(): + self.random_error_timer_stop_event.clear() + self.random_error_timer = threading.Thread(target=self._random_error_loop, args=(interval_val,), daemon=True) + self.random_error_timer.start() + else: # Si el usuario intenta desactivar los errores aleatorios (el checkbox ahora está desmarcado) + if self.random_error_timer and self.random_error_timer.is_alive(): + Utils.log_message(self.log_text, "Error Sim: Deteniendo envío de errores aleatorios.") + self.random_error_timer_stop_event.set() + # No es necesario join aquí, se hará en stop_simulation o al cerrar. + + # Actualizar siempre el estado de los controles al final, basado en el estado final de self.random_error_var + self.update_error_controls_state() + + def _random_error_loop(self, initial_interval_s): + """Bucle del hilo que envía errores aleatorios.""" + possible_error_types = [val for val in self.error_type_combo['values'] if val != "Ninguno"] + if not possible_error_types: return + + current_interval = initial_interval_s + Utils.log_message(self.log_text, f"Error Sim: Hilo de errores aleatorios iniciado con intervalo {current_interval:.2f}s.") + + while not self.random_error_timer_stop_event.is_set(): + if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating and self.connection_manager.is_client_connected()): + self.random_error_timer_stop_event.wait(1.0) # Esperar si no hay cliente o no está activo + continue + + selected_random_error = random.choice(possible_error_types) + adam_address, base_ma_value = self.get_current_error_sim_parameters() + message_bytes, log_suffix = self.generate_erroneous_message_logic(selected_random_error, adam_address, base_ma_value) + + if self.replace_normal_with_error_var.get(): + # Programar el error para que reemplace la siguiente trama normal + self.error_details_for_replacement = (message_bytes, log_suffix, selected_random_error) + self.next_frame_is_error_event.set() + # El log de este envío se hará en run_simulation cuando efectivamente se envíe/omita + Utils.log_message(self.log_text, f"Error Sim Aleatorio: Programada trama '{selected_random_error}'{log_suffix} para reemplazo.") + else: + # Enviar el error inmediatamente, además de las tramas normales + if message_bytes: + try: + self.connection_manager.send_data(message_bytes) + Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Trama '{selected_random_error}'{log_suffix} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}") + except Exception as e: + Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Fallo al enviar: {e}") + elif selected_random_error == "Trama Faltante (Omitir Envío)": + Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Simulación de '{selected_random_error}'{log_suffix}. No se envió trama adicional.") + + # Permitir que el intervalo se actualice dinámicamente + try: + new_interval = float(self.random_error_interval_var.get()) + if new_interval > 0 and new_interval != current_interval: + current_interval = new_interval + Utils.log_message(self.log_text, f"Error Sim: Intervalo de errores aleatorios actualizado a {current_interval:.2f}s.") + except ValueError: + pass # Mantener el intervalo actual si el nuevo valor es inválido + + self.random_error_timer_stop_event.wait(timeout=current_interval) + Utils.log_message(self.log_text, "Error Sim: Hilo de errores aleatorios detenido.") def add_data_point(self, brix_value, ma_value): """Agrega un punto de datos al gráfico""" @@ -627,6 +896,8 @@ class SimulatorTab: if 'shared_widgets' in self.shared_config: Utils.set_widgets_state(self.shared_config['shared_widgets'], state) + + # self.update_error_controls_state() # El estado de los controles de error depende también de self.simulating def get_config(self): """Obtiene la configuración actual del simulador""" @@ -636,7 +907,8 @@ class SimulatorTab: 'cycle_time': self.cycle_time_var.get(), 'samples_per_cycle': self.samples_per_cycle_var.get(), 'manual_input_type': self.manual_input_type_var.get(), - 'manual_value': self.manual_value_var.get() + 'manual_value': self.manual_value_var.get(), + 'random_error_interval': self.random_error_interval_var.get() } def set_config(self, config): @@ -648,6 +920,7 @@ class SimulatorTab: self.manual_input_type_var.set(config.get('manual_input_type', 'Brix')) self.manual_value_var.set(config.get('manual_value', '10.0')) + self.random_error_interval_var.set(config.get('random_error_interval', '10.0')) try: self.manual_slider_var.set(float(self.manual_value_var.get())) @@ -657,3 +930,9 @@ class SimulatorTab: pass self.on_function_type_change() # Esto llamará a on_manual_input_type_change si es necesario + self.update_error_controls_state() # Actualizar estado de controles de error al cargar config + + def on_app_close(self): + """Llamado cuando la aplicación se está cerrando para limpiar recursos.""" + if self.simulating: + self.stop_simulation() # Asegura que todo se detenga y limpie correctamente