MaselliSimulatorApp/MaselliSimulatorApp.py

1210 lines
55 KiB
Python

import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import serial
import socket
import threading
import time
import math
import json
import os
import csv
from datetime import datetime
from collections import deque
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.animation as animation
# Simulador y Trace para Protocolo Maselli (ADAM)
# Soporta conexiones Serial, TCP y UDP
#
# Para cambiar el icono, coloca uno de estos archivos en el mismo directorio:
# - icon.png (recomendado)
# - icon.ico (para Windows)
# - icon.gif
#
# Características:
# - Modo Simulador: Genera valores de prueba en protocolo ADAM
# - Modo Trace: Recibe y registra valores del medidor real
# - Conversión automática mA <-> Brix
# - Registro en CSV con timestamp
# - Gráficos en tiempo real
# - Respuestas del dispositivo mostradas en el log
class MaselliSimulatorApp:
def __init__(self, root_window):
self.root = root_window
self.root.title("Simulador/Trace Protocolo Maselli")
self.root.geometry("900x700") # Tamaño inicial de ventana
# Intentar cargar el icono
icon_loaded = False
for icon_file in ['icon.png', 'icon.ico', 'icon.gif']:
if os.path.exists(icon_file):
try:
if icon_file.endswith('.ico'):
self.root.iconbitmap(icon_file)
else:
icon = tk.PhotoImage(file=icon_file)
self.root.iconphoto(True, icon)
icon_loaded = True
break
except Exception as e:
print(f"No se pudo cargar {icon_file}: {e}")
if not icon_loaded:
print("No se encontró ningún archivo de icono (icon.png, icon.ico, icon.gif)")
self.connection = None
self.connection_type = None
self.simulating = False
self.simulation_thread = None
self.simulation_step = 0
# Para modo Trace
self.tracing = False
self.trace_thread = None
self.csv_file = None
self.csv_writer = None
# Para los gráficos
self.max_points = 100
# Datos para simulador
self.sim_time_data = deque(maxlen=self.max_points)
self.sim_brix_data = deque(maxlen=self.max_points)
self.sim_ma_data = deque(maxlen=self.max_points)
self.sim_start_time = time.time()
# Datos para trace
self.trace_time_data = deque(maxlen=self.max_points)
self.trace_brix_data = deque(maxlen=self.max_points)
self.trace_start_time = time.time()
# Archivo de configuración
self.config_file = "maselli_simulator_config.json"
# Crear notebook para tabs
self.notebook = ttk.Notebook(self.root)
self.notebook.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
# Tab Simulador
self.simulator_tab = ttk.Frame(self.notebook)
self.notebook.add(self.simulator_tab, text="Simulador")
# Tab Trace
self.trace_tab = ttk.Frame(self.notebook)
self.notebook.add(self.trace_tab, text="Trace")
# --- Frame de Configuración Compartida ---
self.create_shared_config_frame()
# --- Crear contenido de cada tab ---
self.create_simulator_tab()
self.create_trace_tab()
# Configurar pesos
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
# Cargar configuración si existe
self.load_config(silent=True)
# Inicializar estado de la interfaz
self.on_connection_type_change()
self.on_function_type_change()
# Iniciar animaciones de gráficos
self.sim_ani = animation.FuncAnimation(self.sim_fig, self.update_sim_graph, interval=100, blit=False)
self.trace_ani = animation.FuncAnimation(self.trace_fig, self.update_trace_graph, interval=100, blit=False)
def create_shared_config_frame(self):
"""Crea el frame de configuración compartida que aparece arriba del notebook"""
shared_config_frame = ttk.LabelFrame(self.root, text="Configuración de Conexión")
shared_config_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
# Tipo de conexión
ttk.Label(shared_config_frame, text="Tipo:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.connection_type_var = tk.StringVar(value="Serial")
self.connection_type_combo = ttk.Combobox(shared_config_frame, textvariable=self.connection_type_var,
values=["Serial", "TCP", "UDP"], state="readonly", width=10)
self.connection_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.connection_type_combo.bind("<<ComboboxSelected>>", self.on_connection_type_change)
# Frame para configuración Serial
self.serial_frame = ttk.Frame(shared_config_frame)
self.serial_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew")
ttk.Label(self.serial_frame, text="Puerto:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.com_port_var = tk.StringVar(value="COM3")
self.com_port_entry = ttk.Entry(self.serial_frame, textvariable=self.com_port_var, width=10)
self.com_port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(self.serial_frame, text="Baud:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.baud_rate_var = tk.StringVar(value="115200")
self.baud_rate_entry = ttk.Entry(self.serial_frame, textvariable=self.baud_rate_var, width=10)
self.baud_rate_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
# Frame para configuración Ethernet
self.ethernet_frame = ttk.Frame(shared_config_frame)
self.ethernet_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew")
self.ethernet_frame.grid_remove()
ttk.Label(self.ethernet_frame, text="IP:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.ip_address_var = tk.StringVar(value="192.168.1.100")
self.ip_address_entry = ttk.Entry(self.ethernet_frame, textvariable=self.ip_address_var, width=15)
self.ip_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(self.ethernet_frame, text="Puerto:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.port_var = tk.StringVar(value="502")
self.port_entry = ttk.Entry(self.ethernet_frame, textvariable=self.port_var, width=10)
self.port_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
# Parámetros de mapeo (compartidos)
ttk.Label(shared_config_frame, text="Min Brix [4mA]:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.min_brix_map_var = tk.StringVar(value="0")
self.min_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.min_brix_map_var, width=10)
self.min_brix_map_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(shared_config_frame, text="Max Brix [20mA]:").grid(row=1, column=2, padx=5, pady=5, sticky="w")
self.max_brix_map_var = tk.StringVar(value="80")
self.max_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.max_brix_map_var, width=10)
self.max_brix_map_entry.grid(row=1, column=3, padx=5, pady=5, sticky="ew")
# Botones de persistencia
self.save_config_button = ttk.Button(shared_config_frame, text="Guardar", command=self.save_config)
self.save_config_button.grid(row=1, column=4, padx=5, pady=5, sticky="ew")
self.load_config_button = ttk.Button(shared_config_frame, text="Cargar", command=self.load_config)
self.load_config_button.grid(row=1, column=5, padx=5, pady=5, sticky="ew")
def create_simulator_tab(self):
"""Crea el contenido del tab Simulador"""
# Frame de configuración del simulador
sim_config_frame = ttk.LabelFrame(self.simulator_tab, text="Configuración Simulador")
sim_config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2)
# Dirección ADAM
ttk.Label(sim_config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.adam_address_var = tk.StringVar(value="01")
self.adam_address_entry = ttk.Entry(sim_config_frame, textvariable=self.adam_address_var, width=5)
self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
# Función
ttk.Label(sim_config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.function_type_var = tk.StringVar(value="Lineal")
self.function_type_combo = ttk.Combobox(sim_config_frame, textvariable=self.function_type_var,
values=["Lineal", "Sinusoidal", "Manual"], state="readonly", width=10)
self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
self.function_type_combo.bind("<<ComboboxSelected>>", self.on_function_type_change)
# Periodo
ttk.Label(sim_config_frame, text="Periodo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w")
self.period_var = tk.StringVar(value="1.0")
self.period_entry = ttk.Entry(sim_config_frame, textvariable=self.period_var, width=5)
self.period_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew")
# Frame para modo Manual
manual_frame = ttk.LabelFrame(sim_config_frame, text="Modo Manual")
manual_frame.grid(row=1, column=0, columnspan=6, padx=5, pady=5, sticky="ew")
ttk.Label(manual_frame, text="Valor Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.manual_brix_var = tk.StringVar(value="10.0")
self.manual_brix_entry = ttk.Entry(manual_frame, textvariable=self.manual_brix_var, width=10, state=tk.DISABLED)
self.manual_brix_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.manual_brix_entry.bind('<Return>', lambda e: self.update_slider_from_entry())
self.manual_brix_entry.bind('<FocusOut>', lambda e: self.update_slider_from_entry())
# Slider
self.manual_slider_var = tk.DoubleVar(value=10.0)
self.manual_slider = ttk.Scale(manual_frame, from_=0, to=100, orient=tk.HORIZONTAL,
variable=self.manual_slider_var, command=self.on_slider_change,
state=tk.DISABLED, length=200)
self.manual_slider.grid(row=0, column=2, padx=5, pady=5, sticky="ew")
self.manual_send_button = ttk.Button(manual_frame, text="Enviar Manual", command=self.send_manual_value, state=tk.DISABLED)
self.manual_send_button.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
manual_frame.columnconfigure(2, weight=1)
# Controls Frame
controls_frame = ttk.LabelFrame(self.simulator_tab, text="Control Simulación")
controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.clear_sim_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_sim_graph)
self.clear_sim_graph_button.pack(side=tk.LEFT, padx=5)
# Display Frame
display_frame = ttk.LabelFrame(self.simulator_tab, text="Valores Actuales")
display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="ew")
ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.current_brix_display_var = tk.StringVar(value="---")
self.current_brix_label = ttk.Label(display_frame, textvariable=self.current_brix_display_var,
font=("Courier", 14, "bold"))
self.current_brix_label.grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.current_ma_display_var = tk.StringVar(value="--.-- mA")
self.current_ma_label = ttk.Label(display_frame, textvariable=self.current_ma_display_var,
font=("Courier", 14, "bold"))
self.current_ma_label.grid(row=1, column=1, padx=5, pady=5, sticky="w")
# Graph Frame
sim_graph_frame = ttk.LabelFrame(self.simulator_tab, text="Gráfico Simulador")
sim_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
# Crear figura para simulador
self.sim_fig = Figure(figsize=(8, 3.5), dpi=100)
self.sim_ax1 = self.sim_fig.add_subplot(111)
self.sim_ax2 = self.sim_ax1.twinx()
self.sim_ax1.set_xlabel('Tiempo (s)')
self.sim_ax1.set_ylabel('Brix', color='b')
self.sim_ax2.set_ylabel('mA', color='r')
self.sim_ax1.tick_params(axis='y', labelcolor='b')
self.sim_ax2.tick_params(axis='y', labelcolor='r')
self.sim_ax1.grid(True, alpha=0.3)
self.sim_line_brix, = self.sim_ax1.plot([], [], 'b-', label='Brix', linewidth=2)
self.sim_line_ma, = self.sim_ax2.plot([], [], 'r-', label='mA', linewidth=2)
self.sim_canvas = FigureCanvasTkAgg(self.sim_fig, master=sim_graph_frame)
self.sim_canvas.draw()
self.sim_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Log Frame
sim_log_frame = ttk.LabelFrame(self.simulator_tab, text="Log de Comunicación")
sim_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.sim_log_text = scrolledtext.ScrolledText(sim_log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.sim_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# Configurar pesos
self.simulator_tab.columnconfigure(0, weight=1)
self.simulator_tab.columnconfigure(1, weight=1)
self.simulator_tab.rowconfigure(2, weight=1)
self.simulator_tab.rowconfigure(3, weight=1)
def create_trace_tab(self):
"""Crea el contenido del tab Trace"""
# Control Frame
trace_control_frame = ttk.LabelFrame(self.trace_tab, text="Control Trace")
trace_control_frame.grid(row=0, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
self.start_trace_button = ttk.Button(trace_control_frame, text="Iniciar Trace", command=self.start_trace)
self.start_trace_button.pack(side=tk.LEFT, padx=5)
self.stop_trace_button = ttk.Button(trace_control_frame, text="Detener Trace", command=self.stop_trace, state=tk.DISABLED)
self.stop_trace_button.pack(side=tk.LEFT, padx=5)
self.clear_trace_graph_button = ttk.Button(trace_control_frame, text="Limpiar Gráfico", command=self.clear_trace_graph)
self.clear_trace_graph_button.pack(side=tk.LEFT, padx=5)
ttk.Label(trace_control_frame, text="Archivo CSV:").pack(side=tk.LEFT, padx=(20, 5))
self.csv_filename_var = tk.StringVar(value="Sin archivo")
self.csv_filename_label = ttk.Label(trace_control_frame, textvariable=self.csv_filename_var)
self.csv_filename_label.pack(side=tk.LEFT, padx=5)
# Display Frame
trace_display_frame = ttk.LabelFrame(self.trace_tab, text="Último Valor Recibido")
trace_display_frame.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
ttk.Label(trace_display_frame, text="Timestamp:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.trace_timestamp_var = tk.StringVar(value="---")
ttk.Label(trace_display_frame, textvariable=self.trace_timestamp_var, font=("Courier", 12)).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(trace_display_frame, text="Valor mA:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.trace_ma_var = tk.StringVar(value="---")
ttk.Label(trace_display_frame, textvariable=self.trace_ma_var, font=("Courier", 12, "bold")).grid(row=0, column=3, padx=5, pady=5, sticky="w")
ttk.Label(trace_display_frame, text="Valor Brix:").grid(row=0, column=4, padx=5, pady=5, sticky="w")
self.trace_brix_var = tk.StringVar(value="---")
ttk.Label(trace_display_frame, textvariable=self.trace_brix_var, font=("Courier", 12, "bold")).grid(row=0, column=5, padx=5, pady=5, sticky="w")
# Graph Frame
trace_graph_frame = ttk.LabelFrame(self.trace_tab, text="Gráfico Trace (Brix)")
trace_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
# Crear figura para trace
self.trace_fig = Figure(figsize=(8, 4), dpi=100)
self.trace_ax = self.trace_fig.add_subplot(111)
self.trace_ax.set_xlabel('Tiempo (s)')
self.trace_ax.set_ylabel('Brix', color='b')
self.trace_ax.tick_params(axis='y', labelcolor='b')
self.trace_ax.grid(True, alpha=0.3)
self.trace_line_brix, = self.trace_ax.plot([], [], 'b-', label='Brix', linewidth=2, marker='o', markersize=4)
self.trace_canvas = FigureCanvasTkAgg(self.trace_fig, master=trace_graph_frame)
self.trace_canvas.draw()
self.trace_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Log Frame
trace_log_frame = ttk.LabelFrame(self.trace_tab, text="Log de Recepción")
trace_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.trace_log_text = scrolledtext.ScrolledText(trace_log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.trace_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# Configurar pesos
self.trace_tab.columnconfigure(0, weight=1)
self.trace_tab.columnconfigure(1, weight=1)
self.trace_tab.rowconfigure(2, weight=1)
self.trace_tab.rowconfigure(3, weight=1)
def _log_message(self, message, log_widget=None):
"""Escribe mensaje en el log especificado o en el log activo"""
if log_widget is None:
# Determinar qué log usar basado en la pestaña activa
current_tab = self.notebook.index(self.notebook.select())
log_widget = self.sim_log_text if current_tab == 0 else self.trace_log_text
log_widget.configure(state=tk.NORMAL)
log_widget.insert(tk.END, f"[{datetime.now().strftime('%H:%M:%S')}] {message}\n")
log_widget.see(tk.END)
log_widget.configure(state=tk.DISABLED)
def parse_adam_message(self, data, log_widget=None):
"""
Parsea un mensaje del protocolo ADAM y retorna el valor en mA
Formato esperado: #AA[valor_mA][checksum]\r
Donde:
- # : Carácter inicial (opcional en algunas respuestas)
- AA : Dirección del dispositivo (2 caracteres)
- valor_mA : Valor en mA (6 caracteres, formato XX.XXX)
- checksum : Suma de verificación (2 caracteres hex)
- \r : Carácter de fin (opcional)
"""
try:
# Formato esperado: #AA[valor_mA][checksum]\r
# Pero también manejar respuestas sin # inicial o sin \r final
data = data.strip()
# Si empieza con #, es un mensaje estándar
if data.startswith('#'):
data = data[1:] # Remover #
# Si termina con \r, removerlo
if data.endswith('\r'):
data = data[:-1]
# Verificar longitud mínima
if len(data) < 8: # 2 addr + 6 valor mínimo
return None
address = data[:2]
value_str = data[2:8] # 6 caracteres para el valor (XX.XXX)
# Verificar si hay checksum
if len(data) >= 10:
checksum = data[8:10] # 2 caracteres para checksum
# Verificar checksum
message_part = f"#{address}{value_str}"
calculated_checksum = self.calculate_checksum(message_part)
if checksum != calculated_checksum and log_widget:
self._log_message(f"Checksum incorrecto: recibido={checksum}, calculado={calculated_checksum}",
log_widget)
# Continuar de todos modos si el valor parece válido
# Convertir valor a float
try:
ma_value = float(value_str)
return {'address': address, 'ma': ma_value}
except ValueError:
return None
except Exception as e:
if log_widget:
self._log_message(f"Error parseando mensaje: {e}", log_widget)
return None
def ma_to_brix(self, ma_value):
"""Convierte valor mA a Brix usando el mapeo configurado"""
try:
min_brix = float(self.min_brix_map_var.get())
max_brix = float(self.max_brix_map_var.get())
if ma_value <= 4.0:
return min_brix
elif ma_value >= 20.0:
return max_brix
else:
# Interpolación lineal
percentage = (ma_value - 4.0) / 16.0
return min_brix + percentage * (max_brix - min_brix)
except:
return 0.0
def start_trace(self):
"""Inicia el modo trace para recibir datos"""
if self.tracing:
messagebox.showwarning("Advertencia", "El trace ya está en curso.")
return
# Obtener parámetros de conexión
try:
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
conn_params = {
'port': self.com_port_var.get(),
'baud': int(self.baud_rate_var.get())
}
else:
conn_params = {
'ip': self.ip_address_var.get(),
'port': int(self.port_var.get())
}
except ValueError as e:
messagebox.showerror("Error", f"Parámetros de conexión inválidos: {e}")
return
# Crear archivo CSV
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"maselli_trace_{timestamp}.csv"
try:
self.csv_file = open(csv_filename, 'w', newline='')
self.csv_writer = csv.writer(self.csv_file)
self.csv_writer.writerow(['Timestamp', 'Address', 'mA', 'Brix', 'Raw_Message'])
self.csv_filename_var.set(csv_filename)
except Exception as e:
messagebox.showerror("Error", f"No se pudo crear archivo CSV: {e}")
return
# Abrir conexión
try:
self.connection = self._open_connection(conn_type, conn_params)
self.connection_type = conn_type
self._log_message(f"Conexión {conn_type} abierta para trace.", self.trace_log_text)
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
if self.csv_file:
self.csv_file.close()
return
self.tracing = True
self.trace_start_time = time.time()
self.start_trace_button.config(state=tk.DISABLED)
self.stop_trace_button.config(state=tk.NORMAL)
self._set_trace_entries_state(tk.DISABLED)
# Iniciar thread de recepción
self.trace_thread = threading.Thread(target=self.run_trace, daemon=True)
self.trace_thread.start()
self._log_message("Trace iniciado.", self.trace_log_text)
def stop_trace(self):
"""Detiene el modo trace"""
if not self.tracing:
return
self.tracing = False
# Esperar a que termine el thread
if self.trace_thread and self.trace_thread.is_alive():
self.trace_thread.join(timeout=2.0)
# Cerrar conexión
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message("Conexión cerrada.", self.trace_log_text)
self.connection = None
# Cerrar archivo CSV
if self.csv_file:
self.csv_file.close()
self.csv_file = None
self.csv_writer = None
self._log_message(f"Archivo CSV guardado: {self.csv_filename_var.get()}", self.trace_log_text)
self.start_trace_button.config(state=tk.NORMAL)
self.stop_trace_button.config(state=tk.DISABLED)
self._set_trace_entries_state(tk.NORMAL)
self._log_message("Trace detenido.", self.trace_log_text)
def run_trace(self):
"""Thread principal para recepción de datos en modo trace"""
buffer = ""
while self.tracing:
try:
# Leer datos según el tipo de conexión
data = None
if self.connection_type == "Serial":
if self.connection.in_waiting > 0:
data = self.connection.read(self.connection.in_waiting).decode('ascii', errors='ignore')
elif self.connection_type == "TCP":
self.connection.settimeout(0.1)
try:
data = self.connection.recv(1024).decode('ascii', errors='ignore')
if not data: # Conexión cerrada
self._log_message("Conexión TCP cerrada por el servidor.", self.trace_log_text)
break
except socket.timeout:
continue
elif self.connection_type == "UDP":
self.connection.settimeout(0.1)
try:
data, addr = self.connection.recvfrom(1024)
data = data.decode('ascii', errors='ignore')
except socket.timeout:
continue
if data:
buffer += data
# Buscar mensajes completos (terminan con \r o \n)
while '\r' in buffer or '\n' in buffer:
# Encontrar el primer terminador
end_idx = len(buffer)
for term in ['\r', '\n']:
if term in buffer:
idx = buffer.index(term) + 1
if idx < end_idx:
end_idx = idx
if end_idx > 0:
message = buffer[:end_idx]
buffer = buffer[end_idx:]
# Procesar mensaje si tiene contenido
if message.strip():
self._process_trace_message(message)
else:
break
# Si el buffer tiene un mensaje completo sin terminador (>= 10 chars)
# y no han llegado más datos en un tiempo, procesarlo
if len(buffer) >= 10 and not ('\r' in buffer or '\n' in buffer):
# Verificar si parece un mensaje ADAM completo
if buffer.startswith('#') or len(buffer) == 10:
self._process_trace_message(buffer)
buffer = ""
except Exception as e:
if self.tracing: # Solo loguear si todavía estamos en trace
self._log_message(f"Error en trace: {e}", self.trace_log_text)
break
# Pequeña pausa para no consumir demasiado CPU
if not data:
time.sleep(0.01)
def _process_trace_message(self, message):
"""Procesa un mensaje recibido en modo trace"""
# Log del mensaje raw
display_msg = message.replace('\r', '<CR>').replace('\n', '<LF>')
self._log_message(f"Recibido: {display_msg}", self.trace_log_text)
# Parsear mensaje
parsed = self.parse_adam_message(message, self.trace_log_text)
if parsed:
ma_value = parsed['ma']
brix_value = self.ma_to_brix(ma_value)
timestamp = datetime.now()
# Actualizar display
self.trace_timestamp_var.set(timestamp.strftime("%H:%M:%S.%f")[:-3])
self.trace_ma_var.set(f"{ma_value:.3f} mA")
self.trace_brix_var.set(f"{brix_value:.3f} Brix")
# Log con detalles parseados
self._log_message(f" -> Addr: {parsed['address']}, mA: {ma_value:.3f}, Brix: {brix_value:.3f}",
self.trace_log_text)
# Guardar en CSV
if self.csv_writer:
self.csv_writer.writerow([
timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
parsed['address'],
f"{ma_value:.3f}",
f"{brix_value:.3f}",
display_msg
])
if self.csv_file:
self.csv_file.flush()
# Agregar al gráfico
current_time = time.time() - self.trace_start_time
self.trace_time_data.append(current_time)
self.trace_brix_data.append(brix_value)
# Actualizar gráfico
self.root.after(0, self.trace_canvas.draw_idle)
else:
# Si no es un mensaje ADAM válido, podría ser otro tipo de respuesta
self._log_message(f"Mensaje no ADAM: {display_msg}", self.trace_log_text)
def _set_trace_entries_state(self, state):
"""Habilita/deshabilita controles durante el trace"""
self.connection_type_combo.config(state=state)
self.com_port_entry.config(state=state)
self.baud_rate_entry.config(state=state)
self.ip_address_entry.config(state=state)
self.port_entry.config(state=state)
self.min_brix_map_entry.config(state=state)
self.max_brix_map_entry.config(state=state)
def update_sim_graph(self, frame):
"""Actualiza el gráfico del simulador"""
if len(self.sim_time_data) > 0:
self.sim_line_brix.set_data(list(self.sim_time_data), list(self.sim_brix_data))
self.sim_line_ma.set_data(list(self.sim_time_data), list(self.sim_ma_data))
if len(self.sim_time_data) > 1:
self.sim_ax1.set_xlim(min(self.sim_time_data), max(self.sim_time_data))
if len(self.sim_brix_data) > 0:
brix_min = min(self.sim_brix_data) - 1
brix_max = max(self.sim_brix_data) + 1
self.sim_ax1.set_ylim(brix_min, brix_max)
if len(self.sim_ma_data) > 0:
ma_min = min(self.sim_ma_data) - 0.5
ma_max = max(self.sim_ma_data) + 0.5
self.sim_ax2.set_ylim(ma_min, ma_max)
return self.sim_line_brix, self.sim_line_ma
def update_trace_graph(self, frame):
"""Actualiza el gráfico del trace"""
if len(self.trace_time_data) > 0:
self.trace_line_brix.set_data(list(self.trace_time_data), list(self.trace_brix_data))
if len(self.trace_time_data) > 1:
self.trace_ax.set_xlim(min(self.trace_time_data), max(self.trace_time_data))
if len(self.trace_brix_data) > 0:
brix_min = min(self.trace_brix_data) - 1
brix_max = max(self.trace_brix_data) + 1
self.trace_ax.set_ylim(brix_min, brix_max)
return self.trace_line_brix,
def clear_sim_graph(self):
"""Limpia el gráfico del simulador"""
self.sim_time_data.clear()
self.sim_brix_data.clear()
self.sim_ma_data.clear()
self.sim_start_time = time.time()
self.sim_canvas.draw_idle()
self._log_message("Gráfico del simulador limpiado.", self.sim_log_text)
def clear_trace_graph(self):
"""Limpia el gráfico del trace"""
self.trace_time_data.clear()
self.trace_brix_data.clear()
self.trace_start_time = time.time()
self.trace_canvas.draw_idle()
self._log_message("Gráfico del trace limpiado.", self.trace_log_text)
def save_config(self):
"""Guarda la configuración actual"""
config = {
'connection_type': self.connection_type_var.get(),
'com_port': self.com_port_var.get(),
'baud_rate': self.baud_rate_var.get(),
'ip_address': self.ip_address_var.get(),
'port': self.port_var.get(),
'adam_address': self.adam_address_var.get(),
'function_type': self.function_type_var.get(),
'min_brix_map': self.min_brix_map_var.get(),
'max_brix_map': self.max_brix_map_var.get(),
'period': self.period_var.get(),
'manual_brix': self.manual_brix_var.get()
}
try:
with open(self.config_file, 'w') as f:
json.dump(config, f, indent=4)
self._log_message("Configuración guardada exitosamente.")
messagebox.showinfo("Éxito", "Configuración guardada correctamente.")
except Exception as e:
self._log_message(f"Error al guardar configuración: {e}")
messagebox.showerror("Error", f"No se pudo guardar la configuración: {e}")
def load_config(self, silent=False):
"""Carga la configuración desde archivo"""
if not os.path.exists(self.config_file):
if not silent:
messagebox.showinfo("Información", "No se encontró archivo de configuración.")
return
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
self.connection_type_var.set(config.get('connection_type', 'Serial'))
self.com_port_var.set(config.get('com_port', 'COM3'))
self.baud_rate_var.set(config.get('baud_rate', '115200'))
self.ip_address_var.set(config.get('ip_address', '192.168.1.100'))
self.port_var.set(config.get('port', '502'))
self.adam_address_var.set(config.get('adam_address', '01'))
self.function_type_var.set(config.get('function_type', 'Lineal'))
self.min_brix_map_var.set(config.get('min_brix_map', '0'))
self.max_brix_map_var.set(config.get('max_brix_map', '80'))
self.period_var.set(config.get('period', '1.0'))
self.manual_brix_var.set(config.get('manual_brix', '10.0'))
try:
self.manual_slider_var.set(float(config.get('manual_brix', '10.0')))
except:
pass
self.on_connection_type_change()
if not silent:
self._log_message("Configuración cargada exitosamente.")
messagebox.showinfo("Éxito", "Configuración cargada correctamente.")
except Exception as e:
if not silent:
self._log_message(f"Error al cargar configuración: {e}")
messagebox.showerror("Error", f"No se pudo cargar la configuración: {e}")
def on_connection_type_change(self, event=None):
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
self.ethernet_frame.grid_remove()
self.serial_frame.grid()
else:
self.serial_frame.grid_remove()
self.ethernet_frame.grid()
def on_function_type_change(self, event=None):
func_type = self.function_type_var.get()
if func_type == "Manual":
if self.simulating:
self.stop_simulation()
self.manual_brix_entry.config(state=tk.NORMAL)
self.manual_send_button.config(state=tk.NORMAL)
self.manual_slider.config(state=tk.NORMAL)
self.period_entry.config(state=tk.DISABLED)
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.DISABLED)
else:
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
self.manual_slider.config(state=tk.DISABLED)
self.period_entry.config(state=tk.NORMAL)
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
else:
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
def on_slider_change(self, value):
self.manual_brix_var.set(f"{float(value):.1f}")
def update_slider_from_entry(self):
try:
value = float(self.manual_brix_var.get())
value = max(0, min(100, value))
self.manual_slider_var.set(value)
self.manual_brix_var.set(f"{value:.1f}")
except ValueError:
pass
def calculate_checksum(self, message_part):
s = sum(ord(c) for c in message_part)
checksum_byte = s % 256
return f"{checksum_byte:02X}"
def scale_to_mA(self, brix_value, min_brix_map, max_brix_map):
if max_brix_map == min_brix_map:
return 4.0
percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map)
percentage = max(0.0, min(1.0, percentage))
mA_value = 4.0 + percentage * 16.0
return mA_value
def format_mA_value(self, mA_val):
# Formato: "XX.XXX" (6 caracteres incluyendo el punto)
return f"{mA_val:06.3f}"
def _get_common_params(self):
try:
adam_address = self.adam_address_var.get()
if len(adam_address) != 2:
messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.")
return None
min_brix_map = float(self.min_brix_map_var.get())
max_brix_map = float(self.max_brix_map_var.get())
if min_brix_map > max_brix_map:
messagebox.showerror("Error", "Valor Mínimo (Brix) no puede ser mayor que Valor Máximo (Brix).")
return None
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
com_port = self.com_port_var.get()
baud_rate = int(self.baud_rate_var.get())
return conn_type, {'port': com_port, 'baud': baud_rate}, adam_address, min_brix_map, max_brix_map
else:
ip_address = self.ip_address_var.get()
port = int(self.port_var.get())
return conn_type, {'ip': ip_address, 'port': port}, adam_address, min_brix_map, max_brix_map
except ValueError as e:
messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración: {e}")
return None
def _open_connection(self, conn_type, conn_params):
try:
if conn_type == "Serial":
return serial.Serial(conn_params['port'], conn_params['baud'], timeout=1)
elif conn_type == "TCP":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5.0)
sock.connect((conn_params['ip'], conn_params['port']))
return sock
elif conn_type == "UDP":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
sock.dest_address = (conn_params['ip'], conn_params['port'])
return sock
except Exception as e:
raise Exception(f"Error al abrir conexión {conn_type}: {e}")
def _close_connection(self, connection, conn_type):
try:
if conn_type == "Serial":
if connection and connection.is_open:
connection.close()
elif conn_type in ["TCP", "UDP"]:
if connection:
connection.close()
except Exception as e:
self._log_message(f"Error al cerrar conexión: {e}")
def _send_data(self, connection, conn_type, data):
try:
if conn_type == "Serial":
connection.write(data.encode('ascii'))
elif conn_type == "TCP":
connection.send(data.encode('ascii'))
elif conn_type == "UDP":
connection.sendto(data.encode('ascii'), connection.dest_address)
except Exception as e:
raise Exception(f"Error al enviar datos: {e}")
def add_sim_data_point(self, brix_value, ma_value):
current_time = time.time() - self.sim_start_time
self.sim_time_data.append(current_time)
self.sim_brix_data.append(brix_value)
self.sim_ma_data.append(ma_value)
self.sim_canvas.draw_idle()
def _read_response(self, connection, conn_type, timeout=0.5):
"""Intenta leer una respuesta del dispositivo"""
try:
response = None
if conn_type == "Serial":
# Guardar timeout original
original_timeout = connection.timeout
connection.timeout = timeout
# Esperar un poco para que llegue la respuesta
time.sleep(0.05)
# Leer todos los bytes disponibles
response_bytes = b""
start_time = time.time()
while (time.time() - start_time) < timeout:
if connection.in_waiting > 0:
response_bytes += connection.read(connection.in_waiting)
# Si encontramos un terminador, salir
if b'\r' in response_bytes or b'\n' in response_bytes:
break
else:
time.sleep(0.01)
if response_bytes:
response = response_bytes.decode('ascii', errors='ignore')
connection.timeout = original_timeout
elif conn_type == "TCP":
connection.settimeout(timeout)
try:
response = connection.recv(1024).decode('ascii', errors='ignore')
except socket.timeout:
pass
elif conn_type == "UDP":
connection.settimeout(timeout)
try:
response, addr = connection.recvfrom(1024)
response = response.decode('ascii', errors='ignore')
except socket.timeout:
pass
return response
except Exception as e:
self._log_message(f"Error al leer respuesta: {e}", self.sim_log_text)
return None
def send_manual_value(self):
common_params = self._get_common_params()
if not common_params:
return
conn_type, conn_params, adam_address, min_brix_map, max_brix_map = common_params
try:
manual_brix = float(self.manual_brix_var.get())
except ValueError:
messagebox.showerror("Error de Entrada", "Valor Brix Manual inválido.")
return
mA_val = self.scale_to_mA(manual_brix, min_brix_map, max_brix_map)
mA_str = self.format_mA_value(mA_val)
self.current_brix_display_var.set(f"{manual_brix:.3f} Brix")
self.current_ma_display_var.set(f"{mA_str} mA")
self.add_sim_data_point(manual_brix, mA_val)
message_part = f"#{adam_address}{mA_str}"
checksum = self.calculate_checksum(message_part)
full_string_to_send = f"{message_part}{checksum}\r"
log_display_string = full_string_to_send.replace('\r', '<CR>')
temp_connection = None
try:
temp_connection = self._open_connection(conn_type, conn_params)
self._log_message(f"Conexión {conn_type} abierta temporalmente para envío manual.", self.sim_log_text)
self._log_message(f"Enviando Manual: {log_display_string}", self.sim_log_text)
self._send_data(temp_connection, conn_type, full_string_to_send)
# Intentar leer respuesta
response = self._read_response(temp_connection, conn_type)
if response and response.strip(): # Solo procesar si hay contenido
display_resp = response.replace('\r', '<CR>').replace('\n', '<LF>')
self._log_message(f"Respuesta: {display_resp}", self.sim_log_text)
# Intentar parsear como mensaje ADAM
parsed = self.parse_adam_message(response, self.sim_log_text)
if parsed:
# Convertir mA a Brix para mostrar
brix_value = self.ma_to_brix(parsed['ma'])
self._log_message(f" -> Dirección: {parsed['address']}, Valor: {parsed['ma']:.3f} mA ({brix_value:.3f} Brix)", self.sim_log_text)
except Exception as e:
self._log_message(f"Error al enviar manualmente: {e}", self.sim_log_text)
messagebox.showerror("Error de Conexión", str(e))
finally:
if temp_connection:
self._close_connection(temp_connection, conn_type)
self._log_message(f"Conexión {conn_type} cerrada tras envío manual.", self.sim_log_text)
def start_simulation(self):
if self.simulating:
messagebox.showwarning("Advertencia", "La simulación ya está en curso.")
return
common_params = self._get_common_params()
if not common_params:
return
self.connection_type, self.conn_params, self.adam_address, self.min_brix_map, self.max_brix_map = common_params
try:
self.simulation_period = float(self.period_var.get())
if self.simulation_period <= 0:
messagebox.showerror("Error", "El periodo debe ser un número positivo.")
return
self.function_type = self.function_type_var.get()
if self.function_type == "Manual":
messagebox.showinfo("Info", "Seleccione modo Lineal o Sinusoidal para simulación continua.")
return
except ValueError as e:
messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración de simulación: {e}")
return
try:
self.connection = self._open_connection(self.connection_type, self.conn_params)
self._log_message(f"Conexión {self.connection_type} abierta para simulación continua.", self.sim_log_text)
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
self.connection = None
return
self.simulating = True
self.simulation_step = 0
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self._set_sim_config_entries_state(tk.DISABLED)
self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True)
self.simulation_thread.start()
self._log_message("Simulación continua iniciada.", self.sim_log_text)
def _set_sim_config_entries_state(self, state):
self.connection_type_combo.config(state=state)
self.com_port_entry.config(state=state)
self.baud_rate_entry.config(state=state)
self.ip_address_entry.config(state=state)
self.port_entry.config(state=state)
self.adam_address_entry.config(state=state)
self.function_type_combo.config(state=state)
self.min_brix_map_entry.config(state=state)
self.max_brix_map_entry.config(state=state)
self.save_config_button.config(state=state)
self.load_config_button.config(state=state)
current_func_type = self.function_type_var.get()
if current_func_type != "Manual":
self.period_entry.config(state=state)
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
self.manual_slider.config(state=tk.DISABLED)
else:
self.period_entry.config(state=tk.DISABLED)
if state == tk.DISABLED:
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
self.manual_slider.config(state=tk.DISABLED)
else:
self.manual_brix_entry.config(state=tk.NORMAL)
self.manual_send_button.config(state=tk.NORMAL)
self.manual_slider.config(state=tk.NORMAL)
def stop_simulation(self):
if not self.simulating:
if self.function_type_var.get() != "Manual":
self._log_message("Simulación continua ya estaba detenida.", self.sim_log_text)
if self.function_type_var.get() != "Manual":
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_sim_config_entries_state(tk.NORMAL)
return
self.simulating = False
if self.simulation_thread and self.simulation_thread.is_alive():
try:
self.simulation_thread.join(timeout=max(0.1, self.simulation_period * 1.5 if hasattr(self, 'simulation_period') else 2.0))
except Exception as e:
self._log_message(f"Error al esperar el hilo de simulación: {e}", self.sim_log_text)
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message(f"Conexión {self.connection_type} cerrada (simulación continua).", self.sim_log_text)
self.connection = None
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_sim_config_entries_state(tk.NORMAL)
self.on_function_type_change()
self._log_message("Simulación continua detenida.", self.sim_log_text)
self.current_brix_display_var.set("---")
self.current_ma_display_var.set("--.-- mA")
def run_simulation(self):
steps_for_full_cycle = 100
while self.simulating:
current_brix_val = 0.0
if self.function_type == "Lineal":
progress = (self.simulation_step % (2 * steps_for_full_cycle)) / steps_for_full_cycle
if progress > 1.0:
progress = 2.0 - progress
current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * progress
elif self.function_type == "Sinusoidal":
phase = (self.simulation_step / steps_for_full_cycle) * 2 * math.pi
sin_val = (math.sin(phase) + 1) / 2
current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * sin_val
mA_val = self.scale_to_mA(current_brix_val, self.min_brix_map, self.max_brix_map)
mA_str = self.format_mA_value(mA_val)
self.current_brix_display_var.set(f"{current_brix_val:.3f} Brix")
self.current_ma_display_var.set(f"{mA_str} mA")
self.root.after(0, lambda b=current_brix_val, m=mA_val: self.add_sim_data_point(b, m))
message_part = f"#{self.adam_address}{mA_str}"
checksum = self.calculate_checksum(message_part)
full_string_to_send = f"{message_part}{checksum}\r"
log_display_string = full_string_to_send.replace('\r', '<CR>')
self._log_message(f"Enviando Sim: {log_display_string}", self.sim_log_text)
if self.connection:
try:
self._send_data(self.connection, self.connection_type, full_string_to_send)
# Intentar leer respuesta (timeout corto para no ralentizar simulación)
response = self._read_response(self.connection, self.connection_type, timeout=0.1)
if response and response.strip(): # Solo procesar si hay contenido
display_resp = response.replace('\r', '<CR>').replace('\n', '<LF>')
self._log_message(f"Respuesta: {display_resp}", self.sim_log_text)
# Intentar parsear como mensaje ADAM
parsed = self.parse_adam_message(response, self.sim_log_text)
if parsed:
# Convertir mA a Brix para mostrar
brix_value = self.ma_to_brix(parsed['ma'])
self._log_message(f" -> Dirección: {parsed['address']}, Valor: {parsed['ma']:.3f} mA ({brix_value:.3f} Brix)", self.sim_log_text)
except Exception as e:
self._log_message(f"Error al escribir en conexión (sim): {e}", self.sim_log_text)
self.root.after(0, self.stop_simulation_from_thread_error)
break
self.simulation_step += 1
time.sleep(self.simulation_period)
if not self.simulating and self.root.winfo_exists():
self.root.after(0, self.ensure_gui_stopped_state_sim)
def stop_simulation_from_thread_error(self):
if self.simulating:
messagebox.showerror("Error de Simulación", "Error de conexión durante la simulación. Simulación detenida.")
self.stop_simulation()
def ensure_gui_stopped_state_sim(self):
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_sim_config_entries_state(tk.NORMAL)
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message(f"Conexión {self.connection_type} cerrada (auto, sim_end).", self.sim_log_text)
self.connection = None
self._log_message("Simulación continua terminada (hilo finalizado).", self.sim_log_text)
def on_closing(self):
"""Maneja el cierre de la aplicación"""
# Detener simulación si está activa
if self.simulating:
self.stop_simulation()
# Detener trace si está activo
if self.tracing:
self.stop_trace()
# Cerrar cualquier conexión abierta
if self.connection:
self._close_connection(self.connection, self.connection_type)
# Destruir ventana
self.root.destroy()
if __name__ == "__main__":
main_root = tk.Tk()
app = MaselliSimulatorApp(main_root)
main_root.mainloop()