Files
encoach_backend/modules/batch_users/service.py
2024-09-08 00:38:35 +01:00

271 lines
9.0 KiB
Python

import os
import subprocess
import time
import uuid
from datetime import datetime
from logging import getLogger
import pandas as pd
from typing import Dict
import shortuuid
from pymongo.database import Database
from modules.batch_users.batch_users import BatchUsersDTO, UserDTO
from modules.helper.file_helper import FileHelper
class BatchUsers:
_DEFAULT_DESIRED_LEVELS = {
"reading": 9,
"listening": 9,
"writing": 9,
"speaking": 9,
}
_DEFAULT_LEVELS = {
"reading": 0,
"listening": 0,
"writing": 0,
"speaking": 0,
}
def __init__(self, mongo: Database):
self._db: Database = mongo
self._logger = getLogger(__name__)
def batch_users(self, request_data: Dict):
batch_dto = self._map_to_batch(request_data)
file_name = f'{uuid.uuid4()}.csv'
path = f'./tmp/{file_name}'
self._generate_firebase_auth_csv(batch_dto, path)
result = self._upload_users('./tmp', file_name)
if result.returncode != 0:
error_msg = f"Couldn't upload users. Failed to run command firebase auth import -> ```cmd {result.stderr}```"
self._logger.error(error_msg)
return error_msg
self._init_users(batch_dto)
FileHelper.remove_file(path)
return {"ok": True}
@staticmethod
def _map_to_batch(request_data: Dict) -> BatchUsersDTO:
users: list[UserDTO] = [UserDTO(**user) for user in request_data["users"]]
return BatchUsersDTO(makerID=request_data["makerID"], users=users)
@staticmethod
def _generate_firebase_auth_csv(batch_dto: BatchUsersDTO, path: str):
# https://firebase.google.com/docs/cli/auth#file_format
columns = [
'UID', 'Email', 'Email Verified', 'Password Hash', 'Password Salt', 'Name',
'Photo URL', 'Google ID', 'Google Email', 'Google Display Name', 'Google Photo URL',
'Facebook ID', 'Facebook Email', 'Facebook Display Name', 'Facebook Photo URL',
'Twitter ID', 'Twitter Email', 'Twitter Display Name', 'Twitter Photo URL',
'GitHub ID', 'GitHub Email', 'GitHub Display Name', 'GitHub Photo URL',
'User Creation Time', 'Last Sign-In Time', 'Phone Number'
]
users_data = []
current_time = int(time.time() * 1000)
for user in batch_dto.users:
user_data = {
'UID': str(user.id),
'Email': user.email,
'Email Verified': False,
'Password Hash': user.passwordHash,
'Password Salt': user.passwordSalt,
'Name': '',
'Photo URL': '',
'Google ID': '',
'Google Email': '',
'Google Display Name': '',
'Google Photo URL': '',
'Facebook ID': '',
'Facebook Email': '',
'Facebook Display Name': '',
'Facebook Photo URL': '',
'Twitter ID': '',
'Twitter Email': '',
'Twitter Display Name': '',
'Twitter Photo URL': '',
'GitHub ID': '',
'GitHub Email': '',
'GitHub Display Name': '',
'GitHub Photo URL': '',
'User Creation Time': current_time,
'Last Sign-In Time': '',
'Phone Number': ''
}
users_data.append(user_data)
df = pd.DataFrame(users_data, columns=columns)
df.to_csv(path, index=False, header=False)
@staticmethod
def _upload_users(directory: str, file_name: str):
command = (
f'firebase auth:import {file_name} '
f'--hash-algo=SCRYPT '
f'--hash-key={os.getenv("FIREBASE_SCRYPT_B64_SIGNER_KEY")} '
f'--salt-separator={os.getenv("FIREBASE_SCRYPT_B64_SALT_SEPARATOR")} '
f'--rounds={os.getenv("FIREBASE_SCRYPT_ROUNDS")} '
f'--mem-cost={os.getenv("FIREBASE_SCRYPT_MEM_COST")} '
f'--project={os.getenv("FIREBASE_PROJECT_ID")} '
)
result = subprocess.run(command, shell=True, cwd=directory, capture_output=True, text=True)
return result
def _init_users(self, batch_users: BatchUsersDTO):
maker_id = batch_users.makerID
for user in batch_users.users:
self._insert_new_user(user)
code = self._create_code(user, maker_id)
if user.type == "corporate":
self._set_corporate_default_groups(user)
if user.corporate:
self._assign_corporate_to_user(user, code)
if user.groupName and len(user.groupName.strip()) > 0:
self._assign_user_to_group_by_name(user, maker_id)
def _insert_new_user(self, user: UserDTO):
new_user = {
**user.dict(exclude={
'passport_id', 'groupName', 'expiryDate',
'corporate', 'passwordHash', 'passwordSalt'
}),
'id': str(user.id),
'bio': "",
'focus': "academic",
'status': "active",
'desiredLevels': self._DEFAULT_DESIRED_LEVELS,
'profilePicture': "/defaultAvatar.png",
'levels': self._DEFAULT_LEVELS,
'isFirstLogin': False,
'isVerified': True,
'registrationDate': datetime.now(),
'subscriptionExpirationDate': user.expiryDate
}
self._db.users.insert_one(new_user)
def _create_code(self, user: UserDTO, maker_id: str) -> str:
code = shortuuid.ShortUUID().random(length=6)
self._db.codes.insert_one({
'id': code,
'code': code,
'creator': maker_id,
'expiryDate': user.expiryDate,
'type': user.type,
'creationDate': datetime.now(),
'userId': str(user.id),
'email': user.email,
'name': user.name,
'passport_id': user.passport_id
})
return code
def _set_corporate_default_groups(self, user: UserDTO):
user_id = str(user.id)
default_groups = [
{
'admin': user_id,
'id': str(uuid.uuid4()),
'name': "Teachers",
'participants': [],
'disableEditing': True,
},
{
'admin': user_id,
'id': str(uuid.uuid4()),
'name': "Students",
'participants': [],
'disableEditing': True,
},
{
'admin': user_id,
'id': str(uuid.uuid4()),
'name': "Corporate",
'participants': [],
'disableEditing': True,
}
]
for group in default_groups:
self._db.groups.insert_one(group)
def _assign_corporate_to_user(self, user: UserDTO, code: str):
user_id = str(user.id)
corporate_user = self._db.users.find_one(
{"email": user.corporate}
)
if corporate_user:
self._db.codes.update_one(
{"id": code},
{"$set": {"creator": corporate_user.id}},
upsert=True
)
group_type = "Students" if user.type == "student" else "Teachers"
group = self._db.groups.find_one(
{
"admin": corporate_user.id,
"name": group_type
}
)
if group:
participants = group['participants']
if user_id not in participants:
participants.append(user_id)
self._db.groups.update_one(
{"id": group.id},
{"$set": {"participants": participants}}
)
else:
group = {
'admin': corporate_user.id,
'id': str(uuid.uuid4()),
'name': group_type,
'participants': [user_id],
'disableEditing': True,
}
self._db.groups.insert_one(group)
def _assign_user_to_group_by_name(self, user: UserDTO, maker_id: str):
user_id = str(user.id)
group = self._db.groups.find_one(
{
"admin": maker_id,
"name": user.group_name.strip()
}
)
if group:
new_group = {
'id': str(uuid.uuid4()),
'admin': maker_id,
'name': user.groupName.strip(),
'participants': [user_id],
'disableEditing': False,
}
self._db.groups.insert_one(new_group)
else:
participants = group.participants
if user_id not in participants:
participants.append(user_id)
self._db.groups.update_one(
{"id": group.id},
{"$set": {"participants": participants}}
)