Brushed up the backend, added writing task 1 academic prompt gen and grading ENCOA-274

This commit is contained in:
Carlos-Mesquita
2024-12-10 22:24:40 +00:00
parent 68cab80851
commit 6982068864
167 changed files with 1411 additions and 1229 deletions

View File

@@ -0,0 +1,11 @@
from .file import FileHelper
from .text import TextHelper
from .token_counter import count_tokens
from .exercises import ExercisesHelper
__all__ = [
"FileHelper",
"TextHelper",
"count_tokens",
"ExercisesHelper",
]

View File

@@ -0,0 +1,249 @@
import queue
import random
import re
import string
from wonderwords import RandomWord
from .text import TextHelper
class ExercisesHelper:
@staticmethod
def divide_number_into_parts(number, parts):
if number < parts:
return None
part_size = number // parts
remaining = number % parts
q = queue.Queue()
for i in range(parts):
if i < remaining:
q.put(part_size + 1)
else:
q.put(part_size)
return q
@staticmethod
def fix_exercise_ids(exercise, start_id):
# Initialize the starting ID for the first exercise
current_id = start_id
questions = exercise["questions"]
# Iterate through questions and update the "id" value
for question in questions:
question["id"] = str(current_id)
current_id += 1
return exercise
@staticmethod
def replace_first_occurrences_with_placeholders(text: str, words_to_replace: list, start_id):
for i, word in enumerate(words_to_replace, start=start_id):
# Create a case-insensitive regular expression pattern
pattern = re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE)
placeholder = '{{' + str(i) + '}}'
text = pattern.sub(placeholder, text, 1)
return text
@staticmethod
def replace_first_occurrences_with_placeholders_notes(notes: list, words_to_replace: list, start_id):
replaced_notes = []
for i, note in enumerate(notes, start=0):
word = words_to_replace[i]
pattern = re.compile(r'\b' + re.escape(word) + r'\b', re.IGNORECASE)
placeholder = '{{' + str(start_id + i) + '}}'
note = pattern.sub(placeholder, note, 1)
replaced_notes.append(note)
return replaced_notes
@staticmethod
def add_random_words_and_shuffle(word_array, num_random_words):
r = RandomWord()
random_words_selected = r.random_words(num_random_words)
combined_array = word_array + random_words_selected
random.shuffle(combined_array)
result = []
for i, word in enumerate(combined_array):
letter = chr(65 + i) # chr(65) is 'A'
result.append({"letter": letter, "word": word})
return result
@staticmethod
def fillblanks_build_solutions_array(words, start_id):
solutions = []
for i, word in enumerate(words, start=start_id):
solutions.append(
{
"id": str(i),
"solution": word
}
)
return solutions
@staticmethod
def remove_excess_questions(questions: [], quantity):
count_true = 0
result = []
for item in reversed(questions):
if item.get('solution') == 'true' and count_true < quantity:
count_true += 1
else:
result.append(item)
result.reverse()
return result
@staticmethod
def build_write_blanks_text(questions: [], start_id):
result = ""
for i, q in enumerate(questions, start=start_id):
placeholder = '{{' + str(i) + '}}'
result = result + q["question"] + placeholder + "\\n"
return result
@staticmethod
def build_write_blanks_text_form(form: [], start_id):
result = ""
replaced_words = []
for i, entry in enumerate(form, start=start_id):
placeholder = '{{' + str(i) + '}}'
# Use regular expression to find the string after ':'
match = re.search(r'(?<=:)\s*(.*)', entry)
# Extract the matched string
original_string = match.group(1)
# Split the string into words
words = re.findall(r'\b\w+\b', original_string)
# Remove words with only one letter
filtered_words = [word for word in words if len(word) > 1]
# Choose a random word from the list of words
selected_word = random.choice(filtered_words)
pattern = re.compile(r'\b' + re.escape(selected_word) + r'\b', re.IGNORECASE)
# Replace the chosen word with the placeholder
replaced_string = pattern.sub(placeholder, original_string, 1)
# Construct the final replaced string
replaced_string = entry.replace(original_string, replaced_string)
result = result + replaced_string + "\\n"
# Save the replaced word or use it as needed
# For example, you can save it to a file or a list
replaced_words.append(selected_word)
return result, replaced_words
@staticmethod
def build_write_blanks_solutions(questions: [], start_id):
solutions = []
for i, q in enumerate(questions, start=start_id):
solution = [q["possible_answers"]] if isinstance(q["possible_answers"], str) else q["possible_answers"]
solutions.append(
{
"id": str(i),
"solution": solution
}
)
return solutions
@staticmethod
def build_write_blanks_solutions_listening(words: [], start_id):
solutions = []
for i, word in enumerate(words, start=start_id):
solution = [word] if isinstance(word, str) else word
solutions.append(
{
"id": str(i),
"solution": solution
}
)
return solutions
@staticmethod
def answer_word_limit_ok(question):
# Check if any option in any solution has more than three words
return not any(
len(option.split()) > 3
for solution in question["solutions"]
for option in solution["solution"]
)
@staticmethod
def assign_letters_to_paragraphs(paragraphs):
result = []
letters = iter(string.ascii_uppercase)
for paragraph in paragraphs.split("\n\n"):
if TextHelper.has_x_words(paragraph, 10):
result.append({'paragraph': paragraph.strip(), 'letter': next(letters)})
return result
@staticmethod
def contains_empty_dict(arr):
return any(elem == {} for elem in arr)
@staticmethod
def fix_writing_overall(overall: float, task_response: dict):
grades = [category["grade"] for category in task_response.values()]
if overall > max(grades) or overall < min(grades):
total_sum = sum(grades)
average = total_sum / len(grades)
rounded_average = round(average, 0)
return rounded_average
return overall
@staticmethod
def build_options(ideas):
options = []
letters = iter(string.ascii_uppercase)
for idea in ideas:
options.append({
"id": next(letters),
"sentence": idea["from"]
})
return options
@staticmethod
def build_sentences(ideas, start_id):
sentences = []
letters = iter(string.ascii_uppercase)
for idea in ideas:
sentences.append({
"solution": next(letters),
"sentence": idea["idea"]
})
random.shuffle(sentences)
for i, sentence in enumerate(sentences, start=start_id):
sentence["id"] = i
return sentences
@staticmethod
def randomize_mc_options_order(questions):
option_ids = ['A', 'B', 'C', 'D']
for question in questions:
# Store the original solution text
original_solution_text = next(
option['text'] for option in question['options'] if option['id'] == question['solution'])
# Shuffle the options
random.shuffle(question['options'])
# Update the option ids and find the new solution id
for idx, option in enumerate(question['options']):
option['id'] = option_ids[idx]
if option['text'] == original_solution_text:
question['solution'] = option['id']
return questions

