feat: Implement SHA256 hash detection for XML changes to enhance file processing accuracy and efficiency. Update relevant scripts to calculate and store file hashes, improving change detection beyond traditional methods.

This commit is contained in:
Miguel 2025-08-24 21:35:21 +02:00
parent c0ef4cb12a
commit 24a0ece0b4
8 changed files with 627 additions and 10 deletions

View File

@ -0,0 +1,135 @@
# Detección de Cambios por Hash SHA256
## ¿Qué es y por qué es importante?
El sistema ahora utiliza **hash SHA256** para detectar cambios en archivos XML, complementando el método tradicional basado en tiempo de modificación y tamaño de archivo.
## Problema resuelto
### Antes (método tradicional)
- **Solo verificaba**: tiempo de modificación + tamaño de archivo
- **Problema**: Si un archivo XML cambia pero mantiene el mismo tamaño y tiempo, el sistema NO regeneraba los archivos JSON/SCL
- **Resultado**: Cambios en el XML no se reflejaban en el código SCL generado
### Ahora (método con hash)
- **Verifica**: hash SHA256 del contenido completo del archivo
- **Ventaja**: Detecta CUALQUIER cambio en el contenido, sin importar tamaño o tiempo
- **Resultado**: Regeneración precisa solo cuando el contenido realmente cambió
## Casos donde el hash es crucial
### Caso 1: Cambios sutiles
```xml
<!-- Antes -->
<Name>MotorControl_01</Name>
<!-- Después -->
<Name>MotorControl_02</Name>
```
- Mismo tamaño de archivo
- Posiblemente mismo tiempo (si se restaura)
- **Método tradicional**: NO detecta cambio ❌
- **Método hash**: SÍ detecta cambio ✅
### Caso 2: Reemplazo de contenido
```xml
<!-- Antes -->
<Comment>Test AAAAAA</Comment>
<!-- Después -->
<Comment>Test BBBBBB</Comment>
```
- Exactamente el mismo tamaño
- **Método tradicional**: NO detecta cambio ❌
- **Método hash**: SÍ detecta cambio ✅
## Implementación
### Archivos modificados
1. **`x0_main.py`**:
- Añadida función `calculate_file_hash()`
- Modificada función `check_skip_status()` para usar hash como prioridad
- Logging mejorado para mostrar qué método de detección se usa
2. **`x1_to_json.py`**:
- Añadida función `calculate_file_hash()`
- Modificada función `convert_xml_to_json()` para calcular y guardar hash
- Nuevo campo `source_xml_hash` en archivos JSON
3. **`x2_process.py`**:
- Modificado para preservar el campo `source_xml_hash` en JSONs procesados
### Nuevo campo en JSON
Los archivos JSON ahora incluyen:
```json
{
"block_name": "FC Pack Motor 71",
"source_xml_mod_time": 1755944794.1680913,
"source_xml_size": 32969,
"source_xml_hash": "a7f5d2e8c4b6a9f3..." // <-- NUEVO
}
```
## Comportamiento del sistema
### Prioridad de verificación
1. **Primera prioridad**: Comparación por hash SHA256
- Si el hash coincide → saltar procesamiento
- Si el hash difiere → regenerar archivos
2. **Fallback**: Método tradicional (tiempo + tamaño)
- Se usa si no hay hash almacenado en el JSON
- Mantiene compatibilidad con archivos JSON existentes
### Logging mejorado
El sistema ahora muestra claramente qué método usa:
```
✓ Hash coincide para FC_Motor_71.xml - Saltando x1/x2
✗ Hash diferente para FC_Motor_72.xml - Regenerando
✓ Tiempo/tamaño coinciden para FC_Motor_73.xml - Saltando x1/x2 (método legacy)
```
## Beneficios
### Para el usuario
- **Precisión**: No se pierden cambios por limitaciones del método tradicional
- **Eficiencia**: No regenera archivos que realmente no cambiaron
- **Confiabilidad**: Detecta modificaciones sutiles pero importantes
### Para el desarrollo
- **Compatibilidad**: Los archivos JSON existentes siguen funcionando
- **Transparencia**: Logs claros sobre qué se procesa y por qué
- **Robustez**: Fallback automático al método tradicional si es necesario
## Casos de uso típicos
### Exportación desde TIA Portal
Cuando exportas un proyecto de TIA Portal varias veces:
1. **Primera exportación**: Se generan todos los JSON/SCL
2. **Segunda exportación** (sin cambios): Se saltan todos los archivos (hash coincide)
3. **Tercera exportación** (con cambios menores): Solo se regeneran los archivos modificados
### Modificaciones manuales
Si modificas manualmente un XML:
- El hash cambia instantáneamente
- El sistema regenera automáticamente los archivos derivados
- No importa si el tamaño o tiempo parecen iguales
## Testing
Se incluyen scripts de prueba que demuestran la funcionalidad:
- `test_hash_detection.py`: Demostración básica
- `test_hash_edge_cases.py`: Casos donde el hash es superior al método tradicional
Para ejecutar:
```bash
python test_hash_detection.py
python test_hash_edge_cases.py
```
## Migración automática
Los archivos JSON existentes seguirán funcionando:
- Sin campo `source_xml_hash`: usa método tradicional
- Con campo `source_xml_hash`: usa método de hash (más preciso)
- La próxima regeneración añadirá automáticamente el hash

