656 lines
26 KiB
Python
656 lines
26 KiB
Python
import random
|
|
|
|
from flask import Flask, request
|
|
from flask_jwt_extended import JWTManager, jwt_required
|
|
from functools import reduce
|
|
from helper.api_messages import *
|
|
from helper.constants import *
|
|
from helper.exercises import *
|
|
from helper.file_helper import delete_files_older_than_one_day
|
|
from helper.firebase_helper import *
|
|
from helper.heygen_api import create_video
|
|
from helper.speech_to_text_helper import *
|
|
from helper.token_counter import count_tokens
|
|
from helper.openai_interface import make_openai_call, make_openai_instruct_call
|
|
import os
|
|
import re
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
from heygen.AvatarEnum import AvatarEnum
|
|
from templates.question_templates import *
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.config['JWT_SECRET_KEY'] = os.getenv("JWT_SECRET_KEY")
|
|
jwt = JWTManager(app)
|
|
|
|
# Initialize Firebase Admin SDK
|
|
cred = credentials.Certificate(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
|
|
firebase_admin.initialize_app(cred)
|
|
|
|
|
|
@app.route('/healthcheck', methods=['GET'])
|
|
def healthcheck():
|
|
return {"healthy": True}
|
|
|
|
|
|
@app.route('/listening_section_1', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_1_question():
|
|
try:
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
# Extract parameters from the URL query string
|
|
topic = request.args.get('topic', default=random.choice(two_people_scenarios))
|
|
req_exercises = request.args.getlist('exercises')
|
|
|
|
if (len(req_exercises) == 0):
|
|
req_exercises = random.sample(LISTENING_EXERCISE_TYPES, 1)
|
|
|
|
number_of_exercises_q = divide_number_into_parts(TOTAL_LISTENING_SECTION_1_EXERCISES, len(req_exercises))
|
|
|
|
unprocessed_conversation, processed_conversation = generate_listening_1_conversation(topic)
|
|
|
|
print("Generated conversation: " + str(processed_conversation))
|
|
|
|
start_id = 1
|
|
exercises = generate_listening_conversation_exercises(unprocessed_conversation, req_exercises,
|
|
number_of_exercises_q,
|
|
start_id)
|
|
return {
|
|
"exercises": exercises,
|
|
"text": processed_conversation
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/listening_section_2', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_2_question():
|
|
try:
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
# Extract parameters from the URL query string
|
|
topic = request.args.get('topic', default=random.choice(social_monologue_contexts))
|
|
req_exercises = request.args.getlist('exercises')
|
|
|
|
if (len(req_exercises) == 0):
|
|
req_exercises = random.sample(LISTENING_EXERCISE_TYPES, 2)
|
|
|
|
number_of_exercises_q = divide_number_into_parts(TOTAL_LISTENING_SECTION_2_EXERCISES, len(req_exercises))
|
|
|
|
monologue = generate_listening_2_monologue(topic)
|
|
|
|
print("Generated monologue: " + str(monologue))
|
|
start_id = 11
|
|
exercises = generate_listening_monologue_exercises(monologue, req_exercises, number_of_exercises_q, start_id)
|
|
return {
|
|
"exercises": exercises,
|
|
"text": monologue
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/listening_section_3', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_3_question():
|
|
try:
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
# Extract parameters from the URL query string
|
|
topic = request.args.get('topic', default=random.choice(four_people_scenarios))
|
|
req_exercises = request.args.getlist('exercises')
|
|
|
|
if (len(req_exercises) == 0):
|
|
req_exercises = random.sample(LISTENING_EXERCISE_TYPES, 1)
|
|
|
|
number_of_exercises_q = divide_number_into_parts(TOTAL_LISTENING_SECTION_3_EXERCISES, len(req_exercises))
|
|
|
|
unprocessed_conversation, processed_conversation = generate_listening_3_conversation(topic)
|
|
|
|
print("Generated conversation: " + str(processed_conversation))
|
|
|
|
start_id = 21
|
|
exercises = generate_listening_conversation_exercises(unprocessed_conversation, req_exercises,
|
|
number_of_exercises_q,
|
|
start_id)
|
|
return {
|
|
"exercises": exercises,
|
|
"text": processed_conversation
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/listening_section_4', methods=['GET'])
|
|
@jwt_required()
|
|
def get_listening_section_4_question():
|
|
try:
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
# Extract parameters from the URL query string
|
|
topic = request.args.get('topic', default=random.choice(academic_subjects))
|
|
req_exercises = request.args.getlist('exercises')
|
|
|
|
if (len(req_exercises) == 0):
|
|
req_exercises = random.sample(LISTENING_EXERCISE_TYPES, 2)
|
|
|
|
number_of_exercises_q = divide_number_into_parts(TOTAL_LISTENING_SECTION_4_EXERCISES, len(req_exercises))
|
|
|
|
monologue = generate_listening_4_monologue(topic)
|
|
|
|
print("Generated monologue: " + str(monologue))
|
|
start_id = 31
|
|
exercises = generate_listening_monologue_exercises(monologue, req_exercises, number_of_exercises_q, start_id)
|
|
return {
|
|
"exercises": exercises,
|
|
"text": monologue
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/listening', methods=['POST'])
|
|
@jwt_required()
|
|
def save_listening():
|
|
try:
|
|
data = request.get_json()
|
|
parts = data.get('parts')
|
|
template = getListeningTemplate()
|
|
for i, part in enumerate(parts, start=0):
|
|
file_name = str(uuid.uuid4()) + ".mp3"
|
|
sound_file_path = AUDIO_FILES_PATH + file_name
|
|
firebase_file_path = FIREBASE_LISTENING_AUDIO_FILES_PATH + file_name
|
|
if "conversation" in part["text"]:
|
|
conversation_text_to_speech(part["text"]["conversation"], sound_file_path)
|
|
else:
|
|
text_to_speech(part["text"], sound_file_path)
|
|
file_url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
template["parts"][i]["audio"]["source"] = file_url
|
|
template["parts"][i]["exercises"] = part["exercises"]
|
|
if save_to_db("listening", template):
|
|
return template
|
|
else:
|
|
raise Exception("Failed to save question: " + parts)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/writing_task1', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_writing_task_1():
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
context = data.get('context')
|
|
answer = data.get('answer')
|
|
if has_words(answer):
|
|
messages = get_grading_messages(QuestionType.WRITING_TASK_1, question, answer, context)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
return response
|
|
else:
|
|
return {
|
|
'comment': "The answer does not contain any english words.",
|
|
'overall': 0,
|
|
'task_response': {
|
|
'Coherence and Cohesion': 0,
|
|
'Grammatical Range and Accuracy': 0,
|
|
'Lexical Resource': 0,
|
|
'Task Achievement': 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/writing_task1_general', methods=['GET'])
|
|
@jwt_required()
|
|
def get_writing_task_1_general_question():
|
|
try:
|
|
gen_wt1_question = "Craft a prompt for an IELTS Writing Task 1 General Training exercise that instructs the " \
|
|
"student to compose a letter. The prompt should present a specific scenario or situation, " \
|
|
"requiring the student to provide information, advice, or instructions within the letter."
|
|
token_count = count_tokens(gen_wt1_question)["n_tokens"]
|
|
response = make_openai_instruct_call(GPT_3_5_TURBO_INSTRUCT, gen_wt1_question, token_count, None,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return {
|
|
"question": response.strip()
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/writing_task2', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_writing_task_2():
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
answer = data.get('answer')
|
|
if has_words(answer):
|
|
messages = get_grading_messages(QuestionType.WRITING_TASK_2, question, answer)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
return response
|
|
else:
|
|
return {
|
|
'comment': "The answer does not contain any english words.",
|
|
'overall': 0,
|
|
'task_response': {
|
|
'Coherence and Cohesion': 0,
|
|
'Grammatical Range and Accuracy': 0,
|
|
'Lexical Resource': 0,
|
|
'Task Achievement': 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/writing_task2_general', methods=['GET'])
|
|
@jwt_required()
|
|
def get_writing_task_2_general_question():
|
|
try:
|
|
gen_wt2_question = "Craft a comprehensive question for IELTS Writing Task 2 General Training that directs the candidate " \
|
|
"to delve into an in-depth analysis of contrasting perspectives on a specific topic. The candidate " \
|
|
"should be asked to discuss the strengths and weaknesses of both viewpoints, provide evidence or " \
|
|
"examples, and present a well-rounded argument before concluding with their personal opinion on the " \
|
|
"subject."
|
|
token_count = count_tokens(gen_wt2_question)["n_tokens"]
|
|
response = make_openai_instruct_call(GPT_3_5_TURBO_INSTRUCT, gen_wt2_question, token_count, None,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return {
|
|
"question": response.strip()
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/writing', methods=['POST'])
|
|
@jwt_required()
|
|
def save_writing_task():
|
|
try:
|
|
data = request.get_json()
|
|
exercises = data.get('exercises')
|
|
template = getWritingTemplate()
|
|
for i, exercise in enumerate(exercises, start=0):
|
|
template["exercises"][i]["prompt"] = exercise
|
|
if save_to_db("writing", template):
|
|
return template
|
|
else:
|
|
raise Exception("Failed to save writing: " + template)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/speaking_task_1', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_speaking_task_1():
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
sound_file_name = AUDIO_FILES_PATH + str(uuid.uuid4())
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
answer_firebase_path = data.get('answer')
|
|
|
|
download_firebase_file(FIREBASE_BUCKET, answer_firebase_path, sound_file_name)
|
|
answer = speech_to_text(sound_file_name)
|
|
if has_10_words(answer):
|
|
messages = get_grading_messages(QuestionType.SPEAKING_1, question, answer)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
os.remove(sound_file_name)
|
|
return response
|
|
else:
|
|
return {
|
|
"comment": "The audio recorded does not contain enough english words to be graded.",
|
|
"overall": 0,
|
|
"task_response": {
|
|
"Fluency and Coherence": 0,
|
|
"Lexical Resource": 0,
|
|
"Grammatical Range and Accuracy": 0,
|
|
"Pronunciation": 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
os.remove(sound_file_name)
|
|
return str(e), 400
|
|
|
|
|
|
@app.route('/speaking_task_1', methods=['GET'])
|
|
@jwt_required()
|
|
def get_speaking_task_1_question():
|
|
try:
|
|
gen_sp1_question = "Craft a thought-provoking question for IELTS Speaking Part 1 that encourages candidates to delve deeply " \
|
|
"into personal experiences, preferences, or insights on diverse topics. Instruct the candidate to offer " \
|
|
"not only detailed descriptions but also provide nuanced explanations, examples, or anecdotes to enrich " \
|
|
"their response." \
|
|
"Provide your response in this json format: {'topic': 'topic','question': 'question'}"
|
|
token_count = count_tokens(gen_sp1_question)["n_tokens"]
|
|
response = make_openai_instruct_call(GPT_3_5_TURBO_INSTRUCT, gen_sp1_question, token_count, GEN_FIELDS,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/speaking_task_2', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_speaking_task_2():
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
sound_file_name = AUDIO_FILES_PATH + str(uuid.uuid4())
|
|
try:
|
|
data = request.get_json()
|
|
question = data.get('question')
|
|
answer_firebase_path = data.get('answer')
|
|
|
|
download_firebase_file(FIREBASE_BUCKET, answer_firebase_path, sound_file_name)
|
|
answer = speech_to_text(sound_file_name)
|
|
if has_10_words(answer):
|
|
messages = get_grading_messages(QuestionType.SPEAKING_2, question, answer)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
os.remove(sound_file_name)
|
|
return response
|
|
else:
|
|
return {
|
|
"comment": "The audio recorded does not contain enough english words to be graded.",
|
|
"overall": 0,
|
|
"task_response": {
|
|
"Fluency and Coherence": 0,
|
|
"Lexical Resource": 0,
|
|
"Grammatical Range and Accuracy": 0,
|
|
"Pronunciation": 0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
os.remove(sound_file_name)
|
|
return str(e), 400
|
|
|
|
|
|
@app.route('/speaking_task_2', methods=['GET'])
|
|
@jwt_required()
|
|
def get_speaking_task_2_question():
|
|
try:
|
|
gen_sp2_question = "Create a question for IELTS Speaking Part 2 that encourages candidates to narrate a personal experience " \
|
|
"or story related to a randomly selected topic. Include 3 prompts that guide the candidate to describe " \
|
|
"specific aspects of the experience, such as details about the situation, their actions, and the " \
|
|
"reasons it left a lasting impression." \
|
|
"Provide your response in this json format: {'topic': 'topic','question': 'question', " \
|
|
"'prompts': ['prompt_1', 'prompt_2', 'prompt_3']}"
|
|
token_count = count_tokens(gen_sp2_question)["n_tokens"]
|
|
response = make_openai_instruct_call(GPT_3_5_TURBO_INSTRUCT, gen_sp2_question, token_count, GEN_FIELDS,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/speaking_task_3', methods=['GET'])
|
|
@jwt_required()
|
|
def get_speaking_task_3_question():
|
|
try:
|
|
gen_sp3_question = "Formulate a set of 3 questions for IELTS Speaking Part 3 that encourage candidates to engage in a " \
|
|
"meaningful discussion on a particular topic. Provide inquiries, ensuring " \
|
|
"they explore various aspects, perspectives, and implications related to the topic." \
|
|
"Provide your response in this json format: {'topic': 'topic','questions': ['question', " \
|
|
"'question', 'question']}"
|
|
token_count = count_tokens(gen_sp3_question)["n_tokens"]
|
|
response = make_openai_instruct_call(GPT_3_5_TURBO_INSTRUCT, gen_sp3_question, token_count, GEN_FIELDS,
|
|
GEN_QUESTION_TEMPERATURE)
|
|
# Remove the numbers from the questions only if the string starts with a number
|
|
response["questions"] = [re.sub(r"^\d+\.\s*", "", question) if re.match(r"^\d+\.", question) else question for
|
|
question in response["questions"]]
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/speaking_task_3', methods=['POST'])
|
|
@jwt_required()
|
|
def grade_speaking_task_3():
|
|
delete_files_older_than_one_day(AUDIO_FILES_PATH)
|
|
try:
|
|
data = request.get_json()
|
|
answers = data.get('answers')
|
|
|
|
for item in answers:
|
|
sound_file_name = AUDIO_FILES_PATH + str(uuid.uuid4())
|
|
download_firebase_file(FIREBASE_BUCKET, item["answer"], sound_file_name)
|
|
answer_text = speech_to_text(sound_file_name)
|
|
item["answer_text"] = answer_text
|
|
os.remove(sound_file_name)
|
|
if not has_10_words(answer_text):
|
|
return {
|
|
"comment": "The audio recorded does not contain enough english words to be graded.",
|
|
"overall": 0,
|
|
"task_response": {
|
|
"Fluency and Coherence": 0,
|
|
"Lexical Resource": 0,
|
|
"Grammatical Range and Accuracy": 0,
|
|
"Pronunciation": 0
|
|
}
|
|
}
|
|
|
|
messages = get_speaking_grading_messages(answers)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE)
|
|
return response
|
|
except Exception as e:
|
|
return str(e), 400
|
|
|
|
|
|
@app.route('/speaking', methods=['POST'])
|
|
@jwt_required()
|
|
def save_speaking():
|
|
try:
|
|
data = request.get_json()
|
|
exercises = data.get('exercises')
|
|
template = getSpeakingTemplate()
|
|
|
|
# Speaking 1
|
|
sp1_result = create_video(exercises[0]["question"], random.choice(list(AvatarEnum)))
|
|
if sp1_result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + sp1_result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
sp1_video_path = firebase_file_path
|
|
sp1_video_url = url
|
|
template["exercises"][0]["text"] = exercises[0]["question"]
|
|
template["exercises"][0]["title"] = exercises[0]["topic"]
|
|
template["exercises"][0]["video_url"] = sp1_video_url
|
|
template["exercises"][0]["video_path"] = sp1_video_path
|
|
else:
|
|
print("Failed to create video for part 1 question: " + exercises[0]["question"])
|
|
|
|
# Speaking 2
|
|
sp2_result = create_video(exercises[1]["question"], random.choice(list(AvatarEnum)))
|
|
if sp2_result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + sp2_result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
sp2_video_path = firebase_file_path
|
|
sp2_video_url = url
|
|
template["exercises"][1]["prompts"] = exercises[1]["prompts"]
|
|
template["exercises"][1]["text"] = exercises[1]["question"]
|
|
template["exercises"][1]["title"] = exercises[1]["topic"]
|
|
template["exercises"][1]["video_url"] = sp2_video_url
|
|
template["exercises"][1]["video_path"] = sp2_video_path
|
|
else:
|
|
print("Failed to create video for part 2 question: " + exercises[1]["question"])
|
|
|
|
# Speaking 3
|
|
sp3_questions = []
|
|
avatar = random.choice(list(AvatarEnum))
|
|
for question in exercises[2]["questions"]:
|
|
result = create_video(question, avatar)
|
|
if result is not None:
|
|
sound_file_path = VIDEO_FILES_PATH + result
|
|
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
|
|
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
|
video = {
|
|
"text": question,
|
|
"video_path": firebase_file_path,
|
|
"video_url": url
|
|
}
|
|
sp3_questions.append(video)
|
|
else:
|
|
print("Failed to create video for part 3 question: " + question)
|
|
template["exercises"][2]["prompts"] = sp3_questions
|
|
template["exercises"][2]["title"] = exercises[2]["topic"]
|
|
|
|
if save_to_db("speaking", template):
|
|
return template
|
|
else:
|
|
raise Exception("Failed to save speaking: " + template)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/reading_passage_1', methods=['GET'])
|
|
@jwt_required()
|
|
def get_reading_passage_1_question():
|
|
try:
|
|
# Extract parameters from the URL query string
|
|
topic = request.args.get('topic', default=random.choice(topics))
|
|
req_exercises = request.args.getlist('exercises')
|
|
|
|
if (len(req_exercises) == 0):
|
|
req_exercises = random.sample(READING_EXERCISE_TYPES, 2)
|
|
|
|
number_of_exercises_q = divide_number_into_parts(TOTAL_READING_PASSAGE_1_EXERCISES, len(req_exercises))
|
|
|
|
passage = generate_reading_passage(QuestionType.READING_PASSAGE_1, topic)
|
|
print("Generated passage: " + str(passage))
|
|
start_id = 1
|
|
exercises = generate_reading_exercises(passage["text"], req_exercises, number_of_exercises_q, start_id)
|
|
return {
|
|
"exercises": exercises,
|
|
"text": {
|
|
"content": passage["text"],
|
|
"title": passage["title"]
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/reading_passage_2', methods=['GET'])
|
|
@jwt_required()
|
|
def get_reading_passage_2_question():
|
|
try:
|
|
# Extract parameters from the URL query string
|
|
topic = request.args.get('topic', default=random.choice(topics))
|
|
req_exercises = request.args.getlist('exercises')
|
|
|
|
if (len(req_exercises) == 0):
|
|
req_exercises = random.sample(READING_EXERCISE_TYPES, 2)
|
|
|
|
number_of_exercises_q = divide_number_into_parts(TOTAL_READING_PASSAGE_2_EXERCISES, len(req_exercises))
|
|
|
|
passage = generate_reading_passage(QuestionType.READING_PASSAGE_2, topic)
|
|
print("Generated passage: " + str(passage))
|
|
start_id = 14
|
|
exercises = generate_reading_exercises(passage["text"], req_exercises, number_of_exercises_q, start_id)
|
|
return {
|
|
"exercises": exercises,
|
|
"text": {
|
|
"content": passage["text"],
|
|
"title": passage["title"]
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/reading_passage_3', methods=['GET'])
|
|
@jwt_required()
|
|
def get_reading_passage_3_question():
|
|
try:
|
|
# Extract parameters from the URL query string
|
|
topic = request.args.get('topic', default=random.choice(topics))
|
|
req_exercises = request.args.getlist('exercises')
|
|
|
|
if (len(req_exercises) == 0):
|
|
req_exercises = random.sample(READING_EXERCISE_TYPES, 2)
|
|
|
|
number_of_exercises_q = divide_number_into_parts(TOTAL_READING_PASSAGE_3_EXERCISES, len(req_exercises))
|
|
|
|
passage = generate_reading_passage(QuestionType.READING_PASSAGE_3, topic)
|
|
print("Generated passage: " + str(passage))
|
|
start_id = 27
|
|
exercises = generate_reading_exercises(passage["text"], req_exercises, number_of_exercises_q, start_id)
|
|
return {
|
|
"exercises": exercises,
|
|
"text": {
|
|
"content": passage["text"],
|
|
"title": passage["title"]
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
@app.route('/reading', methods=['POST'])
|
|
@jwt_required()
|
|
def save_reading_passage():
|
|
try:
|
|
data = request.get_json()
|
|
parts = data.get('parts')
|
|
template = getReadingTemplate()
|
|
template["parts"] = parts
|
|
if save_to_db("reading", template):
|
|
return template
|
|
else:
|
|
raise Exception("Failed to save reading: " + template)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/level', methods=['GET'])
|
|
@jwt_required()
|
|
def get_level_exam():
|
|
try:
|
|
number_of_exercises = 25
|
|
exercises = gen_multiple_choice_level(number_of_exercises)
|
|
return {
|
|
"exercises": [exercises],
|
|
"isDiagnostic": False,
|
|
"minTimer": 25,
|
|
"module": "level"
|
|
}
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/fetch_tips', methods=['POST'])
|
|
@jwt_required()
|
|
def fetch_answer_tips():
|
|
try:
|
|
data = request.get_json()
|
|
context = data.get('context')
|
|
question = data.get('question')
|
|
answer = data.get('answer')
|
|
correct_answer = data.get('correct_answer')
|
|
messages = get_question_tips(question, answer, correct_answer, context)
|
|
token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'],
|
|
map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0)
|
|
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, None, TIPS_TEMPERATURE)
|
|
|
|
if isinstance(response, str):
|
|
response = re.sub(r"^[a-zA-Z0-9_]+\:\s*", "", response)
|
|
|
|
return response
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|