144 lines
4.9 KiB
Python
144 lines
4.9 KiB
Python
import json
|
|
import openai
|
|
import os
|
|
import re
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
openai.api_key = os.getenv("OPENAI_API_KEY")
|
|
|
|
MAX_TOKENS = 4097
|
|
TOP_P = 0.9
|
|
FREQUENCY_PENALTY = 0.5
|
|
|
|
TRY_LIMIT = 1
|
|
|
|
try_count = 0
|
|
|
|
def process_response(input_string, quotation_check_field):
|
|
if '{' in input_string:
|
|
try:
|
|
# Find the index of the first occurrence of '{'
|
|
index = input_string.index('{')
|
|
# Extract everything after the first '{' (inclusive)
|
|
result = input_string[index:]
|
|
if re.search(r"'" + quotation_check_field + "':\s*'(.*?)'", result, re.DOTALL | re.MULTILINE) or \
|
|
re.search(r"'" + quotation_check_field + "':\s*\[([^\]]+)]", result, re.DOTALL | re.MULTILINE):
|
|
json_obj = json.loads(parse_string(result))
|
|
return json_obj
|
|
else:
|
|
parsed_string = result.replace("\n\n", " ")
|
|
parsed_string = parsed_string.replace("\n", " ")
|
|
parsed_string = re.sub(r',\s*]', ']', parsed_string)
|
|
parsed_string = re.sub(r',\s*}', '}', parsed_string)
|
|
if (parsed_string.find('[') == -1) and (parsed_string.find(']') == -1):
|
|
parsed_string = parse_string_2(parsed_string)
|
|
return json.loads(parsed_string)
|
|
|
|
return json.loads(parsed_string)
|
|
except Exception as e:
|
|
print(f"Invalid JSON string! Exception: {e}")
|
|
print(f"String: {input_string}")
|
|
print(f"Exception: {e}")
|
|
else:
|
|
return input_string
|
|
|
|
def parse_string(to_parse: str):
|
|
parsed_string = to_parse.replace("\"", "\\\"")
|
|
pattern = r"(?<!\w)'|'(?!\w)"
|
|
parsed_string = re.sub(pattern, '"', parsed_string)
|
|
parsed_string = parsed_string.replace("\\\"", "'")
|
|
parsed_string = parsed_string.replace("\n\n", " ")
|
|
parsed_string = re.sub(r',\s*]', ']', parsed_string)
|
|
parsed_string = re.sub(r',\s*}', '}', parsed_string)
|
|
return parsed_string
|
|
|
|
|
|
def parse_string_2(to_parse: str):
|
|
keys_and_values_str = to_parse.replace("{", "").replace("}", "")
|
|
split_pattern = r'(?<="),|(?<="):'
|
|
keys_and_values = re.split(split_pattern, keys_and_values_str)
|
|
|
|
keys = []
|
|
values = []
|
|
|
|
for idx, x in enumerate(keys_and_values):
|
|
if (idx % 2) == 0:
|
|
keys.append(x)
|
|
else:
|
|
values.append(x)
|
|
|
|
parsed_values = []
|
|
|
|
for value in values:
|
|
parsed_values.append(("\"" + value.replace("\"", "").strip() + "\""))
|
|
|
|
for ind, parsed_value in enumerate(parsed_values):
|
|
to_parse = to_parse.replace(values[ind], parsed_values[ind])
|
|
|
|
to_parse = to_parse.replace(":", ": ")
|
|
return to_parse
|
|
|
|
def remove_special_chars_and_escapes(input_string):
|
|
parsed_string = input_string.replace("\\\"", "'")
|
|
parsed_string = parsed_string.replace("\n\n", " ")
|
|
# Define a regular expression pattern to match special characters and escapes
|
|
pattern = r'(\\[nrt])|[^a-zA-Z0-9\s]'
|
|
|
|
# Use re.sub() to replace the matched patterns with an empty string
|
|
cleaned_string = re.sub(pattern, '', parsed_string)
|
|
|
|
return cleaned_string
|
|
|
|
|
|
def check_fields(obj, fields):
|
|
return all(field in obj for field in fields)
|
|
|
|
|
|
def make_openai_call(model, messages, token_count, fields_to_check, temperature):
|
|
global try_count
|
|
result = openai.ChatCompletion.create(
|
|
model=model,
|
|
max_tokens=int(MAX_TOKENS - token_count - 300),
|
|
temperature=float(temperature),
|
|
top_p=float(TOP_P),
|
|
frequency_penalty=float(FREQUENCY_PENALTY),
|
|
messages=messages
|
|
)
|
|
|
|
if fields_to_check is None:
|
|
return result["choices"][0]["message"]["content"]
|
|
|
|
processed_response = process_response(result["choices"][0]["message"]["content"], fields_to_check[0])
|
|
|
|
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
|
|
try_count = try_count + 1
|
|
return make_openai_call(model, messages, token_count, fields_to_check, temperature)
|
|
elif try_count >= TRY_LIMIT:
|
|
try_count = 0
|
|
return result["choices"][0]["message"]["content"]
|
|
else:
|
|
try_count = 0
|
|
return processed_response
|
|
|
|
def make_openai_instruct_call(model, message: str, token_count, fields_to_check, temperature):
|
|
global try_count
|
|
response = openai.Completion.create(
|
|
model=model,
|
|
prompt=message,
|
|
max_tokens=int(4097 - token_count - 300),
|
|
temperature=0.7
|
|
)["choices"][0]["text"]
|
|
|
|
if fields_to_check is None:
|
|
return response.replace("\n\n", " ").strip()
|
|
|
|
processed_response = process_response(response, fields_to_check[0])
|
|
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
|
|
try_count = try_count + 1
|
|
return make_openai_instruct_call(model, message, token_count, fields_to_check, temperature)
|
|
else:
|
|
try_count = 0
|
|
return processed_response
|