546 lines
22 KiB
Python
546 lines
22 KiB
Python
from flask import Flask, request
|
|
from flask_jwt_extended import JWTManager, jwt_required
|
|
from functools import reduce
|
|
import firebase_admin
|
|
from firebase_admin import credentials, firestore
|
|
from helper.api_messages import QuestionType, get_grading_messages, get_question_gen_messages, get_question_tips
|
|
from helper.file_helper import delete_files_older_than_one_day
|
|
from helper.firebase_helper import download_firebase_file, upload_file_firebase, upload_file_firebase_get_url, \
|
|
save_to_db
|
|
from helper.heygen_api import create_video
|
|
from helper.speech_to_text_helper import speech_to_text, text_to_speech, has_words
|
|
from helper.token_counter import count_tokens
|
|
from helper.openai_interface import make_openai_call
|
|
import os
|
|
import uuid
|
|
import re
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
from templates.question_templates import getListening1Template, getListening2Template, getSpeaking1Template, \
|
|
getSpeaking2Template, getSpeaking3Template
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.config['JWT_SECRET_KEY'] = os.getenv("JWT_SECRET_KEY")
|
|
jwt = JWTManager(app)
|
|
|
|
# Initialize Firebase Admin SDK
|
|
cred = credentials.Certificate(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
|
|
firebase_admin.initialize_app(cred)
|
|
|
|
GRADING_TEMPERATURE = 0.1
|
|
TIPS_TEMPERATURE = 0.2
|
|
GEN_QUESTION_TEMPERATURE = 0.9
|
|
GPT_3_5_TURBO = "gpt-3.5-turbo"
|
|
GPT_3_5_TURBO_16K = "gpt-3.5-turbo-16k"
|
|
GRADING_FIELDS = ['comment', 'overall', 'task_response']
|
|
GEN_FIELDS = ['question']
|
|
LISTENING_GEN_FIELDS = ['transcript', 'exercise']
|
|
|
|
FIREBASE_BUCKET = 'mti-ielts.appspot.com'
|
|
AUDIO_FILES_PATH = 'download-audio/'
|
|
FIREBASE_LISTENING_AUDIO_FILES_PATH = 'listening_recordings/'
|
|
|
|
VIDEO_FILES_PATH = 'download-video/'
|
|
FIREBASE_SPEAKING_VIDEO_FILES_PATH = 'speaking_videos/'
|
|
|
|
@app.route('/listening_section_1', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_1_question():
|
|
try:
|
|
messages = get_question_gen_messages(QuestionType.LISTENING_SECTION_1)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO_16K, messages, token_count, LISTENING_GEN_FIELDS,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_listening_section_1', methods=['POST'])
|
|
@jwt_required()
|
|
def save_listening_section_1_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
question = getListening1Template()
|
|
file_name = str(uuid.uuid4()) + ".mp3"
|
|
sound_file_path = AUDIO_FILES_PATH + file_name
|
|
firebase_file_path = FIREBASE_LISTENING_AUDIO_FILES_PATH + file_name
|
|
# TODO it's the conversation audio, still work to do on text-to-speech
|
|
text_to_speech(question["audio"]["conversation"], sound_file_path)
|
|
file_url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
question["audio"]["source"] = file_url
|
|
if save_to_db("listening", question):
|
|
return question
|
|
else:
|
|
raise Exception("Failed to save question: " + question)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/listening_section_2', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_2_question():
|
|
try:
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
messages = get_question_gen_messages(QuestionType.LISTENING_SECTION_2)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO_16K, messages, token_count, LISTENING_GEN_FIELDS,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_listening_section_2', methods=['POST'])
|
|
@jwt_required()
|
|
def save_listening_section_2_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
question = getListening2Template()
|
|
file_name = str(uuid.uuid4()) + ".mp3"
|
|
sound_file_path = AUDIO_FILES_PATH + file_name
|
|
firebase_file_path = FIREBASE_LISTENING_AUDIO_FILES_PATH + file_name
|
|
text_to_speech(question["audio"]["text"], sound_file_path)
|
|
file_url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
question["audio"]["source"] = file_url
|
|
if save_to_db("listening", question):
|
|
return question
|
|
else:
|
|
raise Exception("Failed to save question: " + question)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/listening_section_3', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_3_question():
|
|
try:
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
messages = get_question_gen_messages(QuestionType.LISTENING_SECTION_3)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO_16K, messages, token_count, LISTENING_GEN_FIELDS,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_listening_section_3', methods=['POST'])
|
|
@jwt_required()
|
|
def save_listening_section_3_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
question = getListening2Template()
|
|
file_name = str(uuid.uuid4()) + ".mp3"
|
|
sound_file_path = AUDIO_FILES_PATH + file_name
|
|
firebase_file_path = FIREBASE_LISTENING_AUDIO_FILES_PATH + file_name
|
|
text_to_speech(question["audio"]["text"], sound_file_path)
|
|
file_url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
question["audio"]["source"] = file_url
|
|
if save_to_db("listening", question):
|
|
return question
|
|
else:
|
|
raise Exception("Failed to save question: " + question)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/listening_section_4', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_4_question():
|
|
try:
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
messages = get_question_gen_messages(QuestionType.LISTENING_SECTION_4)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO_16K, messages, token_count, LISTENING_GEN_FIELDS,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_listening_section_4', methods=['POST'])
|
|
@jwt_required()
|
|
def save_listening_section_4_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
question = getListening2Template()
|
|
file_name = str(uuid.uuid4()) + ".mp3"
|
|
sound_file_path = AUDIO_FILES_PATH + file_name
|
|
firebase_file_path = FIREBASE_LISTENING_AUDIO_FILES_PATH + file_name
|
|
text_to_speech(question["audio"]["text"], sound_file_path)
|
|
file_url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
question["audio"]["source"] = file_url
|
|
if save_to_db("listening", question):
|
|
return question
|
|
else:
|
|
raise Exception("Failed to save question: " + question)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/writing_task1', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_writing_task_1():
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
context = data.get('context')
|
|
answer = data.get('answer')
|
|
if has_words(answer):
|
|
messages = get_grading_messages(QuestionType.WRITING_TASK_1, question, answer, context)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
return response
|
|
else:
|
|
return {
|
|
'comment': "The answer does not contain any english words.",
|
|
'overall': 0,
|
|
'task_response': {
|
|
'Coherence and Cohesion': 0,
|
|
'Grammatical Range and Accuracy': 0,
|
|
'Lexical Resource': 0,
|
|
'Task Achievement': 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_writing_task_1', methods=['POST'])
|
|
@jwt_required()
|
|
def save_writing_task_1_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
# TODO ADD SAVE IMAGE TO DB
|
|
question = getListening2Template()
|
|
if save_to_db("writing", question):
|
|
return question
|
|
else:
|
|
raise Exception("Failed to save question: " + question)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/writing_task2', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_writing_task_2():
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
answer = data.get('answer')
|
|
if has_words(answer):
|
|
messages = get_grading_messages(QuestionType.WRITING_TASK_2, question, answer)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
return response
|
|
else:
|
|
return {
|
|
'comment': "The answer does not contain any english words.",
|
|
'overall': 0,
|
|
'task_response': {
|
|
'Coherence and Cohesion': 0,
|
|
'Grammatical Range and Accuracy': 0,
|
|
'Lexical Resource': 0,
|
|
'Task Achievement': 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/writing_task2', methods=['GET'])
|
|
@jwt_required()
|
|
def get_writing_task_2_question():
|
|
try:
|
|
messages = get_question_gen_messages(QuestionType.WRITING_TASK_2)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_writing_task_2', methods=['POST'])
|
|
@jwt_required()
|
|
def save_writing_task_2_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
question = getListening2Template()
|
|
if save_to_db("writing", question):
|
|
return question
|
|
else:
|
|
raise Exception("Failed to save question: " + question)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/speaking_task_1', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_speaking_task_1():
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
sound_file_name = AUDIO_FILES_PATH + str(uuid.uuid4())
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
answer_firebase_path = data.get('answer')
|
|
|
|
download_firebase_file(FIREBASE_BUCKET, answer_firebase_path, sound_file_name)
|
|
answer = speech_to_text(sound_file_name)
|
|
if has_words(answer):
|
|
messages = get_grading_messages(QuestionType.SPEAKING_1, question, answer)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
os.remove(sound_file_name)
|
|
return response
|
|
else:
|
|
return {
|
|
"comment": "The audio recorded does not contain any english words.",
|
|
"overall": 0,
|
|
"task_response": {
|
|
"Fluency and Coherence": 0,
|
|
"Lexical Resource": 0,
|
|
"Grammatical Range and Accuracy": 0,
|
|
"Pronunciation": 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
os.remove(sound_file_name)
|
|
return str(e), 400
|
|
|
|
|
|
@app.route('/speaking_task_1', methods=['GET'])
|
|
@jwt_required()
|
|
def get_speaking_task_1_question():
|
|
try:
|
|
messages = get_question_gen_messages(QuestionType.SPEAKING_1)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_speaking_task_1', methods=['POST'])
|
|
@jwt_required()
|
|
def save_speaking_task_1_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
questions_json = getSpeaking1Template()
|
|
questions = []
|
|
for question in questions_json["questions"]:
|
|
result = create_video(question)
|
|
if result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
video = {
|
|
"text": question,
|
|
"video_path": firebase_file_path,
|
|
"video_url": url
|
|
}
|
|
questions.append(video)
|
|
else:
|
|
print("Failed to create video for question: " + question)
|
|
|
|
if len(questions) == len(questions_json["questions"]):
|
|
speaking_pt1_to_insert = {
|
|
"exercises": [
|
|
{
|
|
"id": str(uuid.uuid4()),
|
|
"prompts": questions,
|
|
"text": "Listen carefully and respond.",
|
|
"title": questions_json["topic"],
|
|
"type": "speakingPart1"
|
|
}
|
|
],
|
|
"isDiagnostic": True,
|
|
"minTimer": 5,
|
|
"module": "speaking"
|
|
}
|
|
if save_to_db("speaking", speaking_pt1_to_insert):
|
|
return speaking_pt1_to_insert
|
|
else:
|
|
raise Exception("Failed to save question: " + speaking_pt1_to_insert)
|
|
else:
|
|
raise Exception("Array sizes do not match. Video uploading failing is probably the cause.")
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/speaking_task_2', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_speaking_task_2():
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
sound_file_name = AUDIO_FILES_PATH + str(uuid.uuid4())
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
answer_firebase_path = data.get('answer')
|
|
|
|
download_firebase_file(FIREBASE_BUCKET, answer_firebase_path, sound_file_name)
|
|
answer = speech_to_text(sound_file_name)
|
|
if has_words(answer):
|
|
messages = get_grading_messages(QuestionType.SPEAKING_2, question, answer)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
os.remove(sound_file_name)
|
|
return response
|
|
else:
|
|
return {
|
|
"comment": "The audio recorded does not contain any english words.",
|
|
"overall": 0,
|
|
"task_response": {
|
|
"Fluency and Coherence": 0,
|
|
"Lexical Resource": 0,
|
|
"Grammatical Range and Accuracy": 0,
|
|
"Pronunciation": 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
os.remove(sound_file_name)
|
|
return str(e), 400
|
|
|
|
|
|
@app.route('/speaking_task_2', methods=['GET'])
|
|
@jwt_required()
|
|
def get_speaking_task_2_question():
|
|
try:
|
|
messages = get_question_gen_messages(QuestionType.SPEAKING_2)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_speaking_task_2', methods=['POST'])
|
|
@jwt_required()
|
|
def save_speaking_task_2_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
questions_json = getSpeaking2Template()
|
|
questions = []
|
|
for question in questions_json["questions"]:
|
|
result = create_video(question)
|
|
if result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
video = {
|
|
"text": question,
|
|
"video_path": firebase_file_path,
|
|
"video_url": url
|
|
}
|
|
questions.append(video)
|
|
else:
|
|
print("Failed to create video for question: " + question)
|
|
|
|
if len(questions) == len(questions_json["questions"]):
|
|
speaking_pt2_to_insert = {
|
|
"exercises": [
|
|
{
|
|
"id": str(uuid.uuid4()),
|
|
"prompts": questions,
|
|
"text": "Listen carefully and respond.",
|
|
"title": questions_json["topic"],
|
|
"type": "speakingPart2"
|
|
}
|
|
],
|
|
"isDiagnostic": True,
|
|
"minTimer": 5,
|
|
"module": "speaking"
|
|
}
|
|
if save_to_db("speaking", speaking_pt2_to_insert):
|
|
return speaking_pt2_to_insert
|
|
else:
|
|
raise Exception("Failed to save question: " + str(speaking_pt2_to_insert))
|
|
else:
|
|
raise Exception("Array sizes do not match. Video uploading failing is probably the cause.")
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/save_speaking_task_3', methods=['POST'])
|
|
@jwt_required()
|
|
def save_speaking_task_3_question():
|
|
try:
|
|
# data = request.get_json()
|
|
# question = data.get('question')
|
|
questions_json = getSpeaking3Template()
|
|
questions = []
|
|
for question in questions_json["questions"]:
|
|
result = create_video(question)
|
|
if result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
video = {
|
|
"text": question,
|
|
"video_path": firebase_file_path,
|
|
"video_url": url
|
|
}
|
|
questions.append(video)
|
|
else:
|
|
print("Failed to create video for question: " + question)
|
|
|
|
if len(questions) == len(questions_json["questions"]):
|
|
speaking_pt3_to_insert = {
|
|
"exercises": [
|
|
{
|
|
"id": str(uuid.uuid4()),
|
|
"prompts": questions,
|
|
"text": "Listen carefully and respond.",
|
|
"title": questions_json["topic"],
|
|
"type": "speakingPart3"
|
|
}
|
|
],
|
|
"isDiagnostic": True,
|
|
"minTimer": 5,
|
|
"module": "speaking"
|
|
}
|
|
if save_to_db("speaking", speaking_pt3_to_insert):
|
|
return speaking_pt3_to_insert
|
|
else:
|
|
raise Exception("Failed to save question: " + str(speaking_pt3_to_insert))
|
|
else:
|
|
raise Exception("Array sizes do not match. Video uploading failing is probably the cause.")
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/fetch_tips', methods=['POST'])
|
|
@jwt_required()
|
|
def fetch_answer_tips():
|
|
try:
|
|
data = request.get_json()
|
|
context = data.get('context')
|
|
question = data.get('question')
|
|
answer = data.get('answer')
|
|
correct_answer = data.get('correct_answer')
|
|
messages = get_question_tips(question, answer, correct_answer, context)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, None, TIPS_TEMPERATURE)
|
|
|
|
if isinstance(response, str):
|
|
response = re.sub(r"^[a-zA-Z0-9_]+\:\s*", "", response)
|
|
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|