Files
encoach_backend/app/services/impl/user.py
2024-11-06 00:50:56 +00:00

204 lines
7.1 KiB
Python

import os
import subprocess
import time
import uuid
import pandas as pd
import shortuuid
from datetime import datetime
from logging import getLogger
import pandas as pd
from typing import Dict
import shortuuid
from pymongo.database import Database
from app.dtos.user_batch import BatchUsersDTO, UserDTO
from app.helpers import FileHelper
from app.services.abc import IUserService
class UserService(IUserService):
_DEFAULT_DESIRED_LEVELS = {
"reading": 9,
"listening": 9,
"writing": 9,
"speaking": 9,
}
_DEFAULT_LEVELS = {
"reading": 0,
"listening": 0,
"writing": 0,
"speaking": 0,
}
def __init__(self, mongo: Database):
self._db: Database = mongo
self._logger = getLogger(__name__)
def fetch_tips(self, batch: BatchUsersDTO):
file_name = f'{uuid.uuid4()}.csv'
path = f'./tmp/{file_name}'
self._generate_firebase_auth_csv(batch, path)
result = self._upload_users('./tmp', file_name)
if result.returncode != 0:
error_msg = f"Couldn't upload users. Failed to run command firebase auth import -> ```cmd {result.stdout}```"
self._logger.error(error_msg)
return error_msg
self._init_users(batch)
FileHelper.remove_file(path)
return {"ok": True}
@staticmethod
def _map_to_batch(request_data: Dict) -> BatchUsersDTO:
users_list = [{**user} for user in request_data["users"]]
for user in users_list:
user["studentID"] = str(user["studentID"])
users: list[UserDTO] = [UserDTO(**user) for user in users_list]
return BatchUsersDTO(makerID=request_data["makerID"], users=users)
@staticmethod
def _generate_firebase_auth_csv(batch_dto: BatchUsersDTO, path: str):
# https://firebase.google.com/docs/cli/auth#file_format
columns = [
'UID', 'Email', 'Email Verified', 'Password Hash', 'Password Salt', 'Name',
'Photo URL', 'Google ID', 'Google Email', 'Google Display Name', 'Google Photo URL',
'Facebook ID', 'Facebook Email', 'Facebook Display Name', 'Facebook Photo URL',
'Twitter ID', 'Twitter Email', 'Twitter Display Name', 'Twitter Photo URL',
'GitHub ID', 'GitHub Email', 'GitHub Display Name', 'GitHub Photo URL',
'User Creation Time', 'Last Sign-In Time', 'Phone Number'
]
users_data = []
current_time = int(time.time() * 1000)
for user in batch_dto.users:
user_data = {
'UID': str(user.id),
'Email': user.email,
'Email Verified': False,
'Password Hash': user.passwordHash,
'Password Salt': user.passwordSalt,
'Name': '',
'Photo URL': '',
'Google ID': '',
'Google Email': '',
'Google Display Name': '',
'Google Photo URL': '',
'Facebook ID': '',
'Facebook Email': '',
'Facebook Display Name': '',
'Facebook Photo URL': '',
'Twitter ID': '',
'Twitter Email': '',
'Twitter Display Name': '',
'Twitter Photo URL': '',
'GitHub ID': '',
'GitHub Email': '',
'GitHub Display Name': '',
'GitHub Photo URL': '',
'User Creation Time': current_time,
'Last Sign-In Time': '',
'Phone Number': ''
}
users_data.append(user_data)
df = pd.DataFrame(users_data, columns=columns)
df.to_csv(path, index=False, header=False)
@staticmethod
def _upload_users(directory: str, file_name: str):
command = (
f'firebase auth:import {file_name} '
f'--hash-algo=SCRYPT '
f'--hash-key={os.getenv("FIREBASE_SCRYPT_B64_SIGNER_KEY")} '
f'--salt-separator={os.getenv("FIREBASE_SCRYPT_B64_SALT_SEPARATOR")} '
f'--rounds={os.getenv("FIREBASE_SCRYPT_ROUNDS")} '
f'--mem-cost={os.getenv("FIREBASE_SCRYPT_MEM_COST")} '
f'--project={os.getenv("FIREBASE_PROJECT_ID")} '
)
result = subprocess.run(command, shell=True, cwd=directory, capture_output=True, text=True)
return result
def _init_users(self, batch_users: BatchUsersDTO):
maker_id = batch_users.makerID
for user in batch_users.users:
self._insert_new_user(user)
code = self._create_code(user, maker_id)
if user.groupName and len(user.groupName.strip()) > 0:
self._assign_user_to_group_by_name(user, maker_id)
def _insert_new_user(self, user: UserDTO):
new_user = {
**user.dict(exclude={
'passport_id', 'groupName', 'expiryDate',
'corporate', 'passwordHash', 'passwordSalt'
}),
'id': str(user.id),
'bio': "",
'focus': "academic",
'status': "active",
'desiredLevels': self._DEFAULT_DESIRED_LEVELS,
'profilePicture': "/defaultAvatar.png",
'levels': self._DEFAULT_LEVELS,
'isFirstLogin': False,
'isVerified': True,
'registrationDate': datetime.now(),
'subscriptionExpirationDate': user.expiryDate,
'entities': user.entities
}
self._db.users.insert_one(new_user)
def _create_code(self, user: UserDTO, maker_id: str) -> str:
code = shortuuid.ShortUUID().random(length=6)
self._db.codes.insert_one({
'id': code,
'code': code,
'creator': maker_id,
'expiryDate': user.expiryDate,
'type': user.type,
'creationDate': datetime.now(),
'userId': str(user.id),
'email': user.email,
'name': user.name,
'passport_id': user.passport_id
})
return code
def _assign_user_to_group_by_name(self, user: UserDTO, maker_id: str):
user_id = str(user.id)
groups = list(self._db.groups.find(
{
"admin": maker_id,
"name": user.groupName.strip()
}
))
if len(groups) == 0:
new_group = {
'id': str(uuid.uuid4()),
'admin': maker_id,
'name': user.groupName.strip(),
'participants': [user_id],
'disableEditing': False,
}
self._db.groups.insert_one(new_group)
else:
group = groups[0]
participants = group["participants"]
if user_id not in participants:
participants.append(user_id)
self._db.groups.update_one(
{"id": group["id"]},
{"$set": {"participants": participants}}
)