""" Manejador del protocolo ADAM/Maselli Contiene las funciones para formatear mensajes, calcular checksums y parsear respuestas """ class ProtocolHandler: @staticmethod def calculate_checksum(message_part): """Calcula el checksum de un mensaje ADAM""" s = sum(ord(c) for c in message_part) checksum_byte = s % 256 return f"{checksum_byte:02X}" @staticmethod def format_ma_value(ma_val): """Formatea un valor mA al formato ADAM: XX.XXX (6 caracteres)""" return f"{ma_val:06.3f}" @staticmethod def scale_to_ma(brix_value, min_brix_map, max_brix_map): """Convierte valor Brix a mA usando el mapeo configurado""" if max_brix_map == min_brix_map: return 4.0 percentage = (brix_value - min_brix_map) / (max_brix_map - min_brix_map) percentage = max(0.0, min(1.0, percentage)) ma_value = 4.0 + percentage * 16.0 return ma_value @staticmethod def ma_to_brix(ma_value, min_brix_map, max_brix_map): """Convierte valor mA a Brix usando el mapeo configurado""" try: if ma_value <= 4.0: return min_brix_map elif ma_value >= 20.0: return max_brix_map else: # Interpolación lineal percentage = (ma_value - 4.0) / 16.0 return min_brix_map + percentage * (max_brix_map - min_brix_map) except: return 0.0 @staticmethod def create_adam_message(adam_address, brix_value, min_brix_map, max_brix_map): """Crea un mensaje completo ADAM (como bytes) a partir de un valor Brix""" ma_val = ProtocolHandler.scale_to_ma(brix_value, min_brix_map, max_brix_map) ma_str = ProtocolHandler.format_ma_value(ma_val) message_part = f"#{adam_address}{ma_str}" checksum = ProtocolHandler.calculate_checksum(message_part) full_message_str = f"{message_part}{checksum}\r" return full_message_str.encode('ascii'), ma_val @staticmethod def create_adam_message_from_ma(adam_address, ma_value): """ Crea un mensaje completo ADAM (como bytes) directamente desde un valor mA. Permite enviar valores mA fuera del rango 4-20mA si es necesario. """ ma_str = ProtocolHandler.format_ma_value(ma_value) # Formato XX.XXX message_part = f"#{adam_address}{ma_str}" checksum = ProtocolHandler.calculate_checksum(message_part) full_message_str = f"{message_part}{checksum}\r" return full_message_str.encode('ascii'), ma_value @staticmethod def create_adam_message_with_bad_checksum(adam_address, ma_value): """ Crea un mensaje completo ADAM (como bytes) directamente desde un valor mA, pero con un checksum deliberadamente incorrecto. """ ma_str = ProtocolHandler.format_ma_value(ma_value) # Formato XX.XXX message_part = f"#{adam_address}{ma_str}" correct_checksum = ProtocolHandler.calculate_checksum(message_part) # Generar un checksum incorrecto. bad_checksum_str = "XX" # Valor por defecto si algo falla try: correct_sum_val = int(correct_checksum, 16) # Sumar 1 (o cualquier otro valor) y tomar módulo 256 para que siga siendo un byte. # Asegurarse de que sea diferente al original. bad_sum_val = (correct_sum_val + 1) % 256 if f"{bad_sum_val:02X}" == correct_checksum: # En caso de que correct_checksum fuera FF bad_sum_val = (correct_sum_val + 2) % 256 bad_checksum_str = f"{bad_sum_val:02X}" except ValueError: # Si correct_checksum no es un hexadecimal válido (no debería pasar) pass # bad_checksum_str se queda como "XX" full_message_str = f"{message_part}{bad_checksum_str}\r" return full_message_str.encode('ascii'), ma_value @staticmethod def ma_to_voltage(ma_value): """Convierte valor mA a Voltaje (0-10V). 0mA -> 0V, 20mA -> 10V.""" # Escala lineal: Voltage = (ma_value / 20mA) * 10V voltage = (ma_value / 20.0) * 10.0 return max(0.0, min(10.0, voltage)) # Asegurar que esté en el rango 0-10V @staticmethod def voltage_to_ma(voltage_value): """Convierte valor Voltaje (0-10V) a mA. 0V -> 0mA, 10V -> 20mA.""" # Escala lineal: mA = (voltage_value / 10V) * 20mA ma = (voltage_value / 10.0) * 20.0 return max(0.0, min(20.0, ma)) # Asegurar que esté en el rango 0-20mA @staticmethod def format_voltage_display(voltage_value): """Formatea un valor de Voltaje para mostrar.""" return f"{voltage_value:.2f} V" @staticmethod def parse_adam_message(data): """ Parsea un mensaje del protocolo ADAM y retorna el valor en mA Formato esperado: #AA[valor_mA][checksum]\r Donde: - # : Carácter inicial (opcional en algunas respuestas) - AA : Dirección del dispositivo (2 caracteres) - valor_mA : Valor en mA (6 caracteres, formato XX.XXX) - checksum : Suma de verificación (2 caracteres hex) - \r : Carácter de fin (opcional) Retorna: dict con 'address' y 'ma', o None si no es válido """ try: # Formato esperado: #AA[valor_mA][checksum]\r # Pero también manejar respuestas sin # inicial o sin \r final data = data.strip() # Si empieza con #, es un mensaje estándar if data.startswith('#'): data = data[1:] # Remover # # Si termina con \r, removerlo if data.endswith('\r'): data = data[:-1] # Verificar longitud mínima if len(data) < 8: # 2 addr + 6 valor mínimo return None address = data[:2] value_str = data[2:8] # 6 caracteres para el valor (XX.XXX) # Verificar si hay checksum checksum_valid = True if len(data) >= 10: checksum = data[8:10] # 2 caracteres para checksum # Verificar checksum message_part = f"#{address}{value_str}" calculated_checksum = ProtocolHandler.calculate_checksum(message_part) if checksum != calculated_checksum: checksum_valid = False # Convertir valor a float try: ma_value = float(value_str) return { 'address': address, 'ma': ma_value, 'checksum_valid': checksum_valid } except ValueError: return None except Exception: return None @staticmethod def format_for_display(message, hex_non_printable=False): """Formatea un mensaje (bytes o str) para mostrar en el log""" if isinstance(message, bytes): if hex_non_printable: parts = [] for byte_val in message: # Caracteres ASCII imprimibles (32 a 126) se dejan como están. # Otros se convierten a \xHH. if 32 <= byte_val <= 126: parts.append(chr(byte_val)) else: parts.append(f'\\x{byte_val:02x}') message_str = "".join(parts) else: message_str = message.decode('ascii', errors='replace') # 'replace' muestra para no decodificables else: # Asumir que es str message_str = str(message) # Si no es formato hexadecimal, reemplazar caracteres de control comunes por representaciones legibles. if not hex_non_printable: message_str = message_str.replace('\r', '').replace('\n', '').replace('\t', '') return message_str