125
ielts_be/helpers/file.py Normal file
View File

@@ -0,0 +1,125 @@
import base64
import io
import os
import shutil
import subprocess
import uuid
import datetime
from pathlib import Path
from typing import Optional, Tuple
import aiofiles
import numpy as np
import pypandoc
from PIL import Image
from fastapi import UploadFile
class FileHelper:
@staticmethod
def delete_files_older_than_one_day(directory: str):
current_time = datetime.datetime.now()
for entry in os.scandir(directory):
if entry.is_file():
file_path = Path(entry)
file_name = file_path.name
file_modified_time = datetime.datetime.fromtimestamp(file_path.stat().st_mtime)
time_difference = current_time - file_modified_time
if time_difference.days > 1 and "placeholder" not in file_name:
file_path.unlink()
print(f"Deleted file: {file_path}")
# Supposedly pandoc covers a wide range of file extensions only tested with docx
@staticmethod
def convert_file_to_pdf(input_path: str, output_path: str):
pypandoc.convert_file(input_path, 'pdf', outputfile=output_path, extra_args=[
'-V', 'geometry:paperwidth=5.5in',
'-V', 'geometry:paperheight=8.5in',
'-V', 'geometry:margin=0.5in',
'-V', 'pagestyle=empty'
])
@staticmethod
def convert_file_to_html(input_path: str, output_path: str):
pypandoc.convert_file(input_path, 'html', outputfile=output_path)
@staticmethod
def pdf_to_png(path_id: str):
to_png = f"pdftoppm -png exercises.pdf page"
result = subprocess.run(to_png, shell=True, cwd=f'./tmp/{path_id}', capture_output=True, text=True)
if result.returncode != 0:
raise Exception(
f"Couldn't convert pdf to png. Failed to run command '{to_png}' -> ```cmd {result.stderr}```")
@staticmethod
def is_page_blank(image_bytes: bytes, image_threshold=10) -> bool:
with Image.open(io.BytesIO(image_bytes)) as img:
img_gray = img.convert('L')
img_array = np.array(img_gray)
non_white_pixels = np.sum(img_array < 255)
return non_white_pixels <= image_threshold
@classmethod
async def _encode_image(cls, image_path: str, image_threshold=10) -> Optional[str]:
async with aiofiles.open(image_path, "rb") as image_file:
image_bytes = await image_file.read()
if cls.is_page_blank(image_bytes, image_threshold):
return None
return base64.b64encode(image_bytes).decode('utf-8')
@classmethod
async def b64_pngs(cls, path_id: str, files: list[str]):
png_messages = []
for filename in files:
b64_string = await cls._encode_image(os.path.join(f'./tmp/{path_id}', filename))
if b64_string:
png_messages.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{b64_string}"
}
})
return png_messages
@staticmethod
def remove_directory(path):
try:
if os.path.exists(path):
if os.path.isdir(path):
shutil.rmtree(path)
except Exception as e:
print(f"An error occurred while trying to remove {path}: {str(e)}")
@staticmethod
def remove_file(file_path):
try:
if os.path.exists(file_path):
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"An error occurred while trying to remove the file {file_path}: {str(e)}")
@staticmethod
async def save_upload(file: UploadFile, name: str = "upload", path_id: str = None) -> Tuple[str, str]:
ext = file.filename.split('.')[-1]
path_id = str(uuid.uuid4()) if path_id is None else path_id
os.makedirs(f'./tmp/{path_id}', exist_ok=True)
tmp_filename = f'./tmp/{path_id}/{name}.{ext}'
file_bytes: bytes = await file.read()
async with aiofiles.open(tmp_filename, 'wb') as file:
await file.write(file_bytes)
return ext, path_id
@staticmethod
async def encode_image(image_path: str) -> str:
async with aiofiles.open(image_path, "rb") as image_file:
img = await image_file.read()
return base64.b64encode(img).decode('utf-8')

