Whisper_using_docker/x1.py

207 lines
8.9 KiB
Python

import requests
import os
import time
import traceback
import json # Necesitamos importar json para cargar la cadena
# Importación para MoviePy v2.x
from moviepy import AudioFileClip
# --- Configuración ---
WHISPER_API_URL = "http://192.168.88.26:9005/asr"
INPUT_MP4_PATH = "test.mp4"
TEMP_WAV_BASENAME = "temp_audio_direct_conversion"
API_LANGUAGE = None
API_TASK = 'transcribe'
API_OUTPUT_FORMAT = 'json' # Seguimos pidiéndolo, aunque lo ignoren
API_ENCODE = True
# --- Fin de la Configuración ---
# La función convert_mp4_to_wav se mantiene igual
def convert_mp4_to_wav(input_path, output_wav_path):
# (Código omitido por brevedad - idéntico a la respuesta anterior)
print(f"Intentando convertir '{os.path.basename(input_path)}' a WAV...")
print(f" Entrada: {input_path}")
print(f" Salida: {output_wav_path}")
audio_clip = None
try:
audio_clip = AudioFileClip(input_path)
audio_clip.write_audiofile(output_wav_path, codec='pcm_s16le', logger=None)
print("Conversión a WAV exitosa.")
return output_wav_path
except Exception as e:
print(f"ERROR durante la conversión a WAV: {type(e).__name__} - {e}")
return None
finally:
if audio_clip:
try: audio_clip.close()
except Exception: pass
def transcribe_audio_api(api_url_base, file_path, encode=True, task='transcribe', language=None, output_format='json'):
"""
Envía audio, muestra detalles y INTENTA interpretar la respuesta como JSON
incluso si el Content-Type es incorrecto.
"""
print(f"\n--- Enviando Audio a la API Whisper ---")
print(f"Archivo: '{os.path.basename(file_path)}'")
url_params = {'encode': str(encode).lower(), 'task': task, 'output': output_format}
if language: url_params['language'] = language
print(f"Parámetros para URL: {url_params}")
filename = os.path.basename(file_path)
f = None
try:
f = open(file_path, 'rb')
files_payload = {'audio_file': (filename, f, 'audio/wav')}
request_headers = {'Accept': 'application/json'}
print(f"Cabeceras explícitas: {request_headers}")
print(f"Realizando POST a: {api_url_base}")
response = requests.post(
api_url_base, params=url_params, files=files_payload,
headers=request_headers, timeout=300
)
print("\n--- Detalles de la Comunicación ---")
print(f"URL Final Enviada: {response.request.url}")
print(f"Código de Estado Recibido: {response.status_code} ({response.reason})")
received_content_type = response.headers.get('Content-Type', 'N/A')
print(f"Content-Type Recibido: {received_content_type}")
print("\n--- Cuerpo de la Respuesta (Raw Text) ---")
response_text = ""
try:
response_text = response.content.decode('utf-8', errors='replace')
# Mostramos solo una parte si es muy largo, para no saturar
preview_limit = 500
print(response_text[:preview_limit] + ('...' if len(response_text) > preview_limit else ''))
except Exception as decode_err:
print(f"[Error al decodificar: {decode_err}] Bytes: {response.content[:preview_limit]}...")
print("-----------------------------------")
response.raise_for_status() # Verificar errores 4xx/5xx
# --- Interpretación Mejorada de la Respuesta ---
if response.status_code == 200 and response_text.strip():
# Advertir si el Content-Type no es JSON pero intentaremos igual
if not received_content_type.startswith('application/json'):
print("\nADVERTENCIA: Content-Type no es JSON, pero se intentará interpretar el cuerpo.")
try:
# Intentar cargar el texto como JSON
data = json.loads(response_text)
print("Interpretación como JSON exitosa.")
return data # Devolver el diccionario Python
except json.JSONDecodeError as json_err:
print(f"ERROR: No se pudo interpretar la respuesta como JSON: {json_err}")
print("Devolviendo la respuesta como texto plano.")
return response_text # Devolver texto si falla la interpretación
else:
# Respuesta vacía o código de error no capturado por raise_for_status
print("Respuesta recibida vacía o con estado inesperado (no 200 OK).")
return response_text # Devolver el texto (probablemente vacío)
# ... (resto del manejo de excepciones de requests igual que antes) ...
except requests.exceptions.Timeout: print("ERROR: Timeout.")
except requests.exceptions.ConnectionError as e: print(f"ERROR de Conexión: {e}")
except requests.exceptions.HTTPError as e: print(f"ERROR HTTP {e.response.status_code}.")
except requests.exceptions.RequestException as e: print(f"ERROR de Request: {e}")
except Exception as e: print(f"ERROR inesperado en API: {type(e).__name__} - {e}")
finally:
if f:
try: f.close()
except Exception: pass
return None
# --- Función para Formatear Tiempo ---
def format_time(seconds):
"""Convierte segundos a formato HH:MM:SS.mmm"""
millis = int(seconds * 1000) % 1000
total_seconds = int(seconds)
secs = total_seconds % 60
mins = (total_seconds // 60) % 60
hours = total_seconds // 3600
return f"{hours:02}:{mins:02}:{secs:02}.{millis:03}"
# --- Ejecución Principal ---
if __name__ == "__main__":
print("--- Iniciando Script Mejorado ---")
overall_start_time = time.perf_counter()
temp_wav_file_path = os.path.join(
os.path.dirname(__file__) or '.',
f"{TEMP_WAV_BASENAME}_{int(time.time())}.wav"
)
transcription_result_data = None
prepared_audio_path = None
# 1. Verificar archivo de entrada
if not os.path.exists(INPUT_MP4_PATH):
print(f"Error Crítico: El archivo de entrada '{INPUT_MP4_PATH}' no existe.")
else:
# 2. Convertir a WAV
conv_start_time = time.perf_counter()
prepared_audio_path = convert_mp4_to_wav(INPUT_MP4_PATH, temp_wav_file_path)
conv_end_time = time.perf_counter()
if prepared_audio_path:
print(f"(Tiempo de conversión: {conv_end_time - conv_start_time:.2f} segundos)")
# 3. Transcribir si la conversión fue exitosa
if os.path.exists(prepared_audio_path):
api_start_time = time.perf_counter()
transcription_result_data = transcribe_audio_api(
api_url_base=WHISPER_API_URL,
file_path=prepared_audio_path,
encode=API_ENCODE,
task=API_TASK,
language=API_LANGUAGE,
output_format=API_OUTPUT_FORMAT
)
api_end_time = time.perf_counter()
print(f"(Tiempo de llamada API: {api_end_time - api_start_time:.2f} segundos)")
else:
print(f"ERROR: El archivo WAV convertido '{prepared_audio_path}' no se encontró.")
else:
print("La conversión a WAV falló. No se puede continuar.")
# 4. Limpieza
if os.path.exists(temp_wav_file_path):
try:
print(f"\nLimpiando archivo temporal '{temp_wav_file_path}'...")
os.remove(temp_wav_file_path)
print("Limpieza completada.")
except OSError as e:
print(f"Error al eliminar archivo temporal: {e}")
# 5. Mostrar Resultado Final Interpretado
print("\n--- Resultado Final Interpretado ---")
if isinstance(transcription_result_data, dict):
print("¡Respuesta interpretada como JSON exitosamente!")
lang = transcription_result_data.get('language', 'N/D')
full_text = transcription_result_data.get('text', '[Texto no encontrado]')
print(f"\nIdioma Detectado: {lang}")
print(f"\nTranscripción Completa:\n{'-'*25}\n{full_text}\n{'-'*25}")
# Mostrar segmentos si existen
segments = transcription_result_data.get('segments')
if segments and isinstance(segments, list):
print("\nSegmentos Detallados:")
for segment in segments:
start = segment.get('start', 0.0)
end = segment.get('end', 0.0)
text = segment.get('text', '')
print(f" [{format_time(start)} --> {format_time(end)}] {text.strip()}")
else:
print("\n(No se encontraron detalles de segmentos en el JSON)")
elif isinstance(transcription_result_data, str):
print("Se recibió una respuesta de texto plano que no pudo ser interpretada como JSON:")
print(transcription_result_data)
else:
print("No se obtuvo resultado de la transcripción o hubo un error previo.")
print("----------------------------------")
overall_end_time = time.perf_counter()
elapsed_time = overall_end_time - overall_start_time
print(f"\n--- Tiempo Total Empleado: {elapsed_time:.2f} segundos ({format_time(elapsed_time)}) ---")