import os import random import time import requests from dotenv import load_dotenv import app from helper.constants import * from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id from heygen.AvatarEnum import AvatarEnum load_dotenv() # Get HeyGen token TOKEN = os.getenv("HEY_GEN_TOKEN") FIREBASE_BUCKET = os.getenv('FIREBASE_BUCKET') # POST TO CREATE VIDEO CREATE_VIDEO_URL = 'https://api.heygen.com/v1/template.generate' GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get' POST_HEADER = { 'X-Api-Key': TOKEN, 'Content-Type': 'application/json' } GET_HEADER = { 'X-Api-Key': TOKEN } def create_videos_and_save_to_db(exercises, template, id): # Speaking 1 # Using list comprehension to find the element with the desired value in the 'type' field found_exercises_1 = [element for element in exercises if element.get('type') == 1] # Check if any elements were found if found_exercises_1: exercise_1 = found_exercises_1[0] app.app.logger.info('Creating video for speaking part 1') sp1_result = create_video(exercise_1["question"], random.choice(list(AvatarEnum))) if sp1_result is not None: sound_file_path = VIDEO_FILES_PATH + sp1_result firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path) sp1_video_path = firebase_file_path sp1_video_url = url template["exercises"][0]["text"] = exercise_1["question"] template["exercises"][0]["title"] = exercise_1["topic"] template["exercises"][0]["video_url"] = sp1_video_url template["exercises"][0]["video_path"] = sp1_video_path else: app.app.logger.error("Failed to create video for part 1 question: " + exercise_1["question"]) # Speaking 2 # Using list comprehension to find the element with the desired value in the 'type' field found_exercises_2 = [element for element in exercises if element.get('type') == 2] # Check if any elements were found if found_exercises_2: exercise_2 = found_exercises_2[0] app.app.logger.info('Creating video for speaking part 2') sp2_result = create_video(exercise_2["question"], random.choice(list(AvatarEnum))) if sp2_result is not None: sound_file_path = VIDEO_FILES_PATH + sp2_result firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path) sp2_video_path = firebase_file_path sp2_video_url = url template["exercises"][1]["prompts"] = exercise_2["prompts"] template["exercises"][1]["text"] = exercise_2["question"] template["exercises"][1]["title"] = exercise_2["topic"] template["exercises"][1]["video_url"] = sp2_video_url template["exercises"][1]["video_path"] = sp2_video_path else: app.app.logger.error("Failed to create video for part 2 question: " + exercise_2["question"]) # Speaking 3 # Using list comprehension to find the element with the desired value in the 'type' field found_exercises_3 = [element for element in exercises if element.get('type') == 3] # Check if any elements were found if found_exercises_3: exercise_3 = found_exercises_3[0] sp3_questions = [] avatar = random.choice(list(AvatarEnum)) app.app.logger.info('Creating videos for speaking part 3') for question in exercise_3["questions"]: result = create_video(question, avatar) if result is not None: sound_file_path = VIDEO_FILES_PATH + result firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path) video = { "text": question, "video_path": firebase_file_path, "video_url": url } sp3_questions.append(video) else: app.app.logger.error("Failed to create video for part 3 question: " + question) template["exercises"][2]["prompts"] = sp3_questions template["exercises"][2]["title"] = exercise_3["topic"] if not found_exercises_3: template["exercises"].pop(2) if not found_exercises_2: template["exercises"].pop(1) if not found_exercises_1: template["exercises"].pop(0) save_to_db_with_id("speaking", template, id) app.app.logger.info('Saved speaking to DB with id ' + id + " : " + str(template)) def create_video(text, avatar): # POST TO CREATE VIDEO create_video_url = 'https://api.heygen.com/v2/template/' + avatar + '/generate' data = { "test": False, "caption": False, "title": "video_title", "variables": { "script_here": { "name": "script_here", "type": "text", "properties": { "content": text } } } } response = requests.post(create_video_url, headers=POST_HEADER, json=data) app.app.logger.info(response.status_code) app.app.logger.info(response.json()) # GET TO CHECK STATUS AND GET VIDEO WHEN READY video_id = response.json()["data"]["video_id"] params = { 'video_id': response.json()["data"]["video_id"] } response = {} status = "processing" error = None while status != "completed" and error is None: response = requests.get(GET_VIDEO_URL, headers=GET_HEADER, params=params) response_data = response.json() status = response_data["data"]["status"] error = response_data["data"]["error"] if status != "completed" and error is None: app.app.logger.info(f"Status: {status}") time.sleep(10) # Wait for 10 second before the next request app.app.logger.info(response.status_code) app.app.logger.info(response.json()) # DOWNLOAD VIDEO download_url = response.json()['data']['video_url'] output_directory = 'download-video/' output_filename = video_id + '.mp4' response = requests.get(download_url) if response.status_code == 200: os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist output_path = os.path.join(output_directory, output_filename) with open(output_path, 'wb') as f: f.write(response.content) app.app.logger.info(f"File '{output_filename}' downloaded successfully.") return output_filename else: app.app.logger.error(f"Failed to download file. Status code: {response.status_code}") return None