28
ielts_be/helpers/text.py Normal file
View File

@@ -0,0 +1,28 @@
from nltk.corpus import words
class TextHelper:
@classmethod
def has_words(cls, text: str):
if not cls._has_common_words(text):
return False
english_words = set(words.words())
words_in_input = text.split()
return any(word.lower() in english_words for word in words_in_input)
@classmethod
def has_x_words(cls, text: str, quantity):
if not cls._has_common_words(text):
return False
english_words = set(words.words())
words_in_input = text.split()
english_word_count = sum(1 for word in words_in_input if word.lower() in english_words)
return english_word_count >= quantity
@staticmethod
def _has_common_words(text: str):
english_words = {"the", "be", "to", "of", "and", "a", "in", "that", "have", "i"}
words_in_input = text.split()
english_word_count = sum(1 for word in words_in_input if word.lower() in english_words)
return english_word_count >= 10

View File

@@ -0,0 +1,89 @@
# This is a work in progress. There are still bugs. Once it is production-ready this will become a full repo.
import tiktoken
import nltk
def count_tokens(text, model_name="gpt-3.5-turbo", debug=False):
"""
Count the number of tokens in a given text string without using the OpenAI API.
This function tries three methods in the following order:
1. tiktoken (preferred): Accurate token counting similar to the OpenAI API.
2. nltk: Token counting using the Natural Language Toolkit library.
3. split: Simple whitespace-based token counting as a fallback.
Usage:
------
text = "Your text here"
result = count_tokens(text, model_name="gpt-3.5-turbo", debug=True)
print(result)
Required libraries:
-------------------
- tiktoken: Install with 'pip install tiktoken'
- nltk: Install with 'pip install nltk'
Parameters:
-----------
text : str
The text string for which you want to count tokens.
model_name : str, optional
The OpenAI model for which you want to count tokens (default: "gpt-3.5-turbo").
debug : bool, optional
Set to True to print error messages (default: False).
Returns:
--------
result : dict
A dictionary containing the number of tokens and the method used for counting.
"""
# Try using tiktoken
try:
encoding = tiktoken.encoding_for_model(model_name)
num_tokens = len(encoding.encode(text))
result = {"n_tokens": num_tokens, "method": "tiktoken"}
return result
except Exception as e:
if debug:
print(f"Error using tiktoken: {e}")
pass
# Try using nltk
try:
# Passed nltk.download("punkt") to server.py's @asynccontextmanager
tokens = nltk.word_tokenize(text)
result = {"n_tokens": len(tokens), "method": "nltk"}
return result
except Exception as e:
if debug:
print(f"Error using nltk: {e}")
pass
# If nltk and tiktoken fail, use a simple split-based method
tokens = text.split()
result = {"n_tokens": len(tokens), "method": "split"}
return result
class TokenBuffer:
def __init__(self, max_tokens=2048):
self.max_tokens = max_tokens
self.buffer = ""
self.token_lengths = []
self.token_count = 0
def update(self, text, model_name="gpt-3.5-turbo", debug=False):
new_tokens = count_tokens(text, model_name=model_name, debug=debug)["n_tokens"]
self.token_count += new_tokens
self.buffer += text
self.token_lengths.append(new_tokens)
while self.token_count > self.max_tokens:
removed_tokens = self.token_lengths.pop(0)
self.token_count -= removed_tokens
self.buffer = self.buffer.split(" ", removed_tokens)[-1]
def get_buffer(self):
return self.buffer