import pandas as pd from openai import OpenAI import os import re from openai_api_key import openai_api_key from google_api_key import google_api_key import ollama import json from google.cloud import translate_v2 as translate from google.oauth2 import service_account import html from tqdm import tqdm from ...Library.Python import funciones_comunes as fc import time openai_client = OpenAI(api_key=openai_api_key()) GOOGLE_APPLICATION_CREDENTIALS = "translate-431108-020c17463fbb.json" logger = fc.configurar_logger() def init_google_translate_client(): if os.path.exists(GOOGLE_APPLICATION_CREDENTIALS): # Usar credenciales de cuenta de servicio credentials = service_account.Credentials.from_service_account_file( GOOGLE_APPLICATION_CREDENTIALS ) return translate.Client(credentials=credentials) else: raise ValueError( "No se han proporcionado credenciales válidas para Google Translate" ) google_translate_client = init_google_translate_client() def google_translate(text, target_language): result = google_translate_client.translate(text, target_language=target_language) translated_text = result["translatedText"] return html.unescape(translated_text) def read_system_prompt(): try: with open(".\\data\\system_prompt.txt", "r", encoding="utf-8") as file: return file.read().strip() except FileNotFoundError: logger.warning( "Archivo system_prompt.txt no encontrado. Usando prompt por defecto." ) return "You are a translator." def translate_batch_ollama(texts, source_lang, target_lang): joined_text = "\n".join(texts) system_prompt = read_system_prompt() logger.info( f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{joined_text}" ) response = ollama.generate( model="llama3.1", prompt=f"Translate the following texts from {source_lang} to {target_lang} while preserving special fields like <> and <#>. {system_prompt}: \n\n{joined_text}", ) translations = response["response"].strip().split("\n") logger.info(f"Respuestas recibidas:\n{translations}") return translations def translate_batch_openai(texts_dict, source_lang, target_lang): system_prompt = read_system_prompt() texts_list = list(texts_dict.values()) request_payload = json.dumps( {"texts": texts_list, "source_lang": source_lang, "target_lang": target_lang} ) logger.info( f"Solicitando traducción de {source_lang} a {target_lang} para el lote de textos:\n{request_payload}" ) response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": f"You are a translator.{system_prompt}."}, {"role": "user", "content": request_payload}, ], max_tokens=1500, temperature=0.3, ) response_payload = json.loads(response.choices[0].message.content.strip()) translations = response_payload.get("texts", []) logger.info(f"Respuestas recibidas:\n{translations}") if len(translations) != len(texts_list): raise ValueError( "La cantidad de traducciones recibidas no coincide con la cantidad de textos enviados." ) return dict(zip(texts_dict.keys(), translations)) def affinity_batch_openai(tipo_PLC, texts_dict): system_prompt = ( "Evaluate the semantic similarity between the following table of pairs of texts in json format on a scale from 0 to 1. " "Return the similarity scores for every row in JSON format as a list of numbers, without any additional text or formatting." ) original_list = [ fc.compactar_celda_traducida(tipo_PLC, key) for key in texts_dict.keys() ] re_translated_list = list(texts_dict.values()) request_payload = json.dumps( {"original": original_list, "compared": re_translated_list} ) logger.info(f"Solicitando Afinidad para el lote de textos:\n{request_payload}") response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": system_prompt, }, {"role": "user", "content": request_payload}, ], max_tokens=1500, temperature=0.3, ) response_content = response.choices[0].message.content # Limpiar y convertir el contenido de la respuesta cleaned_response_content = response_content.strip().strip("'```json").strip("```") # Intentar convertir el contenido a JSON try: response_payload = json.loads(cleaned_response_content) except json.JSONDecodeError: raise ValueError("La respuesta no se pudo decodificar como JSON.") # Manejar diferentes formatos de respuesta if isinstance(response_payload, dict) and "similarity_scores" in response_payload: scores = response_payload["similarity_scores"] elif isinstance(response_payload, list): scores = response_payload else: raise ValueError("Formato de respuesta inesperado.") logger.info(f"Respuestas recibidas:\n{scores}") if len(scores) != len(original_list): raise ValueError( "La cantidad de afinidades recibidas no coincide con la cantidad de textos enviados." ) return dict(zip(texts_dict.keys(), scores)) # Función que calcula la afinidad entre dos textos def calcular_afinidad(tipo_PLC, texto1, texto2): system_prompt = ( "Evaluate the semantic similarity between the following pair of texts on a scale from 0 to 1. " "Return the similarity score as a single number." ) original_text = fc.compactar_celda_traducida(tipo_PLC, texto1) compared_text = texto2 request_payload = json.dumps({"original": original_text, "compared": compared_text}) logger.info(f"Solicitando afinidad para el par de textos:\n{request_payload}") response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": system_prompt, }, {"role": "user", "content": request_payload}, ], max_tokens=1500, temperature=0.3, ) response_content = response.choices[0].message.content # Limpiar y convertir el contenido de la respuesta cleaned_response_content = response_content.strip().strip("'```json").strip("```") # Intentar convertir el contenido a JSON try: score = float(cleaned_response_content) except ValueError: raise ValueError(f"La respuesta no se pudo decodificar como un número: {cleaned_response_content}") return score def main(tipo_PLC, codigo_columna_maestra, file_path, target_lang_code, target_lang, traducir_todo, batch_size=10): df = pd.read_excel(file_path) source_col = codigo_columna_maestra source_translated_col = target_lang_code target_col = f"{target_lang_code} Translated" check_translate_col = f"{target_lang_code} CheckTranslate" affinity_col = f"{target_lang_code} Affinity" # Asegurarse de que la columna de destino existe if target_col not in df.columns: df[target_col] = None if check_translate_col not in df.columns: df[check_translate_col] = None if affinity_col not in df.columns: df[affinity_col] = None texts_to_translate = {} for index, row in df.iterrows(): celda_clave = str(row[source_col]) source_translated_text = ( str(row[source_translated_col]) if source_translated_col in df.columns else "" ) celda_clave_compactada = fc.compactar_celda_traducida(tipo_PLC, celda_clave) if traducir_todo: if fc.texto_requiere_traduccion(tipo_PLC, celda_clave_compactada, logger): df.at[index, source_translated_col] = '' # Necesita ser traducida. texts_to_translate[celda_clave] = celda_clave_compactada else: if ( pd.isna(row[source_translated_col]) or source_translated_text.strip() == "" ): if fc.texto_requiere_traduccion(tipo_PLC, celda_clave_compactada, logger) or fc.texto_con_campos_especiales(tipo_PLC, celda_clave_compactada): texts_to_translate[celda_clave] = celda_clave_compactada num_texts = len(texts_to_translate) # num_texts = 40 logger.info(f"Número total de textos a traducir: {num_texts}") print(f"Número total de textos a traducir: {num_texts}") # Traducciones # Hacer las traducciones via LLM en batch translations = {} for start_idx in range(0, num_texts, batch_size): end_idx = min(start_idx + batch_size, num_texts) batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx]) logger.info(f"Traduciendo: celdas desde {start_idx} a {end_idx}.") print(f"Traduciendo : celdas desde: {start_idx} a :{end_idx}.") retries = 4 # Número de intentos totales (1 inicial + 1 reintento) for attempt in range(retries): try: batch_translations = translate_batch_openai( batch_texts, fc.idiomas_idiomafromcode(codigo_columna_maestra) , target_lang ) translations.update(batch_translations) break # Si la traducción es exitosa, salimos del bucle de reintentos except Exception as e: if attempt < retries - 1: # Si no es el último intento logger.warning( f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..." ) print( f"Error en el intento {attempt + 1} de traducción de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..." ) time.sleep(3) else: # Si es el último intento logger.error( f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}" ) print( f"Error en todos los intentos de traducción de celdas desde {start_idx} a {end_idx}: {e}" ) logger.info(f"Número total de traducciones recibidas: {len(translations)}") # Traduccion inversa # Actualizar el DataFrame con las traducciones y hacemos la Traduccion inversa for index, row in tqdm( df.iterrows(), total=df.shape[0], desc="Procesando traducciones" ): celda_clave = str(row[source_col]) if celda_clave in translations: df.at[index, target_col] = translations[celda_clave] # Realizar la traducción de verificación con Google Translate try: google_translation = google_translate(translations[celda_clave], fc.idiomas_shortcodefromcode(codigo_columna_maestra)) df.at[index, check_translate_col] = google_translation except Exception as e: logger.error( f"Error en la traducción de Google para el texto '{celda_clave}': {e}" ) df.at[index, check_translate_col] = "Error en la traducción" df.at[index, affinity_col] = 0.0 # Afinidades # Se calculan las Afinidades affinities = {} for start_idx in range(0, num_texts, batch_size): # num_text end_idx = min(start_idx + batch_size, num_texts) batch_texts = dict(list(texts_to_translate.items())[start_idx:end_idx]) logger.info(f"Afinidad: celdas desde {start_idx} a {end_idx}.") print(f"Afinidad: celdas desde: {start_idx} a :{end_idx}.") retries = 2 # Número de intentos totales (1 inicial + 1 reintento) for attempt in range(retries): try: batch_affinities = affinity_batch_openai(tipo_PLC, batch_texts) affinities.update(batch_affinities) break # Si la llamada es exitosa, salimos del bucle de reintentos except Exception as e: if attempt < retries - 1: # Si no es el último intento logger.warning( f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..." ) print( f"Error en el intento {attempt + 1} de Afinidad de celdas desde {start_idx} a {end_idx}: {e}. Reintentando..." ) time.sleep(3) else: # Si es el último intento logger.error( f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}" ) print( f"Error en todos los intentos de Afinidad de celdas desde {start_idx} a {end_idx}: {e}" ) # Intentar individualmente si falla en batch for key, value in batch_texts.items(): try: score = calcular_afinidad(tipo_PLC, key, value) affinities[key] = score except Exception as ind_e: affinities[key] = "0" logger.error( f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}" ) print( f"Error en el cálculo individual de Afinidad para el texto '{key}': {ind_e}" ) # Actualizar el DataFrame con las Afinidades for index, row in df.iterrows(): celda_clave = str(row[source_col]) if celda_clave in affinities: df.at[index, affinity_col] = affinities[celda_clave] output_path = os.path.join( os.path.dirname(file_path), f"3_master_export2translate_translated_{tipo_PLC}.xlsx" ) fc.save_dataframe_with_retries(df, output_path=output_path) logger.info(f"Archivo traducido guardado en: {output_path}") print(f"Archivo traducido guardado en: {output_path}") def run(tipo_PLC, codigo_columna_maestra, seleccion_idioma, traducir_todo): batch_size = 20 translate_file = f".\\data\\2_master_export2translate_{tipo_PLC}.xlsx" if seleccion_idioma not in fc.IDIOMAS: print("Selección inválida.") else: target_lang, target_lang_code = fc.IDIOMAS[seleccion_idioma] main(tipo_PLC, codigo_columna_maestra, translate_file, target_lang_code, target_lang, traducir_todo, batch_size) if __name__ == "__main__": tipo_PLC = "siemens" codigo_columna_maestra = "it-IT" fc.mostrar_idiomas() seleccion_idioma = int(input("Introduce el número del idioma de destino: ")) traducir_todo = ( input("¿Desea traducir todas las celdas (s/n)? ").strip().lower() == "s" ) tipo_PLC = "siemens" run(tipo_PLC, codigo_columna_maestra, seleccion_idioma, traducir_todo)