Finished training content backend

This commit is contained in:
Carlos Mesquita
2024-07-31 14:56:33 +01:00
parent adfc027458
commit 8e56a3228b
15 changed files with 486 additions and 0 deletions

21
app.py
View File

@@ -5,6 +5,7 @@ import firebase_admin
from firebase_admin import credentials from firebase_admin import credentials
from flask import Flask, request from flask import Flask, request
from flask_jwt_extended import JWTManager, jwt_required from flask_jwt_extended import JWTManager, jwt_required
from sentence_transformers import SentenceTransformer
from helper.api_messages import * from helper.api_messages import *
from helper.exam_variant import ExamVariant from helper.exam_variant import ExamVariant
@@ -17,6 +18,7 @@ from helper.openai_interface import *
from helper.question_templates import * from helper.question_templates import *
from helper.speech_to_text_helper import * from helper.speech_to_text_helper import *
from heygen.AvatarEnum import AvatarEnum from heygen.AvatarEnum import AvatarEnum
from training_content import TrainingContentService, TrainingContentKnowledgeBase, GPT
load_dotenv() load_dotenv()
@@ -33,6 +35,14 @@ firebase_admin.initialize_app(cred)
gpt_zero = GPTZero(os.getenv('GPT_ZERO_API_KEY')) gpt_zero = GPTZero(os.getenv('GPT_ZERO_API_KEY'))
# Training Content Dependencies
embeddings = SentenceTransformer('all-MiniLM-L6-v2')
kb = TrainingContentKnowledgeBase(embeddings)
kb.load_indices_and_metadata()
open_ai = GPT(OpenAI())
firestore_client = firestore.client()
tc_service = TrainingContentService(kb, open_ai, firestore_client)
thread_event = threading.Event() thread_event = threading.Event()
# Configure logging # Configure logging
@@ -1596,5 +1606,16 @@ def grading_summary():
return str(e) return str(e)
@app.route('/training_content', methods=['POST'])
@jwt_required()
def training_content():
try:
data = request.get_json()
return tc_service.get_tips(data)
except Exception as e:
app.logger.error(str(e))
return str(e)
if __name__ == '__main__': if __name__ == '__main__':
app.run() app.run()

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
faiss/tips_metadata.pkl Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,9 @@
from .kb import TrainingContentKnowledgeBase
from .service import TrainingContentService
from .gpt import GPT
__all__ = [
"TrainingContentService",
"TrainingContentKnowledgeBase",
"GPT"
]

29
training_content/dtos.py Normal file
View File

@@ -0,0 +1,29 @@
from pydantic import BaseModel
from typing import List
class QueryDTO(BaseModel):
category: str
text: str
class DetailsDTO(BaseModel):
exam_id: str
date: int
performance_comment: str
detailed_summary: str
class WeakAreaDTO(BaseModel):
area: str
comment: str
class TrainingContentDTO(BaseModel):
details: List[DetailsDTO]
weak_areas: List[WeakAreaDTO]
queries: List[QueryDTO]
class TipsDTO(BaseModel):
tip_ids: List[str]

64
training_content/gpt.py Normal file
View File

@@ -0,0 +1,64 @@
import json
from logging import getLogger
from typing import List, Optional, Callable
from openai.types.chat import ChatCompletionMessageParam
from pydantic import BaseModel
class GPT:
def __init__(self, openai_client):
self._client = openai_client
self._default_model = "gpt-4o"
self._logger = getLogger()
def prediction(
self,
messages: List[ChatCompletionMessageParam],
map_to_model: Callable,
json_scheme: str,
*,
model: Optional[str] = None,
temperature: Optional[float] = None,
max_retries: int = 3
) -> List[BaseModel] | BaseModel | str | None:
params = {
"messages": messages,
"response_format": {"type": "json_object"},
"model": model if model else self._default_model
}
if temperature:
params["temperature"] = temperature
attempt = 0
while attempt < max_retries:
result = self._client.chat.completions.create(**params)
result_content = result.choices[0].message.content
try:
result_json = json.loads(result_content)
return map_to_model(result_json)
except Exception as e:
attempt += 1
self._logger.info(f"GPT returned malformed response: {result_content}\n {str(e)}")
params["messages"] = [
{
"role": "user",
"content": (
"Your previous response wasn't in the json format I've explicitly told you to output. "
f"In your next response, you will fix it and return me just the json I've asked."
)
},
{
"role": "user",
"content": (
f"Previous response: {result_content}\n"
f"JSON format: {json_scheme}"
)
}
]
if attempt >= max_retries:
self._logger.error(f"Max retries exceeded!")
return None

