207 lines
8.9 KiB
Python
207 lines
8.9 KiB
Python
import requests
|
|
import os
|
|
import time
|
|
import traceback
|
|
import json # Necesitamos importar json para cargar la cadena
|
|
|
|
# Importación para MoviePy v2.x
|
|
from moviepy import AudioFileClip
|
|
|
|
# --- Configuración ---
|
|
WHISPER_API_URL = "http://192.168.88.26:9005/asr"
|
|
INPUT_MP4_PATH = "test.mp4"
|
|
TEMP_WAV_BASENAME = "temp_audio_direct_conversion"
|
|
API_LANGUAGE = None
|
|
API_TASK = 'transcribe'
|
|
API_OUTPUT_FORMAT = 'json' # Seguimos pidiéndolo, aunque lo ignoren
|
|
API_ENCODE = True
|
|
# --- Fin de la Configuración ---
|
|
|
|
# La función convert_mp4_to_wav se mantiene igual
|
|
def convert_mp4_to_wav(input_path, output_wav_path):
|
|
# (Código omitido por brevedad - idéntico a la respuesta anterior)
|
|
print(f"Intentando convertir '{os.path.basename(input_path)}' a WAV...")
|
|
print(f" Entrada: {input_path}")
|
|
print(f" Salida: {output_wav_path}")
|
|
audio_clip = None
|
|
try:
|
|
audio_clip = AudioFileClip(input_path)
|
|
audio_clip.write_audiofile(output_wav_path, codec='pcm_s16le', logger=None)
|
|
print("Conversión a WAV exitosa.")
|
|
return output_wav_path
|
|
except Exception as e:
|
|
print(f"ERROR durante la conversión a WAV: {type(e).__name__} - {e}")
|
|
return None
|
|
finally:
|
|
if audio_clip:
|
|
try: audio_clip.close()
|
|
except Exception: pass
|
|
|
|
|
|
def transcribe_audio_api(api_url_base, file_path, encode=True, task='transcribe', language=None, output_format='json'):
|
|
"""
|
|
Envía audio, muestra detalles y INTENTA interpretar la respuesta como JSON
|
|
incluso si el Content-Type es incorrecto.
|
|
"""
|
|
print(f"\n--- Enviando Audio a la API Whisper ---")
|
|
print(f"Archivo: '{os.path.basename(file_path)}'")
|
|
url_params = {'encode': str(encode).lower(), 'task': task, 'output': output_format}
|
|
if language: url_params['language'] = language
|
|
print(f"Parámetros para URL: {url_params}")
|
|
|
|
filename = os.path.basename(file_path)
|
|
f = None
|
|
try:
|
|
f = open(file_path, 'rb')
|
|
files_payload = {'audio_file': (filename, f, 'audio/wav')}
|
|
request_headers = {'Accept': 'application/json'}
|
|
print(f"Cabeceras explícitas: {request_headers}")
|
|
print(f"Realizando POST a: {api_url_base}")
|
|
|
|
response = requests.post(
|
|
api_url_base, params=url_params, files=files_payload,
|
|
headers=request_headers, timeout=300
|
|
)
|
|
|
|
print("\n--- Detalles de la Comunicación ---")
|
|
print(f"URL Final Enviada: {response.request.url}")
|
|
print(f"Código de Estado Recibido: {response.status_code} ({response.reason})")
|
|
received_content_type = response.headers.get('Content-Type', 'N/A')
|
|
print(f"Content-Type Recibido: {received_content_type}")
|
|
print("\n--- Cuerpo de la Respuesta (Raw Text) ---")
|
|
response_text = ""
|
|
try:
|
|
response_text = response.content.decode('utf-8', errors='replace')
|
|
# Mostramos solo una parte si es muy largo, para no saturar
|
|
preview_limit = 500
|
|
print(response_text[:preview_limit] + ('...' if len(response_text) > preview_limit else ''))
|
|
except Exception as decode_err:
|
|
print(f"[Error al decodificar: {decode_err}] Bytes: {response.content[:preview_limit]}...")
|
|
print("-----------------------------------")
|
|
|
|
response.raise_for_status() # Verificar errores 4xx/5xx
|
|
|
|
# --- Interpretación Mejorada de la Respuesta ---
|
|
if response.status_code == 200 and response_text.strip():
|
|
# Advertir si el Content-Type no es JSON pero intentaremos igual
|
|
if not received_content_type.startswith('application/json'):
|
|
print("\nADVERTENCIA: Content-Type no es JSON, pero se intentará interpretar el cuerpo.")
|
|
|
|
try:
|
|
# Intentar cargar el texto como JSON
|
|
data = json.loads(response_text)
|
|
print("Interpretación como JSON exitosa.")
|
|
return data # Devolver el diccionario Python
|
|
except json.JSONDecodeError as json_err:
|
|
print(f"ERROR: No se pudo interpretar la respuesta como JSON: {json_err}")
|
|
print("Devolviendo la respuesta como texto plano.")
|
|
return response_text # Devolver texto si falla la interpretación
|
|
else:
|
|
# Respuesta vacía o código de error no capturado por raise_for_status
|
|
print("Respuesta recibida vacía o con estado inesperado (no 200 OK).")
|
|
return response_text # Devolver el texto (probablemente vacío)
|
|
|
|
# ... (resto del manejo de excepciones de requests igual que antes) ...
|
|
except requests.exceptions.Timeout: print("ERROR: Timeout.")
|
|
except requests.exceptions.ConnectionError as e: print(f"ERROR de Conexión: {e}")
|
|
except requests.exceptions.HTTPError as e: print(f"ERROR HTTP {e.response.status_code}.")
|
|
except requests.exceptions.RequestException as e: print(f"ERROR de Request: {e}")
|
|
except Exception as e: print(f"ERROR inesperado en API: {type(e).__name__} - {e}")
|
|
finally:
|
|
if f:
|
|
try: f.close()
|
|
except Exception: pass
|
|
return None
|
|
|
|
# --- Función para Formatear Tiempo ---
|
|
def format_time(seconds):
|
|
"""Convierte segundos a formato HH:MM:SS.mmm"""
|
|
millis = int(seconds * 1000) % 1000
|
|
total_seconds = int(seconds)
|
|
secs = total_seconds % 60
|
|
mins = (total_seconds // 60) % 60
|
|
hours = total_seconds // 3600
|
|
return f"{hours:02}:{mins:02}:{secs:02}.{millis:03}"
|
|
|
|
# --- Ejecución Principal ---
|
|
if __name__ == "__main__":
|
|
print("--- Iniciando Script Mejorado ---")
|
|
overall_start_time = time.perf_counter()
|
|
|
|
temp_wav_file_path = os.path.join(
|
|
os.path.dirname(__file__) or '.',
|
|
f"{TEMP_WAV_BASENAME}_{int(time.time())}.wav"
|
|
)
|
|
transcription_result_data = None
|
|
prepared_audio_path = None
|
|
|
|
# 1. Verificar archivo de entrada
|
|
if not os.path.exists(INPUT_MP4_PATH):
|
|
print(f"Error Crítico: El archivo de entrada '{INPUT_MP4_PATH}' no existe.")
|
|
else:
|
|
# 2. Convertir a WAV
|
|
conv_start_time = time.perf_counter()
|
|
prepared_audio_path = convert_mp4_to_wav(INPUT_MP4_PATH, temp_wav_file_path)
|
|
conv_end_time = time.perf_counter()
|
|
if prepared_audio_path:
|
|
print(f"(Tiempo de conversión: {conv_end_time - conv_start_time:.2f} segundos)")
|
|
|
|
# 3. Transcribir si la conversión fue exitosa
|
|
if os.path.exists(prepared_audio_path):
|
|
api_start_time = time.perf_counter()
|
|
transcription_result_data = transcribe_audio_api(
|
|
api_url_base=WHISPER_API_URL,
|
|
file_path=prepared_audio_path,
|
|
encode=API_ENCODE,
|
|
task=API_TASK,
|
|
language=API_LANGUAGE,
|
|
output_format=API_OUTPUT_FORMAT
|
|
)
|
|
api_end_time = time.perf_counter()
|
|
print(f"(Tiempo de llamada API: {api_end_time - api_start_time:.2f} segundos)")
|
|
else:
|
|
print(f"ERROR: El archivo WAV convertido '{prepared_audio_path}' no se encontró.")
|
|
else:
|
|
print("La conversión a WAV falló. No se puede continuar.")
|
|
|
|
# 4. Limpieza
|
|
if os.path.exists(temp_wav_file_path):
|
|
try:
|
|
print(f"\nLimpiando archivo temporal '{temp_wav_file_path}'...")
|
|
os.remove(temp_wav_file_path)
|
|
print("Limpieza completada.")
|
|
except OSError as e:
|
|
print(f"Error al eliminar archivo temporal: {e}")
|
|
|
|
# 5. Mostrar Resultado Final Interpretado
|
|
print("\n--- Resultado Final Interpretado ---")
|
|
if isinstance(transcription_result_data, dict):
|
|
print("¡Respuesta interpretada como JSON exitosamente!")
|
|
lang = transcription_result_data.get('language', 'N/D')
|
|
full_text = transcription_result_data.get('text', '[Texto no encontrado]')
|
|
print(f"\nIdioma Detectado: {lang}")
|
|
print(f"\nTranscripción Completa:\n{'-'*25}\n{full_text}\n{'-'*25}")
|
|
|
|
# Mostrar segmentos si existen
|
|
segments = transcription_result_data.get('segments')
|
|
if segments and isinstance(segments, list):
|
|
print("\nSegmentos Detallados:")
|
|
for segment in segments:
|
|
start = segment.get('start', 0.0)
|
|
end = segment.get('end', 0.0)
|
|
text = segment.get('text', '')
|
|
print(f" [{format_time(start)} --> {format_time(end)}] {text.strip()}")
|
|
else:
|
|
print("\n(No se encontraron detalles de segmentos en el JSON)")
|
|
|
|
elif isinstance(transcription_result_data, str):
|
|
print("Se recibió una respuesta de texto plano que no pudo ser interpretada como JSON:")
|
|
print(transcription_result_data)
|
|
else:
|
|
print("No se obtuvo resultado de la transcripción o hubo un error previo.")
|
|
print("----------------------------------")
|
|
|
|
overall_end_time = time.perf_counter()
|
|
elapsed_time = overall_end_time - overall_start_time
|
|
print(f"\n--- Tiempo Total Empleado: {elapsed_time:.2f} segundos ({format_time(elapsed_time)}) ---")
|
|
|