import logging import os import random import requests import time from dotenv import load_dotenv import app from helper.constants import * from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id from heygen.AvatarEnum import AvatarEnum load_dotenv() # Get HeyGen token TOKEN = os.getenv("HEY_GEN_TOKEN") # POST TO CREATE VIDEO CREATE_VIDEO_URL = 'https://api.heygen.com/v1/template.generate' GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get' POST_HEADER = { 'X-Api-Key': TOKEN, 'Content-Type': 'application/json' } GET_HEADER = { 'X-Api-Key': TOKEN } MATTHEW_NOAH = "11b234e504e44bfda9bc6b7aac3c8f81" VERA_CERISE = "9bf2f27009cd403ab4ba4e22629b27bb" EDWARD_TONY = "d3333e37952946059b45efc8482a2b6c" TANYA_MOLLY = "07c75076b3f94df4ac658c6de72be83a" KAYLA_ABBI = "d688099f8db9472cb4890b0561e81793" JEROME_RYAN = "ad41feb2a5c4483085525e3d8907f512" TYLER_CHRISTOPHER = "03c796f8ed274bb38f19e893bcbc6121" def create_videos_and_save_to_db(exercises, template, id): # Speaking 1 logging.info('Creating video for speaking part 1') sp1_result = create_video(exercises[0]["question"], random.choice(list(AvatarEnum))) if sp1_result is not None: sound_file_path = VIDEO_FILES_PATH + sp1_result firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path) sp1_video_path = firebase_file_path sp1_video_url = url template["exercises"][0]["text"] = exercises[0]["question"] template["exercises"][0]["title"] = exercises[0]["topic"] template["exercises"][0]["video_url"] = sp1_video_url template["exercises"][0]["video_path"] = sp1_video_path else: logging.error("Failed to create video for part 1 question: " + exercises[0]["question"]) # Speaking 2 logging.info('Creating video for speaking part 2') sp2_result = create_video(exercises[1]["question"], random.choice(list(AvatarEnum))) if sp2_result is not None: sound_file_path = VIDEO_FILES_PATH + sp2_result firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path) sp2_video_path = firebase_file_path sp2_video_url = url template["exercises"][1]["prompts"] = exercises[1]["prompts"] template["exercises"][1]["text"] = exercises[1]["question"] template["exercises"][1]["title"] = exercises[1]["topic"] template["exercises"][1]["video_url"] = sp2_video_url template["exercises"][1]["video_path"] = sp2_video_path else: logging.error("Failed to create video for part 2 question: " + exercises[1]["question"]) # Speaking 3 sp3_questions = [] avatar = random.choice(list(AvatarEnum)) logging.info('Creating videos for speaking part 3') for question in exercises[2]["questions"]: result = create_video(question, avatar) if result is not None: sound_file_path = VIDEO_FILES_PATH + result firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path) video = { "text": question, "video_path": firebase_file_path, "video_url": url } sp3_questions.append(video) else: logging.error("Failed to create video for part 3 question: " + question) template["exercises"][2]["prompts"] = sp3_questions template["exercises"][2]["title"] = exercises[2]["topic"] save_to_db_with_id("speaking", template, id) def create_video(text, avatar: AvatarEnum): # POST TO CREATE VIDEO data = { "template_id": avatar.value, "title": "video_title", "test": False, "variables": [ { "properties": { "text": text }, "name": "avatar_0" } ] } response = requests.post(CREATE_VIDEO_URL, headers=POST_HEADER, json=data) logging.info(response.status_code) logging.info(response.json()) # GET TO CHECK STATUS AND GET VIDEO WHEN READY video_id = response.json()["data"]["video_id"] params = { 'video_id': response.json()["data"]["video_id"] } response = {} status = "processing" error = None while status != "completed" and error is None: response = requests.get(GET_VIDEO_URL, headers=GET_HEADER, params=params) response_data = response.json() status = response_data["data"]["status"] error = response_data["data"]["error"] if status != "completed" and error is None: logging.info(f"Status: {status}") time.sleep(5) # Wait for 5 second before the next request logging.info(response.status_code) logging.info(response.json()) # DOWNLOAD VIDEO download_url = response.json()['data']['video_url'] output_directory = 'download-video/' output_filename = video_id + '.mp4' response = requests.get(download_url) if response.status_code == 200: os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist output_path = os.path.join(output_directory, output_filename) with open(output_path, 'wb') as f: f.write(response.content) logging.info(f"File '{output_filename}' downloaded successfully.") return output_filename else: logging.error(f"Failed to download file. Status code: {response.status_code}") return None