MaselliSimulatorApp/tabs/simulator_tab.py

967 lines
55 KiB
Python

"""
Tab del Simulador - Genera valores de prueba en protocolo ADAM
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import random
import time
import math
from collections import deque
from protocol_handler import ProtocolHandler
from connection_manager import ConnectionManager
from utils import Utils
class SimulatorTab:
def __init__(self, parent_frame, shared_config):
self.frame = parent_frame
self.shared_config = shared_config
# Estado del simulador
self.simulating = False
self.simulation_thread = None
self.simulation_step = 0
self.connection_manager = ConnectionManager()
# Datos para el gráfico
self.max_points = 100
self.time_data = deque(maxlen=self.max_points)
self.brix_data = deque(maxlen=self.max_points)
self.ma_data = deque(maxlen=self.max_points)
self.start_time = time.time()
# Cargar configuración inicial para obtener valores por defecto para StringVars
# Esto es para asegurar que las StringVars tengan un valor inicial antes de que set_config sea llamado
# por maselli_app.py.
initial_config = self.shared_config['config_manager'].load_config()
self.adam_address_var = tk.StringVar(value=initial_config.get('adam_address', '01'))
self.function_type_var = tk.StringVar(value=initial_config.get('function_type', 'Lineal'))
self.cycle_time_var = tk.StringVar(value=initial_config.get('cycle_time', '10.0'))
self.samples_per_cycle_var = tk.StringVar(value=initial_config.get('samples_per_cycle', '100'))
# Configuración para modo manual y errores
self.manual_input_type_var = tk.StringVar(value=initial_config.get('manual_input_type', 'Brix'))
self.manual_value_var = tk.StringVar(value=initial_config.get('manual_value', '10.0'))
try:
manual_value_float = float(initial_config.get('manual_value', '10.0'))
except ValueError:
manual_value_float = 10.0 # Fallback
self.manual_slider_var = tk.DoubleVar(value=manual_value_float)
self.current_brix_var = tk.StringVar(value="---")
self.current_ma_var = tk.StringVar(value="--.-- mA")
self.current_voltage_var = tk.StringVar(value="-.-- V") # Nueva para voltaje
# Para simulación de errores
self.random_error_timer = None
self.random_error_timer_stop_event = threading.Event()
self.replace_normal_with_error_var = tk.BooleanVar(value=False)
self.next_frame_is_error_event = threading.Event()
self.random_error_interval_var = tk.StringVar(value=initial_config.get('random_error_interval', '10.0'))
self.error_details_for_replacement = None # (message_bytes, log_suffix, error_type_str)
self.actual_graph_frame_container = None # Inicializar ANTES de create_widgets
self.create_widgets()
# La línea anterior que asignaba None aquí ha sido eliminada.
def create_widgets(self):
"""Crea los widgets del tab simulador"""
# Frame de configuración del simulador
config_frame = ttk.LabelFrame(self.frame, text="Configuración Simulador")
config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2)
# Dirección ADAM
ttk.Label(config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w")
# self.adam_address_var inicializada en __init__
self.adam_address_entry = ttk.Entry(config_frame, textvariable=self.adam_address_var, width=5)
self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
# Función
ttk.Label(config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
# self.function_type_var inicializada en __init__
self.function_type_combo = ttk.Combobox(config_frame, textvariable=self.function_type_var,
values=["Lineal", "Sinusoidal", "Manual"],
state="readonly", width=10)
self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
self.function_type_combo.bind("<<ComboboxSelected>>", self.on_function_type_change)
# Tiempo de ciclo completo (nueva característica)
ttk.Label(config_frame, text="Tiempo Ciclo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w")
# self.cycle_time_var inicializada en __init__
self.cycle_time_entry = ttk.Entry(config_frame, textvariable=self.cycle_time_var, width=8)
self.cycle_time_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew")
# Velocidad de muestreo (calculada automáticamente)
ttk.Label(config_frame, text="Muestras/ciclo:").grid(row=0, column=6, padx=5, pady=5, sticky="w")
# self.samples_per_cycle_var inicializada en __init__
self.samples_per_cycle_entry = ttk.Entry(config_frame, textvariable=self.samples_per_cycle_var, width=8)
self.samples_per_cycle_entry.grid(row=0, column=7, padx=5, pady=5, sticky="ew")
# --- Frame para modo Manual (Modificado) ---
manual_frame = ttk.LabelFrame(config_frame, text="Modo Manual")
manual_frame.grid(row=1, column=0, columnspan=8, padx=5, pady=5, sticky="ew")
ttk.Label(manual_frame, text="Entrada Por:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
# self.manual_input_type_var inicializada en __init__
self.manual_input_type_combo = ttk.Combobox(manual_frame, textvariable=self.manual_input_type_var,
values=["Brix", "mA", "Voltaje"], state="readonly", width=8)
self.manual_input_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.manual_input_type_combo.bind("<<ComboboxSelected>>", self.on_manual_input_type_change)
self.manual_value_label = ttk.Label(manual_frame, text="Valor Brix:") # Se actualiza dinámicamente
self.manual_value_label.grid(row=1, column=0, padx=5, pady=5, sticky="w")
# self.manual_value_var y self.manual_slider_var inicializadas en __init__
self.manual_value_entry = ttk.Entry(manual_frame, textvariable=self.manual_value_var, width=10, state=tk.DISABLED)
self.manual_value_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
self.manual_value_entry.bind('<Return>', lambda e: self.update_slider_from_entry())
self.manual_value_entry.bind('<FocusOut>', lambda e: self.update_slider_from_entry())
# Slider
self.manual_slider = ttk.Scale(manual_frame, orient=tk.HORIZONTAL, # from_ y to_ se configuran dinámicamente
variable=self.manual_slider_var, command=self.on_slider_change,
state=tk.DISABLED, length=200)
self.manual_slider.grid(row=1, column=2, padx=5, pady=5, sticky="ew")
manual_frame.columnconfigure(2, weight=1)
# Controls Frame
controls_frame = ttk.LabelFrame(self.frame, text="Control Simulación")
controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.clear_comm_log_button = ttk.Button(controls_frame, text="Limpiar Log Com.", command=self.clear_comm_log)
self.clear_comm_log_button.pack(side=tk.LEFT, padx=5)
self.clear_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_graph)
self.clear_graph_button.pack(side=tk.LEFT, padx=5)
# Display Frame
display_frame = ttk.LabelFrame(self.frame, text="Valores Actuales")
display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="nsew")
ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
# self.current_brix_var inicializada en __init__
ttk.Label(display_frame, textvariable=self.current_brix_var,
font=("Courier", 14, "bold")).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
# self.current_ma_var inicializada en __init__
ttk.Label(display_frame, textvariable=self.current_ma_var,
font=("Courier", 14, "bold")).grid(row=1, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="Voltaje:").grid(row=2, column=0, padx=5, pady=5, sticky="w")
# self.current_voltage_var inicializada en __init__
ttk.Label(display_frame, textvariable=self.current_voltage_var,
font=("Courier", 14, "bold")).grid(row=2, column=1, padx=5, pady=5, sticky="w")
# Event Log Frame (antes era el log principal)
event_log_frame = ttk.LabelFrame(self.frame, text="Log de Eventos")
event_log_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.event_log_text = scrolledtext.ScrolledText(event_log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.event_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# --- Frame para Comunicación y Gráfico ---
comm_and_graph_parent_frame = ttk.Frame(self.frame)
comm_and_graph_parent_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
comm_and_graph_parent_frame.columnconfigure(0, weight=1) # Comm log
comm_and_graph_parent_frame.columnconfigure(1, weight=1) # Graph
comm_and_graph_parent_frame.rowconfigure(0, weight=1) # Ambos toman la altura completa de esta fila
# Comm Log Frame (Nuevo)
comm_log_frame = ttk.LabelFrame(comm_and_graph_parent_frame, text="Log de Comunicación (Simulador)")
comm_log_frame.grid(row=0, column=0, padx=(0, 5), pady=0, sticky="nsew")
comm_log_frame.rowconfigure(0, weight=1)
comm_log_frame.columnconfigure(0, weight=1)
self.comm_log_text = scrolledtext.ScrolledText(comm_log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.comm_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# Graph Frame Container (el gráfico se insertará aquí por MaselliApp)
# self.get_graph_frame() ahora devuelve este contenedor.
self.actual_graph_frame_container = ttk.LabelFrame(comm_and_graph_parent_frame, text="Gráfico Simulador")
self.actual_graph_frame_container.grid(row=0, column=1, padx=(5, 0), pady=0, sticky="nsew")
# --- Frame para Simulación de Errores ---
self._setup_error_simulation_ui() # Se añade al final de create_widgets
# Configurar pesos
self.frame.columnconfigure(0, weight=1)
self.frame.columnconfigure(1, weight=1)
self.frame.rowconfigure(2, weight=1) # Event log frame
self.frame.rowconfigure(3, weight=3) # Comm and Graph parent frame (más altura)
self.frame.rowconfigure(4, weight=0) # Error frame no se expande tanto
# Inicializar estado
self.on_function_type_change()
def get_graph_frame(self):
"""Retorna el frame contenedor donde se debe dibujar el gráfico del simulador."""
return self.actual_graph_frame_container
def _setup_error_simulation_ui(self):
"""Crea los controles para la simulación de errores."""
error_frame = ttk.LabelFrame(self.frame, text="Simulación de Errores (Modo TCP Server)")
error_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=10, sticky="ew")
self.frame.rowconfigure(4, weight=0) # Error frame no se expande tanto como el log o gráfico
ttk.Label(error_frame, text="Tipo de Error:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.error_type_var = tk.StringVar(value="Ninguno")
self.error_type_combo = ttk.Combobox(
error_frame,
textvariable=self.error_type_var,
state="disabled", # Se habilita/deshabilita dinámicamente
values=[
"Ninguno", # Para enviar una trama normal desde este control
"ID Erróneo",
"Valor Fuera de Escala (mA)",
"Checksum Erróneo",
"Longitud Errónea (Aleatoria)",
"Trama Faltante (Omitir Envío)"
]
)
self.error_type_combo.current(0)
self.error_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.send_error_button = ttk.Button(error_frame, text="Enviar Trama Errónea",
command=self.send_selected_error_manually, state=tk.DISABLED)
self.send_error_button.grid(row=0, column=2, padx=5, pady=5)
self.random_error_var = tk.BooleanVar(value=False)
self.random_error_check = ttk.Checkbutton(
error_frame,
text="Errores Aleatorios (cada ~10s)",
variable=self.random_error_var,
command=self.toggle_random_errors,
state="disabled" # Se habilita/deshabilita dinámicamente
)
self.random_error_check.grid(row=1, column=0, columnspan=2, padx=5, pady=5, sticky="w")
# Checkbox para reemplazar trama normal con error (ahora en su propia fila para claridad)
self.replace_with_error_check = ttk.Checkbutton(
error_frame,
text="Reemplazar trama normal con error",
variable=self.replace_normal_with_error_var,
state="disabled" # Se habilita/deshabilita dinámicamente
)
self.replace_with_error_check.grid(row=1, column=2, padx=(10,5), pady=5, sticky="w")
ttk.Label(error_frame, text="Intervalo Errores Aleatorios (s):").grid(row=2, column=0, padx=5, pady=5, sticky="w")
self.random_error_interval_entry = ttk.Entry(
error_frame,
textvariable=self.random_error_interval_var,
width=8, # El Entry solo necesita el parent, textvariable, width y state.
state="disabled" # Se habilita/deshabilita dinámicamente
)
self.random_error_interval_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew") # Añadir el grid para el Entry
# El grid para self.replace_with_error_check ya está definido donde se crea ese widget.
error_frame.columnconfigure(1, weight=1)
self.update_error_controls_state() # Establecer estado inicial
def update_error_controls_state(self):
"""Habilita o deshabilita los controles de error según el modo de conexión."""
# Asegurarse de que los widgets de error existan antes de intentar configurarlos
if not hasattr(self, 'error_type_combo'):
return
is_tcp_server_mode = self.shared_config['connection_type_var'].get() == "TCP-Server"
# Considerar si la simulación (conexión) está activa para habilitar el envío
# is_connection_active = self.simulating # O una propiedad más directa de ConnectionManager
# Los controles de error solo tienen sentido si estamos en modo TCP-Server
# y la conexión está activa (es decir, la simulación principal está corriendo o
# el servidor está escuchando de alguna forma).
# Por ahora, lo basaremos en is_tcp_server_mode y self.simulating
enable_controls = is_tcp_server_mode and self.simulating
new_state_tk = tk.NORMAL if enable_controls else tk.DISABLED
new_state_str = "normal" if enable_controls else "disabled" # Para Checkbutton
self.error_type_combo.config(state=new_state_tk if is_tcp_server_mode else tk.DISABLED) # Combo siempre según modo
self.send_error_button.config(state=new_state_tk)
self.random_error_check.config(state=new_state_str)
# El entry del intervalo de errores aleatorios depende de que el check de errores aleatorios esté activo
interval_entry_state_tk = tk.NORMAL if enable_controls and self.random_error_var.get() else tk.DISABLED
self.random_error_interval_entry.config(state=interval_entry_state_tk)
# El check de "Reemplazar trama normal" se habilita si los controles de error están habilitados
self.replace_with_error_check.config(state=new_state_str)
if not enable_controls and self.random_error_var.get():
self.random_error_var.set(False)
self.toggle_random_errors() # Detiene el timer si estaba activo y se deshabilitan controles
def get_current_error_sim_parameters(self):
"""Obtiene parámetros para la simulación de errores (dirección ADAM, valor mA base)."""
adam_address = self.adam_address_var.get()
base_ma_value = 12.345 # Valor por defecto
if self.function_type_var.get() == "Manual":
try:
manual_val = float(self.manual_value_var.get())
input_type = self.manual_input_type_var.get()
if input_type == "Brix":
min_b = float(self.shared_config['min_brix_map_var'].get())
max_b = float(self.shared_config['max_brix_map_var'].get())
base_ma_value = ProtocolHandler.scale_to_ma(manual_val, min_b, max_b)
elif input_type == "mA": # noqa: E721
base_ma_value = manual_val
elif input_type == "Voltaje":
base_ma_value = ProtocolHandler.voltage_to_ma(manual_val)
except (ValueError, KeyError, TypeError):
Utils.log_message(self.log_text, "Error Sim: Usando valor mA base por defecto para error.")
else: # Si no es manual, o para tener un valor si la simulación principal no corre
# Podríamos tomar el self.current_ma_var si la simulación está corriendo
# pero para simplicidad, un valor fijo si no es manual.
pass # Mantiene 12.345
return adam_address, base_ma_value
def on_function_type_change(self, event=None):
"""Maneja el cambio de tipo de función"""
func_type = self.function_type_var.get()
is_manual_mode = (func_type == "Manual")
# Si la simulación está corriendo y el tipo de función cambia, detenerla.
if self.simulating:
self.stop_simulation()
# Configurar controles de entrada manual
manual_specific_state = tk.NORMAL if is_manual_mode else tk.DISABLED
self.manual_input_type_combo.config(state=manual_specific_state)
self.manual_value_entry.config(state=manual_specific_state)
self.manual_slider.config(state=manual_specific_state)
# Tiempo de ciclo y muestras por ciclo ahora están habilitados para todos los modos continuos
self.cycle_time_entry.config(state=tk.NORMAL)
self.samples_per_cycle_entry.config(state=tk.NORMAL)
if is_manual_mode:
self.on_manual_input_type_change() # Actualizar rangos de slider/entry y valor actual
# El estado de los botones Start/Stop depende de si la simulación está (o estaba) corriendo.
# Como stop_simulation() se llama arriba si estaba corriendo, self.simulating debería ser False aquí.
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
else:
# Este estado idealmente no se alcanzaría si stop_simulation()
# establece correctamente self.simulating a False y actualiza los botones.
# Sin embargo, como salvaguarda:
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.update_error_controls_state() # Actualizar estado de controles de error
def on_manual_input_type_change(self, event=None):
"""Maneja el cambio de tipo de entrada manual (Brix, mA, Voltaje)"""
input_type = self.manual_input_type_var.get()
min_val, max_val, default_val, label_text, precision = 0, 100, 10.0, "Valor Brix:", 2
if input_type == "Brix":
try:
min_val = float(self.shared_config['min_brix_map_var'].get())
max_val = float(self.shared_config['max_brix_map_var'].get())
if min_val >= max_val: min_val, max_val = 0.0, 80.0 # Fallback
default_val = min_val + (max_val - min_val) / 4
except (ValueError, KeyError, TypeError):
min_val, max_val = 0.0, 80.0
default_val = 10.0 # noqa: F841
label_text = "Valor Brix:"
precision = 2
elif input_type == "mA": # noqa: E721
min_val, max_val = 0.0, 20.0
default_val = 12.0
label_text = "Valor mA:"
precision = 3
elif input_type == "Voltaje":
min_val, max_val = 0.0, 10.0
default_val = 5.0
label_text = "Valor Voltaje:"
precision = 2
self.manual_value_label.config(text=label_text)
self.manual_slider.config(from_=min_val, to=max_val)
try:
current_numeric_val = float(self.manual_value_var.get())
if not (min_val <= current_numeric_val <= max_val):
self.manual_value_var.set(f"{default_val:.{precision}f}")
self.manual_slider_var.set(default_val)
else:
self.manual_slider_var.set(current_numeric_val)
self.manual_value_var.set(f"{current_numeric_val:.{precision}f}")
except ValueError:
self.manual_value_var.set(f"{default_val:.{precision}f}")
self.manual_slider_var.set(default_val)
def on_slider_change(self, value_str):
"""Actualiza el valor del entry cuando cambia el slider"""
value = float(value_str)
input_type = self.manual_input_type_var.get()
precision = 2
if input_type == "Brix": precision = 2 # noqa: E701
elif input_type == "mA": precision = 3
elif input_type == "Voltaje": precision = 2
self.manual_value_var.set(f"{value:.{precision}f}")
def update_slider_from_entry(self):
"""Actualiza el slider cuando cambia el entry"""
try:
value = float(self.manual_value_var.get())
input_type = self.manual_input_type_var.get()
min_val, max_val, precision = 0,100,2
if input_type == "Brix":
min_val = float(self.shared_config['min_brix_map_var'].get())
max_val = float(self.shared_config['max_brix_map_var'].get())
if min_val >= max_val: min_val, max_val = 0.0, 80.0
precision = 2
elif input_type == "mA": min_val, max_val, precision = 0.0, 20.0, 3 # noqa: E701
elif input_type == "Voltaje": min_val, max_val, precision = 0.0, 10.0, 2
value = max(min_val, min(max_val, value)) # Clampear al rango
self.manual_slider_var.set(value)
self.manual_value_var.set(f"{value:.{precision}f}")
except (ValueError, KeyError, TypeError):
# Si el valor no es un número o shared_config no está listo, resetear al valor del slider
current_slider_val = self.manual_slider_var.get()
precision_fallback = 2
if self.manual_input_type_var.get() == "mA": precision_fallback = 3
self.manual_value_var.set(f"{current_slider_val:.{precision_fallback}f}")
def start_simulation(self):
"""Inicia la simulación continua"""
if self.simulating:
messagebox.showwarning("Advertencia", "La simulación ya está en curso.")
return
try:
adam_address = self.adam_address_var.get()
if len(adam_address) != 2:
messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.")
return
cycle_time = float(self.cycle_time_var.get())
if cycle_time <= 0:
messagebox.showerror("Error", "El tiempo de ciclo debe ser mayor que 0.")
return
samples_per_cycle = int(self.samples_per_cycle_var.get())
if samples_per_cycle <= 0:
messagebox.showerror("Error", "Las muestras por ciclo deben ser mayor que 0.")
return
# Validar mapeo Brix
float(self.shared_config['min_brix_map_var'].get())
float(self.shared_config['max_brix_map_var'].get())
except (ValueError, KeyError, TypeError):
messagebox.showerror("Error", "Valores inválidos en la configuración (ADAM, ciclo, muestras o mapeo Brix).")
return
try:
current_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
conn_type = current_config_values['connection_type']
conn_params = self.shared_config['config_manager'].get_connection_params(current_config_values)
# open_connection ahora devuelve (connection_object, listening_info)
# El connection_object se guarda internamente en self.connection_manager
_, listening_details = self.connection_manager.open_connection(conn_type, conn_params)
if conn_type == "TCP-Server":
Utils.log_message(self.comm_log_text, f"{listening_details} para simulación.")
elif conn_type != "TCP-Server": # Para otros tipos, el mensaje genérico
Utils.log_message(self.comm_log_text, f"Conexión {conn_type} abierta para simulación.")
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
return
self.simulating = True
self.simulation_step = 0
self.start_time = time.time() # Reset start time for graph
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self._set_entries_state(tk.DISABLED)
self.update_error_controls_state() # Habilitar controles de error si es TCP Server
if conn_type == "TCP-Server":
self.shared_config['client_connected_var'].set("Esperando...")
self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True)
self.simulation_thread.start() # noqa: E701
Utils.log_message(self.event_log_text, "Simulación iniciada.")
def stop_simulation(self):
"""Detiene la simulación"""
if not self.simulating:
return
self.simulating = False
# Detener el timer de errores aleatorios primero
if self.random_error_timer and self.random_error_timer.is_alive():
self.random_error_timer_stop_event.set()
self.random_error_timer.join(timeout=1.0) # Esperar un poco
self.random_error_timer = None
self.next_frame_is_error_event.clear()
self.error_details_for_replacement = None
if self.simulation_thread and self.simulation_thread.is_alive():
self.simulation_thread.join(timeout=2.0)
self.connection_manager.close_connection()
Utils.log_message(self.comm_log_text, "Conexión cerrada.")
self._set_entries_state(tk.NORMAL)
self.on_function_type_change() # Re-evaluar estado de controles manuales
if self.connection_manager.connection_type == "TCP-Server": # Limpiar info del cliente
self.shared_config['client_connected_var'].set("Ninguno")
Utils.log_message(self.event_log_text, "Simulación detenida.")
self.current_brix_var.set("---")
self.current_ma_var.set("--.-- mA")
self.current_voltage_var.set("-.-- V")
self.start_button.config(state=tk.NORMAL) # Mover después de _set_entries_state y on_function_type_change
self.stop_button.config(state=tk.DISABLED)
self.update_error_controls_state() # Deshabilitar controles de error
def run_simulation(self):
"""Thread principal de simulación"""
try:
adam_address = self.adam_address_var.get()
min_brix_map = float(self.shared_config['min_brix_map_var'].get())
max_brix_map = float(self.shared_config['max_brix_map_var'].get())
function_type = self.function_type_var.get()
cycle_time = float(self.cycle_time_var.get())
samples_per_cycle = int(self.samples_per_cycle_var.get())
conn_type = self.connection_manager.connection_type # Obtener el tipo de conexión actual
# Obtener la configuración actual para el log del puerto en TCP-Server
current_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
sample_period = cycle_time / samples_per_cycle
while self.simulating:
message_to_send = None
ma_value_for_message_generation = 0.0 # mA que se usaría para generar la trama (normal o base para error)
# --- Determinar valores base de la simulación para este ciclo (Brix, mA) ---
# Esta lógica calcula los valores que se mostrarían y graficarían,
# y que se usarían para generar una trama normal.
target_brix = 0.0 # Brix consistente con target_ma para display/graph
# target_ma es el valor de mA que se usaría para generar el mensaje ADAM si fuera normal
# o el valor base si un error lo reemplaza.
current_manual_input_type = self.manual_input_type_var.get() # Cache para este ciclo
if function_type == "Manual": # Lógica para modo Manual
manual_input_type = self.manual_input_type_var.get()
manual_numeric_value = 0.0
try:
manual_numeric_value = float(self.manual_value_var.get())
except ValueError:
Utils.log_message(self.event_log_text, f"Valor manual inválido: '{self.manual_value_var.get()}'. Usando valor por defecto.")
if manual_input_type == "Brix": manual_numeric_value = min_brix_map
elif manual_input_type == "mA": manual_numeric_value = 4.0
elif manual_input_type == "Voltaje": manual_numeric_value = ProtocolHandler.ma_to_voltage(4.0)
if manual_input_type == "Brix":
target_brix = manual_numeric_value
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
elif manual_input_type == "mA":
ma_value_for_message_generation = manual_numeric_value
target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map)
elif manual_input_type == "Voltaje":
voltage_input = manual_numeric_value
ma_value_for_message_generation = ProtocolHandler.voltage_to_ma(voltage_input)
target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map)
elif function_type == "Lineal":
cycle_progress = (self.simulation_step % (2 * samples_per_cycle)) / samples_per_cycle
if cycle_progress > 1.0:
cycle_progress = 2.0 - cycle_progress
target_brix = min_brix_map + (max_brix_map - min_brix_map) * cycle_progress
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
elif function_type == "Sinusoidal":
progress = (self.simulation_step % samples_per_cycle) / samples_per_cycle
phase = progress * 2 * math.pi
sin_val = (math.sin(phase) + 1) / 2
target_brix = min_brix_map + (max_brix_map - min_brix_map) * sin_val
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
# ma_value_in_message es el valor de mA que realmente se usaría en la trama o que se mostraría
# Si la trama es reemplazada por un error, este valor sigue siendo el de la simulación normal
# para la UI, pero la trama enviada será diferente.
ma_value_for_ui_display = ma_value_for_message_generation
voltage_value_display = ProtocolHandler.ma_to_voltage(ma_value_for_ui_display)
# --- Preparar la trama a enviar (normal o error de reemplazo) ---
log_prefix_for_send = "Enviando"
log_suffix_for_send = ""
actual_error_type_sent = "Normal" # Para el log
if self.next_frame_is_error_event.is_set() and \
self.error_details_for_replacement is not None and \
self.replace_normal_with_error_var.get():
error_msg_bytes, error_log_suffix, error_type_str = self.error_details_for_replacement
message_to_send = error_msg_bytes
log_prefix_for_send = "Error Sim (Reemplazo Programado)"
log_suffix_for_send = error_log_suffix
actual_error_type_sent = error_type_str
self.next_frame_is_error_event.clear()
self.error_details_for_replacement = None
else:
# Generar trama normal
message_to_send, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, ma_value_for_message_generation)
# Preparar texto para display
brix_display_text = ""
if ma_value_for_ui_display < 4.0 and function_type == "Manual" and \
(current_manual_input_type == "mA" or current_manual_input_type == "Voltaje"):
brix_display_text = "Error (Sub 4mA)"
else:
brix_display_text = Utils.format_brix_display(target_brix)
# Actualizar GUI (StringVars son thread-safe para .set())
self.current_brix_var.set(brix_display_text)
self.current_ma_var.set(Utils.format_ma_display(ma_value_for_ui_display))
self.current_voltage_var.set(ProtocolHandler.format_voltage_display(voltage_value_display))
# Agregar punto de datos al gráfico (desde el thread GUI)
self.frame.after(0, lambda b=target_brix, m=ma_value_for_ui_display: self.add_data_point(b, m))
# --- Enviar la trama (normal o de error) ---
if message_to_send: # Si hay algo que enviar (no es "Trama Faltante" de reemplazo)
try:
if conn_type == "TCP-Server":
if not self.connection_manager.is_client_connected():
if not hasattr(self, '_waiting_for_client_logged') or not self._waiting_for_client_logged:
port_to_log = self.shared_config['config_manager'].get_connection_params(current_config_values)['port']
Utils.log_message(self.comm_log_text, f"TCP Server: Esperando cliente en puerto {port_to_log}...")
self._waiting_for_client_logged = True
if self.connection_manager.accept_client(timeout=0.05):
Utils.log_message(self.comm_log_text, f"TCP Server: Cliente conectado desde {self.connection_manager.client_address}")
client_info = f"{self.connection_manager.client_address[0]}:{self.connection_manager.client_address[1]}"
self.shared_config['client_connected_var'].set(client_info)
self._waiting_for_client_logged = False
elif not self.connection_manager.is_client_connected() and \
self.shared_config['client_connected_var'].get() != "Esperando...":
self.shared_config['client_connected_var'].set("Esperando...")
log_content = ProtocolHandler.format_for_display(message_to_send, hex_non_printable=True)
if actual_error_type_sent != "Normal" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"):
Utils.log_message(self.comm_log_text, f"{log_prefix_for_send}: Trama '{actual_error_type_sent}'{log_suffix_for_send} -> {log_content}")
else:
Utils.log_message(self.comm_log_text, f"{log_prefix_for_send}: {log_content}")
self.connection_manager.send_data(message_to_send)
if conn_type != "TCP-Server": # No leer respuesta en modo servidor
response = self.connection_manager.read_response(timeout=0.1)
if response and response.strip():
Utils.log_message(self.comm_log_text, f"Respuesta: {ProtocolHandler.format_for_display(response)}")
parsed = ProtocolHandler.parse_adam_message(response)
if parsed:
brix_resp = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix_map, max_brix_map)
Utils.log_message(self.comm_log_text,
f" -> Addr: {parsed['address']}, "
f"mA: {parsed['ma']:.3f}, "
f"Brix: {brix_resp:.3f}")
except self.connection_manager.ClientDisconnectedError:
Utils.log_message(self.comm_log_text, "TCP Server: Cliente desconectado. Esperando nueva conexión.")
if conn_type == "TCP-Server":
self.shared_config['client_connected_var'].set("Esperando...")
self._waiting_for_client_logged = False
except Exception as e:
Utils.log_message(self.comm_log_text, f"Error en comunicación ({conn_type}): {e}")
self.frame.after(0, self.stop_simulation_error)
break
elif actual_error_type_sent == "Trama Faltante (Omitir Envío)" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"):
# Loguear que se omitió una trama debido al reemplazo por "Trama Faltante"
Utils.log_message(self.comm_log_text, f"{log_prefix_for_send}: Simulación de '{actual_error_type_sent}'{log_suffix_for_send}. No se envió trama.")
self.simulation_step += 1
time.sleep(sample_period)
except Exception as e: # Catches errors in parameter fetching or main loop logic
Utils.log_message(self.event_log_text, f"Error en simulación: {e}")
if self.simulating: # Ensure stop is called only if an error occurs while simulating
self.frame.after(0, self.stop_simulation_error)
def stop_simulation_error(self):
"""Detiene la simulación debido a un error y muestra mensaje"""
if self.simulating: # Solo actuar si la simulación estaba activa
messagebox.showerror("Error de Simulación", "Error durante la simulación. Simulación detenida.")
self.stop_simulation() # Llama al método normal de parada
def generate_erroneous_message_logic(self, error_type, adam_address, base_ma_value):
"""Genera la trama (bytes) según el tipo de error."""
message_bytes = None
log_message_suffix = ""
if error_type == "ID Erróneo":
wrong_adam_address = "99" if adam_address != "99" else "98"
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(wrong_adam_address, base_ma_value)
log_message_suffix = f" (ID cambiado a {wrong_adam_address})"
elif error_type == "Valor Fuera de Escala (mA)":
out_of_scale_ma = 2.500 if random.random() < 0.5 else 22.500
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, out_of_scale_ma)
log_message_suffix = f" (valor mA: {out_of_scale_ma:.3f})"
elif error_type == "Checksum Erróneo":
message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value)
log_message_suffix = " (checksum incorrecto)"
elif error_type == "Longitud Errónea (Aleatoria)":
base_msg_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value)
if len(base_msg_bytes) > 1:
if random.choice([True, False]): # Acortar
cut_len = random.randint(1, max(1, len(base_msg_bytes) // 2))
message_bytes = base_msg_bytes[:-cut_len]
log_message_suffix = f" (longitud acortada en {cut_len} bytes)"
else: # Alargar
add_len = random.randint(1, 5) # Aumentado un poco el largo posible
garbage = bytes([random.randint(32, 126) for _ in range(add_len)])
message_bytes = base_msg_bytes + garbage # Podría ser al final o en medio
log_message_suffix = f" (longitud aumentada en {add_len} bytes)"
else:
message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value)
log_message_suffix = " (longitud errónea -> fallback a checksum incorrecto)"
elif error_type == "Trama Faltante (Omitir Envío)":
log_message_suffix = " (trama omitida)"
return None, log_message_suffix
elif error_type == "Ninguno": # Enviar trama normal
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value)
log_message_suffix = " (trama normal)"
else:
Utils.log_message(self.comm_log_text, f"Error Sim: Tipo de error '{error_type}' desconocido.")
return None, f" (tipo de error '{error_type}' desconocido)"
return message_bytes, log_message_suffix
def send_selected_error_manually(self):
"""Manejador del botón 'Enviar Trama Errónea'."""
if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating):
messagebox.showwarning("No Activo", "La simulación de errores manuales requiere modo TCP-Server y simulación activa.")
return
if not self.connection_manager.is_client_connected():
Utils.log_message(self.comm_log_text, "Error Sim: No hay cliente conectado para enviar trama errónea.")
# messagebox.showinfo("Sin Cliente", "No hay cliente conectado para enviar la trama errónea.")
# return # Permitir enviar aunque no haya cliente, el log lo indicará
error_type = self.error_type_var.get()
adam_address, base_ma_value = self.get_current_error_sim_parameters()
message_bytes, log_suffix_from_gen = self.generate_erroneous_message_logic(error_type, adam_address, base_ma_value)
if self.replace_normal_with_error_var.get():
# Programar para reemplazo en el siguiente ciclo de simulación
self.error_details_for_replacement = (message_bytes, log_suffix_from_gen, error_type)
self.next_frame_is_error_event.set()
if error_type == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.comm_log_text, f"Error Sim Manual: Programada OMISIÓN de trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
elif message_bytes:
Utils.log_message(self.comm_log_text, f"Error Sim Manual: Programada trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
else: # Error en generación o tipo desconocido
Utils.log_message(self.comm_log_text, f"Error Sim Manual: No se pudo programar trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
else:
# Enviar inmediatamente como trama adicional
if message_bytes:
try:
self.connection_manager.send_data(message_bytes)
Utils.log_message(self.comm_log_text, f"Error Sim Manual (Adicional): Trama '{error_type}'{log_suffix_from_gen} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}")
except Exception as e:
Utils.log_message(self.comm_log_text, f"Error Sim Manual (Adicional): Fallo al enviar trama: {e}")
elif error_type == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.comm_log_text, f"Error Sim Manual (Adicional): Simulación de '{error_type}'{log_suffix_from_gen}. No se envió trama adicional.")
# else: Ya logueado por generate_erroneous_message_logic si message_bytes es None y no es "Trama Faltante"
def toggle_random_errors(self):
"""Activa o desactiva el envío de errores aleatorios."""
# self.random_error_var.get() refleja el nuevo estado del checkbox debido al clic del usuario
can_actually_start_random_errors = (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating)
if self.random_error_var.get(): # Si el usuario intenta activar los errores aleatorios
if not can_actually_start_random_errors:
Utils.log_message(self.event_log_text, "Error Sim: Errores aleatorios solo en TCP-Server con simulación activa.")
self.random_error_var.set(False) # Forzar a False ya que las condiciones no se cumplen
# El timer no se iniciará. update_error_controls_state() al final se encargará.
else: # Las condiciones se cumplen, iniciar el timer si no está ya activo
try:
interval_val = float(self.random_error_interval_var.get())
if interval_val <= 0:
messagebox.showerror("Error de Intervalo", "El intervalo para errores aleatorios debe ser un número positivo.")
self.random_error_var.set(False)
self.update_error_controls_state()
return
except ValueError:
messagebox.showerror("Error de Intervalo", "Valor inválido para el intervalo de errores aleatorios.")
self.random_error_var.set(False)
self.update_error_controls_state()
return
# Las condiciones se cumplen, iniciar el timer si no está ya activo
if self.random_error_timer is None or not self.random_error_timer.is_alive():
self.random_error_timer_stop_event.clear()
self.random_error_timer = threading.Thread(target=self._random_error_loop, args=(interval_val,), daemon=True)
self.random_error_timer.start()
else: # Si el usuario intenta desactivar los errores aleatorios (el checkbox ahora está desmarcado)
if self.random_error_timer and self.random_error_timer.is_alive():
Utils.log_message(self.event_log_text, "Error Sim: Deteniendo envío de errores aleatorios.")
self.random_error_timer_stop_event.set()
# No es necesario join aquí, se hará en stop_simulation o al cerrar.
# Actualizar siempre el estado de los controles al final, basado en el estado final de self.random_error_var
self.update_error_controls_state()
def _random_error_loop(self, initial_interval_s):
"""Bucle del hilo que envía errores aleatorios."""
possible_error_types = [val for val in self.error_type_combo['values'] if val != "Ninguno"]
if not possible_error_types: return
current_interval = initial_interval_s
Utils.log_message(self.event_log_text, f"Error Sim: Hilo de errores aleatorios iniciado con intervalo {current_interval:.2f}s.")
while not self.random_error_timer_stop_event.is_set():
if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating and self.connection_manager.is_client_connected()):
self.random_error_timer_stop_event.wait(1.0) # Esperar si no hay cliente o no está activo
continue
selected_random_error = random.choice(possible_error_types)
adam_address, base_ma_value = self.get_current_error_sim_parameters()
message_bytes, log_suffix = self.generate_erroneous_message_logic(selected_random_error, adam_address, base_ma_value)
if self.replace_normal_with_error_var.get():
# Programar el error para que reemplace la siguiente trama normal
self.error_details_for_replacement = (message_bytes, log_suffix, selected_random_error)
self.next_frame_is_error_event.set()
# El log de este envío se hará en run_simulation cuando efectivamente se envíe/omita
Utils.log_message(self.comm_log_text, f"Error Sim Aleatorio: Programada trama '{selected_random_error}'{log_suffix} para reemplazo.")
else:
# Enviar el error inmediatamente, además de las tramas normales
if message_bytes:
try:
self.connection_manager.send_data(message_bytes)
Utils.log_message(self.comm_log_text, f"Error Sim Aleatorio (Adicional): Trama '{selected_random_error}'{log_suffix} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}")
except Exception as e:
Utils.log_message(self.comm_log_text, f"Error Sim Aleatorio (Adicional): Fallo al enviar: {e}")
elif selected_random_error == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.comm_log_text, f"Error Sim Aleatorio (Adicional): Simulación de '{selected_random_error}'{log_suffix}. No se envió trama adicional.")
# Permitir que el intervalo se actualice dinámicamente
try:
new_interval = float(self.random_error_interval_var.get())
if new_interval > 0 and new_interval != current_interval:
current_interval = new_interval
Utils.log_message(self.event_log_text, f"Error Sim: Intervalo de errores aleatorios actualizado a {current_interval:.2f}s.")
except ValueError:
pass # Mantener el intervalo actual si el nuevo valor es inválido
self.random_error_timer_stop_event.wait(timeout=current_interval)
Utils.log_message(self.event_log_text, "Error Sim: Hilo de errores aleatorios detenido.")
def add_data_point(self, brix_value, ma_value):
"""Agrega un punto de datos al gráfico"""
current_time = time.time() - self.start_time
self.time_data.append(current_time)
self.brix_data.append(brix_value)
self.ma_data.append(ma_value)
if hasattr(self, 'graph_update_callback'):
self.graph_update_callback()
def clear_graph(self):
"""Limpia los datos del gráfico"""
Utils.clear_graph_data(self.time_data, self.brix_data, self.ma_data)
self.start_time = time.time() # noqa: F841
Utils.log_message(self.event_log_text, "Gráfico del simulador limpiado.")
if hasattr(self, 'graph_update_callback'):
self.graph_update_callback()
def clear_comm_log(self):
"""Limpia el log de comunicación del simulador."""
Utils.clear_log_widget(self.comm_log_text)
Utils.log_message(self.event_log_text, "Log de comunicación del simulador limpiado.")
def _set_entries_state(self, state):
"""Habilita/deshabilita los controles durante la simulación"""
sim_specific_widgets = [
self.adam_address_entry,
self.function_type_combo,
self.cycle_time_entry,
self.samples_per_cycle_entry
]
# No deshabilitar controles de modo manual aquí, se manejan en on_function_type_change
Utils.set_widgets_state(sim_specific_widgets, state)
if 'shared_widgets' in self.shared_config:
Utils.set_widgets_state(self.shared_config['shared_widgets'], state)
# self.update_error_controls_state() # El estado de los controles de error depende también de self.simulating
def get_config(self):
"""Obtiene la configuración actual del simulador"""
return {
'adam_address': self.adam_address_var.get(),
'function_type': self.function_type_var.get(),
'cycle_time': self.cycle_time_var.get(),
'samples_per_cycle': self.samples_per_cycle_var.get(),
'manual_input_type': self.manual_input_type_var.get(),
'manual_value': self.manual_value_var.get(),
'random_error_interval': self.random_error_interval_var.get()
}
def set_config(self, config):
"""Establece la configuración del simulador"""
self.adam_address_var.set(config.get('adam_address', '01'))
self.function_type_var.set(config.get('function_type', 'Lineal'))
self.cycle_time_var.set(config.get('cycle_time', '10.0'))
self.samples_per_cycle_var.set(config.get('samples_per_cycle', '100'))
self.manual_input_type_var.set(config.get('manual_input_type', 'Brix'))
self.manual_value_var.set(config.get('manual_value', '10.0'))
self.random_error_interval_var.set(config.get('random_error_interval', '10.0'))
try:
self.manual_slider_var.set(float(self.manual_value_var.get()))
except ValueError:
# Si el valor no es un float válido, intentar con un default o el valor del tipo
# Esto se manejará mejor en on_manual_input_type_change
pass
self.on_function_type_change() # Esto llamará a on_manual_input_type_change si es necesario
self.update_error_controls_state() # Actualizar estado de controles de error al cargar config
def on_app_close(self):
"""Llamado cuando la aplicación se está cerrando para limpiar recursos."""
if self.simulating:
self.stop_simulation() # Asegura que todo se detenga y limpie correctamente