180 lines
6.9 KiB
Python
180 lines
6.9 KiB
Python
import os
|
|
import random
|
|
import time
|
|
from logging import getLogger
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
|
|
from helper.constants import *
|
|
from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id
|
|
from heygen.AvatarEnum import AvatarEnum
|
|
|
|
load_dotenv()
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
# Get HeyGen token
|
|
TOKEN = os.getenv("HEY_GEN_TOKEN")
|
|
FIREBASE_BUCKET = os.getenv('FIREBASE_BUCKET')
|
|
|
|
# POST TO CREATE VIDEO
|
|
CREATE_VIDEO_URL = 'https://api.heygen.com/v1/template.generate'
|
|
GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get'
|
|
POST_HEADER = {
|
|
'X-Api-Key': TOKEN,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
GET_HEADER = {
|
|
'X-Api-Key': TOKEN
|
|
}
|
|
|
|
|
|
def create_videos_and_save_to_db(exercises, template, id):
|
|
avatar = random.choice(list(AvatarEnum))
|
|
# Speaking 1
|
|
# Using list comprehension to find the element with the desired value in the 'type' field
|
|
found_exercises_1 = [element for element in exercises if element.get('type') == 1]
|
|
# Check if any elements were found
|
|
if found_exercises_1:
|
|
exercise_1 = found_exercises_1[0]
|
|
sp1_questions = []
|
|
logger.info('Creating video for speaking part 1')
|
|
for question in exercise_1["questions"]:
|
|
sp1_result = create_video(question, avatar)
|
|
if sp1_result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + sp1_result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
video = {
|
|
"text": question,
|
|
"video_path": firebase_file_path,
|
|
"video_url": url
|
|
}
|
|
sp1_questions.append(video)
|
|
else:
|
|
logger.error("Failed to create video for part 1 question: " + exercise_1["question"])
|
|
template["exercises"][0]["prompts"] = sp1_questions
|
|
template["exercises"][0]["first_title"] = exercise_1["first_topic"]
|
|
template["exercises"][0]["second_title"] = exercise_1["second_topic"]
|
|
|
|
# Speaking 2
|
|
# Using list comprehension to find the element with the desired value in the 'type' field
|
|
found_exercises_2 = [element for element in exercises if element.get('type') == 2]
|
|
# Check if any elements were found
|
|
if found_exercises_2:
|
|
exercise_2 = found_exercises_2[0]
|
|
logger.info('Creating video for speaking part 2')
|
|
sp2_result = create_video(exercise_2["question"], avatar)
|
|
if sp2_result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + sp2_result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
sp2_video_path = firebase_file_path
|
|
sp2_video_url = url
|
|
template["exercises"][1]["prompts"] = exercise_2["prompts"]
|
|
template["exercises"][1]["text"] = exercise_2["question"]
|
|
template["exercises"][1]["title"] = exercise_2["topic"]
|
|
template["exercises"][1]["video_url"] = sp2_video_url
|
|
template["exercises"][1]["video_path"] = sp2_video_path
|
|
else:
|
|
logger.error("Failed to create video for part 2 question: " + exercise_2["question"])
|
|
|
|
# Speaking 3
|
|
# Using list comprehension to find the element with the desired value in the 'type' field
|
|
found_exercises_3 = [element for element in exercises if element.get('type') == 3]
|
|
# Check if any elements were found
|
|
if found_exercises_3:
|
|
exercise_3 = found_exercises_3[0]
|
|
sp3_questions = []
|
|
logger.info('Creating videos for speaking part 3')
|
|
for question in exercise_3["questions"]:
|
|
result = create_video(question, avatar)
|
|
if result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
video = {
|
|
"text": question,
|
|
"video_path": firebase_file_path,
|
|
"video_url": url
|
|
}
|
|
sp3_questions.append(video)
|
|
else:
|
|
logger.error("Failed to create video for part 3 question: " + question)
|
|
template["exercises"][2]["prompts"] = sp3_questions
|
|
template["exercises"][2]["title"] = exercise_3["topic"]
|
|
|
|
if not found_exercises_3:
|
|
template["exercises"].pop(2)
|
|
if not found_exercises_2:
|
|
template["exercises"].pop(1)
|
|
if not found_exercises_1:
|
|
template["exercises"].pop(0)
|
|
|
|
save_to_db_with_id("speaking", template, id)
|
|
logger.info('Saved speaking to DB with id ' + id + " : " + str(template))
|
|
|
|
|
|
def create_video(text, avatar):
|
|
# POST TO CREATE VIDEO
|
|
create_video_url = 'https://api.heygen.com/v2/template/' + avatar + '/generate'
|
|
data = {
|
|
"test": False,
|
|
"caption": False,
|
|
"title": "video_title",
|
|
"variables": {
|
|
"script_here": {
|
|
"name": "script_here",
|
|
"type": "text",
|
|
"properties": {
|
|
"content": text
|
|
}
|
|
}
|
|
}
|
|
}
|
|
response = requests.post(create_video_url, headers=POST_HEADER, json=data)
|
|
logger.info(response.status_code)
|
|
logger.info(response.json())
|
|
|
|
# GET TO CHECK STATUS AND GET VIDEO WHEN READY
|
|
video_id = response.json()["data"]["video_id"]
|
|
params = {
|
|
'video_id': response.json()["data"]["video_id"]
|
|
}
|
|
response = {}
|
|
status = "processing"
|
|
error = None
|
|
|
|
while status != "completed" and error is None:
|
|
response = requests.get(GET_VIDEO_URL, headers=GET_HEADER, params=params)
|
|
response_data = response.json()
|
|
|
|
status = response_data["data"]["status"]
|
|
error = response_data["data"]["error"]
|
|
|
|
if status != "completed" and error is None:
|
|
logger.info(f"Status: {status}")
|
|
time.sleep(10) # Wait for 10 second before the next request
|
|
|
|
logger.info(response.status_code)
|
|
logger.info(response.json())
|
|
|
|
# DOWNLOAD VIDEO
|
|
download_url = response.json()['data']['video_url']
|
|
output_directory = 'download-video/'
|
|
output_filename = video_id + '.mp4'
|
|
|
|
response = requests.get(download_url)
|
|
|
|
if response.status_code == 200:
|
|
os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist
|
|
output_path = os.path.join(output_directory, output_filename)
|
|
with open(output_path, 'wb') as f:
|
|
f.write(response.content)
|
|
logger.info(f"File '{output_filename}' downloaded successfully.")
|
|
return output_filename
|
|
else:
|
|
logger.error(f"Failed to download file. Status code: {response.status_code}")
|
|
return None
|