HMI_Translate_Helper_wMaste.../x3_llm_auto_translate.py

377 lines
14 KiB
Python
Raw Normal View History

2024-07-30 09:19:19 -03:00
import pandas as pd
2024-07-30 09:58:19 -03:00
from openai import OpenAI
2024-07-30 09:19:19 -03:00
import os
from openai_api_key import openai_api_key
from google_api_key import google_api_key
2024-07-30 13:03:39 -03:00
import ollama
import json
from google.cloud import translate_v2 as translate
from google.oauth2 import service_account
import html
from tqdm import tqdm
import PyLibrary.funciones_comunes as fc
import time
from translation_config import TranslationConfig
from openpyxl.styles import PatternFill, Alignment
import sys
openai_client = OpenAI(api_key=openai_api_key())
GOOGLE_APPLICATION_CREDENTIALS = "translate-431108-020c17463fbb.json"
batch_size = 20
# Definir el logger a nivel de módulo
logger = None
2024-07-30 09:58:19 -03:00
def init_google_translate_client():
if os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
# Usar credenciales de cuenta de servicio
credentials = service_account.Credentials.from_service_account_file(
GOOGLE_APPLICATION_CREDENTIALS
)
return translate.Client(credentials=credentials)
else:
raise ValueError(
"No se han proporcionado credenciales válidas para Google Translate"
)
google_translate_client = init_google_translate_client()
2024-07-30 09:58:19 -03:00
def google_translate(text, target_language):
result = google_translate_client.translate(text, target_language=target_language)
translated_text = result["translatedText"]
return html.unescape(translated_text)
2024-07-30 12:16:58 -03:00
def read_system_prompt():
try:
with open(".\\data\\system_prompt.txt", "r", encoding="utf-8") as file:
2024-07-30 12:16:58 -03:00
return file.read().strip()
except FileNotFoundError:
logger.warning(
"Archivo system_prompt.txt no encontrado. Usando prompt por defecto."
)
2024-07-30 12:16:58 -03:00
return "You are a translator."
def translate_batch_ollama(texts, source_lang, target_lang):
joined_text = "\n".join(texts)
system_prompt = read_system_prompt()
logger.info(
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{joined_text}"
)
response = ollama.generate(
model="llama3.1",
prompt=f"Translate the following texts from {source_lang} to {target_lang} while preserving special fields like <> and <#>. {system_prompt}: \n\n{joined_text}",
)
translations = response["response"].strip().split("\n")
logger.info(f"Respuestas recibidas:\n{translations}")
return translations
def translate_batch_openai(texts_dict, source_lang, target_lang):
2024-07-30 12:16:58 -03:00
system_prompt = read_system_prompt()
texts_list = list(texts_dict.values())
request_payload = json.dumps(
{"texts": texts_list, "source_lang": source_lang, "target_lang": target_lang}
)
2024-07-30 09:58:19 -03:00
logger.info(
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{request_payload}"
2024-07-30 09:58:19 -03:00
)
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
2024-07-30 09:58:19 -03:00
messages=[
{"role": "system", "content": f"You are a translator.{system_prompt}."},
{"role": "user", "content": request_payload},
2024-07-30 09:58:19 -03:00
],
max_tokens=1500,
temperature=0.3,
2024-07-30 09:19:19 -03:00
)
response_payload = json.loads(response.choices[0].message.content.strip())
translations = response_payload.get("texts", [])
2024-07-30 09:58:19 -03:00
logger.info(f"Respuestas recibidas:\n{translations}")
if len(translations) != len(texts_list):
raise ValueError(
"La cantidad de traducciones recibidas no coincide con la cantidad de textos enviados."
)
return dict(zip(texts_dict.keys(), translations))
2024-07-30 09:19:19 -03:00
2024-07-30 13:03:39 -03:00
def main(config: TranslationConfig):
df = fc.read_dataframe_with_cleanup_retries(config.get_translate_path())
source_col = config.codigo_columna_maestra
source_translated_col = f"{config.codigo_idioma_seleccionado}_Propuesto"
target_col = f"{config.codigo_idioma_seleccionado} Translated"
check_translate_col = f"{config.codigo_idioma_seleccionado} CheckTranslate"
affinity_col = f"{config.codigo_idioma_seleccionado} Affinity"
2024-07-30 09:19:19 -03:00
# Asegurarse de que la columna de destino existe
for col in [target_col, check_translate_col, affinity_col]:
if col not in df.columns:
df[col] = None
2024-07-30 09:19:19 -03:00
texts_to_translate = {}
2024-07-30 09:58:19 -03:00
# Inicializar ProgressBar para la fase de preparación
prep_progress = fc.ProgressBar(
len(df), prefix="Preparando textos:", suffix="Completado"
)
2024-09-20 09:30:33 -03:00
for index, row in df.iterrows():
celda_clave = str(row[source_col])
source_translated_text = (
str(row[source_translated_col])
if source_translated_col in df.columns
else ""
)
celda_clave_compactada = fc.compactar_celda_traducida(
config.codigo_tipo_PLC, celda_clave
)
if config.traducir_todo:
if fc.texto_requiere_traduccion(
config.codigo_tipo_PLC, celda_clave_compactada, logger
):
df.at[index, source_translated_col] = ""
texts_to_translate[celda_clave] = celda_clave_compactada
else:
if (
pd.isna(row[source_translated_col])
or source_translated_text.strip() == ""
):
if fc.texto_requiere_traduccion(
config.codigo_tipo_PLC, celda_clave_compactada, logger
) or fc.texto_con_campos_especiales(
config.codigo_tipo_PLC, celda_clave_compactada
):
texts_to_translate[celda_clave] = celda_clave_compactada
prep_progress.update(index + 1)
2024-07-30 09:19:19 -03:00
prep_progress.finish()
num_texts = len(texts_to_translate)
2024-07-30 09:58:19 -03:00
logger.info(f"Número total de textos a traducir: {num_texts}")
print(f"\nNúmero total de textos a traducir: {num_texts}")
# Inicializar ProgressBar para la fase de traducción
trans_progress = fc.ProgressBar(
num_texts, prefix="Traduciendo:", suffix="Completado"
)
# Traducciones
translations = {}
for start_idx in range(0, num_texts, batch_size):
2024-07-30 09:58:19 -03:00
end_idx = min(start_idx + batch_size, num_texts)
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
logger.info(f"Traduciendo: celdas desde {start_idx} a {end_idx}.")
retries = 4
for attempt in range(retries):
try:
batch_translations = translate_batch_openai(
batch_texts,
fc.idiomas_idiomafromcode(config.codigo_columna_maestra),
fc.idiomas_idiomafromcode(config.codigo_idioma_seleccionado),
)
translations.update(batch_translations)
break
except Exception as e:
if attempt < retries - 1:
logger.warning(
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
print(
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
time.sleep(3)
else:
logger.error(
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
)
print(
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
)
trans_progress.update(end_idx)
2024-07-30 09:19:19 -03:00
trans_progress.finish()
2024-07-30 09:58:19 -03:00
logger.info(f"Número total de traducciones recibidas: {len(translations)}")
# Inicializar ProgressBar para la fase de actualización del DataFrame
update_progress = fc.ProgressBar(
len(df), prefix="Actualizando DataFrame:", suffix="Completado"
)
# Actualizar el DataFrame con las traducciones y hacemos la Traduccion inversa
for index, row in df.iterrows():
celda_clave = str(row[source_col])
if celda_clave in translations:
df.at[index, target_col] = translations[celda_clave]
try:
google_translation = google_translate(
translations[celda_clave],
fc.idiomas_shortcodefromcode(config.codigo_columna_maestra),
)
df.at[index, check_translate_col] = google_translation
except Exception as e:
logger.error(
f"Error en la traducción de Google para el texto '{celda_clave}': {e}"
)
df.at[index, check_translate_col] = "Error en la traducción"
df.at[index, affinity_col] = 0.0
update_progress.increment()
update_progress.finish()
# Inicializar ProgressBar para la fase de cálculo de afinidad
affinity_progress = fc.ProgressBar(
num_texts, prefix="Calculando afinidad:", suffix="Completado"
)
2024-07-30 09:19:19 -03:00
# Afinidades
affinities = {}
for start_idx in range(0, num_texts, batch_size):
end_idx = min(start_idx + batch_size, num_texts)
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
logger.info(f"Afinidad: celdas desde {start_idx} a {end_idx}.")
retries = 2
for attempt in range(retries):
try:
batch_affinities = fc.affinity_batch_openai(
config.codigo_tipo_PLC, batch_texts, openai_client, logger
)
affinities.update(batch_affinities)
break
except Exception as e:
if attempt < retries - 1:
logger.warning(
f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
print(
f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
time.sleep(3)
else:
logger.error(
f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}"
)
print(
f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}"
)
for key, value in batch_texts.items():
try:
score = fc.calcular_afinidad(
config.codigo_tipo_PLC,
key,
value,
openai_client,
logger,
)
affinities[key] = score
except Exception as ind_e:
affinities[key] = "0"
logger.error(
f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}"
)
print(
f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}"
)
affinity_progress.increment()
affinity_progress.finish()
# Actualizar el DataFrame con las Afinidades
for index, row in df.iterrows():
celda_clave = str(row[source_col])
if celda_clave in affinities:
df.at[index, affinity_col] = affinities[celda_clave]
output_path = config.get_auto_translate_path()
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="Sheet1")
workbook = writer.book
worksheet = writer.sheets["Sheet1"]
# Inmovilizar paneles en A2
worksheet.freeze_panes = "A2"
# Configurar ancho de columnas basado en contenido
from openpyxl.utils import get_column_letter
for col in worksheet.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if cell.value:
text_length = len(str(cell.value))
# Si el texto es más largo que 50, aplicamos wrap_text
if text_length > 50:
cell.alignment = Alignment(wrap_text=True, vertical="top")
text_length = min(
50, max(len(word) for word in str(cell.value).split())
)
max_length = max(max_length, text_length)
except:
pass
# Ajustar el ancho con un pequeño padding
adjusted_width = min(50, max_length + 2)
worksheet.column_dimensions[column].width = (
adjusted_width if adjusted_width > 8 else 8
)
# Colores para el formato condicional
light_blue = PatternFill(
start_color="ADD8E6", end_color="ADD8E6", fill_type="solid"
)
yellow = PatternFill(
start_color="FFFF00", end_color="FFFF00", fill_type="solid"
)
# Aplicar formatos
for row in worksheet.iter_rows(min_row=2):
translated_cell = row[df.columns.get_loc(target_col)]
if translated_cell.value:
affinity_cell = row[df.columns.get_loc(affinity_col)]
try:
affinity_value = float(
affinity_cell.value if affinity_cell.value else 0
)
if affinity_value == 1:
translated_cell.fill = light_blue
elif affinity_value < 1:
translated_cell.fill = yellow
except (ValueError, TypeError):
pass
2024-07-30 09:58:19 -03:00
logger.info(f"Archivo traducido guardado en: {output_path}")
print(f"\nArchivo traducido guardado en: {output_path}")
2024-07-30 09:19:19 -03:00
def run(config: TranslationConfig):
global logger
logger = fc.configurar_logger(config.work_dir)
script_name = os.path.basename(__file__)
print(f"\rIniciando: {script_name}\r")
main(config)
if __name__ == "__main__":
import menu_pasos_traduccion
menu_pasos_traduccion.main()