""" Tab del Simulador - Genera valores de prueba en protocolo ADAM """ import tkinter as tk from tkinter import ttk, scrolledtext, messagebox import threading import random import time import math from collections import deque from protocol_handler import ProtocolHandler from connection_manager import ConnectionManager from utils import Utils class SimulatorTab: def __init__(self, parent_frame, shared_config): self.frame = parent_frame self.shared_config = shared_config # Estado del simulador self.simulating = False self.simulation_thread = None self.simulation_step = 0 self.connection_manager = ConnectionManager() # Datos para el gráfico self.max_points = 100 self.time_data = deque(maxlen=self.max_points) self.brix_data = deque(maxlen=self.max_points) self.ma_data = deque(maxlen=self.max_points) self.start_time = time.time() # Cargar configuración inicial para obtener valores por defecto para StringVars # Esto es para asegurar que las StringVars tengan un valor inicial antes de que set_config sea llamado # por maselli_app.py. initial_config = self.shared_config['config_manager'].load_config() self.adam_address_var = tk.StringVar(value=initial_config.get('adam_address', '01')) self.function_type_var = tk.StringVar(value=initial_config.get('function_type', 'Lineal')) self.cycle_time_var = tk.StringVar(value=initial_config.get('cycle_time', '10.0')) self.samples_per_cycle_var = tk.StringVar(value=initial_config.get('samples_per_cycle', '100')) # Configuración para modo manual y errores self.manual_input_type_var = tk.StringVar(value=initial_config.get('manual_input_type', 'Brix')) self.manual_value_var = tk.StringVar(value=initial_config.get('manual_value', '10.0')) try: manual_value_float = float(initial_config.get('manual_value', '10.0')) except ValueError: manual_value_float = 10.0 # Fallback self.manual_slider_var = tk.DoubleVar(value=manual_value_float) self.current_brix_var = tk.StringVar(value="---") self.current_ma_var = tk.StringVar(value="--.-- mA") self.current_voltage_var = tk.StringVar(value="-.-- V") # Nueva para voltaje # Para simulación de errores self.random_error_timer = None self.random_error_timer_stop_event = threading.Event() self.replace_normal_with_error_var = tk.BooleanVar(value=False) self.next_frame_is_error_event = threading.Event() self.random_error_interval_var = tk.StringVar(value=initial_config.get('random_error_interval', '10.0')) self.error_details_for_replacement = None # (message_bytes, log_suffix, error_type_str) self.create_widgets() def create_widgets(self): """Crea los widgets del tab simulador""" # Frame de configuración del simulador config_frame = ttk.LabelFrame(self.frame, text="Configuración Simulador") config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2) # Dirección ADAM ttk.Label(config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w") # self.adam_address_var inicializada en __init__ self.adam_address_entry = ttk.Entry(config_frame, textvariable=self.adam_address_var, width=5) self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew") # Función ttk.Label(config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w") # self.function_type_var inicializada en __init__ self.function_type_combo = ttk.Combobox(config_frame, textvariable=self.function_type_var, values=["Lineal", "Sinusoidal", "Manual"], state="readonly", width=10) self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew") self.function_type_combo.bind("<>", self.on_function_type_change) # Tiempo de ciclo completo (nueva característica) ttk.Label(config_frame, text="Tiempo Ciclo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w") # self.cycle_time_var inicializada en __init__ self.cycle_time_entry = ttk.Entry(config_frame, textvariable=self.cycle_time_var, width=8) self.cycle_time_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew") # Velocidad de muestreo (calculada automáticamente) ttk.Label(config_frame, text="Muestras/ciclo:").grid(row=0, column=6, padx=5, pady=5, sticky="w") # self.samples_per_cycle_var inicializada en __init__ self.samples_per_cycle_entry = ttk.Entry(config_frame, textvariable=self.samples_per_cycle_var, width=8) self.samples_per_cycle_entry.grid(row=0, column=7, padx=5, pady=5, sticky="ew") # --- Frame para modo Manual (Modificado) --- manual_frame = ttk.LabelFrame(config_frame, text="Modo Manual") manual_frame.grid(row=1, column=0, columnspan=8, padx=5, pady=5, sticky="ew") ttk.Label(manual_frame, text="Entrada Por:").grid(row=0, column=0, padx=5, pady=5, sticky="w") # self.manual_input_type_var inicializada en __init__ self.manual_input_type_combo = ttk.Combobox(manual_frame, textvariable=self.manual_input_type_var, values=["Brix", "mA", "Voltaje"], state="readonly", width=8) self.manual_input_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew") self.manual_input_type_combo.bind("<>", self.on_manual_input_type_change) self.manual_value_label = ttk.Label(manual_frame, text="Valor Brix:") # Se actualiza dinámicamente self.manual_value_label.grid(row=1, column=0, padx=5, pady=5, sticky="w") # self.manual_value_var y self.manual_slider_var inicializadas en __init__ self.manual_value_entry = ttk.Entry(manual_frame, textvariable=self.manual_value_var, width=10, state=tk.DISABLED) self.manual_value_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew") self.manual_value_entry.bind('', lambda e: self.update_slider_from_entry()) self.manual_value_entry.bind('', lambda e: self.update_slider_from_entry()) # Slider self.manual_slider = ttk.Scale(manual_frame, orient=tk.HORIZONTAL, # from_ y to_ se configuran dinámicamente variable=self.manual_slider_var, command=self.on_slider_change, state=tk.DISABLED, length=200) self.manual_slider.grid(row=1, column=2, padx=5, pady=5, sticky="ew") manual_frame.columnconfigure(2, weight=1) # Controls Frame controls_frame = ttk.LabelFrame(self.frame, text="Control Simulación") controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew") self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation) self.start_button.pack(side=tk.LEFT, padx=5) self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED) self.stop_button.pack(side=tk.LEFT, padx=5) self.clear_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_graph) self.clear_graph_button.pack(side=tk.LEFT, padx=5) # Display Frame display_frame = ttk.LabelFrame(self.frame, text="Valores Actuales") display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="ew") ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w") # self.current_brix_var inicializada en __init__ ttk.Label(display_frame, textvariable=self.current_brix_var, font=("Courier", 14, "bold")).grid(row=0, column=1, padx=5, pady=5, sticky="w") ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w") # self.current_ma_var inicializada en __init__ ttk.Label(display_frame, textvariable=self.current_ma_var, font=("Courier", 14, "bold")).grid(row=1, column=1, padx=5, pady=5, sticky="w") ttk.Label(display_frame, text="Voltaje:").grid(row=2, column=0, padx=5, pady=5, sticky="w") # self.current_voltage_var inicializada en __init__ ttk.Label(display_frame, textvariable=self.current_voltage_var, font=("Courier", 14, "bold")).grid(row=2, column=1, padx=5, pady=5, sticky="w") # Log Frame log_frame = ttk.LabelFrame(self.frame, text="Log de Comunicación") log_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") self.log_text = scrolledtext.ScrolledText(log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED) self.log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) # --- Frame para Simulación de Errores --- self._setup_error_simulation_ui() # Se añade al final de create_widgets # Configurar pesos self.frame.columnconfigure(0, weight=1) self.frame.columnconfigure(1, weight=1) self.frame.rowconfigure(2, weight=1) # Log frame # Inicializar estado self.on_function_type_change() def get_graph_frame(self): """Crea y retorna el frame para el gráfico""" graph_frame = ttk.LabelFrame(self.frame, text="Gráfico Simulador") graph_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") # El rowconfigure para el gráfico se hace aquí, y el de errores abajo self.frame.rowconfigure(3, weight=1) # Graph frame (se mueve una fila abajo) return graph_frame def _setup_error_simulation_ui(self): """Crea los controles para la simulación de errores.""" error_frame = ttk.LabelFrame(self.frame, text="Simulación de Errores (Modo TCP Server)") error_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=10, sticky="ew") self.frame.rowconfigure(4, weight=0) # Error frame no se expande tanto como el log o gráfico ttk.Label(error_frame, text="Tipo de Error:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.error_type_var = tk.StringVar(value="Ninguno") self.error_type_combo = ttk.Combobox( error_frame, textvariable=self.error_type_var, state="disabled", # Se habilita/deshabilita dinámicamente values=[ "Ninguno", # Para enviar una trama normal desde este control "ID Erróneo", "Valor Fuera de Escala (mA)", "Checksum Erróneo", "Longitud Errónea (Aleatoria)", "Trama Faltante (Omitir Envío)" ] ) self.error_type_combo.current(0) self.error_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew") self.send_error_button = ttk.Button(error_frame, text="Enviar Trama Errónea", command=self.send_selected_error_manually, state=tk.DISABLED) self.send_error_button.grid(row=0, column=2, padx=5, pady=5) self.random_error_var = tk.BooleanVar(value=False) self.random_error_check = ttk.Checkbutton( error_frame, text="Errores Aleatorios (cada ~10s)", variable=self.random_error_var, command=self.toggle_random_errors, state="disabled" # Se habilita/deshabilita dinámicamente ) self.random_error_check.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky="w") # Checkbox para reemplazar trama normal con error (ahora en su propia fila para claridad) self.replace_with_error_check = ttk.Checkbutton( error_frame, text="Reemplazar trama normal con error", variable=self.replace_normal_with_error_var, state="disabled" # Se habilita/deshabilita dinámicamente ) self.replace_with_error_check.grid(row=1, column=2, padx=(10,5), pady=5, sticky="w") ttk.Label(error_frame, text="Intervalo Errores Aleatorios (s):").grid(row=2, column=0, padx=5, pady=5, sticky="w") self.random_error_interval_entry = ttk.Entry( error_frame, textvariable=self.random_error_interval_var, width=8, # El Entry solo necesita el parent, textvariable, width y state. state="disabled" # Se habilita/deshabilita dinámicamente ) self.random_error_interval_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew") # Añadir el grid para el Entry # El grid para self.replace_with_error_check ya está definido donde se crea ese widget. error_frame.columnconfigure(1, weight=1) self.update_error_controls_state() # Establecer estado inicial def update_error_controls_state(self): """Habilita o deshabilita los controles de error según el modo de conexión.""" # Asegurarse de que los widgets de error existan antes de intentar configurarlos if not hasattr(self, 'error_type_combo'): return is_tcp_server_mode = self.shared_config['connection_type_var'].get() == "TCP-Server" # Considerar si la simulación (conexión) está activa para habilitar el envío # is_connection_active = self.simulating # O una propiedad más directa de ConnectionManager # Los controles de error solo tienen sentido si estamos en modo TCP-Server # y la conexión está activa (es decir, la simulación principal está corriendo o # el servidor está escuchando de alguna forma). # Por ahora, lo basaremos en is_tcp_server_mode y self.simulating enable_controls = is_tcp_server_mode and self.simulating new_state_tk = tk.NORMAL if enable_controls else tk.DISABLED new_state_str = "normal" if enable_controls else "disabled" # Para Checkbutton self.error_type_combo.config(state=new_state_tk if is_tcp_server_mode else tk.DISABLED) # Combo siempre según modo self.send_error_button.config(state=new_state_tk) self.random_error_check.config(state=new_state_str) # El entry del intervalo de errores aleatorios depende de que el check de errores aleatorios esté activo interval_entry_state_tk = tk.NORMAL if enable_controls and self.random_error_var.get() else tk.DISABLED self.random_error_interval_entry.config(state=interval_entry_state_tk) # El check de "Reemplazar trama normal" se habilita si los controles de error están habilitados self.replace_with_error_check.config(state=new_state_str) if not enable_controls and self.random_error_var.get(): self.random_error_var.set(False) self.toggle_random_errors() # Detiene el timer si estaba activo y se deshabilitan controles def get_current_error_sim_parameters(self): """Obtiene parámetros para la simulación de errores (dirección ADAM, valor mA base).""" adam_address = self.adam_address_var.get() base_ma_value = 12.345 # Valor por defecto if self.function_type_var.get() == "Manual": try: manual_val = float(self.manual_value_var.get()) input_type = self.manual_input_type_var.get() if input_type == "Brix": min_b = float(self.shared_config['min_brix_map_var'].get()) max_b = float(self.shared_config['max_brix_map_var'].get()) base_ma_value = ProtocolHandler.scale_to_ma(manual_val, min_b, max_b) elif input_type == "mA": base_ma_value = manual_val elif input_type == "Voltaje": base_ma_value = ProtocolHandler.voltage_to_ma(manual_val) except (ValueError, KeyError, TypeError): Utils.log_message(self.log_text, "Error Sim: Usando valor mA base por defecto para error.") else: # Si no es manual, o para tener un valor si la simulación principal no corre # Podríamos tomar el self.current_ma_var si la simulación está corriendo # pero para simplicidad, un valor fijo si no es manual. pass # Mantiene 12.345 return adam_address, base_ma_value def on_function_type_change(self, event=None): """Maneja el cambio de tipo de función""" func_type = self.function_type_var.get() is_manual_mode = (func_type == "Manual") # Si la simulación está corriendo y el tipo de función cambia, detenerla. if self.simulating: self.stop_simulation() # Configurar controles de entrada manual manual_specific_state = tk.NORMAL if is_manual_mode else tk.DISABLED self.manual_input_type_combo.config(state=manual_specific_state) self.manual_value_entry.config(state=manual_specific_state) self.manual_slider.config(state=manual_specific_state) # Tiempo de ciclo y muestras por ciclo ahora están habilitados para todos los modos continuos self.cycle_time_entry.config(state=tk.NORMAL) self.samples_per_cycle_entry.config(state=tk.NORMAL) if is_manual_mode: self.on_manual_input_type_change() # Actualizar rangos de slider/entry y valor actual # El estado de los botones Start/Stop depende de si la simulación está (o estaba) corriendo. # Como stop_simulation() se llama arriba si estaba corriendo, self.simulating debería ser False aquí. if not self.simulating: self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) else: # Este estado idealmente no se alcanzaría si stop_simulation() # establece correctamente self.simulating a False y actualiza los botones. # Sin embargo, como salvaguarda: self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self.update_error_controls_state() # Actualizar estado de controles de error def on_manual_input_type_change(self, event=None): """Maneja el cambio de tipo de entrada manual (Brix, mA, Voltaje)""" input_type = self.manual_input_type_var.get() min_val, max_val, default_val, label_text, precision = 0, 100, 10.0, "Valor Brix:", 2 if input_type == "Brix": try: min_val = float(self.shared_config['min_brix_map_var'].get()) max_val = float(self.shared_config['max_brix_map_var'].get()) if min_val >= max_val: min_val, max_val = 0.0, 80.0 # Fallback default_val = min_val + (max_val - min_val) / 4 except (ValueError, KeyError, TypeError): min_val, max_val = 0.0, 80.0 default_val = 10.0 label_text = "Valor Brix:" precision = 2 elif input_type == "mA": min_val, max_val = 0.0, 20.0 default_val = 12.0 label_text = "Valor mA:" precision = 3 elif input_type == "Voltaje": min_val, max_val = 0.0, 10.0 default_val = 5.0 label_text = "Valor Voltaje:" precision = 2 self.manual_value_label.config(text=label_text) self.manual_slider.config(from_=min_val, to=max_val) try: current_numeric_val = float(self.manual_value_var.get()) if not (min_val <= current_numeric_val <= max_val): self.manual_value_var.set(f"{default_val:.{precision}f}") self.manual_slider_var.set(default_val) else: self.manual_slider_var.set(current_numeric_val) self.manual_value_var.set(f"{current_numeric_val:.{precision}f}") except ValueError: self.manual_value_var.set(f"{default_val:.{precision}f}") self.manual_slider_var.set(default_val) def on_slider_change(self, value_str): """Actualiza el valor del entry cuando cambia el slider""" value = float(value_str) input_type = self.manual_input_type_var.get() precision = 2 if input_type == "Brix": precision = 2 elif input_type == "mA": precision = 3 elif input_type == "Voltaje": precision = 2 self.manual_value_var.set(f"{value:.{precision}f}") def update_slider_from_entry(self): """Actualiza el slider cuando cambia el entry""" try: value = float(self.manual_value_var.get()) input_type = self.manual_input_type_var.get() min_val, max_val, precision = 0,100,2 if input_type == "Brix": min_val = float(self.shared_config['min_brix_map_var'].get()) max_val = float(self.shared_config['max_brix_map_var'].get()) if min_val >= max_val: min_val, max_val = 0.0, 80.0 precision = 2 elif input_type == "mA": min_val, max_val, precision = 0.0, 20.0, 3 elif input_type == "Voltaje": min_val, max_val, precision = 0.0, 10.0, 2 value = max(min_val, min(max_val, value)) # Clampear al rango self.manual_slider_var.set(value) self.manual_value_var.set(f"{value:.{precision}f}") except (ValueError, KeyError, TypeError): # Si el valor no es un número o shared_config no está listo, resetear al valor del slider current_slider_val = self.manual_slider_var.get() precision_fallback = 2 if self.manual_input_type_var.get() == "mA": precision_fallback = 3 self.manual_value_var.set(f"{current_slider_val:.{precision_fallback}f}") def start_simulation(self): """Inicia la simulación continua""" if self.simulating: messagebox.showwarning("Advertencia", "La simulación ya está en curso.") return try: adam_address = self.adam_address_var.get() if len(adam_address) != 2: messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.") return cycle_time = float(self.cycle_time_var.get()) if cycle_time <= 0: messagebox.showerror("Error", "El tiempo de ciclo debe ser mayor que 0.") return samples_per_cycle = int(self.samples_per_cycle_var.get()) if samples_per_cycle <= 0: messagebox.showerror("Error", "Las muestras por ciclo deben ser mayor que 0.") return # Validar mapeo Brix float(self.shared_config['min_brix_map_var'].get()) float(self.shared_config['max_brix_map_var'].get()) except (ValueError, KeyError, TypeError): messagebox.showerror("Error", "Valores inválidos en la configuración (ADAM, ciclo, muestras o mapeo Brix).") return try: current_config_values = { 'connection_type': self.shared_config['connection_type_var'].get(), 'com_port': self.shared_config['com_port_var'].get(), 'baud_rate': self.shared_config['baud_rate_var'].get(), 'ip_address': self.shared_config['ip_address_var'].get(), 'port': self.shared_config['port_var'].get(), } conn_type = current_config_values['connection_type'] conn_params = self.shared_config['config_manager'].get_connection_params(current_config_values) # open_connection ahora devuelve (connection_object, listening_info) # El connection_object se guarda internamente en self.connection_manager _, listening_details = self.connection_manager.open_connection(conn_type, conn_params) if conn_type == "TCP-Server": Utils.log_message(self.log_text, f"{listening_details} para simulación.") elif conn_type != "TCP-Server": # Para otros tipos, el mensaje genérico Utils.log_message(self.log_text, f"Conexión {conn_type} abierta para simulación.") except Exception as e: messagebox.showerror("Error de Conexión", str(e)) return self.simulating = True self.simulation_step = 0 self.start_time = time.time() # Reset start time for graph self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self._set_entries_state(tk.DISABLED) self.update_error_controls_state() # Habilitar controles de error si es TCP Server if conn_type == "TCP-Server": self.shared_config['client_connected_var'].set("Esperando...") self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True) self.simulation_thread.start() Utils.log_message(self.log_text, "Simulación iniciada.") def stop_simulation(self): """Detiene la simulación""" if not self.simulating: return self.simulating = False # Detener el timer de errores aleatorios primero if self.random_error_timer and self.random_error_timer.is_alive(): self.random_error_timer_stop_event.set() self.random_error_timer.join(timeout=1.0) # Esperar un poco self.random_error_timer = None self.next_frame_is_error_event.clear() self.error_details_for_replacement = None if self.simulation_thread and self.simulation_thread.is_alive(): self.simulation_thread.join(timeout=2.0) self.connection_manager.close_connection() Utils.log_message(self.log_text, "Conexión cerrada.") self._set_entries_state(tk.NORMAL) self.on_function_type_change() # Re-evaluar estado de controles manuales if self.connection_manager.connection_type == "TCP-Server": # Limpiar info del cliente self.shared_config['client_connected_var'].set("Ninguno") Utils.log_message(self.log_text, "Simulación detenida.") self.current_brix_var.set("---") self.current_ma_var.set("--.-- mA") self.current_voltage_var.set("-.-- V") self.start_button.config(state=tk.NORMAL) # Mover después de _set_entries_state y on_function_type_change self.stop_button.config(state=tk.DISABLED) self.update_error_controls_state() # Deshabilitar controles de error def run_simulation(self): """Thread principal de simulación""" try: adam_address = self.adam_address_var.get() min_brix_map = float(self.shared_config['min_brix_map_var'].get()) max_brix_map = float(self.shared_config['max_brix_map_var'].get()) function_type = self.function_type_var.get() cycle_time = float(self.cycle_time_var.get()) samples_per_cycle = int(self.samples_per_cycle_var.get()) conn_type = self.connection_manager.connection_type # Obtener el tipo de conexión actual # Obtener la configuración actual para el log del puerto en TCP-Server current_config_values = { 'connection_type': self.shared_config['connection_type_var'].get(), 'com_port': self.shared_config['com_port_var'].get(), 'baud_rate': self.shared_config['baud_rate_var'].get(), 'ip_address': self.shared_config['ip_address_var'].get(), 'port': self.shared_config['port_var'].get(), } sample_period = cycle_time / samples_per_cycle while self.simulating: message_to_send = None ma_value_for_message_generation = 0.0 # mA que se usaría para generar la trama (normal o base para error) # --- Determinar valores base de la simulación para este ciclo (Brix, mA) --- # Esta lógica calcula los valores que se mostrarían y graficarían, # y que se usarían para generar una trama normal. target_brix = 0.0 # Brix consistente con target_ma para display/graph # target_ma es el valor de mA que se usaría para generar el mensaje ADAM si fuera normal # o el valor base si un error lo reemplaza. current_manual_input_type = self.manual_input_type_var.get() # Cache para este ciclo if function_type == "Manual": # Lógica para modo Manual manual_input_type = self.manual_input_type_var.get() manual_numeric_value = 0.0 try: manual_numeric_value = float(self.manual_value_var.get()) except ValueError: Utils.log_message(self.log_text, f"Valor manual inválido: '{self.manual_value_var.get()}'. Usando valor por defecto.") if manual_input_type == "Brix": manual_numeric_value = min_brix_map elif manual_input_type == "mA": manual_numeric_value = 4.0 elif manual_input_type == "Voltaje": manual_numeric_value = ProtocolHandler.ma_to_voltage(4.0) if manual_input_type == "Brix": target_brix = manual_numeric_value ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map) elif manual_input_type == "mA": ma_value_for_message_generation = manual_numeric_value target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map) elif manual_input_type == "Voltaje": voltage_input = manual_numeric_value ma_value_for_message_generation = ProtocolHandler.voltage_to_ma(voltage_input) target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map) elif function_type == "Lineal": cycle_progress = (self.simulation_step % (2 * samples_per_cycle)) / samples_per_cycle if cycle_progress > 1.0: cycle_progress = 2.0 - cycle_progress target_brix = min_brix_map + (max_brix_map - min_brix_map) * cycle_progress ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map) elif function_type == "Sinusoidal": progress = (self.simulation_step % samples_per_cycle) / samples_per_cycle phase = progress * 2 * math.pi sin_val = (math.sin(phase) + 1) / 2 target_brix = min_brix_map + (max_brix_map - min_brix_map) * sin_val ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map) # ma_value_in_message es el valor de mA que realmente se usaría en la trama o que se mostraría # Si la trama es reemplazada por un error, este valor sigue siendo el de la simulación normal # para la UI, pero la trama enviada será diferente. ma_value_for_ui_display = ma_value_for_message_generation voltage_value_display = ProtocolHandler.ma_to_voltage(ma_value_for_ui_display) # --- Preparar la trama a enviar (normal o error de reemplazo) --- log_prefix_for_send = "Enviando" log_suffix_for_send = "" actual_error_type_sent = "Normal" # Para el log if self.next_frame_is_error_event.is_set() and \ self.error_details_for_replacement is not None and \ self.replace_normal_with_error_var.get(): error_msg_bytes, error_log_suffix, error_type_str = self.error_details_for_replacement message_to_send = error_msg_bytes log_prefix_for_send = "Error Sim (Reemplazo Programado)" log_suffix_for_send = error_log_suffix actual_error_type_sent = error_type_str self.next_frame_is_error_event.clear() self.error_details_for_replacement = None else: # Generar trama normal message_to_send, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, ma_value_for_message_generation) # Preparar texto para display brix_display_text = "" if ma_value_for_ui_display < 4.0 and function_type == "Manual" and \ (current_manual_input_type == "mA" or current_manual_input_type == "Voltaje"): brix_display_text = "Error (Sub 4mA)" else: brix_display_text = Utils.format_brix_display(target_brix) # Actualizar GUI (StringVars son thread-safe para .set()) self.current_brix_var.set(brix_display_text) self.current_ma_var.set(Utils.format_ma_display(ma_value_for_ui_display)) self.current_voltage_var.set(ProtocolHandler.format_voltage_display(voltage_value_display)) # Agregar punto de datos al gráfico (desde el thread GUI) self.frame.after(0, lambda b=target_brix, m=ma_value_for_ui_display: self.add_data_point(b, m)) # --- Enviar la trama (normal o de error) --- if message_to_send: # Si hay algo que enviar (no es "Trama Faltante" de reemplazo) try: if conn_type == "TCP-Server": if not self.connection_manager.is_client_connected(): if not hasattr(self, '_waiting_for_client_logged') or not self._waiting_for_client_logged: port_to_log = self.shared_config['config_manager'].get_connection_params(current_config_values)['port'] Utils.log_message(self.log_text, f"TCP Server: Esperando cliente en puerto {port_to_log}...") self._waiting_for_client_logged = True if self.connection_manager.accept_client(timeout=0.05): Utils.log_message(self.log_text, f"TCP Server: Cliente conectado desde {self.connection_manager.client_address}") client_info = f"{self.connection_manager.client_address[0]}:{self.connection_manager.client_address[1]}" self.shared_config['client_connected_var'].set(client_info) self._waiting_for_client_logged = False elif not self.connection_manager.is_client_connected() and \ self.shared_config['client_connected_var'].get() != "Esperando...": self.shared_config['client_connected_var'].set("Esperando...") log_content = ProtocolHandler.format_for_display(message_to_send, hex_non_printable=True) if actual_error_type_sent != "Normal" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"): Utils.log_message(self.log_text, f"{log_prefix_for_send}: Trama '{actual_error_type_sent}'{log_suffix_for_send} -> {log_content}") else: Utils.log_message(self.log_text, f"{log_prefix_for_send}: {log_content}") self.connection_manager.send_data(message_to_send) if conn_type != "TCP-Server": # No leer respuesta en modo servidor response = self.connection_manager.read_response(timeout=0.1) if response and response.strip(): Utils.log_message(self.log_text, f"Respuesta: {ProtocolHandler.format_for_display(response)}") parsed = ProtocolHandler.parse_adam_message(response) if parsed: brix_resp = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix_map, max_brix_map) Utils.log_message(self.log_text, f" -> Addr: {parsed['address']}, " f"mA: {parsed['ma']:.3f}, " f"Brix: {brix_resp:.3f}") except self.connection_manager.ClientDisconnectedError: Utils.log_message(self.log_text, "TCP Server: Cliente desconectado. Esperando nueva conexión.") if conn_type == "TCP-Server": self.shared_config['client_connected_var'].set("Esperando...") self._waiting_for_client_logged = False except Exception as e: Utils.log_message(self.log_text, f"Error en comunicación ({conn_type}): {e}") self.frame.after(0, self.stop_simulation_error) break elif actual_error_type_sent == "Trama Faltante (Omitir Envío)" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"): # Loguear que se omitió una trama debido al reemplazo por "Trama Faltante" Utils.log_message(self.log_text, f"{log_prefix_for_send}: Simulación de '{actual_error_type_sent}'{log_suffix_for_send}. No se envió trama.") self.simulation_step += 1 time.sleep(sample_period) except Exception as e: # Catches errors in parameter fetching or main loop logic Utils.log_message(self.log_text, f"Error en simulación: {e}") if self.simulating: # Ensure stop is called only if an error occurs while simulating self.frame.after(0, self.stop_simulation_error) def stop_simulation_error(self): """Detiene la simulación debido a un error y muestra mensaje""" if self.simulating: # Solo actuar si la simulación estaba activa messagebox.showerror("Error de Simulación", "Error durante la simulación. Simulación detenida.") self.stop_simulation() # Llama al método normal de parada def generate_erroneous_message_logic(self, error_type, adam_address, base_ma_value): """Genera la trama (bytes) según el tipo de error.""" message_bytes = None log_message_suffix = "" if error_type == "ID Erróneo": wrong_adam_address = "99" if adam_address != "99" else "98" message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(wrong_adam_address, base_ma_value) log_message_suffix = f" (ID cambiado a {wrong_adam_address})" elif error_type == "Valor Fuera de Escala (mA)": out_of_scale_ma = 2.500 if random.random() < 0.5 else 22.500 message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, out_of_scale_ma) log_message_suffix = f" (valor mA: {out_of_scale_ma:.3f})" elif error_type == "Checksum Erróneo": message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value) log_message_suffix = " (checksum incorrecto)" elif error_type == "Longitud Errónea (Aleatoria)": base_msg_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value) if len(base_msg_bytes) > 1: if random.choice([True, False]): # Acortar cut_len = random.randint(1, max(1, len(base_msg_bytes) // 2)) message_bytes = base_msg_bytes[:-cut_len] log_message_suffix = f" (longitud acortada en {cut_len} bytes)" else: # Alargar add_len = random.randint(1, 5) # Aumentado un poco el largo posible garbage = bytes([random.randint(32, 126) for _ in range(add_len)]) message_bytes = base_msg_bytes + garbage # Podría ser al final o en medio log_message_suffix = f" (longitud aumentada en {add_len} bytes)" else: message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value) log_message_suffix = " (longitud errónea -> fallback a checksum incorrecto)" elif error_type == "Trama Faltante (Omitir Envío)": log_message_suffix = " (trama omitida)" return None, log_message_suffix elif error_type == "Ninguno": # Enviar trama normal message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value) log_message_suffix = " (trama normal)" else: Utils.log_message(self.log_text, f"Error Sim: Tipo de error '{error_type}' desconocido.") return None, f" (tipo de error '{error_type}' desconocido)" return message_bytes, log_message_suffix def send_selected_error_manually(self): """Manejador del botón 'Enviar Trama Errónea'.""" if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating): messagebox.showwarning("No Activo", "La simulación de errores manuales requiere modo TCP-Server y simulación activa.") return if not self.connection_manager.is_client_connected(): Utils.log_message(self.log_text, "Error Sim: No hay cliente conectado para enviar trama errónea.") # messagebox.showinfo("Sin Cliente", "No hay cliente conectado para enviar la trama errónea.") # return # Permitir enviar aunque no haya cliente, el log lo indicará error_type = self.error_type_var.get() adam_address, base_ma_value = self.get_current_error_sim_parameters() message_bytes, log_suffix_from_gen = self.generate_erroneous_message_logic(error_type, adam_address, base_ma_value) if self.replace_normal_with_error_var.get(): # Programar para reemplazo en el siguiente ciclo de simulación self.error_details_for_replacement = (message_bytes, log_suffix_from_gen, error_type) self.next_frame_is_error_event.set() if error_type == "Trama Faltante (Omitir Envío)": Utils.log_message(self.log_text, f"Error Sim Manual: Programada OMISIÓN de trama '{error_type}'{log_suffix_from_gen} para reemplazo.") elif message_bytes: Utils.log_message(self.log_text, f"Error Sim Manual: Programada trama '{error_type}'{log_suffix_from_gen} para reemplazo.") else: # Error en generación o tipo desconocido Utils.log_message(self.log_text, f"Error Sim Manual: No se pudo programar trama '{error_type}'{log_suffix_from_gen} para reemplazo.") else: # Enviar inmediatamente como trama adicional if message_bytes: try: self.connection_manager.send_data(message_bytes) Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Trama '{error_type}'{log_suffix_from_gen} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}") except Exception as e: Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Fallo al enviar trama: {e}") elif error_type == "Trama Faltante (Omitir Envío)": Utils.log_message(self.log_text, f"Error Sim Manual (Adicional): Simulación de '{error_type}'{log_suffix_from_gen}. No se envió trama adicional.") # else: Ya logueado por generate_erroneous_message_logic si message_bytes es None y no es "Trama Faltante" def toggle_random_errors(self): """Activa o desactiva el envío de errores aleatorios.""" # self.random_error_var.get() refleja el nuevo estado del checkbox debido al clic del usuario can_actually_start_random_errors = (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating) if self.random_error_var.get(): # Si el usuario intenta activar los errores aleatorios if not can_actually_start_random_errors: Utils.log_message(self.log_text, "Error Sim: Errores aleatorios solo en TCP-Server con simulación activa.") self.random_error_var.set(False) # Forzar a False ya que las condiciones no se cumplen # El timer no se iniciará. update_error_controls_state() al final se encargará. else: # Las condiciones se cumplen, iniciar el timer si no está ya activo try: interval_val = float(self.random_error_interval_var.get()) if interval_val <= 0: messagebox.showerror("Error de Intervalo", "El intervalo para errores aleatorios debe ser un número positivo.") self.random_error_var.set(False) self.update_error_controls_state() return except ValueError: messagebox.showerror("Error de Intervalo", "Valor inválido para el intervalo de errores aleatorios.") self.random_error_var.set(False) self.update_error_controls_state() return # Las condiciones se cumplen, iniciar el timer si no está ya activo if self.random_error_timer is None or not self.random_error_timer.is_alive(): self.random_error_timer_stop_event.clear() self.random_error_timer = threading.Thread(target=self._random_error_loop, args=(interval_val,), daemon=True) self.random_error_timer.start() else: # Si el usuario intenta desactivar los errores aleatorios (el checkbox ahora está desmarcado) if self.random_error_timer and self.random_error_timer.is_alive(): Utils.log_message(self.log_text, "Error Sim: Deteniendo envío de errores aleatorios.") self.random_error_timer_stop_event.set() # No es necesario join aquí, se hará en stop_simulation o al cerrar. # Actualizar siempre el estado de los controles al final, basado en el estado final de self.random_error_var self.update_error_controls_state() def _random_error_loop(self, initial_interval_s): """Bucle del hilo que envía errores aleatorios.""" possible_error_types = [val for val in self.error_type_combo['values'] if val != "Ninguno"] if not possible_error_types: return current_interval = initial_interval_s Utils.log_message(self.log_text, f"Error Sim: Hilo de errores aleatorios iniciado con intervalo {current_interval:.2f}s.") while not self.random_error_timer_stop_event.is_set(): if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating and self.connection_manager.is_client_connected()): self.random_error_timer_stop_event.wait(1.0) # Esperar si no hay cliente o no está activo continue selected_random_error = random.choice(possible_error_types) adam_address, base_ma_value = self.get_current_error_sim_parameters() message_bytes, log_suffix = self.generate_erroneous_message_logic(selected_random_error, adam_address, base_ma_value) if self.replace_normal_with_error_var.get(): # Programar el error para que reemplace la siguiente trama normal self.error_details_for_replacement = (message_bytes, log_suffix, selected_random_error) self.next_frame_is_error_event.set() # El log de este envío se hará en run_simulation cuando efectivamente se envíe/omita Utils.log_message(self.log_text, f"Error Sim Aleatorio: Programada trama '{selected_random_error}'{log_suffix} para reemplazo.") else: # Enviar el error inmediatamente, además de las tramas normales if message_bytes: try: self.connection_manager.send_data(message_bytes) Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Trama '{selected_random_error}'{log_suffix} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}") except Exception as e: Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Fallo al enviar: {e}") elif selected_random_error == "Trama Faltante (Omitir Envío)": Utils.log_message(self.log_text, f"Error Sim Aleatorio (Adicional): Simulación de '{selected_random_error}'{log_suffix}. No se envió trama adicional.") # Permitir que el intervalo se actualice dinámicamente try: new_interval = float(self.random_error_interval_var.get()) if new_interval > 0 and new_interval != current_interval: current_interval = new_interval Utils.log_message(self.log_text, f"Error Sim: Intervalo de errores aleatorios actualizado a {current_interval:.2f}s.") except ValueError: pass # Mantener el intervalo actual si el nuevo valor es inválido self.random_error_timer_stop_event.wait(timeout=current_interval) Utils.log_message(self.log_text, "Error Sim: Hilo de errores aleatorios detenido.") def add_data_point(self, brix_value, ma_value): """Agrega un punto de datos al gráfico""" current_time = time.time() - self.start_time self.time_data.append(current_time) self.brix_data.append(brix_value) self.ma_data.append(ma_value) if hasattr(self, 'graph_update_callback'): self.graph_update_callback() def clear_graph(self): """Limpia los datos del gráfico""" Utils.clear_graph_data(self.time_data, self.brix_data, self.ma_data) self.start_time = time.time() Utils.log_message(self.log_text, "Gráfico limpiado.") if hasattr(self, 'graph_update_callback'): self.graph_update_callback() def _set_entries_state(self, state): """Habilita/deshabilita los controles durante la simulación""" sim_specific_widgets = [ self.adam_address_entry, self.function_type_combo, self.cycle_time_entry, self.samples_per_cycle_entry ] # No deshabilitar controles de modo manual aquí, se manejan en on_function_type_change Utils.set_widgets_state(sim_specific_widgets, state) if 'shared_widgets' in self.shared_config: Utils.set_widgets_state(self.shared_config['shared_widgets'], state) # self.update_error_controls_state() # El estado de los controles de error depende también de self.simulating def get_config(self): """Obtiene la configuración actual del simulador""" return { 'adam_address': self.adam_address_var.get(), 'function_type': self.function_type_var.get(), 'cycle_time': self.cycle_time_var.get(), 'samples_per_cycle': self.samples_per_cycle_var.get(), 'manual_input_type': self.manual_input_type_var.get(), 'manual_value': self.manual_value_var.get(), 'random_error_interval': self.random_error_interval_var.get() } def set_config(self, config): """Establece la configuración del simulador""" self.adam_address_var.set(config.get('adam_address', '01')) self.function_type_var.set(config.get('function_type', 'Lineal')) self.cycle_time_var.set(config.get('cycle_time', '10.0')) self.samples_per_cycle_var.set(config.get('samples_per_cycle', '100')) self.manual_input_type_var.set(config.get('manual_input_type', 'Brix')) self.manual_value_var.set(config.get('manual_value', '10.0')) self.random_error_interval_var.set(config.get('random_error_interval', '10.0')) try: self.manual_slider_var.set(float(self.manual_value_var.get())) except ValueError: # Si el valor no es un float válido, intentar con un default o el valor del tipo # Esto se manejará mejor en on_manual_input_type_change pass self.on_function_type_change() # Esto llamará a on_manual_input_type_change si es necesario self.update_error_controls_state() # Actualizar estado de controles de error al cargar config def on_app_close(self): """Llamado cuando la aplicación se está cerrando para limpiar recursos.""" if self.simulating: self.stop_simulation() # Asegura que todo se detenga y limpie correctamente