View File

@ -70,5 +70,17 @@
"short_description": "Sin descripción corta.", "short_description": "Sin descripción corta.",
"long_description": "", "long_description": "",
"hidden": false "hidden": false
},
"test_hash_detection.py": {
"display_name": "test_hash_detection",
"short_description": "Sin descripción corta.",
"long_description": "",
"hidden": false
},
"test_hash_edge_cases.py": {
"display_name": "test_hash_edge_cases",
"short_description": "Sin descripción corta.",
"long_description": "",
"hidden": false
} }
} }

View File

@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""
Test script para verificar la detección de cambios basada en hash.
Este script demuestra cómo el sistema detecta cambios en archivos XML
utilizando hash SHA256 en lugar de solo tiempo/tamaño de archivo.
"""
import os
import json
import hashlib
import tempfile
import shutil
from datetime import datetime
def calculate_file_hash(filepath):
"""Calcula el hash SHA256 de un archivo."""
try:
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except Exception as e:
print(f"Error calculando hash de {filepath}: {e}")
return None
def create_test_xml(content, filepath):
"""Crea un archivo XML de prueba con el contenido especificado."""
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
def create_test_json(xml_filepath, json_filepath):
"""Crea un archivo JSON de prueba con metadatos del XML."""
xml_hash = calculate_file_hash(xml_filepath)
xml_mtime = os.path.getmtime(xml_filepath)
xml_size = os.path.getsize(xml_filepath)
json_data = {
"block_name": "Test_Block",
"block_type": "FC",
"language": "LAD",
"source_xml_mod_time": xml_mtime,
"source_xml_size": xml_size,
"source_xml_hash": xml_hash,
"interface": {},
"networks": [],
}
with open(json_filepath, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=4)
return json_data
def test_hash_detection():
"""Prueba la detección de cambios basada en hash."""
print("=== Test de Detección de Cambios por Hash ===\n")
# Crear directorio temporal
with tempfile.TemporaryDirectory() as temp_dir:
xml_file = os.path.join(temp_dir, "test_block.xml")
json_file = os.path.join(temp_dir, "test_block_processed.json")
# Paso 1: Crear XML original
original_content = """<?xml version="1.0" encoding="UTF-8"?>
<Document>
<SW.Blocks.FC ID="A">
<AttributeList>
<Name>Test_Block</Name>
<Number>100</Number>
<ProgrammingLanguage>LAD</ProgrammingLanguage>
</AttributeList>
<ObjectList>
<NetworkSource />
</ObjectList>
</SW.Blocks.FC>
</Document>"""
create_test_xml(original_content, xml_file)
original_hash = calculate_file_hash(xml_file)
print(f"1. XML original creado")
print(f" Hash: {original_hash[:16]}...")
print(f" Tamaño: {os.path.getsize(xml_file)} bytes")
# Paso 2: Crear JSON con metadatos
json_data = create_test_json(xml_file, json_file)
print(f"\n2. JSON con metadatos creado")
print(f" Hash almacenado: {json_data['source_xml_hash'][:16]}...")
# Paso 3: Simular verificación - archivo sin cambios
current_hash = calculate_file_hash(xml_file)
hash_match = json_data["source_xml_hash"] == current_hash
print(f"\n3. Verificación sin cambios:")
print(f" Hash actual: {current_hash[:16]}...")
print(f" ¿Hash coincide?: {hash_match}")
print(f"{'SALTAR procesamiento' if hash_match else 'PROCESAR archivo'}")
# Paso 4: Modificar archivo manteniendo mismo tamaño y tiempo
print(f"\n4. Modificando XML (cambio sutil)...")
modified_content = original_content.replace(
"Test_Block", "Test_Blck"
) # Cambio sutil
# Guardar tiempo original
original_mtime = os.path.getmtime(xml_file)
create_test_xml(modified_content, xml_file)
# Restaurar tiempo original (simular que el timestamp no cambió)
os.utime(xml_file, (original_mtime, original_mtime))
new_size = os.path.getsize(xml_file)
new_mtime = os.path.getmtime(xml_file)
new_hash = calculate_file_hash(xml_file)
print(f" Nuevo hash: {new_hash[:16]}...")
print(f" Nuevo tamaño: {new_size} bytes")
print(
f" Tiempo modificación: {abs(new_mtime - json_data['source_xml_mod_time']) < 0.001}"
)
# Paso 5: Verificar detección por diferentes métodos
print(f"\n5. Comparación de métodos de detección:")
# Método tradicional (tiempo + tamaño)
time_match = abs(new_mtime - json_data["source_xml_mod_time"]) < 0.001
size_match = new_size == json_data["source_xml_size"]
traditional_would_skip = time_match and size_match
# Método por hash
hash_would_skip = new_hash == json_data["source_xml_hash"]
print(f" Método tradicional (tiempo+tamaño):")
print(f" ¿Tiempo coincide?: {time_match}")
print(f" ¿Tamaño coincide?: {size_match}")
print(f"{'SALTAR' if traditional_would_skip else 'PROCESAR'}")
print(f" Método por hash:")
print(f" ¿Hash coincide?: {hash_would_skip}")
print(f"{'SALTAR' if hash_would_skip else 'PROCESAR'}")
print(f"\n6. Resultado:")
if traditional_would_skip and not hash_would_skip:
print(f" ✅ HASH DETECTÓ CAMBIO que el método tradicional PERDIÓ")
print(f" 🔄 El archivo será regenerado correctamente")
elif not traditional_would_skip and not hash_would_skip:
print(f" ✅ Ambos métodos detectaron el cambio")
elif traditional_would_skip and hash_would_skip:
print(f" ✅ Ambos métodos indican que no hay cambios")
else:
print(f" ⚠️ Situación inesperada")
# Paso 6: Mostrar contenidos para verificación
print(f"\n7. Verificación de contenido:")
print(f" Contenido original tenía: 'Test_Block'")
print(f" Contenido nuevo tiene: 'Test_Blck'")
with open(xml_file, "r", encoding="utf-8") as f:
content = f.read()
has_original = "Test_Block" in content
has_modified = "Test_Blck" in content
print(f" ¿Contiene 'Test_Block'?: {has_original}")
print(f" ¿Contiene 'Test_Blck'?: {has_modified}")
if __name__ == "__main__":
test_hash_detection()

View File

@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Test específico para demostrar casos donde el hash detecta cambios
que el método tradicional (tiempo + tamaño) podría perder.
"""
import os
import json
import hashlib
import tempfile
import time
def calculate_file_hash(filepath):
"""Calcula el hash SHA256 de un archivo."""
try:
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except Exception as e:
print(f"Error calculando hash de {filepath}: {e}")
return None
def test_subtle_changes():
"""Demuestra detección de cambios sutiles que mantienen el mismo tamaño."""
print("=== Test: Cambios Sutiles con Mismo Tamaño ===\n")
with tempfile.TemporaryDirectory() as temp_dir:
xml_file = os.path.join(temp_dir, "test.xml")
# Crear contenido original de exactamente el mismo tamaño
original = """<?xml version="1.0"?>
<Block>
<Name>MotorControl_01</Name>
<Status>Active</Status>
</Block>"""
# Crear contenido modificado del mismo tamaño (cambiar solo algunos caracteres)
modified = """<?xml version="1.0"?>
<Block>
<Name>MotorControl_02</Name>
<Status>Active</Status>
</Block>"""
# Verificar que tienen el mismo tamaño
assert len(original) == len(
modified
), "Los contenidos deben tener el mismo tamaño"
# Paso 1: Crear archivo original
with open(xml_file, "w", encoding="utf-8") as f:
f.write(original)
original_hash = calculate_file_hash(xml_file)
original_size = os.path.getsize(xml_file)
original_mtime = os.path.getmtime(xml_file)
print(f"1. Archivo original:")
print(f" Contenido: MotorControl_01")
print(f" Hash: {original_hash[:16]}...")
print(f" Tamaño: {original_size} bytes")
# Simular JSON con metadatos
json_data = {
"source_xml_hash": original_hash,
"source_xml_size": original_size,
"source_xml_mod_time": original_mtime,
}
# Paso 2: Esperar un poco y modificar archivo manteniendo tamaño
time.sleep(0.1) # Pequeña pausa
with open(xml_file, "w", encoding="utf-8") as f:
f.write(modified)
# Restaurar tiempo de modificación original
os.utime(xml_file, (original_mtime, original_mtime))
new_hash = calculate_file_hash(xml_file)
new_size = os.path.getsize(xml_file)
new_mtime = os.path.getmtime(xml_file)
print(f"\n2. Archivo modificado:")
print(f" Contenido: MotorControl_02")
print(f" Hash: {new_hash[:16]}...")
print(f" Tamaño: {new_size} bytes")
print(
f" Tiempo: {'Igual' if abs(new_mtime - original_mtime) < 0.001 else 'Diferente'}"
)
# Paso 3: Comparar métodos de detección
print(f"\n3. Detección de cambios:")
# Método tradicional
time_match = abs(new_mtime - json_data["source_xml_mod_time"]) < 0.001
size_match = new_size == json_data["source_xml_size"]
traditional_detects_change = not (time_match and size_match)
# Método por hash
hash_detects_change = new_hash != json_data["source_xml_hash"]
print(f" Método tradicional:")
print(f" Tiempo coincide: {time_match}")
print(f" Tamaño coincide: {size_match}")
print(f" Detecta cambio: {traditional_detects_change}")
print(f" Método por hash:")
print(f" Hash coincide: {new_hash == json_data['source_xml_hash']}")
print(f" Detecta cambio: {hash_detects_change}")
# Resultado
print(f"\n4. Resultado:")
if not traditional_detects_change and hash_detects_change:
print(f" 🎯 HASH DETECTÓ CAMBIO que método tradicional PERDIÓ")
print(f" ✅ Sin hash: archivo NO se regeneraría (ERROR)")
print(f" ✅ Con hash: archivo SÍ se regenerará (CORRECTO)")
elif traditional_detects_change and hash_detects_change:
print(f" ✅ Ambos métodos detectaron el cambio correctamente")
else:
print(
f" Resultado: tradicional={traditional_detects_change}, hash={hash_detects_change}"
)
return not traditional_detects_change and hash_detects_change
def test_same_size_different_content():
"""Test con contenidos completamente diferentes pero mismo tamaño."""
print("\n" + "=" * 50)
print("=== Test: Contenido Diferente, Mismo Tamaño ===\n")
with tempfile.TemporaryDirectory() as temp_dir:
xml_file = os.path.join(temp_dir, "test.xml")
# Contenidos de igual longitud pero diferentes
content1 = "<Root><Item>AAAAAAAAAA</Item></Root>" # 33 chars
content2 = "<Root><Item>BBBBBBBBBB</Item></Root>" # 33 chars
assert len(content1) == len(content2), "Deben tener igual tamaño"
# Crear archivo con contenido 1
with open(xml_file, "w", encoding="utf-8") as f:
f.write(content1)
hash1 = calculate_file_hash(xml_file)
size1 = os.path.getsize(xml_file)
mtime1 = os.path.getmtime(xml_file)
# Cambiar a contenido 2 manteniendo tiempo
time.sleep(0.1)
with open(xml_file, "w", encoding="utf-8") as f:
f.write(content2)
os.utime(xml_file, (mtime1, mtime1))
hash2 = calculate_file_hash(xml_file)
size2 = os.path.getsize(xml_file)
mtime2 = os.path.getmtime(xml_file)
print(f"Contenido 1: {content1[:20]}...")
print(f"Contenido 2: {content2[:20]}...")
print(f"Tamaños: {size1} = {size2} ({'' if size1 == size2 else ''})")
print(f"Tiempos: {'iguales' if abs(mtime2 - mtime1) < 0.001 else 'diferentes'}")
print(f"Hashes: {'iguales' if hash1 == hash2 else 'diferentes'}")
would_skip_traditional = (size1 == size2) and (abs(mtime2 - mtime1) < 0.001)
would_skip_hash = hash1 == hash2
print(
f"\nSin hash: {'SALTARÍA procesamiento' if would_skip_traditional else 'procesaría'}"
)
print(
f"Con hash: {'saltaría procesamiento' if would_skip_hash else 'PROCESARÍA'}"
)
if would_skip_traditional and not would_skip_hash:
print("🎯 El hash evita saltar un archivo que SÍ cambió")
return True
return False
if __name__ == "__main__":
print("Ejecutando tests de detección de cambios por hash...\n")
test1_result = test_subtle_changes()
test2_result = test_same_size_different_content()
print("\n" + "=" * 50)
print("=== RESUMEN ===")
print(
f"Test 1 (cambios sutiles): {'HASH ÚTIL' if test1_result else 'ambos métodos iguales'}"
)
print(
f"Test 2 (mismo tamaño): {'HASH ÚTIL' if test2_result else 'ambos métodos iguales'}"
)
if test1_result or test2_result:
print("\n✅ El hash SHA256 mejora la detección de cambios")
print("✅ Evita regeneraciones innecesarias Y perdida de cambios reales")
else:
print("\n✓ Ambos métodos funcionaron igual en estos tests")

