Update video generation to use elai.
This commit is contained in:
263
helper/elai_api.py
Normal file
263
helper/elai_api.py
Normal file
@@ -0,0 +1,263 @@
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from logging import getLogger
|
||||
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from helper.constants import *
|
||||
from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id
|
||||
from elai.AvatarEnum import AvatarEnum
|
||||
|
||||
load_dotenv()
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
# Get ELAI token
|
||||
TOKEN = os.getenv("ELAI_TOKEN")
|
||||
FIREBASE_BUCKET = os.getenv('FIREBASE_BUCKET')
|
||||
|
||||
# POST TO CREATE VIDEO
|
||||
POST_HEADER = {
|
||||
"accept": "application/json",
|
||||
"content-type": "application/json",
|
||||
"Authorization": f"Bearer {TOKEN}"
|
||||
}
|
||||
GET_HEADER = {
|
||||
"accept": "application/json",
|
||||
"Authorization": f"Bearer {TOKEN}"
|
||||
}
|
||||
|
||||
|
||||
def create_videos_and_save_to_db(exercises, template, id):
|
||||
avatar = random.choice(list(AvatarEnum))
|
||||
# Speaking 1
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
found_exercises_1 = [element for element in exercises if element.get('type') == 1]
|
||||
# Check if any elements were found
|
||||
if found_exercises_1:
|
||||
exercise_1 = found_exercises_1[0]
|
||||
sp1_questions = []
|
||||
logger.info('Creating video for speaking part 1')
|
||||
for question in exercise_1["questions"]:
|
||||
sp1_result = create_video(question, avatar)
|
||||
if sp1_result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + sp1_result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result
|
||||
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
||||
video = {
|
||||
"text": question,
|
||||
"video_path": firebase_file_path,
|
||||
"video_url": url
|
||||
}
|
||||
sp1_questions.append(video)
|
||||
else:
|
||||
logger.error("Failed to create video for part 1 question: " + exercise_1["question"])
|
||||
template["exercises"][0]["prompts"] = sp1_questions
|
||||
template["exercises"][0]["first_title"] = exercise_1["first_topic"]
|
||||
template["exercises"][0]["second_title"] = exercise_1["second_topic"]
|
||||
|
||||
# Speaking 2
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
found_exercises_2 = [element for element in exercises if element.get('type') == 2]
|
||||
# Check if any elements were found
|
||||
if found_exercises_2:
|
||||
exercise_2 = found_exercises_2[0]
|
||||
logger.info('Creating video for speaking part 2')
|
||||
sp2_result = create_video(exercise_2["question"], avatar)
|
||||
if sp2_result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + sp2_result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result
|
||||
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
||||
sp2_video_path = firebase_file_path
|
||||
sp2_video_url = url
|
||||
template["exercises"][1]["prompts"] = exercise_2["prompts"]
|
||||
template["exercises"][1]["text"] = exercise_2["question"]
|
||||
template["exercises"][1]["title"] = exercise_2["topic"]
|
||||
template["exercises"][1]["video_url"] = sp2_video_url
|
||||
template["exercises"][1]["video_path"] = sp2_video_path
|
||||
else:
|
||||
logger.error("Failed to create video for part 2 question: " + exercise_2["question"])
|
||||
|
||||
# Speaking 3
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
found_exercises_3 = [element for element in exercises if element.get('type') == 3]
|
||||
# Check if any elements were found
|
||||
if found_exercises_3:
|
||||
exercise_3 = found_exercises_3[0]
|
||||
sp3_questions = []
|
||||
logger.info('Creating videos for speaking part 3')
|
||||
for question in exercise_3["questions"]:
|
||||
result = create_video(question, avatar)
|
||||
if result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
|
||||
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
||||
video = {
|
||||
"text": question,
|
||||
"video_path": firebase_file_path,
|
||||
"video_url": url
|
||||
}
|
||||
sp3_questions.append(video)
|
||||
else:
|
||||
logger.error("Failed to create video for part 3 question: " + question)
|
||||
template["exercises"][2]["prompts"] = sp3_questions
|
||||
template["exercises"][2]["title"] = exercise_3["topic"]
|
||||
|
||||
if not found_exercises_3:
|
||||
template["exercises"].pop(2)
|
||||
if not found_exercises_2:
|
||||
template["exercises"].pop(1)
|
||||
if not found_exercises_1:
|
||||
template["exercises"].pop(0)
|
||||
|
||||
save_to_db_with_id("speaking", template, id)
|
||||
logger.info('Saved speaking to DB with id ' + id + " : " + str(template))
|
||||
|
||||
|
||||
def create_video(text, avatar):
|
||||
# POST TO CREATE VIDEO
|
||||
create_video_url = "https://apis.elai.io/api/v1/videos"
|
||||
|
||||
avatar_url = AvatarEnum[avatar].value.get("avatar_url")
|
||||
avatar_code = AvatarEnum[avatar].value.get("avatar_code")
|
||||
avatar_gender = AvatarEnum[avatar].value.get("avatar_gender")
|
||||
avatar_canvas = AvatarEnum[avatar].value.get("avatar_canvas")
|
||||
voice_id = AvatarEnum[avatar].value.get("voice_id")
|
||||
voice_provider = AvatarEnum[avatar].value.get("voice_provider")
|
||||
|
||||
data = {
|
||||
"name": "API test",
|
||||
"slides": [
|
||||
{
|
||||
"id": 1,
|
||||
"canvas": {
|
||||
"objects": [
|
||||
{
|
||||
"type": "avatar",
|
||||
"left": 151.5,
|
||||
"top": 36,
|
||||
"fill": "#4868FF",
|
||||
"scaleX": 0.3,
|
||||
"scaleY": 0.3,
|
||||
"width": 1080,
|
||||
"height": 1080,
|
||||
"src": avatar_url,
|
||||
"avatarType": "transparent",
|
||||
"animation": {
|
||||
"type": None,
|
||||
"exitType": None
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "image",
|
||||
"version": "5.3.0",
|
||||
"originX": "left",
|
||||
"originY": "top",
|
||||
"left": 30,
|
||||
"top": 30,
|
||||
"width": 800,
|
||||
"height": 600,
|
||||
"fill": "rgb(0,0,0)",
|
||||
"stroke": None,
|
||||
"strokeWidth": 0,
|
||||
"strokeDashArray": None,
|
||||
"strokeLineCap": "butt",
|
||||
"strokeDashOffset": 0,
|
||||
"strokeLineJoin": "miter",
|
||||
"strokeUniform": False,
|
||||
"strokeMiterLimit": 4,
|
||||
"scaleX": 0.18821429,
|
||||
"scaleY": 0.18821429,
|
||||
"angle": 0,
|
||||
"flipX": False,
|
||||
"flipY": False,
|
||||
"opacity": 1,
|
||||
"shadow": None,
|
||||
"visible": True,
|
||||
"backgroundColor": "",
|
||||
"fillRule": "nonzero",
|
||||
"paintFirst": "fill",
|
||||
"globalCompositeOperation": "source-over",
|
||||
"skewX": 0,
|
||||
"skewY": 0,
|
||||
"cropX": 0,
|
||||
"cropY": 0,
|
||||
"id": 676845479989,
|
||||
"src": "https://d3u63mhbhkevz8.cloudfront.net/production/uploads/66f5190349f943682dd776ff/"
|
||||
"en-coach-main-logo-800x600_sm1ype.jpg?Expires=1727654400&Policy=eyJTdGF0ZW1lbnQiOlt"
|
||||
"7IlJlc291cmNlIjoiaHR0cHM6Ly9kM3U2M21oYmhrZXZ6OC5jbG91ZGZyb250Lm5ldC9wcm9kdWN0aW9uL3"
|
||||
"VwbG9hZHMvNjZmNTE5MDM0OWY5NDM2ODJkZDc3NmZmL2VuLWNvYWNoLW1haW4tbG9nby04MDB4NjAwX3NtM"
|
||||
"XlwZS5qcGciLCJDb25kaXRpb24iOnsiRGF0ZUxlc3NUaGFuIjp7IkFXUzpFcG9jaFRpbWUiOjE3Mjc2NTQ0"
|
||||
"MDB9fX1dfQ__&Signature=kTVzlDeS7cua2HiAE5G%7E-yFqbhu0bHraFH5SauUln7yuNXoX7vtiKIBYiL"
|
||||
"%7Eps3LCLEZS77arSZ7H%7EG8CKzabHDjAR-Y6Uc%7ELD5KQaMmk0jbAxbC3Wdoq6cfd0qIwEuodQYlC0It"
|
||||
"2WBidP8KsgOy3uUQ%7EvcBoqlb255yMFw4pHuptOBB1kPs%7EFyzDV0fnRNsKaYRcy0Fn2EFUp13axm0CZQ"
|
||||
"clazuLFM622AyCydKMy0vfxV%7Etny3sskwPaUe2OANGMFg07Q1pRuy6fUON0DsbhAh1tA2H6-nnem5KbFw"
|
||||
"iZK3IIwwYGBx3H41ovzC6Ejt80Fd0%7EPSHw7GzVBnUmtP-IA__&Key-Pair-Id=K1Y7U91AR6T7E5",
|
||||
"crossOrigin": "anonymous",
|
||||
"filters": [],
|
||||
"_exists": True
|
||||
}
|
||||
],
|
||||
"background": "#ffffff",
|
||||
"version": "4.4.0"
|
||||
},
|
||||
"avatar": {
|
||||
"code": avatar_code,
|
||||
"gender": avatar_gender,
|
||||
"canvas": avatar_canvas
|
||||
},
|
||||
"animation": "fade_in",
|
||||
"language": "English",
|
||||
"speech": text,
|
||||
"voice": voice_id,
|
||||
"voiceType": "text",
|
||||
"voiceProvider": voice_provider
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
response = requests.post(create_video_url, headers=POST_HEADER, json=data)
|
||||
logger.info(response.status_code)
|
||||
logger.info(response.json())
|
||||
|
||||
video_id = response.json()["_id"]
|
||||
|
||||
if video_id:
|
||||
# Render Video
|
||||
render_url = f"https://apis.elai.io/api/v1/videos/render/{video_id}"
|
||||
|
||||
requests.post(render_url, headers=GET_HEADER)
|
||||
|
||||
status_url = f"https://apis.elai.io/api/v1/videos/{video_id}"
|
||||
|
||||
while True:
|
||||
response = requests.get(status_url, headers=GET_HEADER)
|
||||
response_data = response.json()
|
||||
|
||||
if response_data['status'] == 'ready':
|
||||
logger.info(response_data)
|
||||
# DOWNLOAD VIDEO
|
||||
download_url = response_data.get('url')
|
||||
output_directory = 'download-video/'
|
||||
output_filename = video_id + '.mp4'
|
||||
|
||||
response = requests.get(download_url)
|
||||
|
||||
if response.status_code == 200:
|
||||
os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist
|
||||
output_path = os.path.join(output_directory, output_filename)
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
logger.info(f"File '{output_filename}' downloaded successfully.")
|
||||
return output_filename
|
||||
else:
|
||||
logger.error(f"Failed to download file. Status code: {response.status_code}")
|
||||
return None
|
||||
elif response_data['status'] == 'failed':
|
||||
print('Video creation failed.')
|
||||
break
|
||||
else:
|
||||
print('Video is still processing. Checking again in 10 seconds...')
|
||||
time.sleep(10)
|
||||
@@ -1,179 +0,0 @@
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from logging import getLogger
|
||||
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from helper.constants import *
|
||||
from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id
|
||||
from heygen.AvatarEnum import AvatarEnum
|
||||
|
||||
load_dotenv()
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
# Get HeyGen token
|
||||
TOKEN = os.getenv("HEY_GEN_TOKEN")
|
||||
FIREBASE_BUCKET = os.getenv('FIREBASE_BUCKET')
|
||||
|
||||
# POST TO CREATE VIDEO
|
||||
CREATE_VIDEO_URL = 'https://api.heygen.com/v1/template.generate'
|
||||
GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get'
|
||||
POST_HEADER = {
|
||||
'X-Api-Key': TOKEN,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
GET_HEADER = {
|
||||
'X-Api-Key': TOKEN
|
||||
}
|
||||
|
||||
|
||||
def create_videos_and_save_to_db(exercises, template, id):
|
||||
avatar = random.choice(list(AvatarEnum))
|
||||
# Speaking 1
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
found_exercises_1 = [element for element in exercises if element.get('type') == 1]
|
||||
# Check if any elements were found
|
||||
if found_exercises_1:
|
||||
exercise_1 = found_exercises_1[0]
|
||||
sp1_questions = []
|
||||
logger.info('Creating video for speaking part 1')
|
||||
for question in exercise_1["questions"]:
|
||||
sp1_result = create_video(question, avatar)
|
||||
if sp1_result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + sp1_result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result
|
||||
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
||||
video = {
|
||||
"text": question,
|
||||
"video_path": firebase_file_path,
|
||||
"video_url": url
|
||||
}
|
||||
sp1_questions.append(video)
|
||||
else:
|
||||
logger.error("Failed to create video for part 1 question: " + exercise_1["question"])
|
||||
template["exercises"][0]["prompts"] = sp1_questions
|
||||
template["exercises"][0]["first_title"] = exercise_1["first_topic"]
|
||||
template["exercises"][0]["second_title"] = exercise_1["second_topic"]
|
||||
|
||||
# Speaking 2
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
found_exercises_2 = [element for element in exercises if element.get('type') == 2]
|
||||
# Check if any elements were found
|
||||
if found_exercises_2:
|
||||
exercise_2 = found_exercises_2[0]
|
||||
logger.info('Creating video for speaking part 2')
|
||||
sp2_result = create_video(exercise_2["question"], avatar)
|
||||
if sp2_result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + sp2_result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result
|
||||
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
||||
sp2_video_path = firebase_file_path
|
||||
sp2_video_url = url
|
||||
template["exercises"][1]["prompts"] = exercise_2["prompts"]
|
||||
template["exercises"][1]["text"] = exercise_2["question"]
|
||||
template["exercises"][1]["title"] = exercise_2["topic"]
|
||||
template["exercises"][1]["video_url"] = sp2_video_url
|
||||
template["exercises"][1]["video_path"] = sp2_video_path
|
||||
else:
|
||||
logger.error("Failed to create video for part 2 question: " + exercise_2["question"])
|
||||
|
||||
# Speaking 3
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
found_exercises_3 = [element for element in exercises if element.get('type') == 3]
|
||||
# Check if any elements were found
|
||||
if found_exercises_3:
|
||||
exercise_3 = found_exercises_3[0]
|
||||
sp3_questions = []
|
||||
logger.info('Creating videos for speaking part 3')
|
||||
for question in exercise_3["questions"]:
|
||||
result = create_video(question, avatar)
|
||||
if result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
|
||||
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
||||
video = {
|
||||
"text": question,
|
||||
"video_path": firebase_file_path,
|
||||
"video_url": url
|
||||
}
|
||||
sp3_questions.append(video)
|
||||
else:
|
||||
logger.error("Failed to create video for part 3 question: " + question)
|
||||
template["exercises"][2]["prompts"] = sp3_questions
|
||||
template["exercises"][2]["title"] = exercise_3["topic"]
|
||||
|
||||
if not found_exercises_3:
|
||||
template["exercises"].pop(2)
|
||||
if not found_exercises_2:
|
||||
template["exercises"].pop(1)
|
||||
if not found_exercises_1:
|
||||
template["exercises"].pop(0)
|
||||
|
||||
save_to_db_with_id("speaking", template, id)
|
||||
logger.info('Saved speaking to DB with id ' + id + " : " + str(template))
|
||||
|
||||
|
||||
def create_video(text, avatar):
|
||||
# POST TO CREATE VIDEO
|
||||
create_video_url = 'https://api.heygen.com/v2/template/' + avatar + '/generate'
|
||||
data = {
|
||||
"test": False,
|
||||
"caption": False,
|
||||
"title": "video_title",
|
||||
"variables": {
|
||||
"script_here": {
|
||||
"name": "script_here",
|
||||
"type": "text",
|
||||
"properties": {
|
||||
"content": text
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
response = requests.post(create_video_url, headers=POST_HEADER, json=data)
|
||||
logger.info(response.status_code)
|
||||
logger.info(response.json())
|
||||
|
||||
# GET TO CHECK STATUS AND GET VIDEO WHEN READY
|
||||
video_id = response.json()["data"]["video_id"]
|
||||
params = {
|
||||
'video_id': response.json()["data"]["video_id"]
|
||||
}
|
||||
response = {}
|
||||
status = "processing"
|
||||
error = None
|
||||
|
||||
while status != "completed" and error is None:
|
||||
response = requests.get(GET_VIDEO_URL, headers=GET_HEADER, params=params)
|
||||
response_data = response.json()
|
||||
|
||||
status = response_data["data"]["status"]
|
||||
error = response_data["data"]["error"]
|
||||
|
||||
if status != "completed" and error is None:
|
||||
logger.info(f"Status: {status}")
|
||||
time.sleep(10) # Wait for 10 second before the next request
|
||||
|
||||
logger.info(response.status_code)
|
||||
logger.info(response.json())
|
||||
|
||||
# DOWNLOAD VIDEO
|
||||
download_url = response.json()['data']['video_url']
|
||||
output_directory = 'download-video/'
|
||||
output_filename = video_id + '.mp4'
|
||||
|
||||
response = requests.get(download_url)
|
||||
|
||||
if response.status_code == 200:
|
||||
os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist
|
||||
output_path = os.path.join(output_directory, output_filename)
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
logger.info(f"File '{output_filename}' downloaded successfully.")
|
||||
return output_filename
|
||||
else:
|
||||
logger.error(f"Failed to download file. Status code: {response.status_code}")
|
||||
return None
|
||||
Reference in New Issue
Block a user