85
training_content/kb.py Normal file
View File

@@ -0,0 +1,85 @@
import json
import os
from logging import getLogger
from typing import Dict, List
import faiss
import pickle
class TrainingContentKnowledgeBase:
def __init__(self, embeddings, path: str = 'pathways_2_rw_with_ids.json'):
self._embedding_model = embeddings
self._tips = None # self._read_json(path)
self._category_metadata = None
self._indices = None
self._logger = getLogger()
@staticmethod
def _read_json(path: str) -> Dict[str, any]:
with open(path, 'r', encoding="utf-8") as json_file:
return json.loads(json_file.read())
def print_category_count(self):
category_tips = {}
for unit in self._tips['units']:
for page in unit['pages']:
for tip in page['tips']:
category = tip['category'].lower().replace(" ", "_")
if category not in category_tips:
category_tips[category] = 0
else:
category_tips[category] = category_tips[category] + 1
print(category_tips)
def create_embeddings_and_save_them(self) -> None:
category_embeddings = {}
category_metadata = {}
for unit in self._tips['units']:
for page in unit['pages']:
for tip in page['tips']:
category = tip['category'].lower().replace(" ", "_")
if category not in category_embeddings:
category_embeddings[category] = []
category_metadata[category] = []
category_embeddings[category].append(tip['embedding'])
category_metadata[category].append({"id": tip['id'], "text": tip['text']})
category_indices = {}
for category, embeddings in category_embeddings.items():
embeddings_array = self._embedding_model.encode(embeddings)
index = faiss.IndexFlatL2(embeddings_array.shape[1])
index.add(embeddings_array)
category_indices[category] = index
faiss.write_index(index, f"./faiss/{category}_tips_index.faiss")
with open("./faiss/tips_metadata.pkl", "wb") as f:
pickle.dump(category_metadata, f)
def load_indices_and_metadata(
self,
directory: str = './faiss',
suffix: str = '_tips_index.faiss',
metadata_path: str = './faiss/tips_metadata.pkl'
):
files = os.listdir(directory)
self._indices = {}
for file in files:
if file.endswith(suffix):
self._indices[file[:-len(suffix)]] = faiss.read_index(f'{directory}/{file}')
self._logger.info(f'Loaded embeddings for {file[:-len(suffix)]} category.')
with open(metadata_path, 'rb') as f:
self._category_metadata = pickle.load(f)
self._logger.info("Loaded tips metadata")
def query_knowledge_base(self, query: str, category: str, top_k: int = 5) -> List[Dict[str, str]]:
query_embedding = self._embedding_model.encode([query])
index = self._indices[category]
D, I = index.search(query_embedding, top_k)
results = [self._category_metadata[category][i] for i in I[0]]
return results

278
training_content/service.py Normal file
View File

