336 lines
12 KiB
Python
336 lines
12 KiB
Python
import pandas as pd
|
|
from openai import OpenAI
|
|
import os
|
|
from openai_api_key import openai_api_key
|
|
from google_api_key import google_api_key
|
|
import ollama
|
|
import json
|
|
from google.cloud import translate_v2 as translate
|
|
from google.oauth2 import service_account
|
|
import html
|
|
from tqdm import tqdm
|
|
import PyLibrary.funciones_comunes as fc
|
|
import time
|
|
from translation_config import TranslationConfig
|
|
from openpyxl.styles import PatternFill, Alignment
|
|
import sys
|
|
|
|
GOOGLE_APPLICATION_CREDENTIALS = "translate-431108-020c17463fbb.json"
|
|
batch_size = 20
|
|
|
|
# Definir el logger a nivel de módulo
|
|
logger = None
|
|
|
|
# Crear el cliente OpenAI
|
|
openai_client = OpenAI(api_key=openai_api_key())
|
|
|
|
|
|
def init_google_translate_client():
|
|
if os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
|
|
# Usar credenciales de cuenta de servicio
|
|
credentials = service_account.Credentials.from_service_account_file(
|
|
GOOGLE_APPLICATION_CREDENTIALS
|
|
)
|
|
return translate.Client(credentials=credentials)
|
|
else:
|
|
raise ValueError(
|
|
"No se han proporcionado credenciales válidas para Google Translate"
|
|
)
|
|
|
|
|
|
google_translate_client = init_google_translate_client()
|
|
|
|
|
|
def google_translate(text, target_language):
|
|
result = google_translate_client.translate(text, target_language=target_language)
|
|
translated_text = result["translatedText"]
|
|
return html.unescape(translated_text)
|
|
|
|
|
|
def read_system_prompt():
|
|
try:
|
|
with open(".\\data\\system_prompt.txt", "r", encoding="utf-8") as file:
|
|
return file.read().strip()
|
|
except FileNotFoundError:
|
|
logger.warning(
|
|
"Archivo system_prompt.txt no encontrado. Usando prompt por defecto."
|
|
)
|
|
return "You are a translator."
|
|
|
|
|
|
def translate_batch_ollama(texts, source_lang, target_lang):
|
|
joined_text = "\n".join(texts)
|
|
system_prompt = read_system_prompt()
|
|
logger.info(
|
|
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{joined_text}"
|
|
)
|
|
response = ollama.generate(
|
|
model="llama3.1",
|
|
prompt=f"Translate the following texts from {source_lang} to {target_lang} while preserving special fields like <> and <#>. {system_prompt}: \n\n{joined_text}",
|
|
)
|
|
|
|
translations = response["response"].strip().split("\n")
|
|
logger.info(f"Respuestas recibidas:\n{translations}")
|
|
return translations
|
|
|
|
|
|
def translate_batch_openai(texts_dict, source_lang, target_lang):
|
|
system_prompt = read_system_prompt()
|
|
texts_list = list(texts_dict.values())
|
|
|
|
request_payload = json.dumps(
|
|
{"texts": texts_list, "source_lang": source_lang, "target_lang": target_lang}
|
|
)
|
|
logger.info(
|
|
f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{request_payload}"
|
|
)
|
|
|
|
response = openai_client.chat.completions.create(
|
|
model="gpt-4o-mini",
|
|
messages=[
|
|
{"role": "system", "content": f"You are a translator.{system_prompt}."},
|
|
{"role": "user", "content": request_payload},
|
|
],
|
|
max_tokens=1500,
|
|
temperature=0.3,
|
|
)
|
|
response_payload = json.loads(response.choices[0].message.content.strip())
|
|
translations = response_payload.get("texts", [])
|
|
logger.info(f"Respuestas recibidas:\n{translations}")
|
|
|
|
if len(translations) != len(texts_list):
|
|
raise ValueError(
|
|
"La cantidad de traducciones recibidas no coincide con la cantidad de textos enviados."
|
|
)
|
|
|
|
return dict(zip(texts_dict.keys(), translations))
|
|
|
|
|
|
def main(config: TranslationConfig):
|
|
df = fc.read_dataframe_with_cleanup_retries(config.get_translate_path())
|
|
|
|
source_col = config.codigo_columna_maestra
|
|
source_translated_col = f"{config.codigo_idioma_seleccionado}_Propuesto"
|
|
target_col = f"{config.codigo_idioma_seleccionado} Translated"
|
|
check_translate_col = f"{config.codigo_idioma_seleccionado} CheckTranslate"
|
|
affinity_col = f"{config.codigo_idioma_seleccionado} Affinity"
|
|
|
|
# Asegurarse de que la columna de destino existe
|
|
for col in [target_col, check_translate_col, affinity_col]:
|
|
if col not in df.columns:
|
|
df[col] = None
|
|
|
|
texts_to_translate = {}
|
|
|
|
# Inicializar ProgressBar para la fase de preparación
|
|
prep_progress = fc.ProgressBar(
|
|
len(df), prefix="Preparando textos:", suffix="Completado"
|
|
)
|
|
|
|
for index, row in df.iterrows():
|
|
celda_clave = str(row[source_col])
|
|
source_translated_text = (
|
|
str(row[source_translated_col])
|
|
if source_translated_col in df.columns
|
|
else ""
|
|
)
|
|
celda_clave_compactada = fc.compactar_celda_traducida(
|
|
config.codigo_tipo_PLC, celda_clave
|
|
)
|
|
|
|
if config.traducir_todo:
|
|
if fc.texto_requiere_traduccion(
|
|
config.codigo_tipo_PLC, celda_clave_compactada, logger
|
|
):
|
|
df.at[index, source_translated_col] = ""
|
|
texts_to_translate[celda_clave] = celda_clave_compactada
|
|
else:
|
|
if (
|
|
pd.isna(row[source_translated_col])
|
|
or source_translated_text.strip() == ""
|
|
):
|
|
if fc.texto_requiere_traduccion(
|
|
config.codigo_tipo_PLC, celda_clave_compactada, logger
|
|
) or fc.texto_con_campos_especiales(
|
|
config.codigo_tipo_PLC, celda_clave_compactada
|
|
):
|
|
texts_to_translate[celda_clave] = celda_clave_compactada
|
|
|
|
prep_progress.update(index + 1)
|
|
|
|
prep_progress.finish()
|
|
|
|
num_texts = len(texts_to_translate)
|
|
logger.info(f"Número total de textos a traducir: {num_texts}")
|
|
print(f"\nNúmero total de textos a traducir: {num_texts}")
|
|
|
|
# Inicializar ProgressBar para la fase de traducción
|
|
trans_progress = fc.ProgressBar(
|
|
num_texts, prefix="Traduciendo:", suffix="Completado"
|
|
)
|
|
|
|
# Traducciones
|
|
translations = {}
|
|
for start_idx in range(0, num_texts, batch_size):
|
|
end_idx = min(start_idx + batch_size, num_texts)
|
|
batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx])
|
|
logger.info(f"Traduciendo: celdas desde {start_idx} a {end_idx}.")
|
|
|
|
retries = 4
|
|
for attempt in range(retries):
|
|
try:
|
|
batch_translations = translate_batch_openai(
|
|
batch_texts,
|
|
fc.idiomas_idiomafromcode(config.codigo_columna_maestra),
|
|
fc.idiomas_idiomafromcode(config.codigo_idioma_seleccionado),
|
|
)
|
|
translations.update(batch_translations)
|
|
break
|
|
except Exception as e:
|
|
if attempt < retries - 1:
|
|
logger.warning(
|
|
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
|
|
)
|
|
print(
|
|
f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..."
|
|
)
|
|
time.sleep(3)
|
|
else:
|
|
logger.error(
|
|
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
|
|
)
|
|
print(
|
|
f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}"
|
|
)
|
|
|
|
trans_progress.update(end_idx)
|
|
|
|
trans_progress.finish()
|
|
logger.info(f"Número total de traducciones recibidas: {len(translations)}")
|
|
|
|
# Inicializar ProgressBar para la fase de actualización del DataFrame
|
|
update_progress = fc.ProgressBar(
|
|
len(df), prefix="Actualizando DataFrame:", suffix="Completado"
|
|
)
|
|
|
|
# Actualizar el DataFrame con las traducciones y hacemos la Traduccion inversa
|
|
for index, row in df.iterrows():
|
|
celda_clave = str(row[source_col])
|
|
if celda_clave in translations:
|
|
df.at[index, target_col] = translations[celda_clave]
|
|
try:
|
|
google_translation = google_translate(
|
|
translations[celda_clave],
|
|
fc.idiomas_shortcodefromcode(config.codigo_columna_maestra),
|
|
)
|
|
df.at[index, check_translate_col] = google_translation
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error en la traducción de Google para el texto '{celda_clave}': {e}"
|
|
)
|
|
df.at[index, check_translate_col] = "Error en la traducción"
|
|
df.at[index, affinity_col] = 0.0
|
|
update_progress.increment()
|
|
|
|
update_progress.finish()
|
|
|
|
# Configurar el modelo a usar
|
|
modelo_llm = fc.LLM_MODELS["OpenAI"] # o el que se prefiera
|
|
api_key = openai_api_key() # solo necesario para OpenAI y Grok
|
|
|
|
# Afinidades
|
|
# Los textos ya vienen del proceso de traducción
|
|
texts_to_check = {}
|
|
for key, translated_text in translations.items():
|
|
if pd.notna(translated_text) and str(translated_text).strip() != "":
|
|
texts_to_check[key] = translated_text
|
|
|
|
# Calcular afinidades usando LLM
|
|
affinities_dict = fc.calcular_afinidad_batch(
|
|
texts_to_check, config.codigo_tipo_PLC, modelo_llm, logger, api_key
|
|
)
|
|
|
|
# Asignar resultados al DataFrame
|
|
for index, row in df.iterrows():
|
|
key = str(row[source_col])
|
|
if key in affinities_dict:
|
|
df.at[index, affinity_col] = affinities_dict[key]
|
|
|
|
output_path = config.get_auto_translate_path()
|
|
|
|
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
|
|
df.to_excel(writer, index=False, sheet_name="Sheet1")
|
|
|
|
workbook = writer.book
|
|
worksheet = writer.sheets["Sheet1"]
|
|
# Inmovilizar paneles en A2
|
|
worksheet.freeze_panes = "A2"
|
|
|
|
# Configurar ancho de columnas basado en contenido
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
for col in worksheet.columns:
|
|
max_length = 0
|
|
column = col[0].column_letter
|
|
|
|
for cell in col:
|
|
try:
|
|
if cell.value:
|
|
text_length = len(str(cell.value))
|
|
# Si el texto es más largo que 50, aplicamos wrap_text
|
|
if text_length > 50:
|
|
cell.alignment = Alignment(wrap_text=True, vertical="top")
|
|
text_length = min(
|
|
50, max(len(word) for word in str(cell.value).split())
|
|
)
|
|
max_length = max(max_length, text_length)
|
|
except:
|
|
pass
|
|
|
|
# Ajustar el ancho con un pequeño padding
|
|
adjusted_width = min(50, max_length + 2)
|
|
worksheet.column_dimensions[column].width = (
|
|
adjusted_width if adjusted_width > 8 else 8
|
|
)
|
|
|
|
# Colores para el formato condicional
|
|
light_blue = PatternFill(
|
|
start_color="ADD8E6", end_color="ADD8E6", fill_type="solid"
|
|
)
|
|
yellow = PatternFill(
|
|
start_color="FFFF00", end_color="FFFF00", fill_type="solid"
|
|
)
|
|
|
|
# Aplicar formatos
|
|
for row in worksheet.iter_rows(min_row=2):
|
|
translated_cell = row[df.columns.get_loc(target_col)]
|
|
if translated_cell.value:
|
|
affinity_cell = row[df.columns.get_loc(affinity_col)]
|
|
try:
|
|
affinity_value = float(
|
|
affinity_cell.value if affinity_cell.value else 0
|
|
)
|
|
if affinity_value == 1:
|
|
translated_cell.fill = light_blue
|
|
elif affinity_value < 1:
|
|
translated_cell.fill = yellow
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
logger.info(f"Archivo traducido guardado en: {output_path}")
|
|
print(f"\nArchivo traducido guardado en: {output_path}")
|
|
|
|
|
|
def run(config: TranslationConfig):
|
|
global logger
|
|
logger = fc.configurar_logger(config.work_dir)
|
|
script_name = os.path.basename(__file__)
|
|
print(f"\rIniciando: {script_name}\r")
|
|
main(config)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import menu_pasos_traduccion
|
|
|
|
menu_pasos_traduccion.main()
|