import asyncio import os import logging import aiofiles from httpx import AsyncClient from app.services.abc import IVideoGeneratorService class Heygen(IVideoGeneratorService): # TODO: Not used, remove if not necessary # CREATE_VIDEO_URL = 'https://api.heygen.com/v1/template.generate' _GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get' def __init__(self, client: AsyncClient, heygen_token: str): self._get_header = { 'X-Api-Key': heygen_token } self._post_header = { 'X-Api-Key': heygen_token, 'Content-Type': 'application/json' } self._http_client = client self._logger = logging.getLogger(__name__) async def create_video(self, text: str, avatar: str): # POST TO CREATE VIDEO create_video_url = 'https://api.heygen.com/v2/template/' + avatar + '/generate' data = { "test": False, "caption": False, "title": "video_title", "variables": { "script_here": { "name": "script_here", "type": "text", "properties": { "content": text } } } } response = await self._http_client.post(create_video_url, headers=self._post_header, json=data) self._logger.info(response.status_code) self._logger.info(response.json()) # GET TO CHECK STATUS AND GET VIDEO WHEN READY video_id = response.json()["data"]["video_id"] params = { 'video_id': response.json()["data"]["video_id"] } response = {} status = "processing" error = None while status != "completed" and error is None: response = await self._http_client.get(self._GET_VIDEO_URL, headers=self._get_header, params=params) response_data = response.json() status = response_data["data"]["status"] error = response_data["data"]["error"] if status != "completed" and error is None: self._logger.info(f"Status: {status}") await asyncio.sleep(10) # Wait for 10 second before the next request self._logger.info(response.status_code) self._logger.info(response.json()) # DOWNLOAD VIDEO download_url = response.json()['data']['video_url'] output_directory = 'download-video/' output_filename = video_id + '.mp4' response = await self._http_client.get(download_url) if response.status_code == 200: os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist output_path = os.path.join(output_directory, output_filename) async with aiofiles.open(output_path, 'wb') as f: await f.write(response.content) self._logger.info(f"File '{output_filename}' downloaded successfully.") return output_filename else: self._logger.error(f"Failed to download file. Status code: {response.status_code}") return None