MaselliSimulatorApp/MaselliSimulatorApp.py

1040 lines
47 KiB
Python

import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import serial
import socket
import threading
import time
import math
import json
import os
import csv
from datetime import datetime
from collections import deque
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.animation as animation
class MaselliSimulatorApp:
def __init__(self, root_window):
self.root = root_window
self.root.title("Simulador/Trace Protocolo Maselli")
self.connection = None
self.connection_type = None
self.simulating = False
self.simulation_thread = None
self.simulation_step = 0
# Para modo Trace
self.tracing = False
self.trace_thread = None
self.csv_file = None
self.csv_writer = None
# Para los gráficos
self.max_points = 100
# Datos para simulador
self.sim_time_data = deque(maxlen=self.max_points)
self.sim_brix_data = deque(maxlen=self.max_points)
self.sim_ma_data = deque(maxlen=self.max_points)
self.sim_start_time = time.time()
# Datos para trace
self.trace_time_data = deque(maxlen=self.max_points)
self.trace_brix_data = deque(maxlen=self.max_points)
self.trace_start_time = time.time()
# Archivo de configuración
self.config_file = "maselli_simulator_config.json"
# Crear notebook para tabs
self.notebook = ttk.Notebook(self.root)
self.notebook.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
# Tab Simulador
self.simulator_tab = ttk.Frame(self.notebook)
self.notebook.add(self.simulator_tab, text="Simulador")
# Tab Trace
self.trace_tab = ttk.Frame(self.notebook)
self.notebook.add(self.trace_tab, text="Trace")
# --- Frame de Configuración Compartida ---
self.create_shared_config_frame()
# --- Crear contenido de cada tab ---
self.create_simulator_tab()
self.create_trace_tab()
# Configurar pesos
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
# Cargar configuración si existe
self.load_config(silent=True)
# Inicializar estado de la interfaz
self.on_connection_type_change()
self.on_function_type_change()
# Iniciar animaciones de gráficos
self.sim_ani = animation.FuncAnimation(self.sim_fig, self.update_sim_graph, interval=100, blit=False)
self.trace_ani = animation.FuncAnimation(self.trace_fig, self.update_trace_graph, interval=100, blit=False)
def create_shared_config_frame(self):
"""Crea el frame de configuración compartida que aparece arriba del notebook"""
shared_config_frame = ttk.LabelFrame(self.root, text="Configuración de Conexión")
shared_config_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
# Tipo de conexión
ttk.Label(shared_config_frame, text="Tipo:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.connection_type_var = tk.StringVar(value="Serial")
self.connection_type_combo = ttk.Combobox(shared_config_frame, textvariable=self.connection_type_var,
values=["Serial", "TCP", "UDP"], state="readonly", width=10)
self.connection_type_combo.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.connection_type_combo.bind("<<ComboboxSelected>>", self.on_connection_type_change)
# Frame para configuración Serial
self.serial_frame = ttk.Frame(shared_config_frame)
self.serial_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew")
ttk.Label(self.serial_frame, text="Puerto:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.com_port_var = tk.StringVar(value="COM3")
self.com_port_entry = ttk.Entry(self.serial_frame, textvariable=self.com_port_var, width=10)
self.com_port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(self.serial_frame, text="Baud:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.baud_rate_var = tk.StringVar(value="115200")
self.baud_rate_entry = ttk.Entry(self.serial_frame, textvariable=self.baud_rate_var, width=10)
self.baud_rate_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
# Frame para configuración Ethernet
self.ethernet_frame = ttk.Frame(shared_config_frame)
self.ethernet_frame.grid(row=0, column=2, columnspan=4, padx=5, pady=5, sticky="ew")
self.ethernet_frame.grid_remove()
ttk.Label(self.ethernet_frame, text="IP:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.ip_address_var = tk.StringVar(value="192.168.1.100")
self.ip_address_entry = ttk.Entry(self.ethernet_frame, textvariable=self.ip_address_var, width=15)
self.ip_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(self.ethernet_frame, text="Puerto:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.port_var = tk.StringVar(value="502")
self.port_entry = ttk.Entry(self.ethernet_frame, textvariable=self.port_var, width=10)
self.port_entry.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
# Parámetros de mapeo (compartidos)
ttk.Label(shared_config_frame, text="Min Brix [4mA]:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.min_brix_map_var = tk.StringVar(value="0")
self.min_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.min_brix_map_var, width=10)
self.min_brix_map_entry.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
ttk.Label(shared_config_frame, text="Max Brix [20mA]:").grid(row=1, column=2, padx=5, pady=5, sticky="w")
self.max_brix_map_var = tk.StringVar(value="80")
self.max_brix_map_entry = ttk.Entry(shared_config_frame, textvariable=self.max_brix_map_var, width=10)
self.max_brix_map_entry.grid(row=1, column=3, padx=5, pady=5, sticky="ew")
# Botones de persistencia
self.save_config_button = ttk.Button(shared_config_frame, text="Guardar", command=self.save_config)
self.save_config_button.grid(row=1, column=4, padx=5, pady=5, sticky="ew")
self.load_config_button = ttk.Button(shared_config_frame, text="Cargar", command=self.load_config)
self.load_config_button.grid(row=1, column=5, padx=5, pady=5, sticky="ew")
def create_simulator_tab(self):
"""Crea el contenido del tab Simulador"""
# Frame de configuración del simulador
sim_config_frame = ttk.LabelFrame(self.simulator_tab, text="Configuración Simulador")
sim_config_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew", columnspan=2)
# Dirección ADAM
ttk.Label(sim_config_frame, text="ADAM Address (2c):").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.adam_address_var = tk.StringVar(value="01")
self.adam_address_entry = ttk.Entry(sim_config_frame, textvariable=self.adam_address_var, width=5)
self.adam_address_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
# Función
ttk.Label(sim_config_frame, text="Función:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.function_type_var = tk.StringVar(value="Lineal")
self.function_type_combo = ttk.Combobox(sim_config_frame, textvariable=self.function_type_var,
values=["Lineal", "Sinusoidal", "Manual"], state="readonly", width=10)
self.function_type_combo.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
self.function_type_combo.bind("<<ComboboxSelected>>", self.on_function_type_change)
# Periodo
ttk.Label(sim_config_frame, text="Periodo (s):").grid(row=0, column=4, padx=5, pady=5, sticky="w")
self.period_var = tk.StringVar(value="1.0")
self.period_entry = ttk.Entry(sim_config_frame, textvariable=self.period_var, width=5)
self.period_entry.grid(row=0, column=5, padx=5, pady=5, sticky="ew")
# Frame para modo Manual
manual_frame = ttk.LabelFrame(sim_config_frame, text="Modo Manual")
manual_frame.grid(row=1, column=0, columnspan=6, padx=5, pady=5, sticky="ew")
ttk.Label(manual_frame, text="Valor Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.manual_brix_var = tk.StringVar(value="10.0")
self.manual_brix_entry = ttk.Entry(manual_frame, textvariable=self.manual_brix_var, width=10, state=tk.DISABLED)
self.manual_brix_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.manual_brix_entry.bind('<Return>', lambda e: self.update_slider_from_entry())
self.manual_brix_entry.bind('<FocusOut>', lambda e: self.update_slider_from_entry())
# Slider
self.manual_slider_var = tk.DoubleVar(value=10.0)
self.manual_slider = ttk.Scale(manual_frame, from_=0, to=100, orient=tk.HORIZONTAL,
variable=self.manual_slider_var, command=self.on_slider_change,
state=tk.DISABLED, length=200)
self.manual_slider.grid(row=0, column=2, padx=5, pady=5, sticky="ew")
self.manual_send_button = ttk.Button(manual_frame, text="Enviar Manual", command=self.send_manual_value, state=tk.DISABLED)
self.manual_send_button.grid(row=0, column=3, padx=5, pady=5, sticky="ew")
manual_frame.columnconfigure(2, weight=1)
# Controls Frame
controls_frame = ttk.LabelFrame(self.simulator_tab, text="Control Simulación")
controls_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
self.start_button = ttk.Button(controls_frame, text="Iniciar", command=self.start_simulation)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(controls_frame, text="Detener", command=self.stop_simulation, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.clear_sim_graph_button = ttk.Button(controls_frame, text="Limpiar Gráfico", command=self.clear_sim_graph)
self.clear_sim_graph_button.pack(side=tk.LEFT, padx=5)
# Display Frame
display_frame = ttk.LabelFrame(self.simulator_tab, text="Valores Actuales")
display_frame.grid(row=1, column=1, padx=10, pady=5, sticky="ew")
ttk.Label(display_frame, text="Brix:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.current_brix_display_var = tk.StringVar(value="---")
self.current_brix_label = ttk.Label(display_frame, textvariable=self.current_brix_display_var,
font=("Courier", 14, "bold"))
self.current_brix_label.grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(display_frame, text="mA:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.current_ma_display_var = tk.StringVar(value="--.-- mA")
self.current_ma_label = ttk.Label(display_frame, textvariable=self.current_ma_display_var,
font=("Courier", 14, "bold"))
self.current_ma_label.grid(row=1, column=1, padx=5, pady=5, sticky="w")
# Graph Frame
sim_graph_frame = ttk.LabelFrame(self.simulator_tab, text="Gráfico Simulador")
sim_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
# Crear figura para simulador
self.sim_fig = Figure(figsize=(8, 3.5), dpi=100)
self.sim_ax1 = self.sim_fig.add_subplot(111)
self.sim_ax2 = self.sim_ax1.twinx()
self.sim_ax1.set_xlabel('Tiempo (s)')
self.sim_ax1.set_ylabel('Brix', color='b')
self.sim_ax2.set_ylabel('mA', color='r')
self.sim_ax1.tick_params(axis='y', labelcolor='b')
self.sim_ax2.tick_params(axis='y', labelcolor='r')
self.sim_ax1.grid(True, alpha=0.3)
self.sim_line_brix, = self.sim_ax1.plot([], [], 'b-', label='Brix', linewidth=2)
self.sim_line_ma, = self.sim_ax2.plot([], [], 'r-', label='mA', linewidth=2)
self.sim_canvas = FigureCanvasTkAgg(self.sim_fig, master=sim_graph_frame)
self.sim_canvas.draw()
self.sim_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Log Frame
sim_log_frame = ttk.LabelFrame(self.simulator_tab, text="Log de Comunicación")
sim_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.sim_log_text = scrolledtext.ScrolledText(sim_log_frame, height=8, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.sim_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# Configurar pesos
self.simulator_tab.columnconfigure(0, weight=1)
self.simulator_tab.columnconfigure(1, weight=1)
self.simulator_tab.rowconfigure(2, weight=1)
self.simulator_tab.rowconfigure(3, weight=1)
def create_trace_tab(self):
"""Crea el contenido del tab Trace"""
# Control Frame
trace_control_frame = ttk.LabelFrame(self.trace_tab, text="Control Trace")
trace_control_frame.grid(row=0, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
self.start_trace_button = ttk.Button(trace_control_frame, text="Iniciar Trace", command=self.start_trace)
self.start_trace_button.pack(side=tk.LEFT, padx=5)
self.stop_trace_button = ttk.Button(trace_control_frame, text="Detener Trace", command=self.stop_trace, state=tk.DISABLED)
self.stop_trace_button.pack(side=tk.LEFT, padx=5)
self.clear_trace_graph_button = ttk.Button(trace_control_frame, text="Limpiar Gráfico", command=self.clear_trace_graph)
self.clear_trace_graph_button.pack(side=tk.LEFT, padx=5)
ttk.Label(trace_control_frame, text="Archivo CSV:").pack(side=tk.LEFT, padx=(20, 5))
self.csv_filename_var = tk.StringVar(value="Sin archivo")
self.csv_filename_label = ttk.Label(trace_control_frame, textvariable=self.csv_filename_var)
self.csv_filename_label.pack(side=tk.LEFT, padx=5)
# Display Frame
trace_display_frame = ttk.LabelFrame(self.trace_tab, text="Último Valor Recibido")
trace_display_frame.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky="ew")
ttk.Label(trace_display_frame, text="Timestamp:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.trace_timestamp_var = tk.StringVar(value="---")
ttk.Label(trace_display_frame, textvariable=self.trace_timestamp_var, font=("Courier", 12)).grid(row=0, column=1, padx=5, pady=5, sticky="w")
ttk.Label(trace_display_frame, text="Valor mA:").grid(row=0, column=2, padx=5, pady=5, sticky="w")
self.trace_ma_var = tk.StringVar(value="---")
ttk.Label(trace_display_frame, textvariable=self.trace_ma_var, font=("Courier", 12, "bold")).grid(row=0, column=3, padx=5, pady=5, sticky="w")
ttk.Label(trace_display_frame, text="Valor Brix:").grid(row=0, column=4, padx=5, pady=5, sticky="w")
self.trace_brix_var = tk.StringVar(value="---")
ttk.Label(trace_display_frame, textvariable=self.trace_brix_var, font=("Courier", 12, "bold")).grid(row=0, column=5, padx=5, pady=5, sticky="w")
# Graph Frame
trace_graph_frame = ttk.LabelFrame(self.trace_tab, text="Gráfico Trace (Brix)")
trace_graph_frame.grid(row=2, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
# Crear figura para trace
self.trace_fig = Figure(figsize=(8, 4), dpi=100)
self.trace_ax = self.trace_fig.add_subplot(111)
self.trace_ax.set_xlabel('Tiempo (s)')
self.trace_ax.set_ylabel('Brix', color='b')
self.trace_ax.tick_params(axis='y', labelcolor='b')
self.trace_ax.grid(True, alpha=0.3)
self.trace_line_brix, = self.trace_ax.plot([], [], 'b-', label='Brix', linewidth=2, marker='o', markersize=4)
self.trace_canvas = FigureCanvasTkAgg(self.trace_fig, master=trace_graph_frame)
self.trace_canvas.draw()
self.trace_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Log Frame
trace_log_frame = ttk.LabelFrame(self.trace_tab, text="Log de Recepción")
trace_log_frame.grid(row=3, column=0, columnspan=2, padx=10, pady=5, sticky="nsew")
self.trace_log_text = scrolledtext.ScrolledText(trace_log_frame, height=10, width=70, wrap=tk.WORD, state=tk.DISABLED)
self.trace_log_text.pack(padx=5, pady=5, fill=tk.BOTH, expand=True)
# Configurar pesos
self.trace_tab.columnconfigure(0, weight=1)
self.trace_tab.columnconfigure(1, weight=1)
self.trace_tab.rowconfigure(2, weight=1)
self.trace_tab.rowconfigure(3, weight=1)
def _log_message(self, message, log_widget=None):
"""Escribe mensaje en el log especificado o en el log activo"""
if log_widget is None:
# Determinar qué log usar basado en la pestaña activa
current_tab = self.notebook.index(self.notebook.select())
log_widget = self.sim_log_text if current_tab == 0 else self.trace_log_text
log_widget.configure(state=tk.NORMAL)
log_widget.insert(tk.END, f"[{datetime.now().strftime('%H:%M:%S')}] {message}\n")
log_widget.see(tk.END)
log_widget.configure(state=tk.DISABLED)
def parse_adam_message(self, data):
"""Parsea un mensaje del protocolo ADAM y retorna el valor en mA"""
try:
# Formato esperado: #AA[valor_mA][checksum]\r
if not data.startswith('#') or not data.endswith('\r'):
return None
# Remover # y \r
data = data[1:-1]
# Los primeros 2 caracteres son la dirección
if len(data) < 9: # 2 addr + 6 valor + 2 checksum
return None
address = data[:2]
value_str = data[2:9] # 6 caracteres para el valor
checksum = data[9:11] # 2 caracteres para checksum
# Verificar checksum
message_part = f"#{address}{value_str}"
calculated_checksum = self.calculate_checksum(message_part)
if checksum != calculated_checksum:
self._log_message(f"Checksum incorrecto: recibido={checksum}, calculado={calculated_checksum}",
self.trace_log_text)
return None
# Convertir valor a float
ma_value = float(value_str)
return {'address': address, 'ma': ma_value}
except Exception as e:
self._log_message(f"Error parseando mensaje: {e}", self.trace_log_text)
return None
def ma_to_brix(self, ma_value):
"""Convierte valor mA a Brix usando el mapeo configurado"""
try:
min_brix = float(self.min_brix_map_var.get())
max_brix = float(self.max_brix_map_var.get())
if ma_value <= 4.0:
return min_brix
elif ma_value >= 20.0:
return max_brix
else:
# Interpolación lineal
percentage = (ma_value - 4.0) / 16.0
return min_brix + percentage * (max_brix - min_brix)
except:
return 0.0
def start_trace(self):
"""Inicia el modo trace para recibir datos"""
if self.tracing:
messagebox.showwarning("Advertencia", "El trace ya está en curso.")
return
# Obtener parámetros de conexión
try:
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
conn_params = {
'port': self.com_port_var.get(),
'baud': int(self.baud_rate_var.get())
}
else:
conn_params = {
'ip': self.ip_address_var.get(),
'port': int(self.port_var.get())
}
except ValueError as e:
messagebox.showerror("Error", f"Parámetros de conexión inválidos: {e}")
return
# Crear archivo CSV
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"maselli_trace_{timestamp}.csv"
try:
self.csv_file = open(csv_filename, 'w', newline='')
self.csv_writer = csv.writer(self.csv_file)
self.csv_writer.writerow(['Timestamp', 'mA', 'Brix', 'Raw_Message'])
self.csv_filename_var.set(csv_filename)
except Exception as e:
messagebox.showerror("Error", f"No se pudo crear archivo CSV: {e}")
return
# Abrir conexión
try:
self.connection = self._open_connection(conn_type, conn_params)
self.connection_type = conn_type
self._log_message(f"Conexión {conn_type} abierta para trace.", self.trace_log_text)
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
if self.csv_file:
self.csv_file.close()
return
self.tracing = True
self.trace_start_time = time.time()
self.start_trace_button.config(state=tk.DISABLED)
self.stop_trace_button.config(state=tk.NORMAL)
self._set_trace_entries_state(tk.DISABLED)
# Iniciar thread de recepción
self.trace_thread = threading.Thread(target=self.run_trace, daemon=True)
self.trace_thread.start()
self._log_message("Trace iniciado.", self.trace_log_text)
def stop_trace(self):
"""Detiene el modo trace"""
if not self.tracing:
return
self.tracing = False
# Esperar a que termine el thread
if self.trace_thread and self.trace_thread.is_alive():
self.trace_thread.join(timeout=2.0)
# Cerrar conexión
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message("Conexión cerrada.", self.trace_log_text)
self.connection = None
# Cerrar archivo CSV
if self.csv_file:
self.csv_file.close()
self.csv_file = None
self.csv_writer = None
self._log_message(f"Archivo CSV guardado: {self.csv_filename_var.get()}", self.trace_log_text)
self.start_trace_button.config(state=tk.NORMAL)
self.stop_trace_button.config(state=tk.DISABLED)
self._set_trace_entries_state(tk.NORMAL)
self._log_message("Trace detenido.", self.trace_log_text)
def run_trace(self):
"""Thread principal para recepción de datos en modo trace"""
buffer = ""
while self.tracing:
try:
# Leer datos según el tipo de conexión
data = None
if self.connection_type == "Serial":
if self.connection.in_waiting > 0:
data = self.connection.read(self.connection.in_waiting).decode('ascii', errors='ignore')
elif self.connection_type == "TCP":
self.connection.settimeout(0.1)
try:
data = self.connection.recv(1024).decode('ascii', errors='ignore')
except socket.timeout:
continue
elif self.connection_type == "UDP":
self.connection.settimeout(0.1)
try:
data, addr = self.connection.recvfrom(1024)
data = data.decode('ascii', errors='ignore')
except socket.timeout:
continue
if data:
buffer += data
# Buscar mensajes completos (terminan con \r)
while '\r' in buffer:
end_idx = buffer.index('\r') + 1
message = buffer[:end_idx]
buffer = buffer[end_idx:]
# Procesar mensaje
self._process_trace_message(message)
except Exception as e:
self._log_message(f"Error en trace: {e}", self.trace_log_text)
if not self.tracing:
break
time.sleep(0.1)
def _process_trace_message(self, message):
"""Procesa un mensaje recibido en modo trace"""
# Log del mensaje raw
display_msg = message.replace('\r', '<CR>').replace('\n', '<LF>')
self._log_message(f"Recibido: {display_msg}", self.trace_log_text)
# Parsear mensaje
parsed = self.parse_adam_message(message)
if parsed:
ma_value = parsed['ma']
brix_value = self.ma_to_brix(ma_value)
timestamp = datetime.now()
# Actualizar display
self.trace_timestamp_var.set(timestamp.strftime("%H:%M:%S.%f")[:-3])
self.trace_ma_var.set(f"{ma_value:.3f} mA")
self.trace_brix_var.set(f"{brix_value:.3f} Brix")
# Guardar en CSV
if self.csv_writer:
self.csv_writer.writerow([
timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
ma_value,
brix_value,
display_msg
])
if self.csv_file:
self.csv_file.flush()
# Agregar al gráfico
current_time = time.time() - self.trace_start_time
self.trace_time_data.append(current_time)
self.trace_brix_data.append(brix_value)
# Actualizar gráfico
self.root.after(0, self.trace_canvas.draw_idle)
def _set_trace_entries_state(self, state):
"""Habilita/deshabilita controles durante el trace"""
self.connection_type_combo.config(state=state)
self.com_port_entry.config(state=state)
self.baud_rate_entry.config(state=state)
self.ip_address_entry.config(state=state)
self.port_entry.config(state=state)
self.min_brix_map_entry.config(state=state)
self.max_brix_map_entry.config(state=state)
def update_sim_graph(self, frame):
"""Actualiza el gráfico del simulador"""
if len(self.sim_time_data) > 0:
self.sim_line_brix.set_data(list(self.sim_time_data), list(self.sim_brix_data))
self.sim_line_ma.set_data(list(self.sim_time_data), list(self.sim_ma_data))
if len(self.sim_time_data) > 1:
self.sim_ax1.set_xlim(min(self.sim_time_data), max(self.sim_time_data))
if len(self.sim_brix_data) > 0:
brix_min = min(self.sim_brix_data) - 1
brix_max = max(self.sim_brix_data) + 1
self.sim_ax1.set_ylim(brix_min, brix_max)
if len(self.sim_ma_data) > 0:
ma_min = min(self.sim_ma_data) - 0.5
ma_max = max(self.sim_ma_data) + 0.5
self.sim_ax2.set_ylim(ma_min, ma_max)
return self.sim_line_brix, self.sim_line_ma
def update_trace_graph(self, frame):
"""Actualiza el gráfico del trace"""
if len(self.trace_time_data) > 0:
self.trace_line_brix.set_data(list(self.trace_time_data), list(self.trace_brix_data))
if len(self.trace_time_data) > 1:
self.trace_ax.set_xlim(min(self.trace_time_data), max(self.trace_time_data))
if len(self.trace_brix_data) > 0:
brix_min = min(self.trace_brix_data) - 1
brix_max = max(self.trace_brix_data) + 1
self.trace_ax.set_ylim(brix_min, brix_max)
return self.trace_line_brix,
def clear_sim_graph(self):
"""Limpia el gráfico del simulador"""
self.sim_time_data.clear()
self.sim_brix_data.clear()
self.sim_ma_data.clear()
self.sim_start_time = time.time()
self.sim_canvas.draw_idle()
self._log_message("Gráfico del simulador limpiado.", self.sim_log_text)
def clear_trace_graph(self):
"""Limpia el gráfico del trace"""
self.trace_time_data.clear()
self.trace_brix_data.clear()
self.trace_start_time = time.time()
self.trace_canvas.draw_idle()
self._log_message("Gráfico del trace limpiado.", self.trace_log_text)
def save_config(self):
"""Guarda la configuración actual"""
config = {
'connection_type': self.connection_type_var.get(),
'com_port': self.com_port_var.get(),
'baud_rate': self.baud_rate_var.get(),
'ip_address': self.ip_address_var.get(),
'port': self.port_var.get(),
'adam_address': self.adam_address_var.get(),
'function_type': self.function_type_var.get(),
'min_brix_map': self.min_brix_map_var.get(),
'max_brix_map': self.max_brix_map_var.get(),
'period': self.period_var.get(),
'manual_brix': self.manual_brix_var.get()
}
try:
with open(self.config_file, 'w') as f:
json.dump(config, f, indent=4)
self._log_message("Configuración guardada exitosamente.")
messagebox.showinfo("Éxito", "Configuración guardada correctamente.")
except Exception as e:
self._log_message(f"Error al guardar configuración: {e}")
messagebox.showerror("Error", f"No se pudo guardar la configuración: {e}")
def load_config(self, silent=False):
"""Carga la configuración desde archivo"""
if not os.path.exists(self.config_file):
if not silent:
messagebox.showinfo("Información", "No se encontró archivo de configuración.")
return
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
self.connection_type_var.set(config.get('connection_type', 'Serial'))
self.com_port_var.set(config.get('com_port', 'COM3'))
self.baud_rate_var.set(config.get('baud_rate', '115200'))
self.ip_address_var.set(config.get('ip_address', '192.168.1.100'))
self.port_var.set(config.get('port', '502'))
self.adam_address_var.set(config.get('adam_address', '01'))
self.function_type_var.set(config.get('function_type', 'Lineal'))
self.min_brix_map_var.set(config.get('min_brix_map', '0'))
self.max_brix_map_var.set(config.get('max_brix_map', '80'))
self.period_var.set(config.get('period', '1.0'))
self.manual_brix_var.set(config.get('manual_brix', '10.0'))
try:
self.manual_slider_var.set(float(config.get('manual_brix', '10.0')))
except:
pass
self.on_connection_type_change()
if not silent:
self._log_message("Configuración cargada exitosamente.")
messagebox.showinfo("Éxito", "Configuración cargada correctamente.")
except Exception as e:
if not silent:
self._log_message(f"Error al cargar configuración: {e}")
messagebox.showerror("Error", f"No se pudo cargar la configuración: {e}")
def on_connection_type_change(self, event=None):
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
self.ethernet_frame.grid_remove()
self.serial_frame.grid()
else:
self.serial_frame.grid_remove()
self.ethernet_frame.grid()
def on_function_type_change(self, event=None):
func_type = self.function_type_var.get()
if func_type == "Manual":
if self.simulating:
self.stop_simulation()
self.manual_brix_entry.config(state=tk.NORMAL)
self.manual_send_button.config(state=tk.NORMAL)
self.manual_slider.config(state=tk.NORMAL)
self.period_entry.config(state=tk.DISABLED)
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.DISABLED)
else:
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
self.manual_slider.config(state=tk.DISABLED)
self.period_entry.config(state=tk.NORMAL)
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
else:
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
def on_slider_change(self, value):
self.manual_brix_var.set(f"{float(value):.1f}")
def update_slider_from_entry(self):
try:
value = float(self.manual_brix_var.get())
value = max(0, min(100, value))
self.manual_slider_var.set(value)
self.manual_brix_var.set(f"{value:.1f}")
except ValueError:
pass
def calculate_checksum(self, message_part):
s = sum(ord(c) for c in message_part)
checksum_byte = s % 256
return f"{checksum_byte:02X}"
def scale_to_mA(self, brix_value, min_brix_map, max_brix_map):
if max_brix_map == min_brix_map:
return 4.0
percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map)
percentage = max(0.0, min(1.0, percentage))
mA_value = 4.0 + percentage * 16.0
return mA_value
def format_mA_value(self, mA_val):
return f"{mA_val:06.3f}"
def _get_common_params(self):
try:
adam_address = self.adam_address_var.get()
if len(adam_address) != 2:
messagebox.showerror("Error", "La dirección ADAM debe tener 2 caracteres.")
return None
min_brix_map = float(self.min_brix_map_var.get())
max_brix_map = float(self.max_brix_map_var.get())
if min_brix_map > max_brix_map:
messagebox.showerror("Error", "Valor Mínimo (Brix) no puede ser mayor que Valor Máximo (Brix).")
return None
conn_type = self.connection_type_var.get()
if conn_type == "Serial":
com_port = self.com_port_var.get()
baud_rate = int(self.baud_rate_var.get())
return conn_type, {'port': com_port, 'baud': baud_rate}, adam_address, min_brix_map, max_brix_map
else:
ip_address = self.ip_address_var.get()
port = int(self.port_var.get())
return conn_type, {'ip': ip_address, 'port': port}, adam_address, min_brix_map, max_brix_map
except ValueError as e:
messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración: {e}")
return None
def _open_connection(self, conn_type, conn_params):
try:
if conn_type == "Serial":
return serial.Serial(conn_params['port'], conn_params['baud'], timeout=1)
elif conn_type == "TCP":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5.0)
sock.connect((conn_params['ip'], conn_params['port']))
return sock
elif conn_type == "UDP":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1.0)
sock.dest_address = (conn_params['ip'], conn_params['port'])
return sock
except Exception as e:
raise Exception(f"Error al abrir conexión {conn_type}: {e}")
def _close_connection(self, connection, conn_type):
try:
if conn_type == "Serial":
if connection and connection.is_open:
connection.close()
elif conn_type in ["TCP", "UDP"]:
if connection:
connection.close()
except Exception as e:
self._log_message(f"Error al cerrar conexión: {e}")
def _send_data(self, connection, conn_type, data):
try:
if conn_type == "Serial":
connection.write(data.encode('ascii'))
elif conn_type == "TCP":
connection.send(data.encode('ascii'))
elif conn_type == "UDP":
connection.sendto(data.encode('ascii'), connection.dest_address)
except Exception as e:
raise Exception(f"Error al enviar datos: {e}")
def add_sim_data_point(self, brix_value, ma_value):
current_time = time.time() - self.sim_start_time
self.sim_time_data.append(current_time)
self.sim_brix_data.append(brix_value)
self.sim_ma_data.append(ma_value)
self.sim_canvas.draw_idle()
def send_manual_value(self):
common_params = self._get_common_params()
if not common_params:
return
conn_type, conn_params, adam_address, min_brix_map, max_brix_map = common_params
try:
manual_brix = float(self.manual_brix_var.get())
except ValueError:
messagebox.showerror("Error de Entrada", "Valor Brix Manual inválido.")
return
mA_val = self.scale_to_mA(manual_brix, min_brix_map, max_brix_map)
mA_str = self.format_mA_value(mA_val)
self.current_brix_display_var.set(f"{manual_brix:.3f} Brix")
self.current_ma_display_var.set(f"{mA_str} mA")
self.add_sim_data_point(manual_brix, mA_val)
message_part = f"#{adam_address}{mA_str}"
checksum = self.calculate_checksum(message_part)
full_string_to_send = f"{message_part}{checksum}\r"
log_display_string = full_string_to_send.replace('\r', '<CR>')
temp_connection = None
try:
temp_connection = self._open_connection(conn_type, conn_params)
self._log_message(f"Conexión {conn_type} abierta temporalmente para envío manual.", self.sim_log_text)
self._log_message(f"Enviando Manual: {log_display_string}", self.sim_log_text)
self._send_data(temp_connection, conn_type, full_string_to_send)
except Exception as e:
self._log_message(f"Error al enviar manualmente: {e}", self.sim_log_text)
messagebox.showerror("Error de Conexión", str(e))
finally:
if temp_connection:
self._close_connection(temp_connection, conn_type)
self._log_message(f"Conexión {conn_type} cerrada tras envío manual.", self.sim_log_text)
def start_simulation(self):
if self.simulating:
messagebox.showwarning("Advertencia", "La simulación ya está en curso.")
return
common_params = self._get_common_params()
if not common_params:
return
self.connection_type, self.conn_params, self.adam_address, self.min_brix_map, self.max_brix_map = common_params
try:
self.simulation_period = float(self.period_var.get())
if self.simulation_period <= 0:
messagebox.showerror("Error", "El periodo debe ser un número positivo.")
return
self.function_type = self.function_type_var.get()
if self.function_type == "Manual":
messagebox.showinfo("Info", "Seleccione modo Lineal o Sinusoidal para simulación continua.")
return
except ValueError as e:
messagebox.showerror("Error de Entrada", f"Valor inválido en la configuración de simulación: {e}")
return
try:
self.connection = self._open_connection(self.connection_type, self.conn_params)
self._log_message(f"Conexión {self.connection_type} abierta para simulación continua.", self.sim_log_text)
except Exception as e:
messagebox.showerror("Error de Conexión", str(e))
self.connection = None
return
self.simulating = True
self.simulation_step = 0
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self._set_sim_config_entries_state(tk.DISABLED)
self.simulation_thread = threading.Thread(target=self.run_simulation, daemon=True)
self.simulation_thread.start()
self._log_message("Simulación continua iniciada.", self.sim_log_text)
def _set_sim_config_entries_state(self, state):
self.connection_type_combo.config(state=state)
self.com_port_entry.config(state=state)
self.baud_rate_entry.config(state=state)
self.ip_address_entry.config(state=state)
self.port_entry.config(state=state)
self.adam_address_entry.config(state=state)
self.function_type_combo.config(state=state)
self.min_brix_map_entry.config(state=state)
self.max_brix_map_entry.config(state=state)
self.save_config_button.config(state=state)
self.load_config_button.config(state=state)
current_func_type = self.function_type_var.get()
if current_func_type != "Manual":
self.period_entry.config(state=state)
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
self.manual_slider.config(state=tk.DISABLED)
else:
self.period_entry.config(state=tk.DISABLED)
if state == tk.DISABLED:
self.manual_brix_entry.config(state=tk.DISABLED)
self.manual_send_button.config(state=tk.DISABLED)
self.manual_slider.config(state=tk.DISABLED)
else:
self.manual_brix_entry.config(state=tk.NORMAL)
self.manual_send_button.config(state=tk.NORMAL)
self.manual_slider.config(state=tk.NORMAL)
def stop_simulation(self):
if not self.simulating:
if self.function_type_var.get() != "Manual":
self._log_message("Simulación continua ya estaba detenida.", self.sim_log_text)
if self.function_type_var.get() != "Manual":
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_sim_config_entries_state(tk.NORMAL)
return
self.simulating = False
if self.simulation_thread and self.simulation_thread.is_alive():
try:
self.simulation_thread.join(timeout=max(0.1, self.simulation_period * 1.5 if hasattr(self, 'simulation_period') else 2.0))
except Exception as e:
self._log_message(f"Error al esperar el hilo de simulación: {e}", self.sim_log_text)
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message(f"Conexión {self.connection_type} cerrada (simulación continua).", self.sim_log_text)
self.connection = None
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_sim_config_entries_state(tk.NORMAL)
self.on_function_type_change()
self._log_message("Simulación continua detenida.", self.sim_log_text)
self.current_brix_display_var.set("---")
self.current_ma_display_var.set("--.-- mA")
def run_simulation(self):
steps_for_full_cycle = 100
while self.simulating:
current_brix_val = 0.0
if self.function_type == "Lineal":
progress = (self.simulation_step % (2 * steps_for_full_cycle)) / steps_for_full_cycle
if progress > 1.0:
progress = 2.0 - progress
current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * progress
elif self.function_type == "Sinusoidal":
phase = (self.simulation_step / steps_for_full_cycle) * 2 * math.pi
sin_val = (math.sin(phase) + 1) / 2
current_brix_val = self.min_brix_map + (self.max_brix_map - self.min_brix_map) * sin_val
mA_val = self.scale_to_mA(current_brix_val, self.min_brix_map, self.max_brix_map)
mA_str = self.format_mA_value(mA_val)
self.current_brix_display_var.set(f"{current_brix_val:.3f} Brix")
self.current_ma_display_var.set(f"{mA_str} mA")
self.root.after(0, lambda b=current_brix_val, m=mA_val: self.add_sim_data_point(b, m))
message_part = f"#{self.adam_address}{mA_str}"
checksum = self.calculate_checksum(message_part)
full_string_to_send = f"{message_part}{checksum}\r"
log_display_string = full_string_to_send.replace('\r', '<CR>')
self._log_message(f"Enviando Sim: {log_display_string}", self.sim_log_text)
if self.connection:
try:
self._send_data(self.connection, self.connection_type, full_string_to_send)
except Exception as e:
self._log_message(f"Error al escribir en conexión (sim): {e}", self.sim_log_text)
self.root.after(0, self.stop_simulation_from_thread_error)
break
self.simulation_step += 1
time.sleep(self.simulation_period)
if not self.simulating and self.root.winfo_exists():
self.root.after(0, self.ensure_gui_stopped_state_sim)
def stop_simulation_from_thread_error(self):
if self.simulating:
messagebox.showerror("Error de Simulación", "Error de conexión durante la simulación. Simulación detenida.")
self.stop_simulation()
def ensure_gui_stopped_state_sim(self):
if not self.simulating:
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self._set_sim_config_entries_state(tk.NORMAL)
if self.connection:
self._close_connection(self.connection, self.connection_type)
self._log_message(f"Conexión {self.connection_type} cerrada (auto, sim_end).", self.sim_log_text)
self.connection = None
self._log_message("Simulación continua terminada (hilo finalizado).", self.sim_log_text)
def on_closing(self):
if self.simulating:
self.stop_simulation()
if self.tracing:
self.stop_trace()
elif self.connection:
self._close_connection(self.connection, self.connection_type)
self.root.destroy()
if __name__ == "__main__":
main_root = tk.Tk()
app = MaselliSimulatorApp(main_root)
main_root.mainloop()