""" Tab del Trace - Escucha datos de un dispositivo Maselli real """ import tkinter as tk from tkinter import ttk, scrolledtext, messagebox import threading import time import csv from collections import deque from datetime import datetime import sys # Add sys import import os # Add os import # If this script is run directly, add the parent directory to sys.path # to allow imports of modules like protocol_handler, connection_manager, utils if __name__ == "__main__" and __package__ is None: sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from protocol_handler import ProtocolHandler from connection_manager import ConnectionManager from utils import Utils class TraceTab: def __init__(self, parent_frame, shared_config): self.frame = parent_frame self.shared_config = shared_config # Estado del trace self.tracing = False self.trace_thread = None self.connection_manager = ConnectionManager() # Archivo CSV self.csv_file = None self.csv_writer = None # Datos para el gráfico (ahora con mA también) self.max_points = 100 self.time_data = deque(maxlen=self.max_points) self.brix_data = deque(maxlen=self.max_points) self.ma_data = deque(maxlen=self.max_points) # Nueva línea para mA self.start_time = time.time() self.create_widgets() def create_widgets(self): """Crea los widgets del tab trace""" # Control Frame control_frame = ttk.LabelFrame(self.frame, text="Control Trace") control_frame.grid(row=0, column=0, columnspan=2, padx=10, pady=5, sticky="ew") self.start_button = ttk.Button(control_frame, text="Iniciar Trace", command=self.start_trace) self.start_button.pack(side=tk.LEFT, padx=5) self.stop_button = ttk.Button(control_frame, text="Detener Trace", command=self.stop_trace, state=tk.DISABLED) self.stop_button.pack(side=tk.LEFT, padx=5) self.clear_graph_button = ttk.Button(control_frame, text="Limpiar Gráfico", command=self.clear_graph) self.clear_graph_button.pack(side=tk.LEFT, padx=5) ttk.Label(control_frame, text="Archivo CSV:").pack(side=tk.LEFT, padx=(20, 5)) self.csv_filename_var = tk.StringVar(value="Sin archivo") ttk.Label(control_frame, textvariable=self.csv_filename_var).pack(side=tk.LEFT, padx=5) # Display Frame display_frame = ttk.LabelFrame(self.frame, text="Último Valor Recibido") display_frame.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky="ew") ttk.Label(display_frame, text="Timestamp:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.timestamp_var = tk.StringVar(value="---") ttk.Label(display_frame, textvariable=self.timestamp_var, font=("Courier", 12)).grid(row=0, column=1, padx=5, pady=5, sticky="w") ttk.Label(display_frame, text="Dirección:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.address_var = tk.StringVar(value="--") ttk.Label(display_frame, textvariable=self.address_var, font=("Courier", 12)).grid(row=0, column=3, padx=5, pady=5, sticky="w") ttk.Label(display_frame, text="Valor mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w") self.ma_var = tk.StringVar(value="---") ttk.Label(display_frame, textvariable=self.ma_var, font=("Courier", 12, "bold"), foreground="red").grid(row=1, column=1, padx=5, pady=5, sticky="w") ttk.Label(display_frame, text="Valor Brix:").grid(row=1, column=2, padx=5, pady=5, sticky="w") self.brix_var = tk.StringVar(value="---") ttk.Label(display_frame, textvariable=self.brix_var, font=("Courier", 12, "bold"), foreground="blue").grid(row=1, column=3, padx=5, pady=5, sticky="w") # Statistics Frame stats_frame = ttk.LabelFrame(self.frame, text="Estadísticas") stats_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="ew") ttk.Label(stats_frame, text="Mensajes recibidos:").grid(row=0, column=0, padx=5, pady=5, sticky="w") self.msg_count_var = tk.StringVar(value="0") ttk.Label(stats_frame, textvariable=self.msg_count_var).grid(row=0, column=1, padx=5, pady=5, sticky="w") ttk.Label(stats_frame, text="Errores checksum:").grid(row=0, column=2, padx=5, pady=5, sticky="w") self.checksum_errors_var = tk.StringVar(value="0") ttk.Label(stats_frame, textvariable=self.checksum_errors_var).grid(row=0, column=3, padx=5, pady=5, sticky="w") # Log Frame log_frame = ttk.LabelFrame(self.frame, text="Log de Recepción") log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") self.log_text = scrolledtext.ScrolledText(log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED) self.log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True) # Configurar pesos self.frame.columnconfigure(0, weight=1) self.frame.columnconfigure(1, weight=1) self.frame.rowconfigure(3, weight=1) # Contadores self.message_count = 0 self.checksum_error_count = 0 def get_graph_frame(self): """Crea y retorna el frame para el gráfico""" graph_frame = ttk.LabelFrame(self.frame, text="Gráfico Trace (Brix y mA)") graph_frame.grid(row=4, column=0, columnspan=2, padx=10, pady=5, sticky="nsew") self.frame.rowconfigure(4, weight=1) return graph_frame def start_trace(self): """Inicia el modo trace""" if self.tracing: messagebox.showwarning("Advertencia", "El trace ya está en curso.") return # Verificar si el tipo de conexión global es compatible global_conn_type = self.shared_config['connection_type_var'].get() if global_conn_type == "TCP-Server": messagebox.showerror("Modo No Compatible", "El modo Trace no es compatible cuando el tipo de conexión global es TCP-Server.") return # Crear archivo CSV csv_filename = Utils.create_csv_filename("maselli_trace") try: self.csv_file = open(csv_filename, 'w', newline='', encoding='utf-8') self.csv_writer = csv.writer(self.csv_file) self.csv_writer.writerow(['Timestamp', 'Address', 'mA', 'Brix', 'Checksum_Valid', 'Raw_Message']) self.csv_filename_var.set(csv_filename) except Exception as e: messagebox.showerror("Error", f"No se pudo crear archivo CSV: {e}") return # Abrir conexión try: # Construct a dictionary of current config values for get_connection_params current_config_values = { 'connection_type': self.shared_config['connection_type_var'].get(), 'com_port': self.shared_config['com_port_var'].get(), 'baud_rate': self.shared_config['baud_rate_var'].get(), 'ip_address': self.shared_config['ip_address_var'].get(), 'port': self.shared_config['port_var'].get(), } conn_type = current_config_values['connection_type'] conn_params = self.shared_config['config_manager'].get_connection_params(current_config_values) self.connection_manager.open_connection(conn_type, conn_params) Utils.log_message(self.log_text, f"Conexión {conn_type} abierta para trace.") except Exception as e: messagebox.showerror("Error de Conexión", str(e)) if self.csv_file: self.csv_file.close() return # Resetear contadores self.message_count = 0 self.checksum_error_count = 0 self.msg_count_var.set("0") self.checksum_errors_var.set("0") self.tracing = True self.start_time = time.time() self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self._set_entries_state(tk.DISABLED) # Iniciar thread de recepción self.trace_thread = threading.Thread(target=self.run_trace, daemon=True) self.trace_thread.start() Utils.log_message(self.log_text, "Trace iniciado.") def stop_trace(self): """Detiene el modo trace""" if not self.tracing: return self.tracing = False # Esperar a que termine el thread if self.trace_thread and self.trace_thread.is_alive(): self.trace_thread.join(timeout=2.0) # Cerrar conexión self.connection_manager.close_connection() Utils.log_message(self.log_text, "Conexión cerrada.") # Cerrar archivo CSV if self.csv_file: self.csv_file.close() self.csv_file = None self.csv_writer = None Utils.log_message(self.log_text, f"Archivo CSV guardado: {self.csv_filename_var.get()}") self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self._set_entries_state(tk.NORMAL) Utils.log_message(self.log_text, "Trace detenido.") Utils.log_message(self.log_text, f"Total mensajes: {self.message_count}, Errores checksum: {self.checksum_error_count}") def run_trace(self): """Thread principal para recepción de datos""" buffer = bytearray() # Cambiar buffer a bytearray while self.tracing: try: # Leer datos disponibles data = self.connection_manager.read_data_non_blocking() if data: buffer.extend(data) # Usar extend para bytearray # Las condiciones de búsqueda ahora deben usar bytes while b'\r' in buffer or b'\n' in buffer or len(buffer) >= 10: # Encontrar el primer terminador end_idx = -1 # Iterar sobre los valores de byte for i, byte_val in enumerate(buffer): if byte_val == ord(b'\r') or byte_val == ord(b'\n'): end_idx = i + 1 break # Si no hay terminador pero el buffer es largo, buscar mensaje completo if end_idx == -1 and len(buffer) >= 10: # Verificar si hay un mensaje ADAM completo # Heurística: si empieza con '#' o parece un valor ADAM # Decodificar solo la parte necesaria para la heurística is_adam_like = False try: temp_str_for_check = buffer[:10].decode('ascii', errors='ignore') if temp_str_for_check.startswith('#') or \ (len(temp_str_for_check) >= 8 and temp_str_for_check[2:8].replace('.', '').isdigit()): is_adam_like = True except: pass if is_adam_like: end_idx = 10 # Longitud de un mensaje ADAM sin terminador explícito if len(buffer) > 10 and (buffer[10] == ord(b'\r') or buffer[10] == ord(b'\n')): end_idx = 11 if end_idx > 0: message_bytes = bytes(buffer[:end_idx]) # Extraer como bytes buffer = buffer[end_idx:] # Procesar mensaje si tiene contenido message_str = message_bytes.decode('ascii', errors='ignore') # Decodificar a string if message_str.strip(): # Procesar si la cadena decodificada tiene contenido self._process_message(message_str) else: break except Exception as e: if self.tracing: Utils.log_message(self.log_text, f"Error en trace: {e}") break # Pequeña pausa para no consumir demasiado CPU if not data: time.sleep(0.01) def _process_message(self, message): """Procesa un mensaje recibido""" # Log del mensaje raw display_msg = ProtocolHandler.format_for_display(message) Utils.log_message(self.log_text, f"Recibido: {display_msg}") # Parsear mensaje parsed = ProtocolHandler.parse_adam_message(message) if parsed: # Obtener valores de mapeo min_brix = float(self.shared_config['min_brix_map_var'].get()) max_brix = float(self.shared_config['max_brix_map_var'].get()) ma_value = parsed['ma'] brix_value = ProtocolHandler.ma_to_brix(ma_value, min_brix, max_brix) timestamp = datetime.now() # Actualizar contadores self.message_count += 1 self.msg_count_var.set(str(self.message_count)) if not parsed.get('checksum_valid', True): self.checksum_error_count += 1 self.checksum_errors_var.set(str(self.checksum_error_count)) # Actualizar display self.timestamp_var.set(timestamp.strftime("%H:%M:%S.%f")[:-3]) self.address_var.set(parsed['address']) self.ma_var.set(Utils.format_ma_display(ma_value)) self.brix_var.set(Utils.format_brix_display(brix_value)) # Log con detalles checksum_status = "OK" if parsed.get('checksum_valid', True) else "ERROR" Utils.log_message(self.log_text, f" -> Addr: {parsed['address']}, " f"mA: {ma_value:.3f}, " f"Brix: {brix_value:.3f}, " f"Checksum: {checksum_status}") # Guardar en CSV if self.csv_writer: self.csv_writer.writerow([ timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], parsed['address'], f"{ma_value:.3f}", f"{brix_value:.3f}", parsed.get('checksum_valid', True), display_msg ]) if self.csv_file: self.csv_file.flush() # Agregar al gráfico current_time = time.time() - self.start_time self.time_data.append(current_time) self.brix_data.append(brix_value) self.ma_data.append(ma_value) # Agregar también mA # Actualizar gráfico if hasattr(self, 'graph_update_callback'): self.frame.after(0, self.graph_update_callback) else: # Mensaje no válido Utils.log_message(self.log_text, f" -> Mensaje no válido ADAM") def clear_graph(self): """Limpia los datos del gráfico""" Utils.clear_graph_data(self.time_data, self.brix_data, self.ma_data) self.start_time = time.time() Utils.log_message(self.log_text, "Gráfico limpiado.") if hasattr(self, 'graph_update_callback'): self.graph_update_callback() def _set_entries_state(self, state): """Habilita/deshabilita los controles durante el trace""" # Deshabilitar controles compartidos if 'shared_widgets' in self.shared_config: Utils.set_widgets_state(self.shared_config['shared_widgets'], state) def get_config(self): """Obtiene la configuración actual (no hay configuración específica para trace)""" return {} def set_config(self, config): """Establece la configuración (no hay configuración específica para trace)""" pass