Video pooling and downloading will now be handled by frontend

This commit is contained in:
Carlos-Mesquita
2024-11-09 06:51:07 +00:00
parent 8d95aa6c21
commit 09998478d1
5 changed files with 0 additions and 93 deletions

View File

@@ -1,93 +0,0 @@
import asyncio
import os
import logging
import aiofiles
from httpx import AsyncClient
from app.services.abc import IVideoGeneratorService
class Heygen(IVideoGeneratorService):
_GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get'
def __init__(self, client: AsyncClient, token: str):
pass
"""
self._get_header = {
'X-Api-Key': heygen_token
}
self._post_header = {
'X-Api-Key': heygen_token,
'Content-Type': 'application/json'
}
self._http_client = client
self._logger = logging.getLogger(__name__)
"""
async def create_video(self, text: str, avatar: str):
pass
# POST TO CREATE VIDEO
"""
create_video_url = 'https://api.heygen.com/v2/template/' + avatar + '/generate'
data = {
"test": False,
"caption": False,
"title": "video_title",
"variables": {
"script_here": {
"name": "script_here",
"type": "text",
"properties": {
"content": text
}
}
}
}
response = await self._http_client.post(create_video_url, headers=self._post_header, json=data)
self._logger.info(response.status_code)
self._logger.info(response.json())
# GET TO CHECK STATUS AND GET VIDEO WHEN READY
video_id = response.json()["data"]["video_id"]
params = {
'video_id': response.json()["data"]["video_id"]
}
response = {}
status = "processing"
error = None
while status != "completed" and error is None:
response = await self._http_client.get(self._GET_VIDEO_URL, headers=self._get_header, params=params)
response_data = response.json()
status = response_data["data"]["status"]
error = response_data["data"]["error"]
if status != "completed" and error is None:
self._logger.info(f"Status: {status}")
await asyncio.sleep(10) # Wait for 10 second before the next request
self._logger.info(response.status_code)
self._logger.info(response.json())
# DOWNLOAD VIDEO
download_url = response.json()['data']['video_url']
output_directory = 'download-video/'
output_filename = video_id + '.mp4'
response = await self._http_client.get(download_url)
if response.status_code == 200:
os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist
output_path = os.path.join(output_directory, output_filename)
async with aiofiles.open(output_path, 'wb') as f:
await f.write(response.content)
self._logger.info(f"File '{output_filename}' downloaded successfully.")
return output_filename
else:
self._logger.error(f"Failed to download file. Status code: {response.status_code}")
return None
"""