Upload level exam without hooking up to firestore and running in thread, will do this when I have the edit view done
This commit is contained in:
5
modules/__init__.py
Normal file
5
modules/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .gpt import GPT
|
||||
|
||||
__all__ = [
|
||||
"GPT"
|
||||
]
|
||||
66
modules/gpt.py
Normal file
66
modules/gpt.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import json
|
||||
from logging import getLogger
|
||||
|
||||
from typing import List, Optional, Callable, TypeVar
|
||||
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
from pydantic import BaseModel
|
||||
|
||||
T = TypeVar('T', bound=BaseModel)
|
||||
|
||||
|
||||
class GPT:
|
||||
|
||||
def __init__(self, openai_client):
|
||||
self._client = openai_client
|
||||
self._default_model = "gpt-4o-2024-08-06"
|
||||
self._logger = getLogger(__name__)
|
||||
|
||||
def prediction(
|
||||
self,
|
||||
messages: List[ChatCompletionMessageParam],
|
||||
map_to_model: Callable,
|
||||
json_scheme: str,
|
||||
*,
|
||||
model: Optional[str] = None,
|
||||
temperature: Optional[float] = None,
|
||||
max_retries: int = 3
|
||||
) -> List[T] | T | None:
|
||||
params = {
|
||||
"messages": messages,
|
||||
"response_format": {"type": "json_object"},
|
||||
"model": model if model else self._default_model
|
||||
}
|
||||
|
||||
if temperature:
|
||||
params["temperature"] = temperature
|
||||
|
||||
attempt = 0
|
||||
while attempt < max_retries:
|
||||
result = self._client.chat.completions.create(**params)
|
||||
result_content = result.choices[0].message.content
|
||||
try:
|
||||
result_json = json.loads(result_content)
|
||||
return map_to_model(result_json)
|
||||
except Exception as e:
|
||||
attempt += 1
|
||||
self._logger.info(f"GPT returned malformed response: {result_content}\n {str(e)}")
|
||||
params["messages"] = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
"Your previous response wasn't in the json format I've explicitly told you to output. "
|
||||
f"In your next response, you will fix it and return me just the json I've asked."
|
||||
)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f"Previous response: {result_content}\n"
|
||||
f"JSON format: {json_scheme}"
|
||||
)
|
||||
}
|
||||
]
|
||||
if attempt >= max_retries:
|
||||
self._logger.error(f"Max retries exceeded!")
|
||||
return None
|
||||
5
modules/helper/__init__.py
Normal file
5
modules/helper/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .logger import LoggerHelper
|
||||
|
||||
__all__ = [
|
||||
"LoggerHelper"
|
||||
]
|
||||
77
modules/helper/file_helper.py
Normal file
77
modules/helper/file_helper.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
import pypandoc
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class FileHelper:
|
||||
|
||||
# Supposedly pandoc covers a wide range of file extensions only tested with docx
|
||||
@staticmethod
|
||||
def convert_file_to_pdf(input_path: str, output_path: str):
|
||||
pypandoc.convert_file(input_path, 'pdf', outputfile=output_path, extra_args=[
|
||||
'-V', 'geometry:paperwidth=5.5in',
|
||||
'-V', 'geometry:paperheight=8.5in',
|
||||
'-V', 'geometry:margin=0.5in',
|
||||
'-V', 'pagestyle=empty'
|
||||
])
|
||||
|
||||
@staticmethod
|
||||
def convert_file_to_html(input_path: str, output_path: str):
|
||||
pypandoc.convert_file(input_path, 'html', outputfile=output_path)
|
||||
|
||||
@staticmethod
|
||||
def pdf_to_png(path_id: str):
|
||||
to_png = f"pdftoppm -png exercises.pdf page"
|
||||
result = subprocess.run(to_png, shell=True, cwd=f'./tmp/{path_id}', capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception(
|
||||
f"Couldn't convert pdf to png. Failed to run command '{to_png}' -> ```cmd {result.stderr}```")
|
||||
|
||||
@staticmethod
|
||||
def is_page_blank(image_bytes: bytes, image_threshold=10) -> bool:
|
||||
with Image.open(io.BytesIO(image_bytes)) as img:
|
||||
img_gray = img.convert('L')
|
||||
img_array = np.array(img_gray)
|
||||
non_white_pixels = np.sum(img_array < 255)
|
||||
|
||||
return non_white_pixels <= image_threshold
|
||||
|
||||
@classmethod
|
||||
def _encode_image(cls, image_path: str, image_threshold=10) -> Optional[str]:
|
||||
with open(image_path, "rb") as image_file:
|
||||
image_bytes = image_file.read()
|
||||
|
||||
if cls.is_page_blank(image_bytes, image_threshold):
|
||||
return None
|
||||
|
||||
return base64.b64encode(image_bytes).decode('utf-8')
|
||||
|
||||
@classmethod
|
||||
def b64_pngs(cls, path_id: str, files: list[str]):
|
||||
png_messages = []
|
||||
for filename in files:
|
||||
b64_string = cls._encode_image(os.path.join(f'./tmp/{path_id}', filename))
|
||||
if b64_string:
|
||||
png_messages.append({
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{b64_string}"
|
||||
}
|
||||
})
|
||||
return png_messages
|
||||
|
||||
@staticmethod
|
||||
def remove_directory(path):
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
if os.path.isdir(path):
|
||||
shutil.rmtree(path)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while trying to remove {path}: {str(e)}")
|
||||
23
modules/helper/logger.py
Normal file
23
modules/helper/logger.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import logging
|
||||
from functools import wraps
|
||||
|
||||
|
||||
class LoggerHelper:
|
||||
|
||||
@staticmethod
|
||||
def suppress_loggers():
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
root_logger = logging.getLogger()
|
||||
original_level = root_logger.level
|
||||
|
||||
root_logger.setLevel(logging.ERROR)
|
||||
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
root_logger.setLevel(original_level)
|
||||
|
||||
return wrapped
|
||||
return decorator
|
||||
7
modules/training_content/__init__.py
Normal file
7
modules/training_content/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .kb import TrainingContentKnowledgeBase
|
||||
from .service import TrainingContentService
|
||||
|
||||
__all__ = [
|
||||
"TrainingContentService",
|
||||
"TrainingContentKnowledgeBase"
|
||||
]
|
||||
29
modules/training_content/dtos.py
Normal file
29
modules/training_content/dtos.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
|
||||
class QueryDTO(BaseModel):
|
||||
category: str
|
||||
text: str
|
||||
|
||||
|
||||
class DetailsDTO(BaseModel):
|
||||
exam_id: str
|
||||
date: int
|
||||
performance_comment: str
|
||||
detailed_summary: str
|
||||
|
||||
|
||||
class WeakAreaDTO(BaseModel):
|
||||
area: str
|
||||
comment: str
|
||||
|
||||
|
||||
class TrainingContentDTO(BaseModel):
|
||||
details: List[DetailsDTO]
|
||||
weak_areas: List[WeakAreaDTO]
|
||||
queries: List[QueryDTO]
|
||||
|
||||
|
||||
class TipsDTO(BaseModel):
|
||||
tip_ids: List[str]
|
||||
85
modules/training_content/kb.py
Normal file
85
modules/training_content/kb.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import json
|
||||
import os
|
||||
from logging import getLogger
|
||||
from typing import Dict, List
|
||||
|
||||
import faiss
|
||||
import pickle
|
||||
|
||||
|
||||
class TrainingContentKnowledgeBase:
|
||||
|
||||
def __init__(self, embeddings, path: str = 'pathways_2_rw_with_ids.json'):
|
||||
self._embedding_model = embeddings
|
||||
self._tips = None # self._read_json(path)
|
||||
self._category_metadata = None
|
||||
self._indices = None
|
||||
self._logger = getLogger(__name__)
|
||||
|
||||
@staticmethod
|
||||
def _read_json(path: str) -> Dict[str, any]:
|
||||
with open(path, 'r', encoding="utf-8") as json_file:
|
||||
return json.loads(json_file.read())
|
||||
|
||||
def print_category_count(self):
|
||||
category_tips = {}
|
||||
for unit in self._tips['units']:
|
||||
for page in unit['pages']:
|
||||
for tip in page['tips']:
|
||||
category = tip['category'].lower().replace(" ", "_")
|
||||
if category not in category_tips:
|
||||
category_tips[category] = 0
|
||||
else:
|
||||
category_tips[category] = category_tips[category] + 1
|
||||
print(category_tips)
|
||||
|
||||
def create_embeddings_and_save_them(self) -> None:
|
||||
category_embeddings = {}
|
||||
category_metadata = {}
|
||||
|
||||
for unit in self._tips['units']:
|
||||
for page in unit['pages']:
|
||||
for tip in page['tips']:
|
||||
category = tip['category'].lower().replace(" ", "_")
|
||||
if category not in category_embeddings:
|
||||
category_embeddings[category] = []
|
||||
category_metadata[category] = []
|
||||
|
||||
category_embeddings[category].append(tip['embedding'])
|
||||
category_metadata[category].append({"id": tip['id'], "text": tip['text']})
|
||||
|
||||
category_indices = {}
|
||||
for category, embeddings in category_embeddings.items():
|
||||
embeddings_array = self._embedding_model.encode(embeddings)
|
||||
index = faiss.IndexFlatL2(embeddings_array.shape[1])
|
||||
index.add(embeddings_array)
|
||||
category_indices[category] = index
|
||||
|
||||
faiss.write_index(index, f"./faiss/{category}_tips_index.faiss")
|
||||
|
||||
with open("./faiss/tips_metadata.pkl", "wb") as f:
|
||||
pickle.dump(category_metadata, f)
|
||||
|
||||
def load_indices_and_metadata(
|
||||
self,
|
||||
directory: str = './faiss',
|
||||
suffix: str = '_tips_index.faiss',
|
||||
metadata_path: str = './faiss/tips_metadata.pkl'
|
||||
):
|
||||
files = os.listdir(directory)
|
||||
self._indices = {}
|
||||
for file in files:
|
||||
if file.endswith(suffix):
|
||||
self._indices[file[:-len(suffix)]] = faiss.read_index(f'{directory}/{file}')
|
||||
self._logger.info(f'Loaded embeddings for {file[:-len(suffix)]} category.')
|
||||
|
||||
with open(metadata_path, 'rb') as f:
|
||||
self._category_metadata = pickle.load(f)
|
||||
self._logger.info("Loaded tips metadata")
|
||||
|
||||
def query_knowledge_base(self, query: str, category: str, top_k: int = 5) -> List[Dict[str, str]]:
|
||||
query_embedding = self._embedding_model.encode([query])
|
||||
index = self._indices[category]
|
||||
D, I = index.search(query_embedding, top_k)
|
||||
results = [self._category_metadata[category][i] for i in I[0]]
|
||||
return results
|
||||
281
modules/training_content/service.py
Normal file
281
modules/training_content/service.py
Normal file
@@ -0,0 +1,281 @@
|
||||
from datetime import datetime
|
||||
from logging import getLogger
|
||||
|
||||
from typing import Dict, List
|
||||
|
||||
from modules.training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO
|
||||
|
||||
|
||||
class TrainingContentService:
|
||||
|
||||
TOOLS = [
|
||||
'critical_thinking',
|
||||
'language_for_writing',
|
||||
'reading_skills',
|
||||
'strategy',
|
||||
'words',
|
||||
'writing_skills'
|
||||
]
|
||||
# strategy word_link ct_focus reading_skill word_partners writing_skill language_for_writing
|
||||
|
||||
def __init__(self, kb, openai, firestore):
|
||||
self._training_content_module = kb
|
||||
self._db = firestore
|
||||
self._logger = getLogger(__name__)
|
||||
self._llm = openai
|
||||
|
||||
def get_tips(self, stats):
|
||||
exam_data, exam_map = self._sort_out_solutions(stats)
|
||||
training_content = self._get_exam_details_and_tips(exam_data)
|
||||
tips = self._query_kb(training_content.queries)
|
||||
usefull_tips = self._get_usefull_tips(exam_data, tips)
|
||||
exam_map = self._merge_exam_map_with_details(exam_map, training_content.details)
|
||||
|
||||
weak_areas = {"weak_areas": []}
|
||||
for area in training_content.weak_areas:
|
||||
weak_areas["weak_areas"].append(area.dict())
|
||||
|
||||
training_doc = {
|
||||
'created_at': int(datetime.now().timestamp() * 1000),
|
||||
**exam_map,
|
||||
**usefull_tips.dict(),
|
||||
**weak_areas
|
||||
}
|
||||
doc_ref = self._db.collection('training').add(training_doc)
|
||||
return {
|
||||
"id": doc_ref[1].id
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _merge_exam_map_with_details(exam_map: Dict[str, any], details: List[DetailsDTO]):
|
||||
new_exam_map = {"exams": []}
|
||||
for detail in details:
|
||||
new_exam_map["exams"].append({
|
||||
"id": detail.exam_id,
|
||||
"date": detail.date,
|
||||
"performance_comment": detail.performance_comment,
|
||||
"detailed_summary": detail.detailed_summary,
|
||||
**exam_map[detail.exam_id]
|
||||
})
|
||||
return new_exam_map
|
||||
|
||||
def _query_kb(self, queries: List[QueryDTO]):
|
||||
map_categories = {
|
||||
"critical_thinking": "ct_focus",
|
||||
"language_for_writing": "language_for_writing",
|
||||
"reading_skills": "reading_skill",
|
||||
"strategy": "strategy",
|
||||
"writing_skills": "writing_skill"
|
||||
}
|
||||
|
||||
tips = {"tips": []}
|
||||
for query in queries:
|
||||
print(f"{query.category} {query.text}")
|
||||
if query.category == "words":
|
||||
tips["tips"].extend(
|
||||
self._training_content_module.query_knowledge_base(query.text, "word_link")
|
||||
)
|
||||
tips["tips"].extend(
|
||||
self._training_content_module.query_knowledge_base(query.text, "word_partners")
|
||||
)
|
||||
else:
|
||||
if query.category in map_categories:
|
||||
tips["tips"].extend(
|
||||
self._training_content_module.query_knowledge_base(query.text, map_categories[query.category])
|
||||
)
|
||||
else:
|
||||
self._logger.info(f"GTP tried to query knowledge base for {query.category} and it doesn't exist.")
|
||||
return tips
|
||||
|
||||
def _get_exam_details_and_tips(self, exam_data: Dict[str, any]) -> TrainingContentDTO:
|
||||
json_schema = (
|
||||
'{ "details": [{"exam_id": "", "date": 0, "performance_comment": "", "detailed_summary": ""}],'
|
||||
' "weak_areas": [{"area": "", "comment": ""}], "queries": [{"text": "", "category": ""}] }'
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f"I'm going to provide you with exam data, you will take the exam data and fill this json "
|
||||
f'schema : {json_schema}. "performance_comment" is a short sentence that describes the '
|
||||
'students\'s performance and main mistakes in a single exam, "detailed_summary" is a detailed '
|
||||
'summary of the student\'s performance, "weak_areas" are identified areas'
|
||||
' across all exams which need to be improved upon, for example, area "Grammar and Syntax" comment "Issues'
|
||||
' with sentence structure and punctuation.", the "queries" field is where you will write queries '
|
||||
'for tips that will be displayed to the student, the category attribute is a collection of '
|
||||
'embeddings and the text will be the text used to query the knowledge base. The categories are '
|
||||
f'the following [{", ".join(self.TOOLS)}].'
|
||||
)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": f'Exam Data: {str(exam_data)}'
|
||||
}
|
||||
]
|
||||
return self._llm.prediction(messages, self._map_gpt_response, json_schema)
|
||||
|
||||
def _get_usefull_tips(self, exam_data: Dict[str, any], tips: Dict[str, any]) -> TipsDTO:
|
||||
json_schema = (
|
||||
'{ "tip_ids": [] }'
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f"I'm going to provide you with tips and I want you to return to me the tips that "
|
||||
f"can be usefull for the student that made the exam that I'm going to send you, return "
|
||||
f"me the tip ids in this json format {json_schema}."
|
||||
)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": f'Exam Data: {str(exam_data)}'
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": f'Tips: {str(tips)}'
|
||||
}
|
||||
]
|
||||
return self._llm.prediction(messages, lambda response: TipsDTO(**response), json_schema)
|
||||
|
||||
@staticmethod
|
||||
def _map_gpt_response(response: Dict[str, any]) -> TrainingContentDTO:
|
||||
parsed_response = {
|
||||
"details": [DetailsDTO(**detail) for detail in response["details"]],
|
||||
"weak_areas": [WeakAreaDTO(**area) for area in response["weak_areas"]],
|
||||
"queries": [QueryDTO(**query) for query in response["queries"]]
|
||||
}
|
||||
return TrainingContentDTO(**parsed_response)
|
||||
|
||||
def _sort_out_solutions(self, stats):
|
||||
grouped_stats = {}
|
||||
for stat in stats:
|
||||
exam_id = stat["exam"]
|
||||
module = stat["module"]
|
||||
if module not in grouped_stats:
|
||||
grouped_stats[module] = {}
|
||||
if exam_id not in grouped_stats[module]:
|
||||
grouped_stats[module][exam_id] = []
|
||||
grouped_stats[module][exam_id].append(stat)
|
||||
|
||||
exercises = {}
|
||||
exam_map = {}
|
||||
for module, exams in grouped_stats.items():
|
||||
exercises[module] = {}
|
||||
for exam_id, stat_group in exams.items():
|
||||
exam = self._get_doc_by_id(module, exam_id)
|
||||
exercises[module][exam_id] = {"date": None, "exercises": [], "score": None}
|
||||
exam_total_questions = 0
|
||||
exam_total_correct = 0
|
||||
for stat in stat_group:
|
||||
exam_total_questions += stat["score"]["total"]
|
||||
exam_total_correct += stat["score"]["correct"]
|
||||
exercises[module][exam_id]["date"] = stat["date"]
|
||||
|
||||
if exam_id not in exam_map:
|
||||
exam_map[exam_id] = {"stat_ids": [], "score": 0}
|
||||
exam_map[exam_id]["stat_ids"].append(stat["id"])
|
||||
|
||||
if module == "listening":
|
||||
exercises[module][exam_id]["exercises"].extend(self._get_listening_solutions(stat, exam))
|
||||
if module == "reading":
|
||||
exercises[module][exam_id]["exercises"].extend(self._get_reading_solutions(stat, exam))
|
||||
if module == "writing":
|
||||
exercises[module][exam_id]["exercises"].extend(self._get_writing_prompts_and_answers(stat, exam))
|
||||
|
||||
exam_map[exam_id]["score"] = round((exam_total_correct / exam_total_questions) * 100)
|
||||
exam_map[exam_id]["module"] = module
|
||||
return exercises, exam_map
|
||||
|
||||
def _get_writing_prompts_and_answers(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
exercises = []
|
||||
for solution in stat['solutions']:
|
||||
answer = solution['solution']
|
||||
exercise_id = solution['id']
|
||||
exercises.append({
|
||||
"exercise_id": exercise_id,
|
||||
"answer": answer
|
||||
})
|
||||
for exercise in exercises:
|
||||
for exam_exercise in exam["exercises"]:
|
||||
if exam_exercise["id"] == exercise["exercise_id"]:
|
||||
result.append({
|
||||
"exercise": exam_exercise["prompt"],
|
||||
"answer": exercise["answer"]
|
||||
})
|
||||
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
|
||||
return result
|
||||
|
||||
def _get_listening_solutions(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
for part in exam["parts"]:
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["id"] == stat["exercise"]:
|
||||
if stat["type"] == "writeBlanks":
|
||||
result.append({
|
||||
"question": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"solution": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
if stat["type"] == "multipleChoice":
|
||||
result.append({
|
||||
"question": exercise["prompt"],
|
||||
"exercise": exercise["questions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return result
|
||||
|
||||
def _get_reading_solutions(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
for part in exam["parts"]:
|
||||
text = part["text"]
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["id"] == stat["exercise"]:
|
||||
if stat["type"] == "fillBlanks":
|
||||
result.append({
|
||||
"text": text,
|
||||
"question": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"words": exercise["words"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "writeBlanks":
|
||||
result.append({
|
||||
"text": text,
|
||||
"question": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
else:
|
||||
# match_sentences
|
||||
result.append({
|
||||
"text": text,
|
||||
"question": exercise["prompt"],
|
||||
"sentences": exercise["sentences"],
|
||||
"options": exercise["options"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return result
|
||||
|
||||
def _get_doc_by_id(self, collection: str, doc_id: str):
|
||||
collection_ref = self._db.collection(collection)
|
||||
doc_ref = collection_ref.document(doc_id)
|
||||
doc = doc_ref.get()
|
||||
|
||||
if doc.exists:
|
||||
return doc.to_dict()
|
||||
return None
|
||||
5
modules/upload_level/__init__.py
Normal file
5
modules/upload_level/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .service import UploadLevelService
|
||||
|
||||
__all__ = [
|
||||
"UploadLevelService"
|
||||
]
|
||||
57
modules/upload_level/exam_dtos.py
Normal file
57
modules/upload_level/exam_dtos.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import List, Dict, Union, Optional, Any
|
||||
from uuid import uuid4, UUID
|
||||
|
||||
|
||||
class Option(BaseModel):
|
||||
id: str
|
||||
text: str
|
||||
|
||||
|
||||
class MultipleChoiceQuestion(BaseModel):
|
||||
id: str
|
||||
prompt: str
|
||||
variant: str = "text"
|
||||
solution: str
|
||||
options: List[Option]
|
||||
|
||||
|
||||
class MultipleChoiceExercise(BaseModel):
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
type: str = "multipleChoice"
|
||||
prompt: str = "Select the appropriate option."
|
||||
questions: List[MultipleChoiceQuestion]
|
||||
userSolutions: List = Field(default_factory=list)
|
||||
|
||||
|
||||
class FillBlanksWord(BaseModel):
|
||||
id: str
|
||||
options: Dict[str, str]
|
||||
|
||||
|
||||
class FillBlanksSolution(BaseModel):
|
||||
id: str
|
||||
solution: str
|
||||
|
||||
|
||||
class FillBlanksExercise(BaseModel):
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
type: str = "fillBlanks"
|
||||
variant: str = "mc"
|
||||
prompt: str = "Click a blank to select the appropriate word for it."
|
||||
text: str
|
||||
solutions: List[FillBlanksSolution]
|
||||
words: List[FillBlanksWord]
|
||||
userSolutions: List = Field(default_factory=list)
|
||||
|
||||
|
||||
Exercise = Union[MultipleChoiceExercise, FillBlanksExercise]
|
||||
|
||||
|
||||
class Part(BaseModel):
|
||||
exercises: List[Exercise]
|
||||
context: Optional[str] = Field(default=None)
|
||||
|
||||
|
||||
class Exam(BaseModel):
|
||||
parts: List[Part]
|
||||
66
modules/upload_level/mapper.py
Normal file
66
modules/upload_level/mapper.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from typing import Dict, Any
|
||||
|
||||
from pydantic import ValidationError
|
||||
|
||||
from modules.upload_level.exam_dtos import (
|
||||
MultipleChoiceExercise,
|
||||
FillBlanksExercise,
|
||||
Part, Exam
|
||||
)
|
||||
from modules.upload_level.sheet_dtos import Sheet, Option, MultipleChoiceQuestion, FillBlanksWord
|
||||
|
||||
|
||||
class ExamMapper:
|
||||
|
||||
@staticmethod
|
||||
def map_to_exam_model(response: Dict[str, Any]) -> Exam:
|
||||
parts = []
|
||||
for part in response['parts']:
|
||||
part_exercises = part['exercises']
|
||||
context = part.get('context', None)
|
||||
|
||||
exercises = []
|
||||
for exercise in part_exercises:
|
||||
exercise_type = exercise['type']
|
||||
if exercise_type == 'multipleChoice':
|
||||
exercise_model = MultipleChoiceExercise(**exercise)
|
||||
elif exercise_type == 'fillBlanks':
|
||||
exercise_model = FillBlanksExercise(**exercise)
|
||||
else:
|
||||
raise ValidationError(f"Unknown exercise type: {exercise_type}")
|
||||
|
||||
exercises.append(exercise_model)
|
||||
|
||||
part_kwargs = {"exercises": exercises}
|
||||
if context is not None:
|
||||
part_kwargs["context"] = context
|
||||
|
||||
part_model = Part(**part_kwargs)
|
||||
parts.append(part_model)
|
||||
|
||||
return Exam(parts=parts)
|
||||
|
||||
@staticmethod
|
||||
def map_to_sheet(response: Dict[str, Any]) -> Sheet:
|
||||
components = []
|
||||
|
||||
for item in response["components"]:
|
||||
component_type = item["type"]
|
||||
|
||||
if component_type == "multipleChoice":
|
||||
options = [Option(id=opt["id"], text=opt["text"]) for opt in item["options"]]
|
||||
components.append(MultipleChoiceQuestion(
|
||||
id=item["id"],
|
||||
prompt=item["prompt"],
|
||||
variant=item.get("variant", "text"),
|
||||
options=options
|
||||
))
|
||||
elif component_type == "fillBlanks":
|
||||
components.append(FillBlanksWord(
|
||||
id=item["id"],
|
||||
options=item["options"]
|
||||
))
|
||||
else:
|
||||
components.append(item)
|
||||
|
||||
return Sheet(components=components)
|
||||
380
modules/upload_level/service.py
Normal file
380
modules/upload_level/service.py
Normal file
@@ -0,0 +1,380 @@
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from logging import getLogger
|
||||
|
||||
from typing import Dict, Any, Tuple, Callable
|
||||
|
||||
import pdfplumber
|
||||
|
||||
from modules import GPT
|
||||
from modules.helper.file_helper import FileHelper
|
||||
from modules.helper import LoggerHelper
|
||||
from modules.upload_level.exam_dtos import Exam
|
||||
from modules.upload_level.mapper import ExamMapper
|
||||
from modules.upload_level.sheet_dtos import Sheet
|
||||
|
||||
|
||||
class UploadLevelService:
|
||||
def __init__(self, openai: GPT):
|
||||
self._logger = getLogger(__name__)
|
||||
self._llm = openai
|
||||
|
||||
def generate_level_from_file(self, file) -> Dict[str, Any] | None:
|
||||
ext, path_id = self._save_upload(file)
|
||||
FileHelper.convert_file_to_pdf(
|
||||
f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.pdf'
|
||||
)
|
||||
file_has_images = self._check_pdf_for_images(f'./tmp/{path_id}/exercises.pdf')
|
||||
|
||||
if not file_has_images:
|
||||
FileHelper.convert_file_to_html(f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.html')
|
||||
|
||||
completion: Callable[[str], Exam] = self._png_completion if file_has_images else self._html_completion
|
||||
response = completion(path_id)
|
||||
|
||||
FileHelper.remove_directory(f'./tmp/{path_id}')
|
||||
|
||||
if response:
|
||||
return response.dict(exclude_none=True)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
@LoggerHelper.suppress_loggers()
|
||||
def _check_pdf_for_images(pdf_path: str) -> bool:
|
||||
with pdfplumber.open(pdf_path) as pdf:
|
||||
for page in pdf.pages:
|
||||
if page.images:
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _save_upload(file) -> Tuple[str, str]:
|
||||
ext = file.filename.split('.')[-1]
|
||||
path_id = str(uuid.uuid4())
|
||||
os.makedirs(f'./tmp/{path_id}', exist_ok=True)
|
||||
|
||||
tmp_filename = f'./tmp/{path_id}/uploaded.{ext}'
|
||||
file.save(tmp_filename)
|
||||
return ext, path_id
|
||||
|
||||
def _level_json_schema(self):
|
||||
return {
|
||||
"parts": [
|
||||
{
|
||||
"context": "<this attribute is optional you may exclude it if not required>",
|
||||
"exercises": [
|
||||
self._multiple_choice_html(),
|
||||
self._passage_blank_space_html()
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def _html_completion(self, path_id: str) -> Exam:
|
||||
with open(f'./tmp/{path_id}/exercises.html', 'r', encoding='utf-8') as f:
|
||||
html = f.read()
|
||||
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_html(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": html
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_exam_model,
|
||||
str(self._level_json_schema())
|
||||
)
|
||||
|
||||
def _gpt_instructions_html(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are GPT Scraper and your job is to clean dirty html into clean usable JSON formatted data.'
|
||||
'Your current task is to scrape html english questions sheets.\n\n'
|
||||
|
||||
'In the question sheet you will only see 4 types of question:\n'
|
||||
'- blank space multiple choice\n'
|
||||
'- underline multiple choice\n'
|
||||
'- reading passage blank space multiple choice\n'
|
||||
'- reading passage multiple choice\n\n'
|
||||
|
||||
'For the first two types of questions the template is the same but the question prompts differ, '
|
||||
'whilst in the blank space multiple choice you must include in the prompt the blank spaces with '
|
||||
'multiple "_", in the underline you must include in the prompt the <u></u> to '
|
||||
'indicate the underline and the options a, b, c, d must be the ordered underlines in the prompt.\n\n'
|
||||
|
||||
'For the reading passage exercise you must handle the formatting of the passages. If it is a '
|
||||
'reading passage with blank spaces you will see blanks represented with (question id) followed by a '
|
||||
'line and your job is to replace the brackets with the question id and line with "{{question id}}" '
|
||||
'with 2 newlines between paragraphs. For the reading passages without blanks you must remove '
|
||||
'any numbers that may be there to specify paragraph numbers or line numbers, and place 2 newlines '
|
||||
'between paragraphs.\n\n'
|
||||
|
||||
'IMPORTANT: Note that for the reading passages, the html might not reflect the actual paragraph '
|
||||
'structure, don\'t format the reading passages paragraphs only by the <p></p> tags, try to figure '
|
||||
'out the best paragraph separation possible.'
|
||||
|
||||
'You will place all the information in a single JSON: {"parts": [{"exercises": [{...}], "context": ""}]}\n '
|
||||
'Where {...} are the exercises templates for each part of a question sheet and the optional field '
|
||||
'context.'
|
||||
|
||||
'IMPORTANT: The question sheet may be divided by sections but you need to only consider the parts, '
|
||||
'so that you can group the exercises by the parts that are in the html, this is crucial since only '
|
||||
'reading passage multiple choice require context and if the context is included in parts where it '
|
||||
'is not required the UI will be messed up. Some make sure to correctly group the exercises by parts.\n'
|
||||
|
||||
'The templates for the exercises are the following:\n'
|
||||
'- blank space multiple choice, underline multiple choice and reading passage multiple choice: '
|
||||
f'{self._multiple_choice_html()}\n'
|
||||
f'- reading passage blank space multiple choice: {self._passage_blank_space_html()}\n'
|
||||
|
||||
'IMPORTANT: For the reading passage multiple choice the context field must be set with the reading '
|
||||
'passages without paragraphs or line numbers, with 2 newlines between paragraphs, for the other '
|
||||
'exercises exclude the context field.'
|
||||
)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _multiple_choice_html():
|
||||
return {
|
||||
"type": "multipleChoice",
|
||||
"prompt": "Select the appropriate option.",
|
||||
"questions": [
|
||||
{
|
||||
"id": "<the question id>",
|
||||
"prompt": "<the question>",
|
||||
"solution": "<the option id solution>",
|
||||
"options": [
|
||||
{
|
||||
"id": "A",
|
||||
"text": "<the a option>"
|
||||
},
|
||||
{
|
||||
"id": "B",
|
||||
"text": "<the b option>"
|
||||
},
|
||||
{
|
||||
"id": "C",
|
||||
"text": "<the c option>"
|
||||
},
|
||||
{
|
||||
"id": "D",
|
||||
"text": "<the d option>"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _passage_blank_space_html():
|
||||
return {
|
||||
"type": "fillBlanks",
|
||||
"variant": "mc",
|
||||
"prompt": "Click a blank to select the appropriate word for it.",
|
||||
"text": (
|
||||
"<The whole text for the exercise with replacements for blank spaces and their "
|
||||
"ids with {{<question id>}} with 2 newlines between paragraphs>"
|
||||
),
|
||||
"solutions": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"solution": "<the option that holds the solution>"
|
||||
}
|
||||
],
|
||||
"words": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"options": {
|
||||
"A": "<a option>",
|
||||
"B": "<b option>",
|
||||
"C": "<c option>",
|
||||
"D": "<d option>"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def _png_completion(self, path_id: str) -> Exam:
|
||||
FileHelper.pdf_to_png(path_id)
|
||||
|
||||
tmp_files = os.listdir(f'./tmp/{path_id}')
|
||||
pages = [f for f in tmp_files if f.startswith('page-') and f.endswith('.png')]
|
||||
pages.sort(key=lambda f: int(f.split('-')[1].split('.')[0]))
|
||||
|
||||
json_schema = {
|
||||
"components": [
|
||||
{"type": "part", "part": "<name or number of the part>"},
|
||||
self._multiple_choice_png(),
|
||||
{"type": "blanksPassage", "text": (
|
||||
"<The whole text for the exercise with replacements for blank spaces and their "
|
||||
"ids with {{<question id>}} with 2 newlines between paragraphs>"
|
||||
)},
|
||||
{"type": "passage", "context": (
|
||||
"<reading passages without paragraphs or line numbers, with 2 newlines between paragraphs>"
|
||||
)},
|
||||
self._passage_blank_space_png()
|
||||
]
|
||||
}
|
||||
|
||||
components = []
|
||||
|
||||
for i in range(len(pages)):
|
||||
current_page = pages[i]
|
||||
next_page = pages[i + 1] if i + 1 < len(pages) else None
|
||||
batch = [current_page, next_page] if next_page else [current_page]
|
||||
|
||||
sheet = self._png_batch(path_id, batch, json_schema)
|
||||
sheet.batch = i + 1
|
||||
components.append(sheet.dict())
|
||||
|
||||
batches = {"batches": components}
|
||||
with open('output.json', 'w') as json_file:
|
||||
json.dump(batches, json_file, indent=4)
|
||||
|
||||
return self._batches_to_exam_completion(batches)
|
||||
|
||||
def _png_batch(self, path_id: str, files: list[str], json_schema) -> Sheet:
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_png(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
*FileHelper.b64_pngs(path_id, files)
|
||||
]
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_sheet,
|
||||
str(json_schema)
|
||||
)
|
||||
|
||||
def _gpt_instructions_png(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are GPT OCR and your job is to scan image text data and format it to JSON format.'
|
||||
'Your current task is to scan english questions sheets.\n\n'
|
||||
|
||||
'You will place all the information in a single JSON: {"components": [{...}]} where {...} is a set of '
|
||||
'sheet components you will retrieve from the images, the components and their corresponding JSON '
|
||||
'templates are as follows:\n'
|
||||
|
||||
'- Part, a standalone part or part of a section of the question sheet: '
|
||||
'{"type": "part", "part": "<name or number of the part>"}\n'
|
||||
|
||||
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
|
||||
'the prompt field of the template: blanks, underlines and normal. '
|
||||
|
||||
'In the blanks prompt you must leave 5 underscores to represent the blank space. '
|
||||
'In the underlines questions the objective is to pick the words that are incorrect in the given '
|
||||
'sentence, for these questions you must wrap the answer to the question with the html tag <u></u>, '
|
||||
'choose 3 other words to wrap in <u></u>, place them in the prompt field and use the underlined words '
|
||||
'in the order they appear in the question for the options A to D, disreguard options that might be '
|
||||
'included underneath the underlines question and use the ones you wrapped in <u></u>.'
|
||||
'In normal you just leave the question as is. '
|
||||
|
||||
f'The template for multiple choice questions is the following: {self._multiple_choice_png()}.\n'
|
||||
|
||||
'- Reading Passages, there are two types of reading passages. Reading passages where you will see '
|
||||
'blanks represented by a (question id) followed by a line, you must format these types of reading '
|
||||
'passages to be only the text with the brackets that have the question id and line replaced with '
|
||||
'"{{question id}}", also place 2 newlines between paragraphs. For the reading passages without blanks '
|
||||
'you must remove any numbers that may be there to specify paragraph numbers or line numbers, '
|
||||
'and place 2 newlines between paragraphs. '
|
||||
|
||||
'For the reading passages with blanks the template is: {"type": "blanksPassage", '
|
||||
'"text": "<The whole text for the exercise with replacements for blank spaces and their '
|
||||
'ids that are enclosed in brackets with {{<question id>}} also place 2 newlines between paragraphs>"}. '
|
||||
|
||||
'For the reading passage without blanks is: {"type": "passage", "context": "<reading passages without '
|
||||
'paragraphs or line numbers, with 2 newlines between paragraphs>"}\n'
|
||||
|
||||
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
|
||||
'options with the question id and the options from a to d. The template is: '
|
||||
f'{self._passage_blank_space_png()}\n'
|
||||
|
||||
'IMPORTANT: You must place the components in the order that they were given to you. If an exercise or '
|
||||
'reading passages are cut off don\'t include them in the JSON.'
|
||||
)
|
||||
}
|
||||
|
||||
def _multiple_choice_png(self):
|
||||
multiple_choice = self._multiple_choice_html()["questions"][0]
|
||||
multiple_choice["type"] = "multipleChoice"
|
||||
multiple_choice.pop("solution")
|
||||
return multiple_choice
|
||||
|
||||
def _passage_blank_space_png(self):
|
||||
passage_blank_space = self._passage_blank_space_html()["words"][0]
|
||||
passage_blank_space["type"] = "fillBlanks"
|
||||
return passage_blank_space
|
||||
|
||||
def _batches_to_exam_completion(self, batches: Dict[str, Any]) -> Exam:
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_html(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": str(batches)
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_exam_model,
|
||||
str(self._level_json_schema())
|
||||
)
|
||||
|
||||
def _gpt_instructions_batches(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are helpfull assistant. Your task is to merge multiple batches of english question sheet '
|
||||
'components and solve the questions. Each batch may contain overlapping content with the previous '
|
||||
'batch, or close enough content which needs to be excluded. The components are as follows:'
|
||||
|
||||
'- Part, a standalone part or part of a section of the question sheet: '
|
||||
'{"type": "part", "part": "<name or number of the part>"}\n'
|
||||
|
||||
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
|
||||
'the prompt field of the template: blanks, underlines and normal. '
|
||||
|
||||
'In a blanks question, the prompt has underscores to represent the blank space, you must select the '
|
||||
'appropriate option to solve it.'
|
||||
|
||||
'In a underlines question, the prompt has 4 underlines represented by the html tags <u></u>, you must '
|
||||
'select the option that makes the prompt incorrect to solve it. If the options order doesn\'t reflect '
|
||||
'the order in which the underlines appear in the prompt you will need to fix it.'
|
||||
|
||||
'In a normal question there isn\'t either blanks or underlines in the prompt, you should just '
|
||||
'select the appropriate solution.'
|
||||
|
||||
f'The template for these questions is the same: {self._multiple_choice_png()}\n'
|
||||
|
||||
'- Reading Passages, there are two types of reading passages with different templates. The one with '
|
||||
'type "blanksPassage" where the text field holds the passage and a blank is represented by '
|
||||
'{{<some number>}} and the other one with type "passage" that has the context field with just '
|
||||
'reading passages. For both of these components you will have to remove any additional data that might '
|
||||
'be related to a question description and also remove some "(<question id>)" and "_" from blanksPassage'
|
||||
' if there are any. These components are used in conjunction with other ones.'
|
||||
|
||||
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
|
||||
'options with the question id and the options from a to d. The template is: '
|
||||
f'{self._passage_blank_space_png()}\n\n'
|
||||
|
||||
'Now that you know the possible components here\'s what I want you to do:\n'
|
||||
'1. Remove duplicates. A batch will have duplicates of other batches and the components of '
|
||||
'the next batch should always take precedence over the previous one batch, what I mean by this is that '
|
||||
'if batch 1 has, for example, multiple choice question with id 10 and the next one also has id 10, '
|
||||
'you pick the next one.\n'
|
||||
'2. Solve the exercises. There are 4 types of exercises, the 3 multipleChoice variants + a fill blanks '
|
||||
'exercise. For the multiple choice question follow the previous instruction to solve them and place '
|
||||
f'them in this format: {self._multiple_choice_html()}. For the fill blanks exercises you need to match '
|
||||
'the correct blanksPassage to the correct fillBlanks options and then pick the correct option. Here is '
|
||||
f'the template for this exercise: {self._passage_blank_space_html()}.\n'
|
||||
f'3. Restructure the JSON to match this template: {self._level_json_schema()}. You must group the exercises by '
|
||||
'the parts in the order they appear in the batches components. The context field of a part is the '
|
||||
'context of a passage component that has text relevant to normal multiple choice questions.\n'
|
||||
|
||||
'Do your utmost to fullfill the requisites, make sure you include all non-duplicate questions'
|
||||
'in your response and correctly structure the JSON.'
|
||||
)
|
||||
}
|
||||
|
||||
29
modules/upload_level/sheet_dtos.py
Normal file
29
modules/upload_level/sheet_dtos.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Dict, Union, Any, Optional
|
||||
|
||||
|
||||
class Option(BaseModel):
|
||||
id: str
|
||||
text: str
|
||||
|
||||
|
||||
class MultipleChoiceQuestion(BaseModel):
|
||||
type: str = "multipleChoice"
|
||||
id: str
|
||||
prompt: str
|
||||
variant: str = "text"
|
||||
options: List[Option]
|
||||
|
||||
|
||||
class FillBlanksWord(BaseModel):
|
||||
type: str = "fillBlanks"
|
||||
id: str
|
||||
options: Dict[str, str]
|
||||
|
||||
|
||||
Component = Union[MultipleChoiceQuestion, FillBlanksWord, Dict[str, Any]]
|
||||
|
||||
|
||||
class Sheet(BaseModel):
|
||||
batch: Optional[int] = None
|
||||
components: List[Component]
|
||||
Reference in New Issue
Block a user