Files
encoach_backend/helper/heygen_api.py
2023-12-12 23:02:00 +00:00

155 lines
5.7 KiB
Python

import logging
import os
import random
import requests
import time
from dotenv import load_dotenv
import app
from helper.constants import *
from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id
from heygen.AvatarEnum import AvatarEnum
load_dotenv()
# Get HeyGen token
TOKEN = os.getenv("HEY_GEN_TOKEN")
# POST TO CREATE VIDEO
CREATE_VIDEO_URL = 'https://api.heygen.com/v1/template.generate'
GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get'
POST_HEADER = {
'X-Api-Key': TOKEN,
'Content-Type': 'application/json'
}
GET_HEADER = {
'X-Api-Key': TOKEN
}
MATTHEW_NOAH = "11b234e504e44bfda9bc6b7aac3c8f81"
VERA_CERISE = "9bf2f27009cd403ab4ba4e22629b27bb"
EDWARD_TONY = "d3333e37952946059b45efc8482a2b6c"
TANYA_MOLLY = "07c75076b3f94df4ac658c6de72be83a"
KAYLA_ABBI = "d688099f8db9472cb4890b0561e81793"
JEROME_RYAN = "ad41feb2a5c4483085525e3d8907f512"
TYLER_CHRISTOPHER = "03c796f8ed274bb38f19e893bcbc6121"
def create_videos_and_save_to_db(exercises, template, id):
# Speaking 1
app.app.logger.info('Creating video for speaking part 1')
sp1_result = create_video(exercises[0]["question"], random.choice(list(AvatarEnum)))
if sp1_result is not None:
sound_file_path = VIDEO_FILES_PATH + sp1_result
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
sp1_video_path = firebase_file_path
sp1_video_url = url
template["exercises"][0]["text"] = exercises[0]["question"]
template["exercises"][0]["title"] = exercises[0]["topic"]
template["exercises"][0]["video_url"] = sp1_video_url
template["exercises"][0]["video_path"] = sp1_video_path
else:
app.app.logger.error("Failed to create video for part 1 question: " + exercises[0]["question"])
# Speaking 2
app.app.logger.info('Creating video for speaking part 2')
sp2_result = create_video(exercises[1]["question"], random.choice(list(AvatarEnum)))
if sp2_result is not None:
sound_file_path = VIDEO_FILES_PATH + sp2_result
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
sp2_video_path = firebase_file_path
sp2_video_url = url
template["exercises"][1]["prompts"] = exercises[1]["prompts"]
template["exercises"][1]["text"] = exercises[1]["question"]
template["exercises"][1]["title"] = exercises[1]["topic"]
template["exercises"][1]["video_url"] = sp2_video_url
template["exercises"][1]["video_path"] = sp2_video_path
else:
app.app.logger.error("Failed to create video for part 2 question: " + exercises[1]["question"])
# Speaking 3
sp3_questions = []
avatar = random.choice(list(AvatarEnum))
app.app.logger.info('Creating videos for speaking part 3')
for question in exercises[2]["questions"]:
result = create_video(question, avatar)
if result is not None:
sound_file_path = VIDEO_FILES_PATH + result
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
video = {
"text": question,
"video_path": firebase_file_path,
"video_url": url
}
sp3_questions.append(video)
else:
app.app.logger.error("Failed to create video for part 3 question: " + question)
template["exercises"][2]["prompts"] = sp3_questions
template["exercises"][2]["title"] = exercises[2]["topic"]
save_to_db_with_id("speaking", template, id)
def create_video(text, avatar: AvatarEnum):
# POST TO CREATE VIDEO
data = {
"template_id": avatar.value,
"title": "video_title",
"test": False,
"variables": [
{
"properties": {
"text": text
},
"name": "avatar_0"
}
]
}
response = requests.post(CREATE_VIDEO_URL, headers=POST_HEADER, json=data)
app.app.logger.info(response.status_code)
app.app.logger.info(response.json())
# GET TO CHECK STATUS AND GET VIDEO WHEN READY
video_id = response.json()["data"]["video_id"]
params = {
'video_id': response.json()["data"]["video_id"]
}
response = {}
status = "processing"
error = None
while status != "completed" and error is None:
response = requests.get(GET_VIDEO_URL, headers=GET_HEADER, params=params)
response_data = response.json()
status = response_data["data"]["status"]
error = response_data["data"]["error"]
if status != "completed" and error is None:
app.app.logger.info(f"Status: {status}")
time.sleep(5) # Wait for 5 second before the next request
app.app.logger.info(response.status_code)
app.app.logger.info(response.json())
# DOWNLOAD VIDEO
download_url = response.json()['data']['video_url']
output_directory = 'download-video/'
output_filename = video_id + '.mp4'
response = requests.get(download_url)
if response.status_code == 200:
os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist
output_path = os.path.join(output_directory, output_filename)
with open(output_path, 'wb') as f:
f.write(response.content)
app.app.logger.info(f"File '{output_filename}' downloaded successfully.")
return output_filename
else:
app.app.logger.error(f"Failed to download file. Status code: {response.status_code}")
return None