from flask import Flask, request from flask_jwt_extended import JWTManager, jwt_required from functools import reduce import firebase_admin from firebase_admin import credentials from helper.api_messages import QuestionType, get_grading_messages, get_question_gen_messages from helper.file_helper import delete_files_older_than_one_day from helper.firebase_helper import download_firebase_file from helper.speech_to_text_helper import speech_to_text from helper.token_counter import count_tokens from helper.openai_interface import make_openai_call import os import uuid from dotenv import load_dotenv load_dotenv() app = Flask(__name__) app.config['JWT_SECRET_KEY'] = os.getenv("JWT_SECRET_KEY") jwt = JWTManager(app) # Initialize Firebase Admin SDK cred = credentials.Certificate(os.getenv("GOOGLE_APPLICATION_CREDENTIALS")) firebase_admin.initialize_app(cred) GRADING_TEMPERATURE = 0.1 GEN_QUESTION_TEMPERATURE = 0.9 GRADING_FIELDS = ['overall', 'comment', 'task_response'] GEN_FIELDS = ['question'] FIREBASE_BUCKET = 'mti-ielts.appspot.com' AUDIO_FILES_PATH = 'download-audio/' @app.route('/writing_task1', methods=['POST']) @jwt_required() def grade_writing_task_1(): try: data = request.get_json() question = data.get('question') context = data.get('context') answer = data.get('answer') messages = get_grading_messages(QuestionType.WRITING_TASK_1, question, answer, context) token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'], map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0) response = make_openai_call(messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE) return response except Exception as e: return str(e) @app.route('/writing_task2', methods=['POST']) @jwt_required() def grade_writing_task_2(): try: data = request.get_json() question = data.get('question') answer = data.get('answer') messages = get_grading_messages(QuestionType.WRITING_TASK_2, question, answer) token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'], map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0) response = make_openai_call(messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE) return response except Exception as e: return str(e) @app.route('/writing_task2', methods=['GET']) @jwt_required() def get_writing_task_2_question(): try: messages = get_question_gen_messages(QuestionType.WRITING_TASK_2) token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'], map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0) response = make_openai_call(messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE) return response except Exception as e: return str(e) @app.route('/speaking_task_1', methods=['POST']) @jwt_required() def grade_speaking_task_1(): delete_files_older_than_one_day(AUDIO_FILES_PATH) sound_file_name = AUDIO_FILES_PATH + str(uuid.uuid4()) try: data = request.get_json() question = data.get('question') answer_firebase_path = data.get('answer') download_firebase_file(FIREBASE_BUCKET, answer_firebase_path, sound_file_name) answer = speech_to_text(sound_file_name) messages = get_grading_messages(QuestionType.SPEAKING_1, question, answer) token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'], map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0) response = make_openai_call(messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE) os.remove(sound_file_name) return response except Exception as e: os.remove(sound_file_name) return str(e) @app.route('/speaking_task_1', methods=['GET']) @jwt_required() def get_speaking_task_1_question(): try: messages = get_question_gen_messages(QuestionType.SPEAKING_1) token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'], map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0) response = make_openai_call(messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE) return response except Exception as e: return str(e) @app.route('/speaking_task_2', methods=['POST']) @jwt_required() def grade_speaking_task_2(): delete_files_older_than_one_day(AUDIO_FILES_PATH) sound_file_name = AUDIO_FILES_PATH + str(uuid.uuid4()) try: data = request.get_json() question = data.get('question') answer_firebase_path = data.get('answer') download_firebase_file(FIREBASE_BUCKET, answer_firebase_path, sound_file_name) answer = speech_to_text(sound_file_name) messages = get_grading_messages(QuestionType.SPEAKING_2, question, answer) token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'], map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0) response = make_openai_call(messages, token_count, GRADING_FIELDS, GRADING_TEMPERATURE) os.remove(sound_file_name) return response except Exception as e: os.remove(sound_file_name) return str(e) @app.route('/speaking_task_2', methods=['GET']) @jwt_required() def get_speaking_task_2_question(): try: messages = get_question_gen_messages(QuestionType.SPEAKING_2) token_count = reduce(lambda count, item: count + count_tokens(item)['n_tokens'], map(lambda x: x["content"], filter(lambda x: "content" in x, messages)), 0) response = make_openai_call(messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE) return response except Exception as e: return str(e) if __name__ == '__main__': app.run()