@@ -0,0 +1,278 @@
from logging import getLogger
from typing import Dict, List
from training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO
class TrainingContentService:
TOOLS = [
'critical_thinking',
'language_for_writing',
'reading_skills',
'strategy',
'words',
'writing_skills'
]
# strategy word_link ct_focus reading_skill word_partners writing_skill language_for_writing
def __init__(self, kb, openai, firestore):
self._training_content_module = kb
self._db = firestore
self._logger = getLogger()
self._llm = openai
def get_tips(self, stats):
exam_data, exam_map = self._sort_out_solutions(stats)
training_content = self._get_exam_details_and_tips(exam_data)
tips = self._query_kb(training_content.queries)
usefull_tips = self._get_usefull_tips(exam_data, tips)
exam_map = self._merge_exam_map_with_details(exam_map, training_content.details)
weak_areas = {"weak_areas": []}
for area in training_content.weak_areas:
weak_areas["weak_areas"].append(area.dict())
training_doc = {
**exam_map,
**usefull_tips.dict(),
**weak_areas
}
doc_ref = self._db.collection('training').add(training_doc)
return {
"id": doc_ref[1].id
}
@staticmethod
def _merge_exam_map_with_details(exam_map: Dict[str, any], details: List[DetailsDTO]):
new_exam_map = {"exams": []}
for detail in details:
new_exam_map["exams"].append({
"id": detail.exam_id,
"date": detail.date,
"performance_comment": detail.performance_comment,
"detailed_summary": detail.detailed_summary,
**exam_map[detail.exam_id]
})
return new_exam_map
def _query_kb(self, queries: List[QueryDTO]):
map_categories = {
"critical_thinking": "ct_focus",
"language_for_writing": "language_for_writing",
"reading_skills": "reading_skill",
"strategy": "strategy",
"writing_skills": "writing_skill"
}
tips = {"tips": []}
for query in queries:
print(f"{query.category} {query.text}")
if query.category == "words":
tips["tips"].extend(
self._training_content_module.query_knowledge_base(query.text, "word_link")
)
tips["tips"].extend(
self._training_content_module.query_knowledge_base(query.text, "word_partners")
)
else:
if query.category in map_categories:
tips["tips"].extend(
self._training_content_module.query_knowledge_base(query.text, map_categories[query.category])
)
else:
self._logger.info(f"GTP tried to query knowledge base for {query.category} and it doesn't exist.")
return tips
def _get_exam_details_and_tips(self, exam_data: Dict[str, any]) -> TrainingContentDTO:
json_schema = (
'{ "details": [{"exam_id": "", "date": 0, "performance_comment": "", "detailed_summary": ""}],'
' "weak_areas": [{"area": "", "comment": ""}], "queries": [{"text": "", "category": ""}] }'
)
messages = [
{
"role": "user",
"content": (
f"I'm going to provide you with exam data, you will take the exam data and fill this json "
f'schema : {json_schema}. "performance_comment" is a short sentence that describes the '
'students\'s performance and main mistakes in a single exam, "detailed_summary" is a detailed '
'summary of the student\'s performance, "weak_areas" are identified areas'
' across all exams which need to be improved upon, for example, area "Grammar and Syntax" comment "Issues'
' with sentence structure and punctuation.", the "queries" field is where you will write queries '
'for tips that will be displayed to the student, the category attribute is a collection of '
'embeddings and the text will be the text used to query the knowledge base. The categories are '
f'the following [{", ".join(self.TOOLS)}].'
)
},
{
"role": "user",
"content": f'Exam Data: {str(exam_data)}'
}
]
return self._llm.prediction(messages, self._map_gpt_response, json_schema)
def _get_usefull_tips(self, exam_data: Dict[str, any], tips: Dict[str, any]) -> TipsDTO:
json_schema = (
'{ "tip_ids": [] }'
)
messages = [
{
"role": "user",
"content": (
f"I'm going to provide you with tips and I want you to return to me the tips that "
f"can be usefull for the student that made the exam that I'm going to send you, return "
f"me the tip ids in this json format {json_schema}."
)
},
{
"role": "user",
"content": f'Exam Data: {str(exam_data)}'
},
{
"role": "user",
"content": f'Tips: {str(tips)}'
}
]
return self._llm.prediction(messages, lambda response: TipsDTO(**response), json_schema)
@staticmethod
def _map_gpt_response(response: Dict[str, any]) -> TrainingContentDTO:
parsed_response = {
"details": [DetailsDTO(**detail) for detail in response["details"]],
"weak_areas": [WeakAreaDTO(**area) for area in response["weak_areas"]],
"queries": [QueryDTO(**query) for query in response["queries"]]
}
return TrainingContentDTO(**parsed_response)
def _sort_out_solutions(self, stats):
grouped_stats = {}
for stat in stats:
exam_id = stat["exam"]
module = stat["module"]
if module not in grouped_stats:
grouped_stats[module] = {}
if exam_id not in grouped_stats[module]:
grouped_stats[module][exam_id] = []
grouped_stats[module][exam_id].append(stat)
exercises = {}
exam_map = {}
for module, exams in grouped_stats.items():
exercises[module] = {}
for exam_id, stat_group in exams.items():
exam = self._get_doc_by_id(module, exam_id)
exercises[module][exam_id] = {"date": None, "exercises": [], "score": None}
exam_total_questions = 0
exam_total_correct = 0
for stat in stat_group:
exam_total_questions += stat["score"]["total"]
exam_total_correct += stat["score"]["correct"]
exercises[module][exam_id]["date"] = stat["date"]
if exam_id not in exam_map:
exam_map[exam_id] = {"stat_ids": [], "score": 0}
exam_map[exam_id]["stat_ids"].append(stat["id"])
if module == "listening":
exercises[module][exam_id]["exercises"].extend(self._get_listening_solutions(stat, exam))
if module == "reading":
exercises[module][exam_id]["exercises"].extend(self._get_reading_solutions(stat, exam))
if module == "writing":
exercises[module][exam_id]["exercises"].extend(self._get_writing_prompts_and_answers(stat, exam))
exam_map[exam_id]["score"] = round((exam_total_correct / exam_total_questions) * 100)
return exercises, exam_map
def _get_writing_prompts_and_answers(self, stat, exam):
result = []
try:
exercises = []
for solution in stat['solutions']:
answer = solution['solution']
exercise_id = solution['id']
exercises.append({
"exercise_id": exercise_id,
"answer": answer
})
for exercise in exercises:
for exam_exercise in exam["exercises"]:
if exam_exercise["id"] == exercise["exercise_id"]:
result.append({
"exercise": exam_exercise["prompt"],
"answer": exercise["answer"]
})
except KeyError as e:
self._logger.warning(f"Malformed stat object: {str(e)}")
return result
def _get_listening_solutions(self, stat, exam):
result = []
try:
for part in exam["parts"]:
for exercise in part["exercises"]:
if exercise["id"] == stat["exercise"]:
if stat["type"] == "writeBlanks":
result.append({
"question": exercise["prompt"],
"template": exercise["text"],
"solution": exercise["solutions"],
"answer": stat["solutions"]
})
if stat["type"] == "multipleChoice":
result.append({
"question": exercise["prompt"],
"exercise": exercise["questions"],
"answer": stat["solutions"]
})
except KeyError as e:
self._logger.warning(f"Malformed stat object: {str(e)}")
return result
def _get_reading_solutions(self, stat, exam):
result = []
try:
for part in exam["parts"]:
text = part["text"]
for exercise in part["exercises"]:
if exercise["id"] == stat["exercise"]:
if stat["type"] == "fillBlanks":
result.append({
"text": text,
"question": exercise["prompt"],
"template": exercise["text"],
"words": exercise["words"],
"solutions": exercise["solutions"],
"answer": stat["solutions"]
})
elif stat["type"] == "writeBlanks":
result.append({
"text": text,
"question": exercise["prompt"],
"template": exercise["text"],
"solutions": exercise["solutions"],
"answer": stat["solutions"]
})
else:
# match_sentences
result.append({
"text": text,
"question": exercise["prompt"],
"sentences": exercise["sentences"],
"options": exercise["options"],
"answer": stat["solutions"]
})
except KeyError as e:
self._logger.warning(f"Malformed stat object: {str(e)}")
return result
def _get_doc_by_id(self, collection: str, doc_id: str):
collection_ref = self._db.collection(collection)
doc_ref = collection_ref.document(doc_id)
doc = doc_ref.get()
if doc.exists:
return doc.to_dict()
return None