1 Commits

Author SHA1 Message Date
Cristiano Ferreira
9df4889517 New custom level tests. 2024-09-02 15:28:41 +01:00
27 changed files with 449 additions and 1128 deletions

View File

@@ -5,4 +5,3 @@ README.md
*.pyd
__pycache__
.pytest_cache
/scripts

6
.env
View File

@@ -4,9 +4,3 @@ JWT_TEST_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0In0.Emrs2D3B
GOOGLE_APPLICATION_CREDENTIALS=firebase-configs/storied-phalanx-349916.json
HEY_GEN_TOKEN=MjY4MDE0MjdjZmNhNDFmYTlhZGRkNmI3MGFlMzYwZDItMTY5NTExNzY3MA==
GPT_ZERO_API_KEY=0195b9bb24c5439899f71230809c74af
FIREBASE_SCRYPT_B64_SIGNER_KEY="vbO3Xii2lajSeSkCstq3s/dCwpXP7J2YN9rP/KRreU2vGOT1fg+wzSuy1kIhBECqJHG82tmwAilSxLFFtNKVMA=="
FIREBASE_SCRYPT_B64_SALT_SEPARATOR="Bw=="
FIREBASE_SCRYPT_ROUNDS=8
FIREBASE_SCRYPT_MEM_COST=14
FIREBASE_PROJECT_ID=storied-phalanx-349916

1
.gitignore vendored
View File

@@ -3,4 +3,3 @@ __pycache__
.env
.DS_Store
/firebase-configs/test_firebase.json
/scripts

3
.idea/ielts-be.iml generated
View File

@@ -7,9 +7,6 @@
<orderEntry type="jdk" jdkName="Python 3.11 (ielts-be)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">
<option name="versionSpecifier" value="Don't specify version" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="GOOGLE" />
<option name="myDocStringFormat" value="Google" />

View File

