diff --git a/app.py b/app.py index a0b26be..555a7b7 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ import firebase_admin from firebase_admin import credentials from flask import Flask, request from flask_jwt_extended import JWTManager, jwt_required +from sentence_transformers import SentenceTransformer from helper.api_messages import * from helper.exam_variant import ExamVariant @@ -17,6 +18,7 @@ from helper.openai_interface import * from helper.question_templates import * from helper.speech_to_text_helper import * from heygen.AvatarEnum import AvatarEnum +from training_content import TrainingContentService, TrainingContentKnowledgeBase, GPT load_dotenv() @@ -33,6 +35,14 @@ firebase_admin.initialize_app(cred) gpt_zero = GPTZero(os.getenv('GPT_ZERO_API_KEY')) +# Training Content Dependencies +embeddings = SentenceTransformer('all-MiniLM-L6-v2') +kb = TrainingContentKnowledgeBase(embeddings) +kb.load_indices_and_metadata() +open_ai = GPT(OpenAI()) +firestore_client = firestore.client() +tc_service = TrainingContentService(kb, open_ai, firestore_client) + thread_event = threading.Event() # Configure logging @@ -1596,5 +1606,16 @@ def grading_summary(): return str(e) +@app.route('/training_content', methods=['POST']) +@jwt_required() +def training_content(): + try: + data = request.get_json() + return tc_service.get_tips(data) + except Exception as e: + app.logger.error(str(e)) + return str(e) + + if __name__ == '__main__': app.run() diff --git a/faiss/ct_focus_tips_index.faiss b/faiss/ct_focus_tips_index.faiss new file mode 100644 index 0000000..909571b Binary files /dev/null and b/faiss/ct_focus_tips_index.faiss differ diff --git a/faiss/language_for_writing_tips_index.faiss b/faiss/language_for_writing_tips_index.faiss new file mode 100644 index 0000000..b9b254c Binary files /dev/null and b/faiss/language_for_writing_tips_index.faiss differ diff --git a/faiss/reading_skill_tips_index.faiss b/faiss/reading_skill_tips_index.faiss new file mode 100644 index 0000000..7113625 Binary files /dev/null and b/faiss/reading_skill_tips_index.faiss differ diff --git a/faiss/strategy_tips_index.faiss b/faiss/strategy_tips_index.faiss new file mode 100644 index 0000000..8032155 Binary files /dev/null and b/faiss/strategy_tips_index.faiss differ diff --git a/faiss/tips_metadata.pkl b/faiss/tips_metadata.pkl new file mode 100644 index 0000000..ecb3614 Binary files /dev/null and b/faiss/tips_metadata.pkl differ diff --git a/faiss/word_link_tips_index.faiss b/faiss/word_link_tips_index.faiss new file mode 100644 index 0000000..b11fd5e Binary files /dev/null and b/faiss/word_link_tips_index.faiss differ diff --git a/faiss/word_partners_tips_index.faiss b/faiss/word_partners_tips_index.faiss new file mode 100644 index 0000000..2f08b63 Binary files /dev/null and b/faiss/word_partners_tips_index.faiss differ diff --git a/faiss/writing_skill_tips_index.faiss b/faiss/writing_skill_tips_index.faiss new file mode 100644 index 0000000..fcae917 Binary files /dev/null and b/faiss/writing_skill_tips_index.faiss differ diff --git a/requirements.txt b/requirements.txt index 978ac46..9a6e207 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/training_content/__init__.py b/training_content/__init__.py new file mode 100644 index 0000000..f1f8bfb --- /dev/null +++ b/training_content/__init__.py @@ -0,0 +1,9 @@ +from .kb import TrainingContentKnowledgeBase +from .service import TrainingContentService +from .gpt import GPT + +__all__ = [ + "TrainingContentService", + "TrainingContentKnowledgeBase", + "GPT" +] diff --git a/training_content/dtos.py b/training_content/dtos.py new file mode 100644 index 0000000..2133f49 --- /dev/null +++ b/training_content/dtos.py @@ -0,0 +1,29 @@ +from pydantic import BaseModel +from typing import List + + +class QueryDTO(BaseModel): + category: str + text: str + + +class DetailsDTO(BaseModel): + exam_id: str + date: int + performance_comment: str + detailed_summary: str + + +class WeakAreaDTO(BaseModel): + area: str + comment: str + + +class TrainingContentDTO(BaseModel): + details: List[DetailsDTO] + weak_areas: List[WeakAreaDTO] + queries: List[QueryDTO] + + +class TipsDTO(BaseModel): + tip_ids: List[str] diff --git a/training_content/gpt.py b/training_content/gpt.py new file mode 100644 index 0000000..b2e1fb6 --- /dev/null +++ b/training_content/gpt.py @@ -0,0 +1,64 @@ +import json +from logging import getLogger + +from typing import List, Optional, Callable + +from openai.types.chat import ChatCompletionMessageParam +from pydantic import BaseModel + + +class GPT: + + def __init__(self, openai_client): + self._client = openai_client + self._default_model = "gpt-4o" + self._logger = getLogger() + + def prediction( + self, + messages: List[ChatCompletionMessageParam], + map_to_model: Callable, + json_scheme: str, + *, + model: Optional[str] = None, + temperature: Optional[float] = None, + max_retries: int = 3 + ) -> List[BaseModel] | BaseModel | str | None: + params = { + "messages": messages, + "response_format": {"type": "json_object"}, + "model": model if model else self._default_model + } + + if temperature: + params["temperature"] = temperature + + attempt = 0 + while attempt < max_retries: + result = self._client.chat.completions.create(**params) + result_content = result.choices[0].message.content + try: + result_json = json.loads(result_content) + return map_to_model(result_json) + except Exception as e: + attempt += 1 + self._logger.info(f"GPT returned malformed response: {result_content}\n {str(e)}") + params["messages"] = [ + { + "role": "user", + "content": ( + "Your previous response wasn't in the json format I've explicitly told you to output. " + f"In your next response, you will fix it and return me just the json I've asked." + ) + }, + { + "role": "user", + "content": ( + f"Previous response: {result_content}\n" + f"JSON format: {json_scheme}" + ) + } + ] + if attempt >= max_retries: + self._logger.error(f"Max retries exceeded!") + return None diff --git a/training_content/kb.py b/training_content/kb.py new file mode 100644 index 0000000..5b17629 --- /dev/null +++ b/training_content/kb.py @@ -0,0 +1,85 @@ +import json +import os +from logging import getLogger +from typing import Dict, List + +import faiss +import pickle + + +class TrainingContentKnowledgeBase: + + def __init__(self, embeddings, path: str = 'pathways_2_rw_with_ids.json'): + self._embedding_model = embeddings + self._tips = None # self._read_json(path) + self._category_metadata = None + self._indices = None + self._logger = getLogger() + + @staticmethod + def _read_json(path: str) -> Dict[str, any]: + with open(path, 'r', encoding="utf-8") as json_file: + return json.loads(json_file.read()) + + def print_category_count(self): + category_tips = {} + for unit in self._tips['units']: + for page in unit['pages']: + for tip in page['tips']: + category = tip['category'].lower().replace(" ", "_") + if category not in category_tips: + category_tips[category] = 0 + else: + category_tips[category] = category_tips[category] + 1 + print(category_tips) + + def create_embeddings_and_save_them(self) -> None: + category_embeddings = {} + category_metadata = {} + + for unit in self._tips['units']: + for page in unit['pages']: + for tip in page['tips']: + category = tip['category'].lower().replace(" ", "_") + if category not in category_embeddings: + category_embeddings[category] = [] + category_metadata[category] = [] + + category_embeddings[category].append(tip['embedding']) + category_metadata[category].append({"id": tip['id'], "text": tip['text']}) + + category_indices = {} + for category, embeddings in category_embeddings.items(): + embeddings_array = self._embedding_model.encode(embeddings) + index = faiss.IndexFlatL2(embeddings_array.shape[1]) + index.add(embeddings_array) + category_indices[category] = index + + faiss.write_index(index, f"./faiss/{category}_tips_index.faiss") + + with open("./faiss/tips_metadata.pkl", "wb") as f: + pickle.dump(category_metadata, f) + + def load_indices_and_metadata( + self, + directory: str = './faiss', + suffix: str = '_tips_index.faiss', + metadata_path: str = './faiss/tips_metadata.pkl' + ): + files = os.listdir(directory) + self._indices = {} + for file in files: + if file.endswith(suffix): + self._indices[file[:-len(suffix)]] = faiss.read_index(f'{directory}/{file}') + self._logger.info(f'Loaded embeddings for {file[:-len(suffix)]} category.') + + with open(metadata_path, 'rb') as f: + self._category_metadata = pickle.load(f) + self._logger.info("Loaded tips metadata") + + def query_knowledge_base(self, query: str, category: str, top_k: int = 5) -> List[Dict[str, str]]: + query_embedding = self._embedding_model.encode([query]) + index = self._indices[category] + D, I = index.search(query_embedding, top_k) + results = [self._category_metadata[category][i] for i in I[0]] + return results diff --git a/training_content/service.py b/training_content/service.py new file mode 100644 index 0000000..08f9c42 --- /dev/null +++ b/training_content/service.py @@ -0,0 +1,278 @@ +from logging import getLogger + +from typing import Dict, List + +from training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO + + +class TrainingContentService: + + TOOLS = [ + 'critical_thinking', + 'language_for_writing', + 'reading_skills', + 'strategy', + 'words', + 'writing_skills' + ] + # strategy word_link ct_focus reading_skill word_partners writing_skill language_for_writing + + def __init__(self, kb, openai, firestore): + self._training_content_module = kb + self._db = firestore + self._logger = getLogger() + self._llm = openai + + def get_tips(self, stats): + exam_data, exam_map = self._sort_out_solutions(stats) + training_content = self._get_exam_details_and_tips(exam_data) + tips = self._query_kb(training_content.queries) + usefull_tips = self._get_usefull_tips(exam_data, tips) + exam_map = self._merge_exam_map_with_details(exam_map, training_content.details) + + weak_areas = {"weak_areas": []} + for area in training_content.weak_areas: + weak_areas["weak_areas"].append(area.dict()) + + training_doc = { + **exam_map, + **usefull_tips.dict(), + **weak_areas + } + doc_ref = self._db.collection('training').add(training_doc) + return { + "id": doc_ref[1].id + } + + @staticmethod + def _merge_exam_map_with_details(exam_map: Dict[str, any], details: List[DetailsDTO]): + new_exam_map = {"exams": []} + for detail in details: + new_exam_map["exams"].append({ + "id": detail.exam_id, + "date": detail.date, + "performance_comment": detail.performance_comment, + "detailed_summary": detail.detailed_summary, + **exam_map[detail.exam_id] + }) + return new_exam_map + + def _query_kb(self, queries: List[QueryDTO]): + map_categories = { + "critical_thinking": "ct_focus", + "language_for_writing": "language_for_writing", + "reading_skills": "reading_skill", + "strategy": "strategy", + "writing_skills": "writing_skill" + } + + tips = {"tips": []} + for query in queries: + print(f"{query.category} {query.text}") + if query.category == "words": + tips["tips"].extend( + self._training_content_module.query_knowledge_base(query.text, "word_link") + ) + tips["tips"].extend( + self._training_content_module.query_knowledge_base(query.text, "word_partners") + ) + else: + if query.category in map_categories: + tips["tips"].extend( + self._training_content_module.query_knowledge_base(query.text, map_categories[query.category]) + ) + else: + self._logger.info(f"GTP tried to query knowledge base for {query.category} and it doesn't exist.") + return tips + + def _get_exam_details_and_tips(self, exam_data: Dict[str, any]) -> TrainingContentDTO: + json_schema = ( + '{ "details": [{"exam_id": "", "date": 0, "performance_comment": "", "detailed_summary": ""}],' + ' "weak_areas": [{"area": "", "comment": ""}], "queries": [{"text": "", "category": ""}] }' + ) + messages = [ + { + "role": "user", + "content": ( + f"I'm going to provide you with exam data, you will take the exam data and fill this json " + f'schema : {json_schema}. "performance_comment" is a short sentence that describes the ' + 'students\'s performance and main mistakes in a single exam, "detailed_summary" is a detailed ' + 'summary of the student\'s performance, "weak_areas" are identified areas' + ' across all exams which need to be improved upon, for example, area "Grammar and Syntax" comment "Issues' + ' with sentence structure and punctuation.", the "queries" field is where you will write queries ' + 'for tips that will be displayed to the student, the category attribute is a collection of ' + 'embeddings and the text will be the text used to query the knowledge base. The categories are ' + f'the following [{", ".join(self.TOOLS)}].' + ) + }, + { + "role": "user", + "content": f'Exam Data: {str(exam_data)}' + } + ] + return self._llm.prediction(messages, self._map_gpt_response, json_schema) + + def _get_usefull_tips(self, exam_data: Dict[str, any], tips: Dict[str, any]) -> TipsDTO: + json_schema = ( + '{ "tip_ids": [] }' + ) + messages = [ + { + "role": "user", + "content": ( + f"I'm going to provide you with tips and I want you to return to me the tips that " + f"can be usefull for the student that made the exam that I'm going to send you, return " + f"me the tip ids in this json format {json_schema}." + ) + }, + { + "role": "user", + "content": f'Exam Data: {str(exam_data)}' + }, + { + "role": "user", + "content": f'Tips: {str(tips)}' + } + ] + return self._llm.prediction(messages, lambda response: TipsDTO(**response), json_schema) + + @staticmethod + def _map_gpt_response(response: Dict[str, any]) -> TrainingContentDTO: + parsed_response = { + "details": [DetailsDTO(**detail) for detail in response["details"]], + "weak_areas": [WeakAreaDTO(**area) for area in response["weak_areas"]], + "queries": [QueryDTO(**query) for query in response["queries"]] + } + return TrainingContentDTO(**parsed_response) + + def _sort_out_solutions(self, stats): + grouped_stats = {} + for stat in stats: + exam_id = stat["exam"] + module = stat["module"] + if module not in grouped_stats: + grouped_stats[module] = {} + if exam_id not in grouped_stats[module]: + grouped_stats[module][exam_id] = [] + grouped_stats[module][exam_id].append(stat) + + exercises = {} + exam_map = {} + for module, exams in grouped_stats.items(): + exercises[module] = {} + for exam_id, stat_group in exams.items(): + exam = self._get_doc_by_id(module, exam_id) + exercises[module][exam_id] = {"date": None, "exercises": [], "score": None} + exam_total_questions = 0 + exam_total_correct = 0 + for stat in stat_group: + exam_total_questions += stat["score"]["total"] + exam_total_correct += stat["score"]["correct"] + exercises[module][exam_id]["date"] = stat["date"] + + if exam_id not in exam_map: + exam_map[exam_id] = {"stat_ids": [], "score": 0} + exam_map[exam_id]["stat_ids"].append(stat["id"]) + + if module == "listening": + exercises[module][exam_id]["exercises"].extend(self._get_listening_solutions(stat, exam)) + if module == "reading": + exercises[module][exam_id]["exercises"].extend(self._get_reading_solutions(stat, exam)) + if module == "writing": + exercises[module][exam_id]["exercises"].extend(self._get_writing_prompts_and_answers(stat, exam)) + + exam_map[exam_id]["score"] = round((exam_total_correct / exam_total_questions) * 100) + return exercises, exam_map + + def _get_writing_prompts_and_answers(self, stat, exam): + result = [] + try: + exercises = [] + for solution in stat['solutions']: + answer = solution['solution'] + exercise_id = solution['id'] + exercises.append({ + "exercise_id": exercise_id, + "answer": answer + }) + for exercise in exercises: + for exam_exercise in exam["exercises"]: + if exam_exercise["id"] == exercise["exercise_id"]: + result.append({ + "exercise": exam_exercise["prompt"], + "answer": exercise["answer"] + }) + + except KeyError as e: + self._logger.warning(f"Malformed stat object: {str(e)}") + + return result + + def _get_listening_solutions(self, stat, exam): + result = [] + try: + for part in exam["parts"]: + for exercise in part["exercises"]: + if exercise["id"] == stat["exercise"]: + if stat["type"] == "writeBlanks": + result.append({ + "question": exercise["prompt"], + "template": exercise["text"], + "solution": exercise["solutions"], + "answer": stat["solutions"] + }) + if stat["type"] == "multipleChoice": + result.append({ + "question": exercise["prompt"], + "exercise": exercise["questions"], + "answer": stat["solutions"] + }) + except KeyError as e: + self._logger.warning(f"Malformed stat object: {str(e)}") + return result + + def _get_reading_solutions(self, stat, exam): + result = [] + try: + for part in exam["parts"]: + text = part["text"] + for exercise in part["exercises"]: + if exercise["id"] == stat["exercise"]: + if stat["type"] == "fillBlanks": + result.append({ + "text": text, + "question": exercise["prompt"], + "template": exercise["text"], + "words": exercise["words"], + "solutions": exercise["solutions"], + "answer": stat["solutions"] + }) + elif stat["type"] == "writeBlanks": + result.append({ + "text": text, + "question": exercise["prompt"], + "template": exercise["text"], + "solutions": exercise["solutions"], + "answer": stat["solutions"] + }) + else: + # match_sentences + result.append({ + "text": text, + "question": exercise["prompt"], + "sentences": exercise["sentences"], + "options": exercise["options"], + "answer": stat["solutions"] + }) + except KeyError as e: + self._logger.warning(f"Malformed stat object: {str(e)}") + return result + + def _get_doc_by_id(self, collection: str, doc_id: str): + collection_ref = self._db.collection(collection) + doc_ref = collection_ref.document(doc_id) + doc = doc_ref.get() + + if doc.exists: + return doc.to_dict() + return None