Used main as base branch in the last time
This commit is contained in:
49
helper/gpt_zero.py
Normal file
49
helper/gpt_zero.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from logging import getLogger
|
||||
from typing import Dict, Optional
|
||||
import requests
|
||||
|
||||
|
||||
class GPTZero:
|
||||
_GPT_ZERO_ENDPOINT = 'https://api.gptzero.me/v2/predict/text'
|
||||
|
||||
def __init__(self, gpt_zero_key: str):
|
||||
self._logger = getLogger(__name__)
|
||||
if gpt_zero_key is None:
|
||||
self._logger.warning('GPT Zero key was not included! Skipping ai detection when grading.')
|
||||
self._gpt_zero_key = gpt_zero_key
|
||||
self._header = {
|
||||
'x-api-key': gpt_zero_key
|
||||
}
|
||||
|
||||
def run_detection(self, text: str):
|
||||
if self._gpt_zero_key is None:
|
||||
return None
|
||||
data = {
|
||||
'document': text,
|
||||
'version': '',
|
||||
'multilingual': False
|
||||
}
|
||||
response = requests.post(self._GPT_ZERO_ENDPOINT, headers=self._header, json=data)
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
return self._parse_detection(response.json())
|
||||
|
||||
def _parse_detection(self, response: Dict) -> Optional[Dict]:
|
||||
try:
|
||||
text_scan = response["documents"][0]
|
||||
filtered_sentences = [
|
||||
{
|
||||
"sentence": item["sentence"],
|
||||
"highlight_sentence_for_ai": item["highlight_sentence_for_ai"]
|
||||
}
|
||||
for item in text_scan["sentences"]
|
||||
]
|
||||
return {
|
||||
"class_probabilities": text_scan["class_probabilities"],
|
||||
"confidence_category": text_scan["confidence_category"],
|
||||
"predicted_class": text_scan["predicted_class"],
|
||||
"sentences": filtered_sentences
|
||||
}
|
||||
except Exception as e:
|
||||
self._logger.error(f'Failed to parse GPT\'s Zero response: {str(e)}')
|
||||
return None
|
||||
@@ -1,17 +1,19 @@
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from logging import getLogger
|
||||
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
|
||||
import app
|
||||
from helper.constants import *
|
||||
from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id
|
||||
from heygen.AvatarEnum import AvatarEnum
|
||||
|
||||
load_dotenv()
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
# Get HeyGen token
|
||||
TOKEN = os.getenv("HEY_GEN_TOKEN")
|
||||
FIREBASE_BUCKET = os.getenv('FIREBASE_BUCKET')
|
||||
@@ -37,7 +39,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
if found_exercises_1:
|
||||
exercise_1 = found_exercises_1[0]
|
||||
sp1_questions = []
|
||||
app.app.logger.info('Creating video for speaking part 1')
|
||||
logger.info('Creating video for speaking part 1')
|
||||
for question in exercise_1["questions"]:
|
||||
sp1_result = create_video(question, avatar)
|
||||
if sp1_result is not None:
|
||||
@@ -51,7 +53,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
}
|
||||
sp1_questions.append(video)
|
||||
else:
|
||||
app.app.logger.error("Failed to create video for part 1 question: " + exercise_1["question"])
|
||||
logger.error("Failed to create video for part 1 question: " + exercise_1["question"])
|
||||
template["exercises"][0]["prompts"] = sp1_questions
|
||||
template["exercises"][0]["first_title"] = exercise_1["first_topic"]
|
||||
template["exercises"][0]["second_title"] = exercise_1["second_topic"]
|
||||
@@ -62,7 +64,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
# Check if any elements were found
|
||||
if found_exercises_2:
|
||||
exercise_2 = found_exercises_2[0]
|
||||
app.app.logger.info('Creating video for speaking part 2')
|
||||
logger.info('Creating video for speaking part 2')
|
||||
sp2_result = create_video(exercise_2["question"], avatar)
|
||||
if sp2_result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + sp2_result
|
||||
@@ -76,7 +78,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
template["exercises"][1]["video_url"] = sp2_video_url
|
||||
template["exercises"][1]["video_path"] = sp2_video_path
|
||||
else:
|
||||
app.app.logger.error("Failed to create video for part 2 question: " + exercise_2["question"])
|
||||
logger.error("Failed to create video for part 2 question: " + exercise_2["question"])
|
||||
|
||||
# Speaking 3
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
@@ -85,7 +87,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
if found_exercises_3:
|
||||
exercise_3 = found_exercises_3[0]
|
||||
sp3_questions = []
|
||||
app.app.logger.info('Creating videos for speaking part 3')
|
||||
logger.info('Creating videos for speaking part 3')
|
||||
for question in exercise_3["questions"]:
|
||||
result = create_video(question, avatar)
|
||||
if result is not None:
|
||||
@@ -99,7 +101,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
}
|
||||
sp3_questions.append(video)
|
||||
else:
|
||||
app.app.logger.error("Failed to create video for part 3 question: " + question)
|
||||
logger.error("Failed to create video for part 3 question: " + question)
|
||||
template["exercises"][2]["prompts"] = sp3_questions
|
||||
template["exercises"][2]["title"] = exercise_3["topic"]
|
||||
|
||||
@@ -111,7 +113,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
template["exercises"].pop(0)
|
||||
|
||||
save_to_db_with_id("speaking", template, id)
|
||||
app.app.logger.info('Saved speaking to DB with id ' + id + " : " + str(template))
|
||||
logger.info('Saved speaking to DB with id ' + id + " : " + str(template))
|
||||
|
||||
|
||||
def create_video(text, avatar):
|
||||
@@ -132,8 +134,8 @@ def create_video(text, avatar):
|
||||
}
|
||||
}
|
||||
response = requests.post(create_video_url, headers=POST_HEADER, json=data)
|
||||
app.app.logger.info(response.status_code)
|
||||
app.app.logger.info(response.json())
|
||||
logger.info(response.status_code)
|
||||
logger.info(response.json())
|
||||
|
||||
# GET TO CHECK STATUS AND GET VIDEO WHEN READY
|
||||
video_id = response.json()["data"]["video_id"]
|
||||
@@ -152,11 +154,11 @@ def create_video(text, avatar):
|
||||
error = response_data["data"]["error"]
|
||||
|
||||
if status != "completed" and error is None:
|
||||
app.app.logger.info(f"Status: {status}")
|
||||
logger.info(f"Status: {status}")
|
||||
time.sleep(10) # Wait for 10 second before the next request
|
||||
|
||||
app.app.logger.info(response.status_code)
|
||||
app.app.logger.info(response.json())
|
||||
logger.info(response.status_code)
|
||||
logger.info(response.json())
|
||||
|
||||
# DOWNLOAD VIDEO
|
||||
download_url = response.json()['data']['video_url']
|
||||
@@ -170,8 +172,8 @@ def create_video(text, avatar):
|
||||
output_path = os.path.join(output_directory, output_filename)
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
app.app.logger.info(f"File '{output_filename}' downloaded successfully.")
|
||||
logger.info(f"File '{output_filename}' downloaded successfully.")
|
||||
return output_filename
|
||||
else:
|
||||
app.app.logger.error(f"Failed to download file. Status code: {response.status_code}")
|
||||
logger.error(f"Failed to download file. Status code: {response.status_code}")
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user