View File

@ -19,6 +19,7 @@ import json
import datetime # <-- NUEVO: Para timestamps import datetime # <-- NUEVO: Para timestamps
import shutil # <-- ADDED: Import shutil for file copying import shutil # <-- ADDED: Import shutil for file copying
import re # <-- ADDED: Import regex for block header processing import re # <-- ADDED: Import regex for block header processing
import hashlib # <-- NUEVO: Para hash de archivos XML
script_root = os.path.dirname( script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))) os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
@ -92,10 +93,24 @@ def log_message(message, log_file_handle, also_print=True):
# <-- FIN NUEVO --> # <-- FIN NUEVO -->
def calculate_file_hash(filepath):
"""Calcula el hash SHA256 de un archivo."""
try:
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
# Leer el archivo en chunks para manejar archivos grandes
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except Exception as e:
print(f"Error calculando hash de {filepath}: {e}")
return None
# <-- run_script ya no es necesaria --> # <-- run_script ya no es necesaria -->
# --- Función check_skip_status (sin cambios en su lógica interna) --- # --- Función check_skip_status (MODIFICADO: Añadido control por hash) ---
def check_skip_status( def check_skip_status(
xml_filepath, processed_json_filepath, final_output_dir, log_f xml_filepath, processed_json_filepath, final_output_dir, log_f
): # Añadido log_f ): # Añadido log_f
@ -103,17 +118,21 @@ def check_skip_status(
can_check_x3 = False can_check_x3 = False
if not os.path.exists(processed_json_filepath): if not os.path.exists(processed_json_filepath):
return status return status
stored_mtime = None stored_mtime = None
stored_size = None stored_size = None
stored_hash = None
block_name = None block_name = None
block_type = None block_type = None
processed_json_mtime = None processed_json_mtime = None
try: try:
processed_json_mtime = os.path.getmtime(processed_json_filepath) processed_json_mtime = os.path.getmtime(processed_json_filepath)
with open(processed_json_filepath, "r", encoding="utf-8") as f: with open(processed_json_filepath, "r", encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
stored_mtime = data.get("source_xml_mod_time") stored_mtime = data.get("source_xml_mod_time")
stored_size = data.get("source_xml_size") stored_size = data.get("source_xml_size")
stored_hash = data.get("source_xml_hash") # <-- NUEVO: Obtener hash almacenado
block_name = data.get("block_name") block_name = data.get("block_name")
block_type = data.get("block_type") block_type = data.get("block_type")
except Exception as e: except Exception as e:
@ -124,17 +143,51 @@ def check_skip_status(
) )
return status return status
if stored_mtime is None or stored_size is None: # Calcular hash actual del XML
current_xml_hash = calculate_file_hash(xml_filepath)
# Priorizar comparación por hash si está disponible
if stored_hash is not None and current_xml_hash is not None:
if stored_hash == current_xml_hash:
log_message(
f" ✓ Hash coincide para {os.path.basename(xml_filepath)} - Saltando x1/x2",
log_f,
also_print=False,
)
status["skip_x1_x2"] = True
can_check_x3 = True
else:
log_message(
f" ✗ Hash diferente para {os.path.basename(xml_filepath)} - Regenerando",
log_f,
also_print=False,
)
# Hash diferente, no saltar x1/x2
can_check_x3 = block_name is not None and block_type is not None
elif stored_mtime is None or stored_size is None:
# Fallback: sin metadatos de tiempo/tamaño, solo verificar si se puede x3
can_check_x3 = block_name is not None and block_type is not None can_check_x3 = block_name is not None and block_type is not None
else: else:
# Fallback: usar método original por tiempo y tamaño
try: try:
current_xml_mtime = os.path.getmtime(xml_filepath) current_xml_mtime = os.path.getmtime(xml_filepath)
current_xml_size = os.path.getsize(xml_filepath) current_xml_size = os.path.getsize(xml_filepath)
time_match = abs(stored_mtime - current_xml_mtime) < 0.001 time_match = abs(stored_mtime - current_xml_mtime) < 0.001
size_match = stored_size == current_xml_size size_match = stored_size == current_xml_size
if time_match and size_match: if time_match and size_match:
log_message(
f" ✓ Tiempo/tamaño coinciden para {os.path.basename(xml_filepath)} - Saltando x1/x2 (método legacy)",
log_f,
also_print=False,
)
status["skip_x1_x2"] = True status["skip_x1_x2"] = True
can_check_x3 = True can_check_x3 = True
else:
log_message(
f" ✗ Tiempo/tamaño diferentes para {os.path.basename(xml_filepath)} - Regenerando (método legacy)",
log_f,
also_print=False,
)
except OSError as e: except OSError as e:
log_message( log_message(
f"Advertencia: Error obteniendo metadatos XML para {xml_filepath}: {e}. No se saltará x1/x2.", f"Advertencia: Error obteniendo metadatos XML para {xml_filepath}: {e}. No se saltará x1/x2.",

View File

@ -13,6 +13,7 @@ import os
import sys import sys
import traceback import traceback
import importlib import importlib
import hashlib # <-- NUEVO: Para calcular hash del XML
from lxml import etree from lxml import etree
from lxml.etree import ( from lxml.etree import (
XMLSyntaxError as etree_XMLSyntaxError, XMLSyntaxError as etree_XMLSyntaxError,
@ -45,6 +46,20 @@ except ImportError as e:
sys.exit(1) sys.exit(1)
def calculate_file_hash(filepath):
"""Calcula el hash SHA256 de un archivo."""
try:
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
# Leer el archivo en chunks para manejar archivos grandes
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except Exception as e:
print(f"Error calculando hash de {filepath}: {e}")
return None
# --- NUEVAS FUNCIONES DE PARSEO para UDT y Tag Table (Sin cambios) --- # --- NUEVAS FUNCIONES DE PARSEO para UDT y Tag Table (Sin cambios) ---
def parse_udt(udt_element): def parse_udt(udt_element):
"""Parsea un elemento <SW.Types.PlcStruct> (UDT).""" """Parsea un elemento <SW.Types.PlcStruct> (UDT)."""
@ -249,10 +264,14 @@ def convert_xml_to_json(xml_filepath, json_filepath):
# Obtener metadatos del XML # Obtener metadatos del XML
xml_mod_time = None xml_mod_time = None
xml_size = None xml_size = None
xml_hash = None # <-- NUEVO: Hash del archivo XML
try: try:
xml_mod_time = os.path.getmtime(xml_filepath) xml_mod_time = os.path.getmtime(xml_filepath)
xml_size = os.path.getsize(xml_filepath) xml_size = os.path.getsize(xml_filepath)
print(f"Metadatos XML: ModTime={xml_mod_time}, Size={xml_size}") xml_hash = calculate_file_hash(xml_filepath) # <-- NUEVO: Calcular hash
print(
f"Metadatos XML: ModTime={xml_mod_time}, Size={xml_size}, Hash={xml_hash[:16] if xml_hash else None}..."
)
except OSError as e: except OSError as e:
print(f"Advertencia: No se pudieron obtener metadatos de '{xml_filepath}': {e}") print(f"Advertencia: No se pudieron obtener metadatos de '{xml_filepath}': {e}")
@ -602,6 +621,8 @@ def convert_xml_to_json(xml_filepath, json_filepath):
result["source_xml_mod_time"] = xml_mod_time result["source_xml_mod_time"] = xml_mod_time
if xml_size is not None: if xml_size is not None:
result["source_xml_size"] = xml_size result["source_xml_size"] = xml_size
if xml_hash is not None: # <-- NUEVO: Añadir hash al resultado
result["source_xml_hash"] = xml_hash
print("Paso 6: Escribiendo el resultado en el archivo JSON...") print("Paso 6: Escribiendo el resultado en el archivo JSON...")
# Advertencias finales si faltan partes clave # Advertencias finales si faltan partes clave

View File

@ -5,6 +5,7 @@ Este script convierte un archivo JSON simplificado (resultado de un análisis de
JSON enriquecido con lógica SCL. Se enfoca en la lógica de programación y la agrupación de instrucciones IF. JSON enriquecido con lógica SCL. Se enfoca en la lógica de programación y la agrupación de instrucciones IF.
""" """
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
import argparse import argparse
@ -15,6 +16,7 @@ import re
import importlib import importlib
import sys import sys
import sympy import sympy
script_root = os.path.dirname( script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))) os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
) )
@ -270,12 +272,18 @@ def process_json_to_scl(json_filepath, output_json_filepath):
# <-- NUEVO: Extraer metadatos del JSON de entrada (si existen) --> # <-- NUEVO: Extraer metadatos del JSON de entrada (si existen) -->
source_xml_mod_time = data.get("source_xml_mod_time") source_xml_mod_time = data.get("source_xml_mod_time")
source_xml_size = data.get("source_xml_size") source_xml_size = data.get("source_xml_size")
source_xml_hash = data.get("source_xml_hash") # <-- NUEVO: Extraer hash
# <-- FIN NUEVO --> # <-- FIN NUEVO -->
block_type = data.get("block_type", "Unknown") block_type = data.get("block_type", "Unknown")
print(f"Procesando bloque tipo: {block_type}") print(f"Procesando bloque tipo: {block_type}")
if block_type in ["GlobalDB", "PlcUDT", "PlcTagTable", "InstanceDB"]: # <-- MODIFIED: Add InstanceDB if block_type in [
"GlobalDB",
"PlcUDT",
"PlcTagTable",
"InstanceDB",
]: # <-- MODIFIED: Add InstanceDB
print(f"INFO: El bloque es {block_type}. Saltando procesamiento lógico de x2.") print(f"INFO: El bloque es {block_type}. Saltando procesamiento lógico de x2.")
print( print(
f"Guardando JSON de {block_type} (con metadatos) en: {output_json_filepath}" f"Guardando JSON de {block_type} (con metadatos) en: {output_json_filepath}"
@ -287,6 +295,8 @@ def process_json_to_scl(json_filepath, output_json_filepath):
data_to_save["source_xml_mod_time"] = source_xml_mod_time data_to_save["source_xml_mod_time"] = source_xml_mod_time
if source_xml_size is not None: if source_xml_size is not None:
data_to_save["source_xml_size"] = source_xml_size data_to_save["source_xml_size"] = source_xml_size
if source_xml_hash is not None: # <-- NUEVO: Preservar hash
data_to_save["source_xml_hash"] = source_xml_hash
# <-- FIN MODIFICADO --> # <-- FIN MODIFICADO -->
with open(output_json_filepath, "w", encoding="utf-8") as f_out: with open(output_json_filepath, "w", encoding="utf-8") as f_out:
json.dump(data_to_save, f_out, indent=4, ensure_ascii=False) json.dump(data_to_save, f_out, indent=4, ensure_ascii=False)
@ -514,6 +524,8 @@ def process_json_to_scl(json_filepath, output_json_filepath):
data["source_xml_mod_time"] = source_xml_mod_time data["source_xml_mod_time"] = source_xml_mod_time
if source_xml_size is not None: if source_xml_size is not None:
data["source_xml_size"] = source_xml_size data["source_xml_size"] = source_xml_size
if source_xml_hash is not None: # <-- NUEVO: Preservar hash
data["source_xml_hash"] = source_xml_hash
# <-- FIN MODIFICADO --> # <-- FIN MODIFICADO -->
print(f"\nGuardando JSON procesado ({block_type}) en: {output_json_filepath}") print(f"\nGuardando JSON procesado ({block_type}) en: {output_json_filepath}")
@ -537,7 +549,10 @@ if __name__ == "__main__":
import tkinter as tk import tkinter as tk
from tkinter import filedialog from tkinter import filedialog
except ImportError: except ImportError:
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr) print(
"Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.",
file=sys.stderr,
)
tk = None tk = None
input_json_file = "" input_json_file = ""
@ -547,7 +562,7 @@ if __name__ == "__main__":
print("Por favor, selecciona el archivo JSON de entrada (generado por x1)...") print("Por favor, selecciona el archivo JSON de entrada (generado por x1)...")
input_json_file = filedialog.askopenfilename( input_json_file = filedialog.askopenfilename(
title="Selecciona el archivo JSON de entrada (.json)", title="Selecciona el archivo JSON de entrada (.json)",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")] filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
) )
root.destroy() root.destroy()
@ -560,7 +575,9 @@ if __name__ == "__main__":
json_filename_base = os.path.splitext(os.path.basename(input_json_file))[0] json_filename_base = os.path.splitext(os.path.basename(input_json_file))[0]
# Asumimos que el _processed.json va al mismo directorio 'parsing' # Asumimos que el _processed.json va al mismo directorio 'parsing'
parsing_dir = os.path.dirname(input_json_file) parsing_dir = os.path.dirname(input_json_file)
output_json_file = os.path.join(parsing_dir, f"{json_filename_base}_processed.json") output_json_file = os.path.join(
parsing_dir, f"{json_filename_base}_processed.json"
)
# Asegurarse de que el directorio de salida exista (aunque debería si el input existe) # Asegurarse de que el directorio de salida exista (aunque debería si el input existe)
os.makedirs(parsing_dir, exist_ok=True) os.makedirs(parsing_dir, exist_ok=True)
@ -574,7 +591,10 @@ if __name__ == "__main__":
if success: if success:
print("\nProcesamiento completado exitosamente.") print("\nProcesamiento completado exitosamente.")
else: else:
print(f"\nError durante el procesamiento de '{os.path.relpath(input_json_file)}'.", file=sys.stderr) print(
f"\nError durante el procesamiento de '{os.path.relpath(input_json_file)}'.",
file=sys.stderr,
)
# sys.exit(1) # No usar sys.exit # sys.exit(1) # No usar sys.exit
except Exception as e: except Exception as e:
print( print(

View File

@ -129,8 +129,9 @@ class ScriptExecutor:
start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..." start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..."
broadcast_fn(start_msg) broadcast_fn(start_msg)
# Usar CREATE_NEW_CONSOLE para permitir foco en diálogos
creation_flags = ( creation_flags = (
subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 subprocess.CREATE_NEW_CONSOLE if sys.platform == "win32" else 0
) )
process = subprocess.Popen( process = subprocess.Popen(