import requests import os import time import traceback import json # Necesitamos importar json para cargar la cadena # Importación para MoviePy v2.x from moviepy import AudioFileClip # --- Configuración --- WHISPER_API_URL = "http://192.168.88.26:9005/asr" INPUT_MP4_PATH = "test.mp4" TEMP_WAV_BASENAME = "temp_audio_direct_conversion" API_LANGUAGE = None API_TASK = 'transcribe' API_OUTPUT_FORMAT = 'json' # Seguimos pidiéndolo, aunque lo ignoren API_ENCODE = True # --- Fin de la Configuración --- # La función convert_mp4_to_wav se mantiene igual def convert_mp4_to_wav(input_path, output_wav_path): # (Código omitido por brevedad - idéntico a la respuesta anterior) print(f"Intentando convertir '{os.path.basename(input_path)}' a WAV...") print(f" Entrada: {input_path}") print(f" Salida: {output_wav_path}") audio_clip = None try: audio_clip = AudioFileClip(input_path) audio_clip.write_audiofile(output_wav_path, codec='pcm_s16le', logger=None) print("Conversión a WAV exitosa.") return output_wav_path except Exception as e: print(f"ERROR durante la conversión a WAV: {type(e).__name__} - {e}") return None finally: if audio_clip: try: audio_clip.close() except Exception: pass def transcribe_audio_api(api_url_base, file_path, encode=True, task='transcribe', language=None, output_format='json'): """ Envía audio, muestra detalles y INTENTA interpretar la respuesta como JSON incluso si el Content-Type es incorrecto. """ print(f"\n--- Enviando Audio a la API Whisper ---") print(f"Archivo: '{os.path.basename(file_path)}'") url_params = {'encode': str(encode).lower(), 'task': task, 'output': output_format} if language: url_params['language'] = language print(f"Parámetros para URL: {url_params}") filename = os.path.basename(file_path) f = None try: f = open(file_path, 'rb') files_payload = {'audio_file': (filename, f, 'audio/wav')} request_headers = {'Accept': 'application/json'} print(f"Cabeceras explícitas: {request_headers}") print(f"Realizando POST a: {api_url_base}") response = requests.post( api_url_base, params=url_params, files=files_payload, headers=request_headers, timeout=300 ) print("\n--- Detalles de la Comunicación ---") print(f"URL Final Enviada: {response.request.url}") print(f"Código de Estado Recibido: {response.status_code} ({response.reason})") received_content_type = response.headers.get('Content-Type', 'N/A') print(f"Content-Type Recibido: {received_content_type}") print("\n--- Cuerpo de la Respuesta (Raw Text) ---") response_text = "" try: response_text = response.content.decode('utf-8', errors='replace') # Mostramos solo una parte si es muy largo, para no saturar preview_limit = 500 print(response_text[:preview_limit] + ('...' if len(response_text) > preview_limit else '')) except Exception as decode_err: print(f"[Error al decodificar: {decode_err}] Bytes: {response.content[:preview_limit]}...") print("-----------------------------------") response.raise_for_status() # Verificar errores 4xx/5xx # --- Interpretación Mejorada de la Respuesta --- if response.status_code == 200 and response_text.strip(): # Advertir si el Content-Type no es JSON pero intentaremos igual if not received_content_type.startswith('application/json'): print("\nADVERTENCIA: Content-Type no es JSON, pero se intentará interpretar el cuerpo.") try: # Intentar cargar el texto como JSON data = json.loads(response_text) print("Interpretación como JSON exitosa.") return data # Devolver el diccionario Python except json.JSONDecodeError as json_err: print(f"ERROR: No se pudo interpretar la respuesta como JSON: {json_err}") print("Devolviendo la respuesta como texto plano.") return response_text # Devolver texto si falla la interpretación else: # Respuesta vacía o código de error no capturado por raise_for_status print("Respuesta recibida vacía o con estado inesperado (no 200 OK).") return response_text # Devolver el texto (probablemente vacío) # ... (resto del manejo de excepciones de requests igual que antes) ... except requests.exceptions.Timeout: print("ERROR: Timeout.") except requests.exceptions.ConnectionError as e: print(f"ERROR de Conexión: {e}") except requests.exceptions.HTTPError as e: print(f"ERROR HTTP {e.response.status_code}.") except requests.exceptions.RequestException as e: print(f"ERROR de Request: {e}") except Exception as e: print(f"ERROR inesperado en API: {type(e).__name__} - {e}") finally: if f: try: f.close() except Exception: pass return None # --- Función para Formatear Tiempo --- def format_time(seconds): """Convierte segundos a formato HH:MM:SS.mmm""" millis = int(seconds * 1000) % 1000 total_seconds = int(seconds) secs = total_seconds % 60 mins = (total_seconds // 60) % 60 hours = total_seconds // 3600 return f"{hours:02}:{mins:02}:{secs:02}.{millis:03}" # --- Ejecución Principal --- if __name__ == "__main__": print("--- Iniciando Script Mejorado ---") overall_start_time = time.perf_counter() temp_wav_file_path = os.path.join( os.path.dirname(__file__) or '.', f"{TEMP_WAV_BASENAME}_{int(time.time())}.wav" ) transcription_result_data = None prepared_audio_path = None # 1. Verificar archivo de entrada if not os.path.exists(INPUT_MP4_PATH): print(f"Error Crítico: El archivo de entrada '{INPUT_MP4_PATH}' no existe.") else: # 2. Convertir a WAV conv_start_time = time.perf_counter() prepared_audio_path = convert_mp4_to_wav(INPUT_MP4_PATH, temp_wav_file_path) conv_end_time = time.perf_counter() if prepared_audio_path: print(f"(Tiempo de conversión: {conv_end_time - conv_start_time:.2f} segundos)") # 3. Transcribir si la conversión fue exitosa if os.path.exists(prepared_audio_path): api_start_time = time.perf_counter() transcription_result_data = transcribe_audio_api( api_url_base=WHISPER_API_URL, file_path=prepared_audio_path, encode=API_ENCODE, task=API_TASK, language=API_LANGUAGE, output_format=API_OUTPUT_FORMAT ) api_end_time = time.perf_counter() print(f"(Tiempo de llamada API: {api_end_time - api_start_time:.2f} segundos)") else: print(f"ERROR: El archivo WAV convertido '{prepared_audio_path}' no se encontró.") else: print("La conversión a WAV falló. No se puede continuar.") # 4. Limpieza if os.path.exists(temp_wav_file_path): try: print(f"\nLimpiando archivo temporal '{temp_wav_file_path}'...") os.remove(temp_wav_file_path) print("Limpieza completada.") except OSError as e: print(f"Error al eliminar archivo temporal: {e}") # 5. Mostrar Resultado Final Interpretado print("\n--- Resultado Final Interpretado ---") if isinstance(transcription_result_data, dict): print("¡Respuesta interpretada como JSON exitosamente!") lang = transcription_result_data.get('language', 'N/D') full_text = transcription_result_data.get('text', '[Texto no encontrado]') print(f"\nIdioma Detectado: {lang}") print(f"\nTranscripción Completa:\n{'-'*25}\n{full_text}\n{'-'*25}") # Mostrar segmentos si existen segments = transcription_result_data.get('segments') if segments and isinstance(segments, list): print("\nSegmentos Detallados:") for segment in segments: start = segment.get('start', 0.0) end = segment.get('end', 0.0) text = segment.get('text', '') print(f" [{format_time(start)} --> {format_time(end)}] {text.strip()}") else: print("\n(No se encontraron detalles de segmentos en el JSON)") elif isinstance(transcription_result_data, str): print("Se recibió una respuesta de texto plano que no pudo ser interpretada como JSON:") print(transcription_result_data) else: print("No se obtuvo resultado de la transcripción o hubo un error previo.") print("----------------------------------") overall_end_time = time.perf_counter() elapsed_time = overall_end_time - overall_start_time print(f"\n--- Tiempo Total Empleado: {elapsed_time:.2f} segundos ({format_time(elapsed_time)}) ---")