@@ -6,30 +6,12 @@ FROM python:3.11-slim
# Allow statements and log messages to immediately appear in the logs
ENV PYTHONUNBUFFERED True
ENV GOOGLE_APPLICATION_CREDENTIALS=/app/firebase-configs/storied-phalanx-349916.json
# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./
RUN apt update && apt install -y \
ffmpeg \
poppler-utils \
texlive-latex-base \
texlive-fonts-recommended \
texlive-latex-extra \
texlive-xetex \
pandoc \
librsvg2-bin \
curl \
&& rm -rf /var/lib/apt/lists/*
RUN curl -sL https://deb.nodesource.com/setup_20.x | bash - \
&& apt-get install -y nodejs
RUN npm install -g firebase-tools
RUN apt update && apt install -y ffmpeg
# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

131
app.py
View File

@@ -18,11 +18,7 @@ from helper.openai_interface import *
from helper.question_templates import *
from helper.speech_to_text_helper import *
from heygen.AvatarEnum import AvatarEnum
from modules import GPT
from modules.training_content import TrainingContentService, TrainingContentKnowledgeBase
from modules.upload_level import UploadLevelService
from modules.batch_users import BatchUsers
from training_content import TrainingContentService, TrainingContentKnowledgeBase, GPT
load_dotenv()
@@ -47,10 +43,6 @@ open_ai = GPT(OpenAI())
firestore_client = firestore.client()
tc_service = TrainingContentService(kb, open_ai, firestore_client)
upload_level_service = UploadLevelService(open_ai)
batch_users_service = BatchUsers(firestore_client)
thread_event = threading.Event()
# Configure logging
@@ -1256,17 +1248,17 @@ def get_level_utas():
all_mc_questions = []
# PART 1
mc_exercises1 = gen_multiple_choice_blank_space_utas(15, 1, all_mc_questions)
mc_exercises1 = gen_multiple_choice_blank_space_utas(15, 1, None, all_mc_questions)
print(json.dumps(mc_exercises1, indent=4))
all_mc_questions.append(mc_exercises1)
# PART 2
mc_exercises2 = gen_multiple_choice_blank_space_utas(15, 16, all_mc_questions)
mc_exercises2 = gen_multiple_choice_blank_space_utas(15, 16, None, all_mc_questions)
print(json.dumps(mc_exercises2, indent=4))
all_mc_questions.append(mc_exercises2)
# PART 3
mc_exercises3 = gen_multiple_choice_blank_space_utas(15, 31, all_mc_questions)
mc_exercises3 = gen_multiple_choice_blank_space_utas(15, 31, None, all_mc_questions)
print(json.dumps(mc_exercises3, indent=4))
all_mc_questions.append(mc_exercises3)
@@ -1331,8 +1323,15 @@ class CustomLevelExerciseTypes(Enum):
LISTENING_2 = "listening_2"
LISTENING_3 = "listening_3"
LISTENING_4 = "listening_4"
TRANSFORMATION = "transformation"
GAP_FILLING = "gap_filling"
MATCHING = "matching"
CLOZE = "cloze"
TRUE_FALSE = "true_false"
ERROR_CORRECTION = "error_correction"
# https://www.teachingenglish.org.uk/professional-development/teachers/assessing-learning/articles/test-question-types
@app.route('/custom_level', methods=['GET'])
@jwt_required()
def get_custom_level():
@@ -1348,8 +1347,8 @@ def get_custom_level():
exercise_difficulty = request.args.get('exercise_' + str(i) + '_difficulty',
random.choice(['easy', 'medium', 'hard']))
exercise_qty = int(request.args.get('exercise_' + str(i) + '_qty', -1))
exercise_topic = request.args.get('exercise_' + str(i) + '_topic', random.choice(topics))
exercise_topic_2 = request.args.get('exercise_' + str(i) + '_topic_2', random.choice(topics))
exercise_topic = request.args.get('exercise_' + str(i) + '_topic', None)
exercise_topic_2 = request.args.get('exercise_' + str(i) + '_topic_2', None)
exercise_text_size = int(request.args.get('exercise_' + str(i) + '_text_size', 700))
exercise_sa_qty = int(request.args.get('exercise_' + str(i) + '_sa_qty', -1))
exercise_mc_qty = int(request.args.get('exercise_' + str(i) + '_mc_qty', -1))
@@ -1391,7 +1390,7 @@ def get_custom_level():
qty = exercise_qty
response["exercises"]["exercise_" + str(i)]["questions"].extend(
gen_multiple_choice_blank_space_utas(qty, exercise_id,
gen_multiple_choice_blank_space_utas(qty, exercise_id, exercise_topic,
response["exercises"]["exercise_" + str(i)]["questions"])[
"questions"])
exercise_id = exercise_id + qty
@@ -1420,28 +1419,42 @@ def get_custom_level():
response["exercises"]["exercise_" + str(i)]["type"] = "blankSpaceText"
exercise_id = exercise_id + exercise_qty
elif exercise_type == CustomLevelExerciseTypes.READING_PASSAGE_UTAS.value:
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_reading_passage_utas(exercise_id, exercise_sa_qty,
exercise_mc_qty, exercise_topic)
response["exercises"]["exercise_" + str(i)]["type"] = "readingExercises"
exercise_id = exercise_id + exercise_qty
elif exercise_type == CustomLevelExerciseTypes.WRITING_LETTER.value:
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_writing_task_1(exercise_topic, exercise_difficulty)
response["exercises"]["exercise_" + str(i)]["type"] = "writing"
exercise_id = exercise_id + 1
elif exercise_type == CustomLevelExerciseTypes.WRITING_2.value:
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_writing_task_2(exercise_topic, exercise_difficulty)
response["exercises"]["exercise_" + str(i)]["type"] = "writing"
exercise_id = exercise_id + 1
elif exercise_type == CustomLevelExerciseTypes.SPEAKING_1.value:
if exercise_topic is None:
exercise_topic = random.choice(topics)
if exercise_topic_2 is None:
exercise_topic_2 = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = (
gen_speaking_part_1(exercise_topic, exercise_topic_2, exercise_difficulty))
response["exercises"]["exercise_" + str(i)]["type"] = "interactiveSpeaking"
exercise_id = exercise_id + 1
elif exercise_type == CustomLevelExerciseTypes.SPEAKING_2.value:
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_speaking_part_2(exercise_topic, exercise_difficulty)
response["exercises"]["exercise_" + str(i)]["type"] = "speaking"
exercise_id = exercise_id + 1
elif exercise_type == CustomLevelExerciseTypes.SPEAKING_3.value:
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_speaking_part_3(exercise_topic, exercise_difficulty)
response["exercises"]["exercise_" + str(i)]["type"] = "interactiveSpeaking"
exercise_id = exercise_id + 1
@@ -1466,6 +1479,9 @@ def get_custom_level():
exercise_qty_q.put(exercise_paragraphmatch_qty)
total_qty = total_qty + exercise_paragraphmatch_qty
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_reading_passage_1(exercise_topic, exercise_difficulty,
exercises, exercise_qty_q, exercise_id)
response["exercises"]["exercise_" + str(i)]["type"] = "reading"
@@ -1492,6 +1508,9 @@ def get_custom_level():
exercise_qty_q.put(exercise_paragraphmatch_qty)
total_qty = total_qty + exercise_paragraphmatch_qty
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_reading_passage_2(exercise_topic, exercise_difficulty,
exercises, exercise_qty_q, exercise_id)
response["exercises"]["exercise_" + str(i)]["type"] = "reading"
@@ -1522,6 +1541,9 @@ def get_custom_level():
exercise_qty_q.put(exercise_ideamatch_qty)
total_qty = total_qty + exercise_ideamatch_qty
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_reading_passage_3(exercise_topic, exercise_difficulty,
exercises, exercise_qty_q, exercise_id)
response["exercises"]["exercise_" + str(i)]["type"] = "reading"
@@ -1548,6 +1570,9 @@ def get_custom_level():
exercise_qty_q.put(exercise_writeblanksform_qty)
total_qty = total_qty + exercise_writeblanksform_qty
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_listening_section_1(exercise_topic, exercise_difficulty,
exercises, exercise_qty_q,
exercise_id)
@@ -1567,6 +1592,9 @@ def get_custom_level():
exercise_qty_q.put(exercise_writeblanksquestions_qty)
total_qty = total_qty + exercise_writeblanksquestions_qty
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_listening_section_2(exercise_topic, exercise_difficulty,
exercises, exercise_qty_q,
exercise_id)
@@ -1586,6 +1614,9 @@ def get_custom_level():
exercise_qty_q.put(exercise_writeblanksquestions_qty)
total_qty = total_qty + exercise_writeblanksquestions_qty
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_listening_section_3(exercise_topic, exercise_difficulty,
exercises, exercise_qty_q,
exercise_id)
@@ -1613,12 +1644,57 @@ def get_custom_level():
exercise_qty_q.put(exercise_writeblanksform_qty)
total_qty = total_qty + exercise_writeblanksform_qty
if exercise_topic is None:
exercise_topic = random.choice(topics)
response["exercises"]["exercise_" + str(i)] = gen_listening_section_4(exercise_topic, exercise_difficulty,
exercises, exercise_qty_q,
exercise_id)
response["exercises"]["exercise_" + str(i)]["type"] = "listening"
exercise_id = exercise_id + total_qty
elif exercise_type == CustomLevelExerciseTypes.TRANSFORMATION.value:
response["exercises"]["exercise_" + str(i)] = gen_transformation_exercise(exercise_qty,
exercise_id,
exercise_difficulty,
exercise_topic)
response["exercises"]["exercise_" + str(i)]["type"] = "transformation"
exercise_id = exercise_id + exercise_qty
elif exercise_type == CustomLevelExerciseTypes.GAP_FILLING.value:
response["exercises"]["exercise_" + str(i)] = gen_gap_filling_exercise(exercise_qty,
exercise_id,
exercise_difficulty,
exercise_topic)
response["exercises"]["exercise_" + str(i)]["type"] = "gapFilling"
exercise_id = exercise_id + exercise_qty
elif exercise_type == CustomLevelExerciseTypes.MATCHING.value:
response["exercises"]["exercise_" + str(i)] = gen_grammar_matching_exercise(exercise_qty,
exercise_id,
exercise_difficulty,
exercise_topic)
response["exercises"]["exercise_" + str(i)]["type"] = "matchSentences"
exercise_id = exercise_id + exercise_qty
elif exercise_type == CustomLevelExerciseTypes.CLOZE.value:
response["exercises"]["exercise_" + str(i)] = gen_cloze_exercise(exercise_qty,
exercise_id,
exercise_difficulty,
exercise_topic)
response["exercises"]["exercise_" + str(i)]["type"] = "writeBlanks"
exercise_id = exercise_id + exercise_qty
elif exercise_type == CustomLevelExerciseTypes.TRUE_FALSE.value:
response["exercises"]["exercise_" + str(i)] = gen_true_false_exercise(exercise_qty,
exercise_id,
exercise_difficulty,
exercise_topic)
response["exercises"]["exercise_" + str(i)]["type"] = "trueFalse"
exercise_id = exercise_id + exercise_qty
elif exercise_type == CustomLevelExerciseTypes.ERROR_CORRECTION.value:
response["exercises"]["exercise_" + str(i)] = gen_error_correction_exercise(exercise_qty,
exercise_id,
exercise_difficulty,
exercise_topic)
response["exercises"]["exercise_" + str(i)]["type"] = "questionAnswer"
exercise_id = exercise_id + exercise_qty
return response
@@ -1698,29 +1774,8 @@ def grading_summary():
@jwt_required()
def training_content():
try:
return tc_service.get_tips(request.get_json())
except Exception as e:
app.logger.error(str(e))
return str(e)
# TODO: create a doc in firestore with a status and get its id, run this in a thread and modify the doc in firestore,
# return the id right away, in generation view poll for the id
@app.route('/upload_level', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return 'File wasn\'t uploaded', 400
file = request.files['file']
if file.filename == '':
return 'No selected file', 400
if file:
return upload_level_service.generate_level_from_file(file), 200
@app.route('/batch_users', methods=['POST'])
def create_users_batch():
try:
return batch_users_service.batch_users(request.get_json())
data = request.get_json()
return tc_service.get_tips(data)
except Exception as e:
app.logger.error(str(e))
return str(e)

View File

@@ -659,3 +659,19 @@ academic_subjects = [
"Ecology",
"International Business"
]
grammar_types = [
"parts of speech",
"parts of speech - Nouns",
"parts of speech - Pronouns",
"parts of speech - Verbs",
"parts of speech - Adverbs",
"parts of speech - Adjectives",
"parts of speech - Conjunctions",
"parts of speech - Prepositions",
"parts of speech - Interjections",
"sentence structure",
"types of sentences",
"tenses",
"active voice and passive voice"
]

View File

@@ -1443,18 +1443,29 @@ def parse_conversation(conversation_data):
return "\n".join(readable_text)
def gen_multiple_choice_blank_space_utas(quantity: int, start_id: int, all_exams=None):
def gen_multiple_choice_blank_space_utas(quantity: int, start_id: int, topic=None, all_exams=None):
gen_multiple_choice_for_text = "Generate " + str(
quantity) + " multiple choice blank space questions of 4 options for an english level exam, some easy questions, some intermediate " \
"questions and some advanced questions. Ensure that the questions cover a range of topics such as " \
"verb tense, subject-verb agreement, pronoun usage, sentence structure, and punctuation. Make sure " \
"every question only has 1 correct answer."
quantity) + (" multiple choice blank space questions of 4 options for an english level exam, some easy "
"questions, some intermediate questions and some advanced questions. Make sure every question "
"only has 1 correct answer.")
if topic is None:
gen_multiple_choice_for_text = gen_multiple_choice_for_text + ("Ensure that the questions cover a range of "
"topics such as verb tense, subject-verb "
"agreement, pronoun usage, sentence structure, "
"and punctuation.")
else:
gen_multiple_choice_for_text = gen_multiple_choice_for_text + ("Ensure that the questions are fill the blanks "
"and cover the grammar "
"topic of '" + topic + "' and the prompts "
"are varied.")
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: {"questions": [{"id": "9", "options": '
'You are a helpful assistant designed to output JSON on this format: '
'{"questions": [{"id": "9", "options": '
'[{"id": "A", "text": '
'"And"}, {"id": "B", "text": "Cat"}, {"id": "C", "text": '
'"Happy"}, {"id": "D", "text": "Jump"}], '
@@ -1473,7 +1484,7 @@ def gen_multiple_choice_blank_space_utas(quantity: int, start_id: int, all_exams
GEN_QUESTION_TEMPERATURE)
if len(question["questions"]) != quantity:
return gen_multiple_choice_blank_space_utas(quantity, start_id)
return gen_multiple_choice_blank_space_utas(quantity, start_id, topic, all_exams)
else:
if all_exams is not None:
seen_keys = set()
@@ -2089,3 +2100,309 @@ def gen_listening_section_4(topic, difficulty, req_exercises, number_of_exercise
"text": monologue,
"difficulty": difficulty
}
def gen_transformation_exercise(quantity, start_id, difficulty, topic=None):
json_format = {
"exercises": [
{
"id": 1,
"first": "first sentence",
"second": "second sentence",
"solutions": ["first_missing_word", "second_missing_word"]
}
]
}
messages = [
{
"role": "system",
"content": 'You are a helpful assistant designed to output JSON on this format: ' + str(json_format)
},
{
"role": "user",
"content": ("Create " + str(quantity) + " transformation exercises of " + difficulty + " where the student "
"has to complete the "
"second sentences' 2 blank spaces so that it has the same meaning "
"as the first. Each blank space must correspond to a single word.")
},
{
"role": "user",
"content": 'The id starts at ' + str(start_id) + '.'
}
]
if topic is not None:
messages.append({
"role": "user",
"content": 'Focus the exercises on the grammar subject of ' + topic + '.'
})
token_count = count_total_tokens(messages)
response = make_openai_call(GPT_4_O, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
response["prompt"] = "Complete the second sentence so that it has the same meaning as the first."
response["difficulty"] = difficulty
response["topic"] = topic
return response
def gen_gap_filling_exercise(quantity, start_id, difficulty, topic=None):
json_format = {
"exercises": [
{
"id": 1,
"question": "sentence with a blank space to fill",
"solutions": ["option 1", "option 2"]
}
]
}
messages = [
{
"role": "system",
"content": 'You are a helpful assistant designed to output JSON on this format: ' + str(json_format)
},
{
"role": "user",
"content": ("Create " + str(quantity) + " gap filling exercises of " + difficulty + " where the student "
"has to complete the "
"sentence's blank space (signaled as {{id}}) so that it makes sense. "
"The blank space must correspond to a single word.")
},
{
"role": "user",
"content": 'The id starts at ' + str(start_id) + '.'
}
]
if topic is not None:
messages.append({
"role": "user",
"content": 'Focus the exercises on the grammar subject of ' + topic + '.'
})
token_count = count_total_tokens(messages)
response = make_openai_call(GPT_4_O, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
response["prompt"] = "Complete the sentence."
response["difficulty"] = difficulty
response["topic"] = topic
return response
def gen_grammar_matching_exercise(quantity, start_id, difficulty, topic=None):
json_format = {
"matching_pairs": [
{
"left": "word/sentence on left",
"right": "word/sentence on right",
}
]
}
messages = [
{
"role": "system",
"content": 'You are a helpful assistant designed to output JSON on this format: ' + str(json_format)
},
{
"role": "user",
"content": ("Create " + str(quantity) + " grammar related matching exercises "
"of " + difficulty + " where the student has to match the "
"words/sentences on the left with "
"words/sentences on the right.")
}
]
if topic is not None:
messages.append({
"role": "user",
"content": 'Focus the exercises on the grammar subject of ' + topic + '.'
})
token_count = count_total_tokens(messages)
response = make_openai_call(GPT_4_O, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
return {
"allowRepetition": False,
"options": build_options_grammar_matching(response["matching_pairs"]),
"prompt": "Match the words/sentences on the left with the ones on the right.",
"sentences": build_sentences_grammar_matching(response["matching_pairs"], start_id),
"type": "matchSentences",
"difficulty": difficulty,
"topic": topic
}
def gen_cloze_exercise(quantity, start_id, difficulty, topic=None):
json_format = {
"text": "the text {{1}} blank spaces {{2}} it",
"solutions": [
{
"id": 1,
"solution": [
"with"
]
},
{
"id": 2,
"word": [
"on"
]
}
]
}
messages = [
{
"role": "system",
"content": 'You are a helpful assistant designed to output JSON on this format: ' + str(json_format)
},
{
"role": "user",
"content": ("Generate a text for a cloze exercise with " + str(quantity) + " blank spaces to fill of " + difficulty + " where the student "
"has to complete the "
"blank spaces (signaled as {{id}}) on the text so that it makes sense. "
"Each blank space must correspond to a single word.")
},
{
"role": "user",
"content": 'The id starts at ' + str(start_id) + '.'
}
]
if topic is not None:
messages.append({
"role": "user",
"content": 'Focus the exercises on the grammar subject of ' + topic + '.'
})
token_count = count_total_tokens(messages)
response = make_openai_call(GPT_4_O, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
response["prompt"] = "Complete the text by adding a word to each gap."
response["difficulty"] = difficulty
response["topic"] = topic
response["maxWords"] = 1
return response
def gen_true_false_exercise(quantity: int, start_id, difficulty, topic=None):
json_format = {
"questions": [
{
"id": 1,
"prompt": "statement_1",
"solution": "true/false"
},
{
"id": 2,
"prompt": "statement_2",
"solution": "true/false"
}
]
}
messages = [
{
"role": "system",
"content": 'You are a helpful assistant designed to output JSON on this format: ' + str(json_format)
},
{
"role": "user",
"content": (
'Generate ' + str(
quantity) + ' ' + difficulty + ' difficulty grammar related statements for a true or false exercise.')
},
{
"role": "user",
"content": 'The id starts at ' + str(start_id) + '.'
}
]
if topic is not None:
messages.append({
"role": "user",
"content": 'Focus the exercises on the grammar subject of ' + topic + '.'
})
token_count = count_total_tokens(messages)
response = make_openai_call(GPT_4_O, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
response["prompt"] = "Decide if the statements are true or false."
response["difficulty"] = difficulty
response["topic"] = topic
return response
def gen_error_correction_exercise(quantity: int, start_id, difficulty, topic=None):
json_format = {
"questions": [
{
"id": 1,
"prompt": "sentence with errors",
"solution": "corrected sentence"
},
{
"id": 2,
"prompt": "sentence with errors",
"solution": "corrected sentence"
}
]
}
messages = [
{
"role": "system",
"content": 'You are a helpful assistant designed to output JSON on this format: ' + str(json_format)
},
{
"role": "user",
"content": (
'Generate ' + str(
quantity) + ' ' + difficulty + ' difficulty grammatically incorrect sentences for an exercise where '
'the user has to fix the sentence.')
},
{
"role": "user",
"content": 'The id starts at ' + str(start_id) + '.'
}
]
if topic is not None:
messages.append({
"role": "user",
"content": 'Focus the exercises on the grammar subject of ' + topic + '.'
})
token_count = count_total_tokens(messages)
response = make_openai_call(GPT_4_O, messages, token_count, GEN_FIELDS, GEN_QUESTION_TEMPERATURE)
response["prompt"] = "Find the mistakes in the sentence and correct them."
response["difficulty"] = difficulty
response["topic"] = topic
return response
def build_options_grammar_matching(pairs):
options = []
letters = iter(string.ascii_uppercase)
for pair in pairs:
options.append({
"id": next(letters),
"sentence": pair["left"]
})
return options
def build_sentences_grammar_matching(pairs, start_id):
sentences = []
letters = iter(string.ascii_uppercase)
for pair in pairs:
sentences.append({
"solution": next(letters),
"sentence": pair["right"]
})
random.shuffle(sentences)
for i, sentence in enumerate(sentences, start=start_id):
sentence["id"] = i
return sentences

View File

@@ -1,5 +0,0 @@
from .gpt import GPT
__all__ = [
"GPT"
]

View File

@@ -1,5 +0,0 @@
from .service import BatchUsers
__all__ = [
"BatchUsers"
]

View File

@@ -1,31 +0,0 @@
import uuid
from typing import Optional
from pydantic import BaseModel, Field
from datetime import datetime
class DemographicInfo(BaseModel):
phone: str
passport_id: Optional[str] = None
country: Optional[str] = None
class UserDTO(BaseModel):
id: uuid.UUID = Field(default_factory=uuid.uuid4)
email: str
name: str
type: str
passport_id: str
passwordHash: str
passwordSalt: str
groupName: Optional[str] = None
corporate: Optional[str] = None
studentID: Optional[str] = None
expiryDate: Optional[str] = None
demographicInformation: Optional[DemographicInfo] = None
class BatchUsersDTO(BaseModel):
makerID: str
users: list[UserDTO]

View File

@@ -1,261 +0,0 @@
import os
import subprocess
import time
import uuid
from datetime import datetime
from logging import getLogger
import pandas as pd
from typing import Dict
import shortuuid
from google.cloud.firestore_v1 import Client
from google.cloud.firestore_v1.base_query import FieldFilter
from modules.batch_users.batch_users import BatchUsersDTO, UserDTO
from modules.helper.file_helper import FileHelper
class BatchUsers:
_DEFAULT_DESIRED_LEVELS = {
"reading": 9,
"listening": 9,
"writing": 9,
"speaking": 9,
}
_DEFAULT_LEVELS = {
"reading": 0,
"listening": 0,
"writing": 0,
"speaking": 0,
}
def __init__(self, firestore: Client):
self._db = firestore
self._logger = getLogger(__name__)
def batch_users(self, request_data: Dict):
batch_dto = self._map_to_batch(request_data)
file_name = f'{uuid.uuid4()}.csv'
path = f'./tmp/{file_name}'
self._generate_firebase_auth_csv(batch_dto, path)
result = self._upload_users('./tmp', file_name)
if result.returncode != 0:
error_msg = f"Couldn't upload users. Failed to run command firebase auth import -> ```cmd {result.stderr}```"
self._logger.error(error_msg)
return error_msg
self._init_users(batch_dto)
FileHelper.remove_file(path)
return {"ok": True}
@staticmethod
def _map_to_batch(request_data: Dict) -> BatchUsersDTO:
users: list[UserDTO] = [UserDTO(**user) for user in request_data["users"]]
return BatchUsersDTO(makerID=request_data["makerID"], users=users)
@staticmethod
def _generate_firebase_auth_csv(batch_dto: BatchUsersDTO, path: str):
# https://firebase.google.com/docs/cli/auth#file_format
columns = [
'UID', 'Email', 'Email Verified', 'Password Hash', 'Password Salt', 'Name',
'Photo URL', 'Google ID', 'Google Email', 'Google Display Name', 'Google Photo URL',
'Facebook ID', 'Facebook Email', 'Facebook Display Name', 'Facebook Photo URL',
'Twitter ID', 'Twitter Email', 'Twitter Display Name', 'Twitter Photo URL',
'GitHub ID', 'GitHub Email', 'GitHub Display Name', 'GitHub Photo URL',
'User Creation Time', 'Last Sign-In Time', 'Phone Number'
]
users_data = []
current_time = int(time.time() * 1000)
for user in batch_dto.users:
user_data = {
'UID': str(user.id),
'Email': user.email,
'Email Verified': False,
'Password Hash': user.passwordHash,
'Password Salt': user.passwordSalt,
'Name': '',
'Photo URL': '',
'Google ID': '',
'Google Email': '',
'Google Display Name': '',
'Google Photo URL': '',
'Facebook ID': '',
'Facebook Email': '',
'Facebook Display Name': '',
'Facebook Photo URL': '',
'Twitter ID': '',
'Twitter Email': '',
'Twitter Display Name': '',
'Twitter Photo URL': '',
'GitHub ID': '',
'GitHub Email': '',
'GitHub Display Name': '',
'GitHub Photo URL': '',
'User Creation Time': current_time,
'Last Sign-In Time': '',
'Phone Number': ''
}
users_data.append(user_data)
df = pd.DataFrame(users_data, columns=columns)
df.to_csv(path, index=False, header=False)
@staticmethod
def _upload_users(directory: str, file_name: str):
command = (
f'firebase auth:import {file_name} '
f'--hash-algo=SCRYPT '
f'--hash-key={os.getenv("FIREBASE_SCRYPT_B64_SIGNER_KEY")} '
f'--salt-separator={os.getenv("FIREBASE_SCRYPT_B64_SALT_SEPARATOR")} '
f'--rounds={os.getenv("FIREBASE_SCRYPT_ROUNDS")} '
f'--mem-cost={os.getenv("FIREBASE_SCRYPT_MEM_COST")} '
f'--project={os.getenv("FIREBASE_PROJECT_ID")} '
)
result = subprocess.run(command, shell=True, cwd=directory, capture_output=True, text=True)
return result
def _init_users(self, batch_users: BatchUsersDTO):
maker_id = batch_users.makerID
for user in batch_users.users:
self._insert_new_user(user)
code = self._create_code(user, maker_id)
if user.type == "corporate":
self._set_corporate_default_groups(user)
if user.corporate:
self._assign_corporate_to_user(user, code)
if user.groupName and len(user.groupName.strip()) > 0:
self._assign_user_to_group_by_name(user, maker_id)
def _insert_new_user(self, user: UserDTO):
new_user = {
**user.dict(exclude={
'id', 'passport_id', 'groupName', 'expiryDate',
'corporate', 'passwordHash', 'passwordSalt'
}),
'bio': "",
'focus': "academic",
'status': "active",
'desiredLevels': self._DEFAULT_DESIRED_LEVELS,
'profilePicture': "/defaultAvatar.png",
'levels': self._DEFAULT_LEVELS,
'isFirstLogin': False,
'isVerified': True,
'registrationDate': datetime.now(),
'subscriptionExpirationDate': user.expiryDate
}
self._db.collection('users').document(str(user.id)).set(new_user)
def _create_code(self, user: UserDTO, maker_id: str) -> str:
code = shortuuid.ShortUUID().random(length=6)
self._db.collection('codes').document(code).set({
'code': code,
'creator': maker_id,
'expiryDate': user.expiryDate,
'type': user.type,
'creationDate': datetime.now(),
'userId': str(user.id),
'email': user.email,
'name': user.name,
'passport_id': user.passport_id
})
return code
def _set_corporate_default_groups(self, user: UserDTO):
user_id = str(user.id)
default_groups = [
{
'admin': user_id,
'id': str(uuid.uuid4()),
'name': "Teachers",
'participants': [],
'disableEditing': True,
},
{
'admin': user_id,
'id': str(uuid.uuid4()),
'name': "Students",
'participants': [],
'disableEditing': True,
},
{
'admin': user_id,
'id': str(uuid.uuid4()),
'name': "Corporate",
'participants': [],
'disableEditing': True,
}
]
for group in default_groups:
self._db.collection('groups').document(group['id']).set(group)
def _assign_corporate_to_user(self, user: UserDTO, code: str):
user_id = str(user.id)
corporate_users = self._db.collection('users').where(
filter=FieldFilter('email', '==', user.corporate)
).limit(1).get()
if len(corporate_users) > 0:
corporate_user = corporate_users[0]
self._db.collection('codes').document(code).set({'creator': corporate_user.id}, merge=True)
group_type = "Students" if user.type == "student" else "Teachers"
groups = self._db.collection('groups').where(
filter=FieldFilter('admin', '==', corporate_user.id)
).where(
filter=FieldFilter('name', '==', group_type)
).limit(1).get()
if len(groups) > 0:
group = groups[0]
participants = group.get('participants')
if user_id not in participants:
participants.append(user_id)
group.reference.update({'participants': participants})
else:
group = {
'admin': corporate_user.id,
'id': str(uuid.uuid4()),
'name': group_type,
'participants': [user_id],
'disableEditing': True,
}
self._db.collection('groups').document(group['id']).set(group)
def _assign_user_to_group_by_name(self, user: UserDTO, maker_id: str):
user_id = str(user.id)
groups = self._db.collection('groups').where(
filter=FieldFilter('admin', '==', maker_id)
).where(
filter=FieldFilter('name', '==', user.groupName.strip())
).limit(1).get()
if len(groups) == 0:
new_group = {
'id': str(uuid.uuid4()),
'admin': maker_id,
'name': user.groupName.strip(),
'participants': [user_id],
'disableEditing': False,
}
self._db.collection('groups').document(new_group['id']).set(new_group)
else:
group = groups[0]
participants = group.get('participants')
if user_id not in participants:
participants.append(user_id)
group.reference.update({'participants': participants})

View File

@@ -1,5 +0,0 @@
from .logger import LoggerHelper
__all__ = [
"LoggerHelper"
]

View File

@@ -1,97 +0,0 @@
import base64
import io
import os
import shutil
import subprocess
import uuid
from typing import Optional, Tuple
import numpy as np
import pypandoc
from PIL import Image
class FileHelper:
# Supposedly pandoc covers a wide range of file extensions only tested with docx
@staticmethod
def convert_file_to_pdf(input_path: str, output_path: str):
pypandoc.convert_file(input_path, 'pdf', outputfile=output_path, extra_args=[
'-V', 'geometry:paperwidth=5.5in',
'-V', 'geometry:paperheight=8.5in',
'-V', 'geometry:margin=0.5in',
'-V', 'pagestyle=empty'
])
@staticmethod
def convert_file_to_html(input_path: str, output_path: str):
pypandoc.convert_file(input_path, 'html', outputfile=output_path)
@staticmethod
def pdf_to_png(path_id: str):
to_png = f"pdftoppm -png exercises.pdf page"
result = subprocess.run(to_png, shell=True, cwd=f'./tmp/{path_id}', capture_output=True, text=True)
if result.returncode != 0:
raise Exception(
f"Couldn't convert pdf to png. Failed to run command '{to_png}' -> ```cmd {result.stderr}```")
@staticmethod
def is_page_blank(image_bytes: bytes, image_threshold=10) -> bool:
with Image.open(io.BytesIO(image_bytes)) as img:
img_gray = img.convert('L')
img_array = np.array(img_gray)
non_white_pixels = np.sum(img_array < 255)
return non_white_pixels <= image_threshold
@classmethod
def _encode_image(cls, image_path: str, image_threshold=10) -> Optional[str]:
with open(image_path, "rb") as image_file:
image_bytes = image_file.read()
if cls.is_page_blank(image_bytes, image_threshold):
return None
return base64.b64encode(image_bytes).decode('utf-8')
@classmethod
def b64_pngs(cls, path_id: str, files: list[str]):
png_messages = []
for filename in files:
b64_string = cls._encode_image(os.path.join(f'./tmp/{path_id}', filename))
if b64_string:
png_messages.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{b64_string}"
}
})
return png_messages
@staticmethod
def remove_directory(path):
try:
if os.path.exists(path):
if os.path.isdir(path):
shutil.rmtree(path)
except Exception as e:
print(f"An error occurred while trying to remove {path}: {str(e)}")
@staticmethod
def remove_file(file_path):
try:
if os.path.exists(file_path):
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"An error occurred while trying to remove the file {file_path}: {str(e)}")
@staticmethod
def save_upload(file) -> Tuple[str, str]:
ext = file.filename.split('.')[-1]
path_id = str(uuid.uuid4())
os.makedirs(f'./tmp/{path_id}', exist_ok=True)
tmp_filename = f'./tmp/{path_id}/uploaded.{ext}'
file.save(tmp_filename)
return ext, path_id

View File

@@ -1,23 +0,0 @@
import logging
from functools import wraps
class LoggerHelper:
@staticmethod
def suppress_loggers():
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
root_logger = logging.getLogger()
original_level = root_logger.level
root_logger.setLevel(logging.ERROR)
try:
return f(*args, **kwargs)
finally:
root_logger.setLevel(original_level)
return wrapped
return decorator

View File

@@ -1,5 +0,0 @@
from .service import UploadLevelService
__all__ = [
"UploadLevelService"
]

View File

@@ -1,57 +0,0 @@
from pydantic import BaseModel, Field
from typing import List, Dict, Union, Optional, Any
from uuid import uuid4, UUID
class Option(BaseModel):
id: str
text: str
class MultipleChoiceQuestion(BaseModel):
id: str
prompt: str
variant: str = "text"
solution: str
options: List[Option]
class MultipleChoiceExercise(BaseModel):
id: UUID = Field(default_factory=uuid4)
type: str = "multipleChoice"
prompt: str = "Select the appropriate option."
questions: List[MultipleChoiceQuestion]
userSolutions: List = Field(default_factory=list)
class FillBlanksWord(BaseModel):
id: str
options: Dict[str, str]
class FillBlanksSolution(BaseModel):
id: str
solution: str
class FillBlanksExercise(BaseModel):
id: UUID = Field(default_factory=uuid4)
type: str = "fillBlanks"
variant: str = "mc"
prompt: str = "Click a blank to select the appropriate word for it."
text: str
solutions: List[FillBlanksSolution]
words: List[FillBlanksWord]
userSolutions: List = Field(default_factory=list)
Exercise = Union[MultipleChoiceExercise, FillBlanksExercise]
class Part(BaseModel):
exercises: List[Exercise]
context: Optional[str] = Field(default=None)
class Exam(BaseModel):
parts: List[Part]

View File

@@ -1,66 +0,0 @@
from typing import Dict, Any
from pydantic import ValidationError
from modules.upload_level.exam_dtos import (
MultipleChoiceExercise,
FillBlanksExercise,
Part, Exam
)
from modules.upload_level.sheet_dtos import Sheet, Option, MultipleChoiceQuestion, FillBlanksWord
class ExamMapper:
@staticmethod
def map_to_exam_model(response: Dict[str, Any]) -> Exam:
parts = []
for part in response['parts']:
part_exercises = part['exercises']
context = part.get('context', None)
exercises = []
for exercise in part_exercises:
exercise_type = exercise['type']
if exercise_type == 'multipleChoice':
exercise_model = MultipleChoiceExercise(**exercise)
elif exercise_type == 'fillBlanks':
exercise_model = FillBlanksExercise(**exercise)
else:
raise ValidationError(f"Unknown exercise type: {exercise_type}")
exercises.append(exercise_model)
part_kwargs = {"exercises": exercises}
if context is not None:
part_kwargs["context"] = context
part_model = Part(**part_kwargs)
parts.append(part_model)
return Exam(parts=parts)
@staticmethod
def map_to_sheet(response: Dict[str, Any]) -> Sheet:
components = []
for item in response["components"]:
component_type = item["type"]
if component_type == "multipleChoice":
options = [Option(id=opt["id"], text=opt["text"]) for opt in item["options"]]
components.append(MultipleChoiceQuestion(
id=item["id"],
prompt=item["prompt"],
variant=item.get("variant", "text"),
options=options
))
elif component_type == "fillBlanks":
components.append(FillBlanksWord(
id=item["id"],
options=item["options"]
))
else:
components.append(item)
return Sheet(components=components)

View File

@@ -1,385 +0,0 @@
import json
import os
import uuid
from logging import getLogger
from typing import Dict, Any, Tuple, Callable
import pdfplumber
from modules import GPT
from modules.helper.file_helper import FileHelper
from modules.helper import LoggerHelper
from modules.upload_level.exam_dtos import Exam
from modules.upload_level.mapper import ExamMapper
from modules.upload_level.sheet_dtos import Sheet
class UploadLevelService:
def __init__(self, openai: GPT):
self._logger = getLogger(__name__)
self._llm = openai
def generate_level_from_file(self, file) -> Dict[str, Any] | None:
ext, path_id = FileHelper.save_upload(file)
FileHelper.convert_file_to_pdf(
f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.pdf'
)
file_has_images = self._check_pdf_for_images(f'./tmp/{path_id}/exercises.pdf')
if not file_has_images:
FileHelper.convert_file_to_html(f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.html')
completion: Callable[[str], Exam] = self._png_completion if file_has_images else self._html_completion
response = completion(path_id)
FileHelper.remove_directory(f'./tmp/{path_id}')
if response:
return self.fix_ids(response.dict(exclude_none=True))
return None
@staticmethod
@LoggerHelper.suppress_loggers()
def _check_pdf_for_images(pdf_path: str) -> bool:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
if page.images:
return True
return False
def _level_json_schema(self):
return {
"parts": [
{
"context": "<this attribute is optional you may exclude it if not required>",
"exercises": [
self._multiple_choice_html(),
self._passage_blank_space_html()
]
}
]
}
def _html_completion(self, path_id: str) -> Exam:
with open(f'./tmp/{path_id}/exercises.html', 'r', encoding='utf-8') as f:
html = f.read()
return self._llm.prediction(
[self._gpt_instructions_html(),
{
"role": "user",
"content": html
}
],
ExamMapper.map_to_exam_model,
str(self._level_json_schema())
)
def _gpt_instructions_html(self):
return {
"role": "system",
"content": (
'You are GPT Scraper and your job is to clean dirty html into clean usable JSON formatted data.'
'Your current task is to scrape html english questions sheets.\n\n'
'In the question sheet you will only see 4 types of question:\n'
'- blank space multiple choice\n'
'- underline multiple choice\n'
'- reading passage blank space multiple choice\n'
'- reading passage multiple choice\n\n'
'For the first two types of questions the template is the same but the question prompts differ, '
'whilst in the blank space multiple choice you must include in the prompt the blank spaces with '
'multiple "_", in the underline you must include in the prompt the <u></u> to '
'indicate the underline and the options a, b, c, d must be the ordered underlines in the prompt.\n\n'
'For the reading passage exercise you must handle the formatting of the passages. If it is a '
'reading passage with blank spaces you will see blanks represented with (question id) followed by a '
'line and your job is to replace the brackets with the question id and line with "{{question id}}" '
'with 2 newlines between paragraphs. For the reading passages without blanks you must remove '
'any numbers that may be there to specify paragraph numbers or line numbers, and place 2 newlines '
'between paragraphs.\n\n'
'IMPORTANT: Note that for the reading passages, the html might not reflect the actual paragraph '
'structure, don\'t format the reading passages paragraphs only by the <p></p> tags, try to figure '
'out the best paragraph separation possible.'
'You will place all the information in a single JSON: {"parts": [{"exercises": [{...}], "context": ""}]}\n '
'Where {...} are the exercises templates for each part of a question sheet and the optional field '
'context.'
'IMPORTANT: The question sheet may be divided by sections but you need to only consider the parts, '
'so that you can group the exercises by the parts that are in the html, this is crucial since only '
'reading passage multiple choice require context and if the context is included in parts where it '
'is not required the UI will be messed up. Some make sure to correctly group the exercises by parts.\n'
'The templates for the exercises are the following:\n'
'- blank space multiple choice, underline multiple choice and reading passage multiple choice: '
f'{self._multiple_choice_html()}\n'
f'- reading passage blank space multiple choice: {self._passage_blank_space_html()}\n'
'IMPORTANT: For the reading passage multiple choice the context field must be set with the reading '
'passages without paragraphs or line numbers, with 2 newlines between paragraphs, for the other '
'exercises exclude the context field.'
)
}
@staticmethod
def _multiple_choice_html():
return {
"type": "multipleChoice",
"prompt": "Select the appropriate option.",
"questions": [
{
"id": "<the question id>",
"prompt": "<the question>",
"solution": "<the option id solution>",
"options": [
{
"id": "A",
"text": "<the a option>"
},
{
"id": "B",
"text": "<the b option>"
},
{
"id": "C",
"text": "<the c option>"
},
{
"id": "D",
"text": "<the d option>"
}
]
}
]
}
@staticmethod
def _passage_blank_space_html():
return {
"type": "fillBlanks",
"variant": "mc",
"prompt": "Click a blank to select the appropriate word for it.",
"text": (
"<The whole text for the exercise with replacements for blank spaces and their "
"ids with {{<question id>}} with 2 newlines between paragraphs>"
),
"solutions": [
{
"id": "<question id>",
"solution": "<the option that holds the solution>"
}
],
"words": [
{
"id": "<question id>",
"options": {
"A": "<a option>",
"B": "<b option>",
"C": "<c option>",
"D": "<d option>"
}
}
]
}
def _png_completion(self, path_id: str) -> Exam:
FileHelper.pdf_to_png(path_id)
tmp_files = os.listdir(f'./tmp/{path_id}')
pages = [f for f in tmp_files if f.startswith('page-') and f.endswith('.png')]
pages.sort(key=lambda f: int(f.split('-')[1].split('.')[0]))
json_schema = {
"components": [
{"type": "part", "part": "<name or number of the part>"},
self._multiple_choice_png(),
{"type": "blanksPassage", "text": (
"<The whole text for the exercise with replacements for blank spaces and their "
"ids with {{<question id>}} with 2 newlines between paragraphs>"
)},
{"type": "passage", "context": (
"<reading passages without paragraphs or line numbers, with 2 newlines between paragraphs>"
)},
self._passage_blank_space_png()
]
}
components = []
for i in range(len(pages)):
current_page = pages[i]
next_page = pages[i + 1] if i + 1 < len(pages) else None
batch = [current_page, next_page] if next_page else [current_page]
sheet = self._png_batch(path_id, batch, json_schema)
sheet.batch = i + 1
components.append(sheet.dict())
batches = {"batches": components}
with open('output.json', 'w') as json_file:
json.dump(batches, json_file, indent=4)
return self._batches_to_exam_completion(batches)
def _png_batch(self, path_id: str, files: list[str], json_schema) -> Sheet:
return self._llm.prediction(
[self._gpt_instructions_png(),
{
"role": "user",
"content": [
*FileHelper.b64_pngs(path_id, files)
]
}
],
ExamMapper.map_to_sheet,
str(json_schema)
)
def _gpt_instructions_png(self):
return {
"role": "system",
"content": (
'You are GPT OCR and your job is to scan image text data and format it to JSON format.'
'Your current task is to scan english questions sheets.\n\n'
'You will place all the information in a single JSON: {"components": [{...}]} where {...} is a set of '
'sheet components you will retrieve from the images, the components and their corresponding JSON '
'templates are as follows:\n'
'- Part, a standalone part or part of a section of the question sheet: '
'{"type": "part", "part": "<name or number of the part>"}\n'
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
'the prompt field of the template: blanks, underlines and normal. '
'In the blanks prompt you must leave 5 underscores to represent the blank space. '
'In the underlines questions the objective is to pick the words that are incorrect in the given '
'sentence, for these questions you must wrap the answer to the question with the html tag <u></u>, '
'choose 3 other words to wrap in <u></u>, place them in the prompt field and use the underlined words '
'in the order they appear in the question for the options A to D, disreguard options that might be '
'included underneath the underlines question and use the ones you wrapped in <u></u>.'
'In normal you just leave the question as is. '
f'The template for multiple choice questions is the following: {self._multiple_choice_png()}.\n'
'- Reading Passages, there are two types of reading passages. Reading passages where you will see '
'blanks represented by a (question id) followed by a line, you must format these types of reading '
'passages to be only the text with the brackets that have the question id and line replaced with '
'"{{question id}}", also place 2 newlines between paragraphs. For the reading passages without blanks '
'you must remove any numbers that may be there to specify paragraph numbers or line numbers, '
'and place 2 newlines between paragraphs. '
'For the reading passages with blanks the template is: {"type": "blanksPassage", '
'"text": "<The whole text for the exercise with replacements for blank spaces and their '
'ids that are enclosed in brackets with {{<question id>}} also place 2 newlines between paragraphs>"}. '
'For the reading passage without blanks is: {"type": "passage", "context": "<reading passages without '
'paragraphs or line numbers, with 2 newlines between paragraphs>"}\n'
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
'options with the question id and the options from a to d. The template is: '
f'{self._passage_blank_space_png()}\n'
'IMPORTANT: You must place the components in the order that they were given to you. If an exercise or '
'reading passages are cut off don\'t include them in the JSON.'
)
}
def _multiple_choice_png(self):
multiple_choice = self._multiple_choice_html()["questions"][0]
multiple_choice["type"] = "multipleChoice"
multiple_choice.pop("solution")
return multiple_choice
def _passage_blank_space_png(self):
passage_blank_space = self._passage_blank_space_html()["words"][0]
passage_blank_space["type"] = "fillBlanks"
return passage_blank_space
def _batches_to_exam_completion(self, batches: Dict[str, Any]) -> Exam:
return self._llm.prediction(
[self._gpt_instructions_html(),
{
"role": "user",
"content": str(batches)
}
],
ExamMapper.map_to_exam_model,
str(self._level_json_schema())
)
def _gpt_instructions_batches(self):
return {
"role": "system",
"content": (
'You are helpfull assistant. Your task is to merge multiple batches of english question sheet '
'components and solve the questions. Each batch may contain overlapping content with the previous '
'batch, or close enough content which needs to be excluded. The components are as follows:'
'- Part, a standalone part or part of a section of the question sheet: '
'{"type": "part", "part": "<name or number of the part>"}\n'
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
'the prompt field of the template: blanks, underlines and normal. '
'In a blanks question, the prompt has underscores to represent the blank space, you must select the '
'appropriate option to solve it.'
'In a underlines question, the prompt has 4 underlines represented by the html tags <u></u>, you must '
'select the option that makes the prompt incorrect to solve it. If the options order doesn\'t reflect '
'the order in which the underlines appear in the prompt you will need to fix it.'
'In a normal question there isn\'t either blanks or underlines in the prompt, you should just '
'select the appropriate solution.'
f'The template for these questions is the same: {self._multiple_choice_png()}\n'
'- Reading Passages, there are two types of reading passages with different templates. The one with '
'type "blanksPassage" where the text field holds the passage and a blank is represented by '
'{{<some number>}} and the other one with type "passage" that has the context field with just '
'reading passages. For both of these components you will have to remove any additional data that might '
'be related to a question description and also remove some "(<question id>)" and "_" from blanksPassage'
' if there are any. These components are used in conjunction with other ones.'
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
'options with the question id and the options from a to d. The template is: '
f'{self._passage_blank_space_png()}\n\n'
'Now that you know the possible components here\'s what I want you to do:\n'
'1. Remove duplicates. A batch will have duplicates of other batches and the components of '
'the next batch should always take precedence over the previous one batch, what I mean by this is that '
'if batch 1 has, for example, multiple choice question with id 10 and the next one also has id 10, '
'you pick the next one.\n'
'2. Solve the exercises. There are 4 types of exercises, the 3 multipleChoice variants + a fill blanks '
'exercise. For the multiple choice question follow the previous instruction to solve them and place '
f'them in this format: {self._multiple_choice_html()}. For the fill blanks exercises you need to match '
'the correct blanksPassage to the correct fillBlanks options and then pick the correct option. Here is '
f'the template for this exercise: {self._passage_blank_space_html()}.\n'
f'3. Restructure the JSON to match this template: {self._level_json_schema()}. You must group the exercises by '
'the parts in the order they appear in the batches components. The context field of a part is the '
'context of a passage component that has text relevant to normal multiple choice questions.\n'
'Do your utmost to fullfill the requisites, make sure you include all non-duplicate questions'
'in your response and correctly structure the JSON.'
)
}
@staticmethod
def fix_ids(response):
counter = 1
for part in response["parts"]:
for exercise in part["exercises"]:
if exercise["type"] == "multipleChoice":
for question in exercise["questions"]:
question["id"] = counter
counter += 1
if exercise["type"] == "fillBlanks":
for i in range(len(exercise["words"])):
exercise["words"][i]["id"] = counter
exercise["solutions"][i]["id"] = counter
counter += 1
return response

View File

@@ -1,29 +0,0 @@
from pydantic import BaseModel
from typing import List, Dict, Union, Any, Optional
class Option(BaseModel):
id: str
text: str
class MultipleChoiceQuestion(BaseModel):
type: str = "multipleChoice"
id: str
prompt: str
variant: str = "text"
options: List[Option]
class FillBlanksWord(BaseModel):
type: str = "fillBlanks"
id: str
options: Dict[str, str]
Component = Union[MultipleChoiceQuestion, FillBlanksWord, Dict[str, Any]]
class Sheet(BaseModel):
batch: Optional[int] = None
components: List[Component]

Binary file not shown.

View File

@@ -1 +0,0 @@
THIS FILE ONLY EXISTS TO KEEP THIS FOLDER IN THE REPO

View File

@@ -1,7 +1,9 @@
from .kb import TrainingContentKnowledgeBase
from .service import TrainingContentService
from .gpt import GPT
__all__ = [
"TrainingContentService",
"TrainingContentKnowledgeBase"
"TrainingContentKnowledgeBase",
"GPT"
]

View File

@@ -1,19 +1,17 @@
import json
from logging import getLogger
from typing import List, Optional, Callable, TypeVar
from typing import List, Optional, Callable
from openai.types.chat import ChatCompletionMessageParam
from pydantic import BaseModel
T = TypeVar('T', bound=BaseModel)
class GPT:
def __init__(self, openai_client):
self._client = openai_client
self._default_model = "gpt-4o-2024-08-06"
self._default_model = "gpt-4o"
self._logger = getLogger(__name__)
def prediction(
@@ -25,7 +23,7 @@ class GPT:
model: Optional[str] = None,
temperature: Optional[float] = None,
max_retries: int = 3
) -> List[T] | T | None:
) -> List[BaseModel] | BaseModel | str | None:
params = {
"messages": messages,
"response_format": {"type": "json_object"},

View File

@@ -4,7 +4,7 @@ from logging import getLogger
from typing import Dict, List
from modules.training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO
from training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO
class TrainingContentService:
@@ -25,8 +25,7 @@ class TrainingContentService:
self._logger = getLogger(__name__)
self._llm = openai
def get_tips(self, training_content):
user, stats = training_content["userID"], training_content["stats"]
def get_tips(self, stats):
exam_data, exam_map = self._sort_out_solutions(stats)
training_content = self._get_exam_details_and_tips(exam_data)
tips = self._query_kb(training_content.queries)
@@ -41,10 +40,10 @@ class TrainingContentService:
'created_at': int(datetime.now().timestamp() * 1000),
**exam_map,
**usefull_tips.dict(),
**weak_areas,
"user": user
**weak_areas
}
doc_ref = self._db.collection('training').add(training_doc)
return {
"id": doc_ref[1].id
}
@@ -107,15 +106,7 @@ class TrainingContentService:
'for tips that will be displayed to the student, the category attribute is a collection of '
'embeddings and the text will be the text used to query the knowledge base. The categories are '
f'the following [{", ".join(self.TOOLS)}]. The exam data will be a json where the key of the field '
'"exams" is the exam id, an exam can be composed of multiple modules or single modules. The student'
' will see your response so refrain from using phrasing like "The student" did x, y and z. If the '
'field "answer" in a question is an empty array "[]", then the student didn\'t answer any question '
'and you must address that in your response. Also questions aren\'t modules, the only modules are: '
'level, speaking, writing, reading and listening. The details array needs to be tailored to the '
'exam attempt, even if you receive the same exam you must treat as different exams by their id.'
'Don\'t make references to an exam by it\'s id, the GUI will handle that so the student knows '
'which is the exam your comments and summary are referencing too. Even if the student hasn\'t '
'submitted no answers for an exam, you must still fill the details structure addressing that fact.'
'"exams" is the exam id, an exam can be composed of multiple modules or single modules.'
)
},
{
@@ -212,15 +203,13 @@ class TrainingContentService:
exercises[session_key][module][exam_id]["exercises"].extend(
self._get_speaking_solutions(stat, exam)
)
elif module == "level":
elif module == "level": # same structure as listening
exercises[session_key][module][exam_id]["exercises"].extend(
self._get_level_solutions(stat, exam)
self._get_listening_solutions(stat, exam)
)
exam_map[session_key]["score"] = round((exam_total_correct / exam_total_questions) * 100)
exam_map[session_key]["module"] = module
with open('exam_result.json', 'w') as file:
json.dump({"exams": exercises}, file, indent=4)
return {"exams": exercises}, exam_map
@@ -248,54 +237,6 @@ class TrainingContentService:
return result
@staticmethod
def _get_mc_question(exercise, stat):
shuffle_maps = stat.get("shuffleMaps", [])
answer = stat["solutions"] if len(shuffle_maps) == 0 else []
if len(shuffle_maps) != 0:
for solution in stat["solutions"]:
shuffle_map = [
item["map"] for item in shuffle_maps
if item["questionID"] == solution["question"]
]
answer.append({
"question": solution["question"],
"option": shuffle_map[solution["option"]]
})
return {
"question": exercise["prompt"],
"exercise": exercise["questions"],
"answer": stat["solutions"]
}
@staticmethod
def _swap_key_name(d, original_key, new_key):
d[new_key] = d.pop(original_key)
return d
def _get_level_solutions(self, stat, exam):
result = []
try:
for part in exam["parts"]:
for exercise in part["exercises"]:
if exercise["id"] == stat["exercise"]:
if stat["type"] == "fillBlanks":
result.append({
"prompt": exercise["prompt"],
"template": exercise["text"],
"words": exercise["words"],
"solutions": exercise["solutions"],
"answer": [
self._swap_key_name(item, 'solution', 'option')
for item in stat["solutions"]
]
})
elif stat["type"] == "multipleChoice":
result.append(self._get_mc_question(exercise, stat))
except KeyError as e:
self._logger.warning(f"Malformed stat object: {str(e)}")
return result
def _get_listening_solutions(self, stat, exam):
result = []
try:
@@ -309,25 +250,16 @@ class TrainingContentService:
"solution": exercise["solutions"],
"answer": stat["solutions"]
})
elif stat["type"] == "fillBlanks":
elif stat["type"] == "multipleChoice":
result.append({
"question": exercise["prompt"],
"template": exercise["text"],
"words": exercise["words"],
"solutions": exercise["solutions"],
"exercise": exercise["questions"],
"answer": stat["solutions"]
})
elif stat["type"] == "multipleChoice":
result.append(self._get_mc_question(exercise, stat))
except KeyError as e:
self._logger.warning(f"Malformed stat object: {str(e)}")
return result
@staticmethod
def _find_shuffle_map(shuffle_maps, question_id):
return next((item["map"] for item in shuffle_maps if item["questionID"] == question_id), None)
def _get_speaking_solutions(self, stat, exam):
result = {}
try: