HMI_Translate_Helper_wMaste.../x3_llm_auto_translate.py

358 lines
14 KiB
Python

import pandas as pd
from openai import OpenAI
import os
from openai_api_key import openai_api_key
from google_api_key import google_api_key
import ollama
import json
from google.cloud import translate_v2 as translate
from google.oauth2 import service_account
import html
from tqdm import tqdm
import PyLibrary.funciones_comunes as fc
import time
import PyLibrary.funciones_comunes as fc
from translation_config import TranslationConfig
from openai import OpenAI
from tqdm import tqdm
openai_client = OpenAI(api_key=openai_api_key())
GOOGLE_APPLICATION_CREDENTIALS = "translate-431108-020c17463fbb.json"
batch_size = 20
# Definir el logger a nivel de módulo
logger = None
def init_google_translate_client():
if os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
# Usar credenciales de cuenta de servicio
credentials = service_account.Credentials.from_service_account_file(
GOOGLE_APPLICATION_CREDENTIALS
)
return translate.Client(credentials=credentials)
else:
raise ValueError(
"No se han proporcionado credenciales válidas para Google Translate"
)
google_translate_client = init_google_translate_client()
def google_translate(text, target_language):
result = google_translate_client.translate(text, target_language=target_language)
translated_text = result["translatedText"]
return html.unescape(translated_text)
def read_system_prompt():
try:
with open(".\\data\\system_prompt.txt", "r", encoding="utf-8") as file:
return file.read().strip()
except FileNotFoundError:
logger.warning(
"Archivo system_prompt.txt no encontrado. Usando prompt por defecto."
)
return "You are a translator."
def translate_batch_ollama(texts, source_lang, target_lang):
joined_text = "\n".join(texts)
system_prompt = read_system_prompt()
logger.info(
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{joined_text}"
)
response = ollama.generate(
model="llama3.1",
prompt=f"Translate the following texts from {source_lang} to {target_lang} while preserving special fields like <> and <#>. {system_prompt}: \n\n{joined_text}",
)
translations = response["response"].strip().split("\n")
logger.info(f"Respuestas recibidas:\n{translations}")
return translations
def translate_batch_openai(texts_dict, source_lang, target_lang):
system_prompt = read_system_prompt()
texts_list = list(texts_dict.values())
request_payload = json.dumps(
{"texts": texts_list, "source_lang": source_lang, "target_lang": target_lang}
)
logger.info(
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{request_payload}"
)
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"You are a translator.{system_prompt}."},
{"role": "user", "content": request_payload},
],
max_tokens=1500,
temperature=0.3,
)
response_payload = json.loads(response.choices[0].message.content.strip())
translations = response_payload.get("texts", [])
logger.info(f"Respuestas recibidas:\n{translations}")
if len(translations) != len(texts_list):
raise ValueError(
"La cantidad de traducciones recibidas no coincide con la cantidad de textos enviados."
)
return dict(zip(texts_dict.keys(), translations))
def affinity_batch_openai(codigo_tipo_PLC, texts_dict):
system_prompt = (
"Evaluate the semantic similarity between the following table of pairs of texts in json format on a scale from 0 to 1. "
"Return the similarity scores for every row in JSON format as a list of numbers, without any additional text or formatting."
)
original_list = [
fc.compactar_celda_traducida(codigo_tipo_PLC, key) for key in texts_dict.keys()
]
re_translated_list = list(texts_dict.values())
request_payload = json.dumps(
{"original": original_list, "compared": re_translated_list}
)
logger.info(f"Solicitando Afinidad para el lote de textos:\n{request_payload}")
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": system_prompt,
},
{"role": "user", "content": request_payload},
],
max_tokens=1500,
temperature=0.3,
)
response_content = response.choices[0].message.content
# Limpiar y convertir el contenido de la respuesta
cleaned_response_content = response_content.strip().strip("'```json").strip("```")
# Intentar convertir el contenido a JSON
try:
response_payload = json.loads(cleaned_response_content)
except json.JSONDecodeError:
raise ValueError("La respuesta no se pudo decodificar como JSON.")
# Manejar diferentes formatos de respuesta
if isinstance(response_payload, dict) and "similarity_scores" in response_payload:
scores = response_payload["similarity_scores"]
elif isinstance(response_payload, list):
scores = response_payload
else:
raise ValueError("Formato de respuesta inesperado.")
logger.info(f"Respuestas recibidas:\n{scores}")
if len(scores) != len(original_list):
raise ValueError(
"La cantidad de afinidades recibidas no coincide con la cantidad de textos enviados."
)
return dict(zip(texts_dict.keys(), scores))
# Función que calcula la afinidad entre dos textos
def calcular_afinidad(tipo_PLC, texto1, texto2):
system_prompt = (
"Evaluate the semantic similarity between the following pair of texts on a scale from 0 to 1. "
"Return the similarity score as a single number."
)
original_text = fc.compactar_celda_traducida(tipo_PLC, texto1)
compared_text = texto2
request_payload = json.dumps({"original": original_text, "compared": compared_text})
logger.info(f"Solicitando afinidad para el par de textos:\n{request_payload}")
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": system_prompt,
},
{"role": "user", "content": request_payload},
],
max_tokens=1500,
temperature=0.3,
)
response_content = response.choices[0].message.content
# Limpiar y convertir el contenido de la respuesta
cleaned_response_content = response_content.strip().strip("'```json").strip("```")
# Intentar convertir el contenido a JSON
try:
score = float(cleaned_response_content)
except ValueError:
raise ValueError(
f"La respuesta no se pudo decodificar como un número: {cleaned_response_content}"
)
return score
def main(config: TranslationConfig):
global logger
df = fc.read_dataframe_with_cleanup_retries(config.get_translate_path())
source_col = config.codigo_columna_maestra
source_translated_col = config.codigo_idioma_seleccionado
target_col = f"{config.codigo_idioma_seleccionado} Translated"
check_translate_col = f"{config.codigo_idioma_seleccionado} CheckTranslate"
affinity_col = f"{config.codigo_idioma_seleccionado} Affinity"
# Asegurarse de que la columna de destino existe
for col in [target_col, check_translate_col, affinity_col]:
if col not in df.columns:
df[col] = None
texts_to_translate = {}
# Inicializar ProgressBar para la fase de preparación
prep_progress = fc.ProgressBar(len(df), prefix='Preparando textos:', suffix='Completado')
for index, row in df.iterrows():
celda_clave = str(row[source_col])
source_translated_text = str(row[source_translated_col]) if source_translated_col in df.columns else ""
celda_clave_compactada = fc.compactar_celda_traducida(config.codigo_tipo_PLC, celda_clave)
if config.traducir_todo:
if fc.texto_requiere_traduccion(config.codigo_tipo_PLC, celda_clave_compactada, logger):
df.at[index, source_translated_col] = ""
texts_to_translate[celda_clave] = celda_clave_compactada
else:
if pd.isna(row[source_translated_col]) or source_translated_text.strip() == "":
if fc.texto_requiere_traduccion(config.codigo_tipo_PLC, celda_clave_compactada, logger) or fc.texto_con_campos_especiales(config.codigo_tipo_PLC, celda_clave_compactada):
texts_to_translate[celda_clave] = celda_clave_compactada
prep_progress.update(index + 1)
prep_progress.finish()
num_texts = len(texts_to_translate)
logger.info(f"Número total de textos a traducir: {num_texts}")
print(f"\nNúmero total de textos a traducir: {num_texts}")
# Inicializar ProgressBar para la fase de traducción
trans_progress = fc.ProgressBar(num_texts, prefix='Traduciendo:', suffix='Completado')
# Traducciones
translations = {}
for start_idx in range(0, num_texts, batch_size):
end_idx = min(start_idx + batch_size, num_texts)
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
logger.info(f"Traduciendo: celdas desde {start_idx} a {end_idx}.")
retries = 4
for attempt in range(retries):
try:
batch_translations = translate_batch_openai(
batch_texts,
fc.idiomas_idiomafromcode(config.codigo_columna_maestra),
fc.idiomas_idiomafromcode(config.codigo_idioma_seleccionado)
)
translations.update(batch_translations)
break
except Exception as e:
if attempt < retries - 1:
logger.warning(f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando...")
print(f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando...")
time.sleep(3)
else:
logger.error(f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}")
print(f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}")
trans_progress.update(end_idx)
trans_progress.finish()
logger.info(f"Número total de traducciones recibidas: {len(translations)}")
# Inicializar ProgressBar para la fase de actualización del DataFrame
update_progress = fc.ProgressBar(len(df), prefix='Actualizando DataFrame:', suffix='Completado')
# Actualizar el DataFrame con las traducciones y hacemos la Traduccion inversa
for index, row in df.iterrows():
celda_clave = str(row[source_col])
if celda_clave in translations:
df.at[index, target_col] = translations[celda_clave]
try:
google_translation = google_translate(
translations[celda_clave],
fc.idiomas_shortcodefromcode(config.codigo_columna_maestra)
)
df.at[index, check_translate_col] = google_translation
except Exception as e:
logger.error(f"Error en la traducción de Google para el texto '{celda_clave}': {e}")
df.at[index, check_translate_col] = "Error en la traducción"
df.at[index, affinity_col] = 0.0
update_progress.increment()
update_progress.finish()
# Inicializar ProgressBar para la fase de cálculo de afinidad
affinity_progress = fc.ProgressBar(num_texts, prefix='Calculando afinidad:', suffix='Completado')
# Afinidades
affinities = {}
for start_idx in range(0, num_texts, batch_size):
end_idx = min(start_idx + batch_size, num_texts)
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
logger.info(f"Afinidad: celdas desde {start_idx} a {end_idx}.")
retries = 2
for attempt in range(retries):
try:
batch_affinities = affinity_batch_openai(config.codigo_tipo_PLC, batch_texts)
affinities.update(batch_affinities)
break
except Exception as e:
if attempt < retries - 1:
logger.warning(f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando...")
print(f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando...")
time.sleep(3)
else:
logger.error(f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}")
print(f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}")
for key, value in batch_texts.items():
try:
score = calcular_afinidad(config.codigo_tipo_PLC, key, value)
affinities[key] = score
except Exception as ind_e:
affinities[key] = "0"
logger.error(f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}")
print(f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}")
affinity_progress.increment()
affinity_progress.finish()
# Actualizar el DataFrame con las Afinidades
for index, row in df.iterrows():
celda_clave = str(row[source_col])
if celda_clave in affinities:
df.at[index, affinity_col] = affinities[celda_clave]
output_path = config.get_auto_translate_path()
fc.save_dataframe_with_retries(df, output_path=output_path)
logger.info(f"Archivo traducido guardado en: {output_path}")
print(f"\nArchivo traducido guardado en: {output_path}")
def run(config: TranslationConfig):
global logger
logger = fc.configurar_logger(config.work_dir)
script_name = os.path.basename(__file__)
print(f"\rIniciando: {script_name}\r")
main(config)
if __name__ == "__main__":
import menu_pasos_traduccion
menu_pasos_traduccion.main()