386 lines
15 KiB
Python
386 lines
15 KiB
Python
import pandas as pd
|
|
from openai import OpenAI
|
|
import os
|
|
import re
|
|
from openai_api_key import openai_api_key
|
|
from google_api_key import google_api_key
|
|
import ollama
|
|
import json
|
|
from google.cloud import translate_v2 as translate
|
|
from google.oauth2 import service_account
|
|
import html
|
|
from tqdm import tqdm
|
|
import funciones_comunes as fc
|
|
import time
|
|
|
|
|
|
openai_client = OpenAI(api_key=openai_api_key())
|
|
GOOGLE_APPLICATION_CREDENTIALS = "translate-431108-020c17463fbb.json"
|
|
logger = fc.configurar_logger()
|
|
|
|
|
|
def init_google_translate_client():
|
|
if os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
|
|
# Usar credenciales de cuenta de servicio
|
|
credentials = service_account.Credentials.from_service_account_file(
|
|
GOOGLE_APPLICATION_CREDENTIALS
|
|
)
|
|
return translate.Client(credentials=credentials)
|
|
else:
|
|
raise ValueError(
|
|
"No se han proporcionado credenciales válidas para Google Translate"
|
|
)
|
|
|
|
google_translate_client = init_google_translate_client()
|
|
|
|
def google_translate(text, target_language):
|
|
result = google_translate_client.translate(text, target_language=target_language)
|
|
translated_text = result["translatedText"]
|
|
return html.unescape(translated_text)
|
|
|
|
|
|
def read_system_prompt():
|
|
try:
|
|
with open(".\\data\\system_prompt.txt", "r", encoding="utf-8") as file:
|
|
return file.read().strip()
|
|
except FileNotFoundError:
|
|
logger.warning(
|
|
"Archivo system_prompt.txt no encontrado. Usando prompt por defecto."
|
|
)
|
|
return "You are a translator."
|
|
|
|
|
|
def translate_batch_ollama(texts, source_lang, target_lang):
|
|
joined_text = "\n".join(texts)
|
|
system_prompt = read_system_prompt()
|
|
logger.info(
|
|
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{joined_text}"
|
|
)
|
|
response = ollama.generate(
|
|
model="llama3.1",
|
|
prompt=f"Translate the following texts from {source_lang} to {target_lang} while preserving special fields like <> and <#>. {system_prompt}: \n\n{joined_text}",
|
|
)
|
|
|
|
translations = response["response"].strip().split("\n")
|
|
logger.info(f"Respuestas recibidas:\n{translations}")
|
|
return translations
|
|
|
|
|
|
def texto_requiere_traduccion(texto):
|
|
palabras = re.findall(r"\b\w{4,}\b", texto)
|
|
campos_especiales = re.findall(r"<.*?>", texto)
|
|
requiere_traduccion = len(palabras) > 0 or len(campos_especiales) != len(
|
|
re.findall(r"<#>", texto)
|
|
)
|
|
logger.debug(
|
|
f"Decisión de traducción para texto '{texto}': {'Sí' if requiere_traduccion else 'No'} (palabras > 3 letras: {len(palabras) > 0}, solo campos especiales: {len(campos_especiales) == len(re.findall(r'<#>', texto))})"
|
|
)
|
|
return requiere_traduccion
|
|
|
|
|
|
def translate_batch_openai(texts_dict, source_lang, target_lang):
|
|
system_prompt = read_system_prompt()
|
|
texts_list = list(texts_dict.values())
|
|
|
|
request_payload = json.dumps(
|
|
{"texts": texts_list, "source_lang": source_lang, "target_lang": target_lang}
|
|
)
|
|
logger.info(
|
|
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{request_payload}"
|
|
)
|
|
|
|
response = openai_client.chat.completions.create(
|
|
model="gpt-4o-mini",
|
|
messages=[
|
|
{"role": "system", "content": f"You are a translator.{system_prompt}."},
|
|
{"role": "user", "content": request_payload},
|
|
],
|
|
max_tokens=1500,
|
|
temperature=0.3,
|
|
)
|
|
response_payload = json.loads(response.choices[0].message.content.strip())
|
|
translations = response_payload.get("texts", [])
|
|
logger.info(f"Respuestas recibidas:\n{translations}")
|
|
|
|
if len(translations) != len(texts_list):
|
|
raise ValueError(
|
|
"La cantidad de traducciones recibidas no coincide con la cantidad de textos enviados."
|
|
)
|
|
|
|
return dict(zip(texts_dict.keys(), translations))
|
|
|
|
|
|
def affinity_batch_openai(texts_dict):
|
|
system_prompt = (
|
|
"Evaluate the semantic similarity between the following table of pairs of texts in json format on a scale from 0 to 1. "
|
|
"Return the similarity scores for every row in JSON format as a list of numbers, without any additional text or formatting."
|
|
)
|
|
original_list = [
|
|
fc.compactar_celda_traducida(key) for key in texts_dict.keys()
|
|
]
|
|
re_translated_list = list(texts_dict.values())
|
|
|
|
request_payload = json.dumps(
|
|
{"original": original_list, "compared": re_translated_list}
|
|
)
|
|
logger.info(f"Solicitando Afinidad para el lote de textos:\n{request_payload}")
|
|
|
|
response = openai_client.chat.completions.create(
|
|
model="gpt-4o-mini",
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": system_prompt,
|
|
},
|
|
{"role": "user", "content": request_payload},
|
|
],
|
|
max_tokens=1500,
|
|
temperature=0.3,
|
|
)
|
|
response_content = response.choices[0].message.content
|
|
|
|
# Limpiar y convertir el contenido de la respuesta
|
|
cleaned_response_content = response_content.strip().strip("'```json").strip("```")
|
|
|
|
# Intentar convertir el contenido a JSON
|
|
try:
|
|
response_payload = json.loads(cleaned_response_content)
|
|
except json.JSONDecodeError:
|
|
raise ValueError("La respuesta no se pudo decodificar como JSON.")
|
|
|
|
# Manejar diferentes formatos de respuesta
|
|
if isinstance(response_payload, dict) and "similarity_scores" in response_payload:
|
|
scores = response_payload["similarity_scores"]
|
|
elif isinstance(response_payload, list):
|
|
scores = response_payload
|
|
else:
|
|
raise ValueError("Formato de respuesta inesperado.")
|
|
|
|
logger.info(f"Respuestas recibidas:\n{scores}")
|
|
|
|
if len(scores) != len(original_list):
|
|
raise ValueError(
|
|
"La cantidad de afinidades recibidas no coincide con la cantidad de textos enviados."
|
|
)
|
|
|
|
return dict(zip(texts_dict.keys(), scores))
|
|
|
|
|
|
# Función que calcula la afinidad entre dos textos
|
|
def calcular_afinidad(texto1, texto2):
|
|
system_prompt = (
|
|
"Evaluate the semantic similarity between the following pair of texts on a scale from 0 to 1. "
|
|
"Return the similarity score as a single number."
|
|
)
|
|
|
|
original_text = fc.compactar_celda_traducida(texto1)
|
|
compared_text = texto2
|
|
|
|
request_payload = json.dumps({"original": original_text, "compared": compared_text})
|
|
logger.info(f"Solicitando afinidad para el par de textos:\n{request_payload}")
|
|
|
|
response = openai_client.chat.completions.create(
|
|
model="gpt-4o-mini",
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": system_prompt,
|
|
},
|
|
{"role": "user", "content": request_payload},
|
|
],
|
|
max_tokens=1500,
|
|
temperature=0.3,
|
|
)
|
|
response_content = response.choices[0].message.content
|
|
|
|
# Limpiar y convertir el contenido de la respuesta
|
|
cleaned_response_content = response_content.strip().strip("'```json").strip("```")
|
|
|
|
# Intentar convertir el contenido a JSON
|
|
try:
|
|
score = float(cleaned_response_content)
|
|
except ValueError:
|
|
raise ValueError(f"La respuesta no se pudo decodificar como un número: {cleaned_response_content}")
|
|
|
|
return score
|
|
|
|
def main(tipo_PLC, codigo_columna_maestra, file_path, target_lang_code, target_lang, traducir_todo, batch_size=10):
|
|
df = pd.read_excel(file_path)
|
|
source_col = codigo_columna_maestra
|
|
source_translated_col = target_lang_code
|
|
target_col = f"{target_lang_code} Translated"
|
|
check_translate_col = f"{target_lang_code} CheckTranslate"
|
|
affinity_col = f"{target_lang_code} Affinity"
|
|
|
|
# Asegurarse de que la columna de destino existe
|
|
if target_col not in df.columns:
|
|
df[target_col] = None
|
|
if check_translate_col not in df.columns:
|
|
df[check_translate_col] = None
|
|
if affinity_col not in df.columns:
|
|
df[affinity_col] = None
|
|
|
|
texts_to_translate = {}
|
|
|
|
for _, row in df.iterrows():
|
|
celda_clave = str(row[source_col])
|
|
source_translated_text = (
|
|
str(row[source_translated_col])
|
|
if source_translated_col in df.columns
|
|
else ""
|
|
)
|
|
celda_clave_compactada = fc.compactar_celda_traducida(celda_clave)
|
|
|
|
if traducir_todo:
|
|
if texto_requiere_traduccion(celda_clave_compactada):
|
|
df[row,source_translated_col] = '' # Necesita ser traducida. En esta iteracion o en la siguiente.
|
|
texts_to_translate[celda_clave] = celda_clave_compactada
|
|
else:
|
|
if (
|
|
pd.isna(row[source_translated_col])
|
|
or source_translated_text.strip() == ""
|
|
):
|
|
if texto_requiere_traduccion(celda_clave_compactada):
|
|
texts_to_translate[celda_clave] = celda_clave_compactada
|
|
|
|
num_texts = len(texts_to_translate)
|
|
# num_texts = 40
|
|
|
|
logger.info(f"Número total de textos a traducir: {num_texts}")
|
|
print(f"Número total de textos a traducir: {num_texts}")
|
|
|
|
# Traducciones
|
|
# Hacer las traducciones via LLM en batch
|
|
translations = {}
|
|
for start_idx in range(0, num_texts, batch_size):
|
|
end_idx = min(start_idx + batch_size, num_texts)
|
|
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
|
|
logger.info(f"Traduciendo: celdas desde {start_idx} a {end_idx}.")
|
|
print(f"Traduciendo : celdas desde: {start_idx} a :{end_idx}.")
|
|
|
|
retries = 4 # Número de intentos totales (1 inicial + 1 reintento)
|
|
for attempt in range(retries):
|
|
try:
|
|
batch_translations = translate_batch_openai(
|
|
batch_texts, "Italian", target_lang
|
|
)
|
|
translations.update(batch_translations)
|
|
break # Si la traducción es exitosa, salimos del bucle de reintentos
|
|
except Exception as e:
|
|
if attempt < retries - 1: # Si no es el último intento
|
|
logger.warning(
|
|
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
|
|
)
|
|
print(
|
|
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
|
|
)
|
|
time.sleep(3)
|
|
else: # Si es el último intento
|
|
logger.error(
|
|
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
|
|
)
|
|
print(
|
|
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
|
|
)
|
|
|
|
logger.info(f"Número total de traducciones recibidas: {len(translations)}")
|
|
|
|
# Traduccion inversa
|
|
# Actualizar el DataFrame con las traducciones y hacemos la Traduccion inversa
|
|
for index, row in tqdm(
|
|
df.iterrows(), total=df.shape[0], desc="Procesando traducciones"
|
|
):
|
|
celda_clave = str(row[source_col])
|
|
if celda_clave in translations:
|
|
df.at[index, target_col] = translations[celda_clave]
|
|
# Realizar la traducción de verificación con Google Translate
|
|
try:
|
|
google_translation = google_translate(translations[celda_clave], "it")
|
|
df.at[index, check_translate_col] = google_translation
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error en la traducción de Google para el texto '{celda_clave}': {e}"
|
|
)
|
|
df.at[index, check_translate_col] = "Error en la traducción"
|
|
df.at[index, affinity_col] = 0.0
|
|
|
|
# Afinidades
|
|
# Se calculan las Afinidades
|
|
affinities = {}
|
|
for start_idx in range(0, num_texts, batch_size): # num_text
|
|
end_idx = min(start_idx + batch_size, num_texts)
|
|
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
|
|
logger.info(f"Afinidad: celdas desde {start_idx} a {end_idx}.")
|
|
print(f"Afinidad: celdas desde: {start_idx} a :{end_idx}.")
|
|
|
|
retries = 2 # Número de intentos totales (1 inicial + 1 reintento)
|
|
for attempt in range(retries):
|
|
try:
|
|
batch_affinities = affinity_batch_openai(batch_texts)
|
|
affinities.update(batch_affinities)
|
|
break # Si la llamada es exitosa, salimos del bucle de reintentos
|
|
except Exception as e:
|
|
if attempt < retries - 1: # Si no es el último intento
|
|
logger.warning(
|
|
f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
|
|
)
|
|
print(
|
|
f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
|
|
)
|
|
time.sleep(3)
|
|
else: # Si es el último intento
|
|
logger.error(
|
|
f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}"
|
|
)
|
|
print(
|
|
f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}"
|
|
)
|
|
# Intentar individualmente si falla en batch
|
|
for key, value in batch_texts.items():
|
|
try:
|
|
score = calcular_afinidad(key, value)
|
|
affinities[key] = score
|
|
except Exception as ind_e:
|
|
affinities[key] = "0"
|
|
logger.error(
|
|
f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}"
|
|
)
|
|
print(
|
|
f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}"
|
|
)
|
|
|
|
|
|
# Actualizar el DataFrame con las Afinidades
|
|
for index, row in df.iterrows():
|
|
celda_clave = str(row[source_col])
|
|
if celda_clave in affinities:
|
|
df.at[index, affinity_col] = affinities[celda_clave]
|
|
|
|
output_path = os.path.join(
|
|
os.path.dirname(file_path), f"3_master_export2translate_translated_{tipo_PLC}.xlsx"
|
|
)
|
|
fc.save_dataframe_with_retries(df, output_path=output_path)
|
|
logger.info(f"Archivo traducido guardado en: {output_path}")
|
|
print(f"Archivo traducido guardado en: {output_path}")
|
|
|
|
def run(tipo_PLC, codigo_columna_maestra, seleccion_idioma, traducir_todo):
|
|
batch_size = 20
|
|
translate_file = f".\\data\\2_master_export2translate_{tipo_PLC}.xlsx"
|
|
|
|
if seleccion_idioma not in fc.IDIOMAS:
|
|
print("Selección inválida.")
|
|
else:
|
|
target_lang, target_lang_code = fc.IDIOMAS[seleccion_idioma]
|
|
main(tipo_PLC, codigo_columna_maestra, translate_file, target_lang_code, target_lang, traducir_todo, batch_size)
|
|
|
|
if __name__ == "__main__":
|
|
tipo_PLC = "siemens"
|
|
codigo_columna_maestra = "it-IT"
|
|
fc.mostrar_idiomas()
|
|
seleccion_idioma = int(input("Introduce el número del idioma de destino: "))
|
|
traducir_todo = (
|
|
input("¿Desea traducir todas las celdas (s/n)? ").strip().lower() == "s"
|
|
)
|
|
tipo_PLC = "siemens"
|
|
run(tipo_PLC, codigo_columna_maestra, seleccion_idioma, traducir_todo)
|