HMI_Translate_Helper_wMaste.../x3_llm_auto_translate.py

374 lines
15 KiB
Python
Raw Normal View History

2024-07-30 09:19:19 -03:00
import pandas as pd
2024-07-30 09:58:19 -03:00
from openai import OpenAI
2024-07-30 09:19:19 -03:00
import os
import re
from openai_api_key import openai_api_key
from google_api_key import google_api_key
2024-07-30 13:03:39 -03:00
import ollama
import json
from google.cloud import translate_v2 as translate
from google.oauth2 import service_account
import html
from tqdm import tqdm
import PyLibrary.funciones_comunes as fc
import time
2024-07-30 09:19:19 -03:00
openai_client = OpenAI(api_key=openai_api_key())
GOOGLE_APPLICATION_CREDENTIALS = "translate-431108-020c17463fbb.json"
logger = fc.configurar_logger()
2024-07-30 09:58:19 -03:00
def init_google_translate_client():
if os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
# Usar credenciales de cuenta de servicio
credentials = service_account.Credentials.from_service_account_file(
GOOGLE_APPLICATION_CREDENTIALS
)
return translate.Client(credentials=credentials)
else:
raise ValueError(
"No se han proporcionado credenciales válidas para Google Translate"
)
google_translate_client = init_google_translate_client()
2024-07-30 09:58:19 -03:00
def google_translate(text, target_language):
result = google_translate_client.translate(text, target_language=target_language)
translated_text = result["translatedText"]
return html.unescape(translated_text)
2024-07-30 12:16:58 -03:00
def read_system_prompt():
try:
with open(".\\data\\system_prompt.txt", "r", encoding="utf-8") as file:
2024-07-30 12:16:58 -03:00
return file.read().strip()
except FileNotFoundError:
logger.warning(
"Archivo system_prompt.txt no encontrado. Usando prompt por defecto."
)
2024-07-30 12:16:58 -03:00
return "You are a translator."
def translate_batch_ollama(texts, source_lang, target_lang):
joined_text = "\n".join(texts)
system_prompt = read_system_prompt()
logger.info(
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{joined_text}"
)
response = ollama.generate(
model="llama3.1",
prompt=f"Translate the following texts from {source_lang} to {target_lang} while preserving special fields like <> and <#>. {system_prompt}: \n\n{joined_text}",
)
translations = response["response"].strip().split("\n")
logger.info(f"Respuestas recibidas:\n{translations}")
return translations
def translate_batch_openai(texts_dict, source_lang, target_lang):
2024-07-30 12:16:58 -03:00
system_prompt = read_system_prompt()
texts_list = list(texts_dict.values())
request_payload = json.dumps(
{"texts": texts_list, "source_lang": source_lang, "target_lang": target_lang}
)
2024-07-30 09:58:19 -03:00
logger.info(
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{request_payload}"
2024-07-30 09:58:19 -03:00
)
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
2024-07-30 09:58:19 -03:00
messages=[
{"role": "system", "content": f"You are a translator.{system_prompt}."},
{"role": "user", "content": request_payload},
2024-07-30 09:58:19 -03:00
],
max_tokens=1500,
temperature=0.3,
2024-07-30 09:19:19 -03:00
)
response_payload = json.loads(response.choices[0].message.content.strip())
translations = response_payload.get("texts", [])
2024-07-30 09:58:19 -03:00
logger.info(f"Respuestas recibidas:\n{translations}")
if len(translations) != len(texts_list):
raise ValueError(
"La cantidad de traducciones recibidas no coincide con la cantidad de textos enviados."
)
return dict(zip(texts_dict.keys(), translations))
2024-07-30 09:19:19 -03:00
2024-07-30 13:03:39 -03:00
def affinity_batch_openai(tipo_PLC, texts_dict):
system_prompt = (
"Evaluate the semantic similarity between the following table of pairs of texts in json format on a scale from 0 to 1. "
"Return the similarity scores for every row in JSON format as a list of numbers, without any additional text or formatting."
)
original_list = [
fc.compactar_celda_traducida(tipo_PLC, key) for key in texts_dict.keys()
]
re_translated_list = list(texts_dict.values())
2024-07-30 09:58:19 -03:00
request_payload = json.dumps(
{"original": original_list, "compared": re_translated_list}
2024-07-30 09:58:19 -03:00
)
logger.info(f"Solicitando Afinidad para el lote de textos:\n{request_payload}")
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": system_prompt,
},
{"role": "user", "content": request_payload},
],
max_tokens=1500,
temperature=0.3,
2024-07-30 09:58:19 -03:00
)
response_content = response.choices[0].message.content
# Limpiar y convertir el contenido de la respuesta
cleaned_response_content = response_content.strip().strip("'```json").strip("```")
# Intentar convertir el contenido a JSON
try:
response_payload = json.loads(cleaned_response_content)
except json.JSONDecodeError:
raise ValueError("La respuesta no se pudo decodificar como JSON.")
# Manejar diferentes formatos de respuesta
if isinstance(response_payload, dict) and "similarity_scores" in response_payload:
scores = response_payload["similarity_scores"]
elif isinstance(response_payload, list):
scores = response_payload
else:
raise ValueError("Formato de respuesta inesperado.")
logger.info(f"Respuestas recibidas:\n{scores}")
if len(scores) != len(original_list):
raise ValueError(
"La cantidad de afinidades recibidas no coincide con la cantidad de textos enviados."
)
return dict(zip(texts_dict.keys(), scores))
2024-07-30 09:58:19 -03:00
# Función que calcula la afinidad entre dos textos
def calcular_afinidad(tipo_PLC, texto1, texto2):
system_prompt = (
"Evaluate the semantic similarity between the following pair of texts on a scale from 0 to 1. "
"Return the similarity score as a single number."
)
original_text = fc.compactar_celda_traducida(tipo_PLC, texto1)
compared_text = texto2
request_payload = json.dumps({"original": original_text, "compared": compared_text})
logger.info(f"Solicitando afinidad para el par de textos:\n{request_payload}")
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": system_prompt,
},
{"role": "user", "content": request_payload},
],
max_tokens=1500,
temperature=0.3,
)
response_content = response.choices[0].message.content
# Limpiar y convertir el contenido de la respuesta
cleaned_response_content = response_content.strip().strip("'```json").strip("```")
# Intentar convertir el contenido a JSON
try:
score = float(cleaned_response_content)
except ValueError:
raise ValueError(f"La respuesta no se pudo decodificar como un número: {cleaned_response_content}")
return score
2024-09-19 04:05:19 -03:00
def main(tipo_PLC, codigo_columna_maestra, file_path, target_lang_code, target_lang, traducir_todo, batch_size=10):
2024-07-30 09:19:19 -03:00
df = pd.read_excel(file_path)
2024-09-19 04:05:19 -03:00
source_col = codigo_columna_maestra
source_translated_col = target_lang_code
2024-07-30 09:19:19 -03:00
target_col = f"{target_lang_code} Translated"
check_translate_col = f"{target_lang_code} CheckTranslate"
affinity_col = f"{target_lang_code} Affinity"
2024-07-30 09:19:19 -03:00
# Asegurarse de que la columna de destino existe
if target_col not in df.columns:
2024-07-30 09:19:19 -03:00
df[target_col] = None
if check_translate_col not in df.columns:
df[check_translate_col] = None
if affinity_col not in df.columns:
df[affinity_col] = None
2024-07-30 09:19:19 -03:00
texts_to_translate = {}
2024-07-30 09:58:19 -03:00
2024-09-20 09:30:33 -03:00
for index, row in df.iterrows():
celda_clave = str(row[source_col])
source_translated_text = (
str(row[source_translated_col])
if source_translated_col in df.columns
else ""
)
2024-09-19 15:55:58 -03:00
celda_clave_compactada = fc.compactar_celda_traducida(tipo_PLC, celda_clave)
if traducir_todo:
2024-09-20 09:30:33 -03:00
if fc.texto_requiere_traduccion(tipo_PLC, celda_clave_compactada, logger):
df.at[index, source_translated_col] = '' # Necesita ser traducida.
texts_to_translate[celda_clave] = celda_clave_compactada
else:
if (
pd.isna(row[source_translated_col])
or source_translated_text.strip() == ""
):
2024-09-20 09:30:33 -03:00
if fc.texto_requiere_traduccion(tipo_PLC, celda_clave_compactada, logger) or fc.texto_con_campos_especiales(tipo_PLC, celda_clave_compactada):
texts_to_translate[celda_clave] = celda_clave_compactada
2024-07-30 09:19:19 -03:00
2024-07-30 09:58:19 -03:00
num_texts = len(texts_to_translate)
# num_texts = 40
2024-07-30 09:58:19 -03:00
logger.info(f"Número total de textos a traducir: {num_texts}")
2024-07-30 12:16:58 -03:00
print(f"Número total de textos a traducir: {num_texts}")
# Traducciones
# Hacer las traducciones via LLM en batch
translations = {}
for start_idx in range(0, num_texts, batch_size):
2024-07-30 09:58:19 -03:00
end_idx = min(start_idx + batch_size, num_texts)
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
logger.info(f"Traduciendo: celdas desde {start_idx} a {end_idx}.")
2024-07-30 12:16:58 -03:00
print(f"Traduciendo : celdas desde: {start_idx} a :{end_idx}.")
retries = 4 # Número de intentos totales (1 inicial + 1 reintento)
for attempt in range(retries):
try:
batch_translations = translate_batch_openai(
2024-09-19 15:55:58 -03:00
batch_texts, fc.idiomas_idiomafromcode(codigo_columna_maestra) , target_lang
)
translations.update(batch_translations)
break # Si la traducción es exitosa, salimos del bucle de reintentos
except Exception as e:
if attempt < retries - 1: # Si no es el último intento
logger.warning(
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
print(
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
time.sleep(3)
else: # Si es el último intento
logger.error(
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
)
print(
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
)
2024-07-30 09:19:19 -03:00
2024-07-30 09:58:19 -03:00
logger.info(f"Número total de traducciones recibidas: {len(translations)}")
# Traduccion inversa
# Actualizar el DataFrame con las traducciones y hacemos la Traduccion inversa
for index, row in tqdm(
df.iterrows(), total=df.shape[0], desc="Procesando traducciones"
):
celda_clave = str(row[source_col])
if celda_clave in translations:
df.at[index, target_col] = translations[celda_clave]
# Realizar la traducción de verificación con Google Translate
try:
google_translation = google_translate(translations[celda_clave], fc.idiomas_shortcodefromcode(codigo_columna_maestra))
df.at[index, check_translate_col] = google_translation
except Exception as e:
logger.error(
f"Error en la traducción de Google para el texto '{celda_clave}': {e}"
)
df.at[index, check_translate_col] = "Error en la traducción"
df.at[index, affinity_col] = 0.0
2024-07-30 09:19:19 -03:00
# Afinidades
# Se calculan las Afinidades
affinities = {}
for start_idx in range(0, num_texts, batch_size): # num_text
end_idx = min(start_idx + batch_size, num_texts)
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
logger.info(f"Afinidad: celdas desde {start_idx} a {end_idx}.")
print(f"Afinidad: celdas desde: {start_idx} a :{end_idx}.")
retries = 2 # Número de intentos totales (1 inicial + 1 reintento)
for attempt in range(retries):
try:
batch_affinities = affinity_batch_openai(tipo_PLC, batch_texts)
affinities.update(batch_affinities)
break # Si la llamada es exitosa, salimos del bucle de reintentos
except Exception as e:
if attempt < retries - 1: # Si no es el último intento
logger.warning(
f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
print(
f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
)
time.sleep(3)
else: # Si es el último intento
logger.error(
f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}"
)
print(
f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}"
)
# Intentar individualmente si falla en batch
for key, value in batch_texts.items():
try:
score = calcular_afinidad(tipo_PLC, key, value)
affinities[key] = score
except Exception as ind_e:
affinities[key] = "0"
logger.error(
f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}"
)
print(
f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}"
)
# Actualizar el DataFrame con las Afinidades
for index, row in df.iterrows():
celda_clave = str(row[source_col])
if celda_clave in affinities:
df.at[index, affinity_col] = affinities[celda_clave]
output_path = os.path.join(
2024-09-19 04:05:19 -03:00
os.path.dirname(file_path), f"3_master_export2translate_translated_{tipo_PLC}.xlsx"
)
fc.save_dataframe_with_retries(df, output_path=output_path)
2024-07-30 09:58:19 -03:00
logger.info(f"Archivo traducido guardado en: {output_path}")
2024-07-30 09:19:19 -03:00
print(f"Archivo traducido guardado en: {output_path}")
2024-09-19 04:05:19 -03:00
def run(tipo_PLC, codigo_columna_maestra, seleccion_idioma, traducir_todo):
batch_size = 20
2024-09-19 04:05:19 -03:00
translate_file = f".\\data\\2_master_export2translate_{tipo_PLC}.xlsx"
2024-07-30 09:19:19 -03:00
if seleccion_idioma not in fc.IDIOMAS:
2024-07-30 09:19:19 -03:00
print("Selección inválida.")
else:
target_lang, target_lang_code = fc.IDIOMAS[seleccion_idioma]
2024-09-19 04:05:19 -03:00
main(tipo_PLC, codigo_columna_maestra, translate_file, target_lang_code, target_lang, traducir_todo, batch_size)
if __name__ == "__main__":
2024-09-19 04:05:19 -03:00
tipo_PLC = "siemens"
codigo_columna_maestra = "it-IT"
fc.mostrar_idiomas()
seleccion_idioma = int(input("Introduce el número del idioma de destino: "))
traducir_todo = (
input("¿Desea traducir todas las celdas (s/n)? ").strip().lower() == "s"
)
2024-09-19 04:05:19 -03:00
tipo_PLC = "siemens"
run(tipo_PLC, codigo_columna_maestra, seleccion_idioma, traducir_todo)