MaselliSimulatorApp/tabs/simulator_tab.py

832 lines
44 KiB
Python

"""
Tab del Simulador - Genera valores de prueba en protocolo ADAM
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import random
import time
import math
from collections import deque
from protocol_handler import ProtocolHandler
from connection_manager import ConnectionManager
from utils import Utils
class SimulatorTab:
def __init__(self, parent_frame, shared_config):
self.frame = parent_frame
self.shared_config = shared_config
# Estado del simulador
self.simulating = False
self.simulation_thread = None
self.simulation_step = 0
self.connection_manager = ConnectionManager()
# Datos para el gráfico
self.max_points = 100
self.time_data = deque(maxlen=self.max_points)
self.brix_data = deque(maxlen=self.max_points)
self.ma_data = deque(maxlen=self.max_points)
self.start_time = time.time()
# Cargar configuración inicial para obtener valores por defecto para StringVars
initial_config = self.shared_config['config_manager'].load_config()
self.adam_address_var = tk.StringVar(value=initial_config.get('adam_address', '01'))
self.function_type_var = tk.StringVar(value=initial_config.get('function_type', 'Lineal'))
self.cycle_time_var = tk.StringVar(value=initial_config.get('cycle_time', '10.0'))
self.samples_per_cycle_var = tk.StringVar(value=initial_config.get('samples_per_cycle', '100'))
# Configuración para modo manual y errores
self.manual_input_type_var = tk.StringVar(value=initial_config.get('manual_input_type', 'Brix'))
self.manual_value_var = tk.StringVar(value=initial_config.get('manual_value', '10.0'))
try:
manual_value_float = float(initial_config.get('manual_value', '10.0'))
except ValueError:
manual_value_float = 10.0 # Fallback
self.manual_slider_var = tk.DoubleVar(value=manual_value_float)
self.current_brix_var = tk.StringVar(value="---")
self.current_ma_var = tk.StringVar(value="--.-- mA")
self.current_voltage_var = tk.StringVar(value="-.-- V")
# Para simulación de errores
self.random_error_timer = None
self.random_error_timer_stop_event = threading.Event()
self.replace_normal_with_error_var = tk.BooleanVar(value=False)
self.next_frame_is_error_event = threading.Event()
self.random_error_interval_var = tk.StringVar(value=initial_config.get('random_error_interval', '10.0'))
self.error_details_for_replacement = None # (message_bytes, log_suffix, error_type_str)
self.actual_graph_frame_container = None # Inicializar ANTES de create_widgets
self.create_widgets()
def create_widgets(self):
"""Crea los widgets del tab simulador"""
config_frame = ttk.LabelFrame(self.frame, text="Configuración Simulador")
config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2)
ttk.Label(config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.adam_address_entry = ttk.Entry(config_frame, textvariable=self.adam_address_var, width=5)
self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.function_type_combo = ttk.Combobox(config_frame, textvariable=self.function_type_var,
values=["Lineal", "Sinusoidal", "Manual"],
state="readonly", width=10)
self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
self.function_type_combo.bind("<<ComboboxSelected>>", self.on_function_type_change)
ttk.Label(config_frame, text="Tiempo Ciclo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w")
self.cycle_time_entry = ttk.Entry(config_frame, textvariable=self.cycle_time_var, width=8)
self.cycle_time_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew")
ttk.Label(config_frame, text="Muestras/ciclo:").grid(row=0, column=6, padx=5, pady=5, sticky="w")
self.samples_per_cycle_entry = ttk.Entry(config_frame, textvariable=self.samples_per_cycle_var, width=8)
self.samples_per_cycle_entry.grid(row=0, column=7, padx=5, pady=5, sticky="ew")
manual_frame = ttk.LabelFrame(config_frame, text="Modo Manual")
manual_frame.grid(row=1, column=0, columnspan=8, padx=5, pady=5, sticky="ew")
ttk.Label(manual_frame, text="Entrada Por:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.manual_input_type_combo = ttk.Combobox(manual_frame, textvariable=self.manual_input_type_var,
values=["Brix", "mA", "Voltaje"], state="readonly", width=8)
self.manual_input_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.manual_input_type_combo.bind("<<ComboboxSelected>>", self.on_manual_input_type_change)
self.manual_value_label = ttk.Label(manual_frame, text="Valor Brix:")
self.manual_value_label.grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.manual_value_entry = ttk.Entry(manual_frame, textvariable=self.manual_value_var, width=10, state=tk.DISABLED)
self.manual_value_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
self.manual_value_entry.bind('<Return>', lambda e: self.update_slider_from_entry())
self.manual_value_entry.bind('<FocusOut>', lambda e: self.update_slider_from_entry())
self.manual_slider = ttk.Scale(manual_frame, orient=tk.HORIZONTAL,
variable=self.manual_slider_var, command=self.on_slider_change,
state=tk.DISABLED, length=200)
self.manual_slider.grid(row=1, column=2, padx=5, pady=5, sticky="ew")
manual_frame.columnconfigure(2, weight=1)
controls_frame = ttk.LabelFrame(self.frame, text="Control Simulación")
controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.clear_cyclic_log_button = ttk.Button(controls_frame, text="Limpiar Log Cíclico", command=self.clear_cyclic_log)
self.clear_cyclic_log_button.pack(side=tk.LEFT, padx=5)
self.clear_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_graph)
self.clear_graph_button.pack(side=tk.LEFT, padx=5)
display_frame = ttk.LabelFrame(self.frame, text="Valores Actuales")
display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="nsew")
ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, textvariable=self.current_brix_var,
font=("Courier", 14, "bold")).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, textvariable=self.current_ma_var,
font=("Courier", 14, "bold")).grid(row=1, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="Voltaje:").grid(row=2, column=0, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, textvariable=self.current_voltage_var,
font=("Courier", 14, "bold")).grid(row=2, column=1, padx=5, pady=5, sticky="w")
event_log_frame = ttk.LabelFrame(self.frame, text="Log de Eventos")
event_log_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.event_log_text = scrolledtext.ScrolledText(event_log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.event_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
comm_and_graph_parent_frame = ttk.Frame(self.frame)
comm_and_graph_parent_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
comm_and_graph_parent_frame.columnconfigure(0, weight=1)
comm_and_graph_parent_frame.columnconfigure(1, weight=1)
comm_and_graph_parent_frame.rowconfigure(0, weight=1)
cyclic_log_frame = ttk.LabelFrame(comm_and_graph_parent_frame, text="Log Cíclico (Simulador)")
cyclic_log_frame.grid(row=0, column=0, padx=(0, 5), pady=0, sticky="nsew")
cyclic_log_frame.rowconfigure(0, weight=1)
cyclic_log_frame.columnconfigure(0, weight=1)
self.cyclic_log_text = scrolledtext.ScrolledText(cyclic_log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.cyclic_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
self.actual_graph_frame_container = ttk.LabelFrame(comm_and_graph_parent_frame, text="Gráfico Simulador")
self.actual_graph_frame_container.grid(row=0, column=1, padx=(5, 0), pady=0, sticky="nsew")
self._setup_error_simulation_ui()
self.frame.columnconfigure(0, weight=1)
self.frame.columnconfigure(1, weight=1)
self.frame.rowconfigure(2, weight=1)
self.frame.rowconfigure(3, weight=3)
self.frame.rowconfigure(4, weight=0)
self.on_function_type_change()
def get_graph_frame(self):
return self.actual_graph_frame_container
def _setup_error_simulation_ui(self):
error_frame = ttk.LabelFrame(self.frame, text="Simulación de Errores (Modo TCP Server)")
error_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=10, sticky="ew")
ttk.Label(error_frame, text="Tipo de Error:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.error_type_var = tk.StringVar(value="Ninguno")
self.error_type_combo = ttk.Combobox(
error_frame,
textvariable=self.error_type_var,
state="disabled",
values=[
"Ninguno",
"ID Erróneo",
"Valor Fuera de Escala (mA)",
"Checksum Erróneo",
"Longitud Errónea (Aleatoria)",
"Trama Faltante (Omitir Envío)"
]
)
self.error_type_combo.current(0)
self.error_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.send_error_button = ttk.Button(error_frame, text="Enviar Trama Errónea",
command=self.send_selected_error_manually, state=tk.DISABLED)
self.send_error_button.grid(row=0, column=2, padx=5, pady=5)
self.random_error_var = tk.BooleanVar(value=False)
self.random_error_check = ttk.Checkbutton(
error_frame,
text="Errores Aleatorios",
variable=self.random_error_var,
command=self.toggle_random_errors,
state="disabled"
)
self.random_error_check.grid(row=1, column=0, columnspan=1, padx=5, pady=5, sticky="w")
self.replace_with_error_check = ttk.Checkbutton(
error_frame,
text="Reemplazar trama normal con error",
variable=self.replace_normal_with_error_var,
state="disabled"
)
self.replace_with_error_check.grid(row=1, column=2, padx=(10,5), pady=5, sticky="w")
ttk.Label(error_frame, text="Intervalo Err. Aleat. (s):").grid(row=2, column=0, padx=5, pady=5, sticky="w")
self.random_error_interval_entry = ttk.Entry(
error_frame,
textvariable=self.random_error_interval_var,
width=8,
state="disabled"
)
self.random_error_interval_entry.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
error_frame.columnconfigure(1, weight=1)
self.update_error_controls_state()
def update_error_controls_state(self):
if not hasattr(self, 'error_type_combo'):
return
is_tcp_server_mode = self.shared_config['connection_type_var'].get() == "TCP-Server"
enable_controls = is_tcp_server_mode and self.simulating
new_state_tk = tk.NORMAL if enable_controls else tk.DISABLED
new_state_str = "normal" if enable_controls else "disabled"
self.error_type_combo.config(state=new_state_tk if is_tcp_server_mode else tk.DISABLED)
self.send_error_button.config(state=new_state_tk)
self.random_error_check.config(state=new_state_str)
interval_entry_state_tk = tk.NORMAL if enable_controls and self.random_error_var.get() else tk.DISABLED
self.random_error_interval_entry.config(state=interval_entry_state_tk)
self.replace_with_error_check.config(state=new_state_str)
if not enable_controls and self.random_error_var.get():
self.random_error_var.set(False)
self.toggle_random_errors()
def get_current_error_sim_parameters(self):
adam_address = self.adam_address_var.get()
base_ma_value = 12.345
if self.function_type_var.get() == "Manual":
try:
manual_val = float(self.manual_value_var.get())
input_type = self.manual_input_type_var.get()
if input_type == "Brix":
min_b = float(self.shared_config['min_brix_map_var'].get())
max_b = float(self.shared_config['max_brix_map_var'].get())
base_ma_value = ProtocolHandler.scale_to_ma(manual_val, min_b, max_b) # noqa: E701
elif input_type == "mA":
base_ma_value = manual_val
elif input_type == "Voltaje":
base_ma_value = ProtocolHandler.voltage_to_ma(manual_val)
except (ValueError, KeyError, TypeError):
Utils.log_message(self.event_log_text, "Error Sim: Usando valor mA base por defecto para error.")
return adam_address, base_ma_value
def on_function_type_change(self, event=None):
func_type = self.function_type_var.get()
is_manual_mode = (func_type == "Manual")
if self.simulating:
self.stop_simulation()
manual_specific_state = tk.NORMAL if is_manual_mode else tk.DISABLED
self.manual_input_type_combo.config(state=manual_specific_state)
self.manual_value_entry.config(state=manual_specific_state)
self.manual_slider.config(state=manual_specific_state)
self.cycle_time_entry.config(state=tk.NORMAL)
self.samples_per_cycle_entry.config(state=tk.NORMAL)
if is_manual_mode:
self.on_manual_input_type_change()
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
else:
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.update_error_controls_state()
def on_manual_input_type_change(self, event=None):
input_type = self.manual_input_type_var.get()
min_val, max_val, default_val, label_text, precision = 0, 100, 10.0, "Valor Brix:", 2
if input_type == "Brix":
try:
min_val = float(self.shared_config['min_brix_map_var'].get())
max_val = float(self.shared_config['max_brix_map_var'].get())
if min_val >= max_val: min_val, max_val = 0.0, 80.0
default_val = min_val + (max_val - min_val) / 4
except (ValueError, KeyError, TypeError):
min_val, max_val = 0.0, 80.0
default_val = 10.0
label_text = "Valor Brix:"
precision = 2
elif input_type == "mA":
min_val, max_val = 0.0, 20.0
default_val = 12.0
label_text = "Valor mA:"
precision = 3
elif input_type == "Voltaje":
min_val, max_val = 0.0, 10.0
default_val = 5.0
label_text = "Valor Voltaje:"
precision = 2
self.manual_value_label.config(text=label_text)
self.manual_slider.config(from_=min_val, to=max_val)
try:
current_numeric_val = float(self.manual_value_var.get())
if not (min_val <= current_numeric_val <= max_val):
self.manual_value_var.set(f"{default_val:.{precision}f}")
self.manual_slider_var.set(default_val)
else:
self.manual_slider_var.set(current_numeric_val)
self.manual_value_var.set(f"{current_numeric_val:.{precision}f}")
except ValueError:
self.manual_value_var.set(f"{default_val:.{precision}f}")
self.manual_slider_var.set(default_val)
def on_slider_change(self, value_str):
value = float(value_str)
input_type = self.manual_input_type_var.get()
precision = 2
if input_type == "Brix": precision = 2
elif input_type == "mA": precision = 3
elif input_type == "Voltaje": precision = 2
self.manual_value_var.set(f"{value:.{precision}f}")
def update_slider_from_entry(self):
try:
value = float(self.manual_value_var.get())
input_type = self.manual_input_type_var.get()
min_val, max_val, precision = 0,100,2
if input_type == "Brix":
min_val = float(self.shared_config['min_brix_map_var'].get())
max_val = float(self.shared_config['max_brix_map_var'].get())
if min_val >= max_val: min_val, max_val = 0.0, 80.0
precision = 2
elif input_type == "mA": min_val, max_val, precision = 0.0, 20.0, 3
elif input_type == "Voltaje": min_val, max_val, precision = 0.0, 10.0, 2
value = max(min_val, min(max_val, value))
self.manual_slider_var.set(value)
self.manual_value_var.set(f"{value:.{precision}f}")
except (ValueError, KeyError, TypeError):
current_slider_val = self.manual_slider_var.get()
precision_fallback = 2
if self.manual_input_type_var.get() == "mA": precision_fallback = 3
self.manual_value_var.set(f"{current_slider_val:.{precision_fallback}f}")
def start_simulation(self):
if self.simulating:
messagebox.showwarning("Advertencia", "La simulación ya está en curso.")
return
try:
adam_address = self.adam_address_var.get()
if len(adam_address) != 2:
messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.")
return
cycle_time = float(self.cycle_time_var.get())
if cycle_time <= 0:
messagebox.showerror("Error", "El tiempo de ciclo debe ser mayor que 0.")
return
samples_per_cycle = int(self.samples_per_cycle_var.get())
if samples_per_cycle <= 0:
messagebox.showerror("Error", "Las muestras por ciclo deben ser mayor que 0.")
return
float(self.shared_config['min_brix_map_var'].get())
float(self.shared_config['max_brix_map_var'].get())
except (ValueError, KeyError, TypeError):
messagebox.showerror("Error", "Valores inválidos en la configuración (ADAM, ciclo, muestras o mapeo Brix).")
return
try:
current_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
conn_type = current_config_values['connection_type']
conn_params = self.shared_config['config_manager'].get_connection_params(current_config_values)
_, listening_details = self.connection_manager.open_connection(conn_type, conn_params)
if conn_type == "TCP-Server":
Utils.log_message(self.event_log_text, f"{listening_details} para simulación.")
elif conn_type != "TCP-Server":
Utils.log_message(self.event_log_text, f"Conexión {conn_type} abierta para simulación.")
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
return
self.simulating = True
self.simulation_step = 0
self.start_time = time.time()
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self._set_entries_state(tk.DISABLED)
self.update_error_controls_state()
if conn_type == "TCP-Server":
self.shared_config['client_connected_var'].set("Esperando...")
self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True)
self.simulation_thread.start()
Utils.log_message(self.event_log_text, "Simulación iniciada.")
def stop_simulation(self):
if not self.simulating:
return
self.simulating = False
if self.random_error_timer and self.random_error_timer.is_alive():
self.random_error_timer_stop_event.set()
self.random_error_timer.join(timeout=1.0)
self.random_error_timer = None
self.next_frame_is_error_event.clear()
self.error_details_for_replacement = None
if self.simulation_thread and self.simulation_thread.is_alive():
self.simulation_thread.join(timeout=2.0)
self.connection_manager.close_connection()
Utils.log_message(self.event_log_text, "Conexión cerrada.")
self._set_entries_state(tk.NORMAL)
self.on_function_type_change()
if self.connection_manager.connection_type == "TCP-Server":
self.shared_config['client_connected_var'].set("Ninguno")
Utils.log_message(self.event_log_text, "Simulación detenida.")
self.current_brix_var.set("---")
self.current_ma_var.set("--.-- mA")
self.current_voltage_var.set("-.-- V")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.update_error_controls_state()
def run_simulation(self):
try:
adam_address = self.adam_address_var.get()
min_brix_map = float(self.shared_config['min_brix_map_var'].get())
max_brix_map = float(self.shared_config['max_brix_map_var'].get())
function_type = self.function_type_var.get()
cycle_time = float(self.cycle_time_var.get())
samples_per_cycle = int(self.samples_per_cycle_var.get())
conn_type = self.connection_manager.connection_type
current_config_values = {
'connection_type': self.shared_config['connection_type_var'].get(),
'com_port': self.shared_config['com_port_var'].get(),
'baud_rate': self.shared_config['baud_rate_var'].get(),
'ip_address': self.shared_config['ip_address_var'].get(),
'port': self.shared_config['port_var'].get(),
}
sample_period = cycle_time / samples_per_cycle
while self.simulating:
message_to_send = None
ma_value_for_message_generation = 0.0
target_brix = 0.0
current_manual_input_type = self.manual_input_type_var.get()
if function_type == "Manual":
manual_input_type = self.manual_input_type_var.get()
manual_numeric_value = 0.0
try:
manual_numeric_value = float(self.manual_value_var.get())
except ValueError:
Utils.log_message(self.event_log_text, f"Valor manual inválido: '{self.manual_value_var.get()}'. Usando valor por defecto.")
if manual_input_type == "Brix": manual_numeric_value = min_brix_map
elif manual_input_type == "mA": manual_numeric_value = 4.0
elif manual_input_type == "Voltaje": manual_numeric_value = ProtocolHandler.ma_to_voltage(4.0)
if manual_input_type == "Brix":
target_brix = manual_numeric_value
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
elif manual_input_type == "mA":
ma_value_for_message_generation = manual_numeric_value
target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map)
elif manual_input_type == "Voltaje":
voltage_input = manual_numeric_value
ma_value_for_message_generation = ProtocolHandler.voltage_to_ma(voltage_input)
target_brix = ProtocolHandler.ma_to_brix(ma_value_for_message_generation, min_brix_map, max_brix_map)
elif function_type == "Lineal":
cycle_progress = (self.simulation_step % (2 * samples_per_cycle)) / samples_per_cycle
if cycle_progress > 1.0:
cycle_progress = 2.0 - cycle_progress
target_brix = min_brix_map + (max_brix_map - min_brix_map) * cycle_progress
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
elif function_type == "Sinusoidal":
progress = (self.simulation_step % samples_per_cycle) / samples_per_cycle
phase = progress * 2 * math.pi
sin_val = (math.sin(phase) + 1) / 2
target_brix = min_brix_map + (max_brix_map - min_brix_map) * sin_val
ma_value_for_message_generation = ProtocolHandler.scale_to_ma(target_brix, min_brix_map, max_brix_map)
ma_value_for_ui_display = ma_value_for_message_generation
voltage_value_display = ProtocolHandler.ma_to_voltage(ma_value_for_ui_display)
log_prefix_for_send = "Enviando"
log_suffix_for_send = ""
actual_error_type_sent = "Normal"
if self.next_frame_is_error_event.is_set() and \
self.error_details_for_replacement is not None and \
self.replace_normal_with_error_var.get():
error_msg_bytes, error_log_suffix, error_type_str = self.error_details_for_replacement
message_to_send = error_msg_bytes
log_prefix_for_send = "Error Sim (Reemplazo Programado)"
log_suffix_for_send = error_log_suffix
actual_error_type_sent = error_type_str
self.next_frame_is_error_event.clear()
self.error_details_for_replacement = None
else:
message_to_send, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, ma_value_for_message_generation)
brix_display_text = ""
if ma_value_for_ui_display < 4.0 and function_type == "Manual" and \
(current_manual_input_type == "mA" or current_manual_input_type == "Voltaje"):
brix_display_text = "Error (Sub 4mA)"
else:
brix_display_text = Utils.format_brix_display(target_brix)
self.current_brix_var.set(brix_display_text)
self.current_ma_var.set(Utils.format_ma_display(ma_value_for_ui_display))
self.current_voltage_var.set(ProtocolHandler.format_voltage_display(voltage_value_display))
self.frame.after(0, lambda b=target_brix, m=ma_value_for_ui_display: self.add_data_point(b, m))
if message_to_send:
try:
if conn_type == "TCP-Server":
if not self.connection_manager.is_client_connected():
if not hasattr(self, '_waiting_for_client_logged') or not self._waiting_for_client_logged:
port_to_log = self.shared_config['config_manager'].get_connection_params(current_config_values)['port']
Utils.log_message(self.event_log_text, f"TCP Server: Esperando cliente en puerto {port_to_log}...")
self._waiting_for_client_logged = True
if self.connection_manager.accept_client(timeout=0.05):
Utils.log_message(self.event_log_text, f"TCP Server: Cliente conectado desde {self.connection_manager.client_address}")
client_info = f"{self.connection_manager.client_address[0]}:{self.connection_manager.client_address[1]}"
self.shared_config['client_connected_var'].set(client_info)
self._waiting_for_client_logged = False
elif not self.connection_manager.is_client_connected() and \
self.shared_config['client_connected_var'].get() != "Esperando...":
self.shared_config['client_connected_var'].set("Esperando...")
log_content = ProtocolHandler.format_for_display(message_to_send, hex_non_printable=True)
if actual_error_type_sent != "Normal" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"):
Utils.log_message(self.cyclic_log_text, f"{log_prefix_for_send}: Trama '{actual_error_type_sent}'{log_suffix_for_send} -> {log_content}")
else:
Utils.log_message(self.cyclic_log_text, f"{log_prefix_for_send}: {log_content}")
self.connection_manager.send_data(message_to_send)
if conn_type != "TCP-Server":
response = self.connection_manager.read_response(timeout=0.1)
if response and response.strip():
Utils.log_message(self.cyclic_log_text, f"Respuesta: {ProtocolHandler.format_for_display(response)}")
parsed = ProtocolHandler.parse_adam_message(response)
if parsed:
brix_resp = ProtocolHandler.ma_to_brix(parsed['ma'], min_brix_map, max_brix_map)
Utils.log_message(self.cyclic_log_text,
f" -> Addr: {parsed['address']}, "
f"mA: {parsed['ma']:.3f}, "
f"Brix: {brix_resp:.3f}")
except self.connection_manager.ClientDisconnectedError:
Utils.log_message(self.event_log_text, "TCP Server: Cliente desconectado. Esperando nueva conexión.")
if conn_type == "TCP-Server":
self.shared_config['client_connected_var'].set("Esperando...")
self._waiting_for_client_logged = False
except Exception as e:
Utils.log_message(self.event_log_text, f"Error en comunicación ({conn_type}): {e}")
self.frame.after(0, self.stop_simulation_error)
break
elif actual_error_type_sent == "Trama Faltante (Omitir Envío)" and log_prefix_for_send.startswith("Error Sim (Reemplazo Programado)"):
Utils.log_message(self.cyclic_log_text, f"{log_prefix_for_send}: Simulación de '{actual_error_type_sent}'{log_suffix_for_send}. No se envió trama.")
self.simulation_step += 1
time.sleep(sample_period)
except Exception as e:
Utils.log_message(self.event_log_text, f"Error en simulación: {e}")
if self.simulating:
self.frame.after(0, self.stop_simulation_error)
def stop_simulation_error(self):
if self.simulating:
messagebox.showerror("Error de Simulación", "Error durante la simulación. Simulación detenida.")
self.stop_simulation()
def generate_erroneous_message_logic(self, error_type, adam_address, base_ma_value):
message_bytes = None
log_message_suffix = ""
if error_type == "ID Erróneo":
wrong_adam_address = "99" if adam_address != "99" else "98"
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(wrong_adam_address, base_ma_value)
log_message_suffix = f" (ID cambiado a {wrong_adam_address})"
elif error_type == "Valor Fuera de Escala (mA)":
out_of_scale_ma = 2.500 if random.random() < 0.5 else 22.500
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, out_of_scale_ma)
log_message_suffix = f" (valor mA: {out_of_scale_ma:.3f})"
elif error_type == "Checksum Erróneo":
message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value)
log_message_suffix = " (checksum incorrecto)"
elif error_type == "Longitud Errónea (Aleatoria)":
base_msg_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value)
if len(base_msg_bytes) > 1:
if random.choice([True, False]):
cut_len = random.randint(1, max(1, len(base_msg_bytes) // 2))
message_bytes = base_msg_bytes[:-cut_len]
log_message_suffix = f" (longitud acortada en {cut_len} bytes)"
else:
add_len = random.randint(1, 5)
garbage = bytes([random.randint(32, 126) for _ in range(add_len)])
message_bytes = base_msg_bytes + garbage
log_message_suffix = f" (longitud aumentada en {add_len} bytes)"
else:
message_bytes, _ = ProtocolHandler.create_adam_message_with_bad_checksum(adam_address, base_ma_value)
log_message_suffix = " (longitud errónea -> fallback a checksum incorrecto)"
elif error_type == "Trama Faltante (Omitir Envío)":
log_message_suffix = " (trama omitida)"
return None, log_message_suffix
elif error_type == "Ninguno":
message_bytes, _ = ProtocolHandler.create_adam_message_from_ma(adam_address, base_ma_value)
log_message_suffix = " (trama normal)"
else:
Utils.log_message(self.event_log_text, f"Error Sim: Tipo de error '{error_type}' desconocido.")
return None, f" (tipo de error '{error_type}' desconocido)"
return message_bytes, log_message_suffix
def send_selected_error_manually(self):
if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating):
messagebox.showwarning("No Activo", "La simulación de errores manuales requiere modo TCP-Server y simulación activa.")
return
if not self.connection_manager.is_client_connected():
Utils.log_message(self.event_log_text, "Error Sim: No hay cliente conectado para enviar trama errónea.")
error_type = self.error_type_var.get()
adam_address, base_ma_value = self.get_current_error_sim_parameters()
message_bytes, log_suffix_from_gen = self.generate_erroneous_message_logic(error_type, adam_address, base_ma_value)
if self.replace_normal_with_error_var.get():
self.error_details_for_replacement = (message_bytes, log_suffix_from_gen, error_type)
self.next_frame_is_error_event.set()
if error_type == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.event_log_text, f"Error Sim Manual: Programada OMISIÓN de trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
elif message_bytes:
Utils.log_message(self.event_log_text, f"Error Sim Manual: Programada trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
else:
Utils.log_message(self.event_log_text, f"Error Sim Manual: No se pudo programar trama '{error_type}'{log_suffix_from_gen} para reemplazo.")
else:
if message_bytes:
try:
self.connection_manager.send_data(message_bytes)
Utils.log_message(self.event_log_text, f"Error Sim Manual (Adicional): Trama '{error_type}'{log_suffix_from_gen} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}")
except Exception as e:
Utils.log_message(self.event_log_text, f"Error Sim Manual (Adicional): Fallo al enviar trama: {e}")
elif error_type == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.event_log_text, f"Error Sim Manual (Adicional): Simulación de '{error_type}'{log_suffix_from_gen}. No se envió trama adicional.")
def toggle_random_errors(self):
can_actually_start_random_errors = (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating)
if self.random_error_var.get():
if not can_actually_start_random_errors:
Utils.log_message(self.event_log_text, "Error Sim: Errores aleatorios solo en TCP-Server con simulación activa.")
self.random_error_var.set(False)
else:
try:
interval_val = float(self.random_error_interval_var.get())
if interval_val <= 0:
messagebox.showerror("Error de Intervalo", "El intervalo para errores aleatorios debe ser un número positivo.")
self.random_error_var.set(False)
self.update_error_controls_state()
return
except ValueError:
messagebox.showerror("Error de Intervalo", "Valor inválido para el intervalo de errores aleatorios.")
self.random_error_var.set(False)
self.update_error_controls_state()
return
if self.random_error_timer is None or not self.random_error_timer.is_alive():
self.random_error_timer_stop_event.clear()
self.random_error_timer = threading.Thread(target=self._random_error_loop, args=(interval_val,), daemon=True)
self.random_error_timer.start()
else:
if self.random_error_timer and self.random_error_timer.is_alive():
Utils.log_message(self.event_log_text, "Error Sim: Deteniendo envío de errores aleatorios.")
self.random_error_timer_stop_event.set()
self.update_error_controls_state()
def _random_error_loop(self, initial_interval_s):
possible_error_types = [val for val in self.error_type_combo['values'] if val != "Ninguno"]
if not possible_error_types: return
current_interval = initial_interval_s
Utils.log_message(self.event_log_text, f"Error Sim: Hilo de errores aleatorios iniciado con intervalo {current_interval:.2f}s.")
while not self.random_error_timer_stop_event.is_set():
if not (self.shared_config['connection_type_var'].get() == "TCP-Server" and self.simulating and self.connection_manager.is_client_connected()):
self.random_error_timer_stop_event.wait(1.0)
continue
selected_random_error = random.choice(possible_error_types)
adam_address, base_ma_value = self.get_current_error_sim_parameters()
message_bytes, log_suffix = self.generate_erroneous_message_logic(selected_random_error, adam_address, base_ma_value)
if self.replace_normal_with_error_var.get():
self.error_details_for_replacement = (message_bytes, log_suffix, selected_random_error)
self.next_frame_is_error_event.set()
Utils.log_message(self.cyclic_log_text, f"Error Sim Aleatorio: Programada trama '{selected_random_error}'{log_suffix} para reemplazo.")
else:
if message_bytes:
try:
self.connection_manager.send_data(message_bytes)
Utils.log_message(self.cyclic_log_text, f"Error Sim Aleatorio (Adicional): Trama '{selected_random_error}'{log_suffix} -> {ProtocolHandler.format_for_display(message_bytes, hex_non_printable=True)}")
except Exception as e:
Utils.log_message(self.cyclic_log_text, f"Error Sim Aleatorio (Adicional): Fallo al enviar: {e}")
elif selected_random_error == "Trama Faltante (Omitir Envío)":
Utils.log_message(self.cyclic_log_text, f"Error Sim Aleatorio (Adicional): Simulación de '{selected_random_error}'{log_suffix}. No se envió trama adicional.")
try:
new_interval = float(self.random_error_interval_var.get())
if new_interval > 0 and new_interval != current_interval:
current_interval = new_interval
Utils.log_message(self.event_log_text, f"Error Sim: Intervalo de errores aleatorios actualizado a {current_interval:.2f}s.")
except ValueError:
pass
self.random_error_timer_stop_event.wait(timeout=current_interval)
Utils.log_message(self.event_log_text, "Error Sim: Hilo de errores aleatorios detenido.")
def add_data_point(self, brix_value, ma_value):
current_time = time.time() - self.start_time
self.time_data.append(current_time)
self.brix_data.append(brix_value)
self.ma_data.append(ma_value)
if hasattr(self, 'graph_update_callback'):
self.graph_update_callback()
def clear_graph(self):
Utils.clear_graph_data(self.time_data, self.brix_data, self.ma_data)
self.start_time = time.time()
Utils.log_message(self.event_log_text, "Gráfico del simulador limpiado.")
if hasattr(self, 'graph_update_callback'):
self.graph_update_callback()
def clear_cyclic_log(self):
Utils.clear_log_widget(self.cyclic_log_text)
Utils.log_message(self.event_log_text, "Log cíclico del simulador limpiado.")
def _set_entries_state(self, state):
sim_specific_widgets = [
self.adam_address_entry,
self.function_type_combo,
self.cycle_time_entry,
self.samples_per_cycle_entry
]
Utils.set_widgets_state(sim_specific_widgets, state)
if 'shared_widgets' in self.shared_config:
Utils.set_widgets_state(self.shared_config['shared_widgets'], state)
def get_config(self):
return {
'adam_address': self.adam_address_var.get(),
'function_type': self.function_type_var.get(),
'cycle_time': self.cycle_time_var.get(),
'samples_per_cycle': self.samples_per_cycle_var.get(),
'manual_input_type': self.manual_input_type_var.get(),
'manual_value': self.manual_value_var.get(),
'random_error_interval': self.random_error_interval_var.get()
}
def set_config(self, config):
self.adam_address_var.set(config.get('adam_address', '01'))
self.function_type_var.set(config.get('function_type', 'Lineal'))
self.cycle_time_var.set(config.get('cycle_time', '10.0'))
self.samples_per_cycle_var.set(config.get('samples_per_cycle', '100'))
self.manual_input_type_var.set(config.get('manual_input_type', 'Brix'))
self.manual_value_var.set(config.get('manual_value', '10.0'))
self.random_error_interval_var.set(config.get('random_error_interval', '10.0'))
try:
self.manual_slider_var.set(float(self.manual_value_var.get()))
except ValueError:
pass
self.on_function_type_change()
self.update_error_controls_state()
def on_app_close(self):
if self.simulating:
self.stop_simulation()