252 lines
8.9 KiB
Python
252 lines
8.9 KiB
Python
import os
|
|
import subprocess
|
|
import time
|
|
import uuid
|
|
from datetime import datetime
|
|
from logging import getLogger
|
|
|
|
import pandas as pd
|
|
from typing import Dict
|
|
|
|
import shortuuid
|
|
from google.cloud.firestore_v1 import Client
|
|
from google.cloud.firestore_v1.base_query import FieldFilter
|
|
|
|
from modules.batch_users.batch_users import BatchUsersDTO, UserDTO
|
|
from modules.helper.file_helper import FileHelper
|
|
|
|
|
|
class BatchUsers:
|
|
|
|
_DEFAULT_DESIRED_LEVELS = {
|
|
"reading": 9,
|
|
"listening": 9,
|
|
"writing": 9,
|
|
"speaking": 9,
|
|
}
|
|
|
|
_DEFAULT_LEVELS = {
|
|
"reading": 0,
|
|
"listening": 0,
|
|
"writing": 0,
|
|
"speaking": 0,
|
|
}
|
|
|
|
def __init__(self, firestore: Client):
|
|
self._db = firestore
|
|
self._logger = getLogger(__name__)
|
|
|
|
def batch_users(self, request_data: Dict):
|
|
batch_dto = self._map_to_batch(request_data)
|
|
|
|
file_name = f'{uuid.uuid4()}.csv'
|
|
path = f'./tmp/{file_name}'
|
|
self._generate_firebase_auth_csv(batch_dto, path)
|
|
|
|
result = self._upload_users('./tmp', file_name)
|
|
if result.returncode != 0:
|
|
error_msg = f"Couldn't upload users. Failed to run command firebase auth import -> ```cmd {result.stderr}```"
|
|
self._logger.error(error_msg)
|
|
return error_msg
|
|
|
|
self._init_users(batch_dto)
|
|
|
|
FileHelper.remove_file(path)
|
|
return {"ok": True}
|
|
|
|
@staticmethod
|
|
def _map_to_batch(request_data: Dict) -> BatchUsersDTO:
|
|
users: list[UserDTO] = [UserDTO(**user) for user in request_data["users"]]
|
|
return BatchUsersDTO(makerID=request_data["makerID"], users=users)
|
|
|
|
@staticmethod
|
|
def _generate_firebase_auth_csv(batch_dto: BatchUsersDTO, path: str):
|
|
# https://firebase.google.com/docs/cli/auth#file_format
|
|
columns = [
|
|
'UID', 'Email', 'Email Verified', 'Password Hash', 'Password Salt', 'Name',
|
|
'Photo URL', 'Google ID', 'Google Email', 'Google Display Name', 'Google Photo URL',
|
|
'Facebook ID', 'Facebook Email', 'Facebook Display Name', 'Facebook Photo URL',
|
|
'Twitter ID', 'Twitter Email', 'Twitter Display Name', 'Twitter Photo URL',
|
|
'GitHub ID', 'GitHub Email', 'GitHub Display Name', 'GitHub Photo URL',
|
|
'User Creation Time', 'Last Sign-In Time', 'Phone Number'
|
|
]
|
|
users_data = []
|
|
|
|
current_time = int(time.time() * 1000)
|
|
|
|
for user in batch_dto.users:
|
|
user_data = {
|
|
'UID': str(user.id),
|
|
'Email': user.email,
|
|
'Email Verified': False,
|
|
'Password Hash': user.passwordHash,
|
|
'Password Salt': user.passwordSalt,
|
|
'Name': '',
|
|
'Photo URL': '',
|
|
'Google ID': '',
|
|
'Google Email': '',
|
|
'Google Display Name': '',
|
|
'Google Photo URL': '',
|
|
'Facebook ID': '',
|
|
'Facebook Email': '',
|
|
'Facebook Display Name': '',
|
|
'Facebook Photo URL': '',
|
|
'Twitter ID': '',
|
|
'Twitter Email': '',
|
|
'Twitter Display Name': '',
|
|
'Twitter Photo URL': '',
|
|
'GitHub ID': '',
|
|
'GitHub Email': '',
|
|
'GitHub Display Name': '',
|
|
'GitHub Photo URL': '',
|
|
'User Creation Time': current_time,
|
|
'Last Sign-In Time': '',
|
|
'Phone Number': ''
|
|
}
|
|
users_data.append(user_data)
|
|
|
|
df = pd.DataFrame(users_data, columns=columns)
|
|
df.to_csv(path, index=False, header=False)
|
|
|
|
@staticmethod
|
|
def _upload_users(directory: str, file_name: str):
|
|
command = (
|
|
f'firebase auth:import {file_name} '
|
|
f'--hash-algo=SCRYPT '
|
|
f'--hash-key={os.getenv("FIREBASE_SCRYPT_B64_SIGNER_KEY")} '
|
|
f'--salt-separator={os.getenv("FIREBASE_SCRYPT_B64_SALT_SEPARATOR")} '
|
|
f'--rounds={os.getenv("FIREBASE_SCRYPT_ROUNDS")} '
|
|
f'--mem-cost={os.getenv("FIREBASE_SCRYPT_MEM_COST")} '
|
|
f'--project={os.getenv("FIREBASE_PROJECT_ID")} '
|
|
f'--token={os.getenv("FIREBASE_CLI_TOKEN")}'
|
|
)
|
|
|
|
result = subprocess.run(command, shell=True, cwd=directory, capture_output=True, text=True)
|
|
return result
|
|
|
|
def _init_users(self, batch_users: BatchUsersDTO):
|
|
maker_id = batch_users.makerID
|
|
for user in batch_users.users:
|
|
self._insert_new_user(user)
|
|
code = self._create_code(user, maker_id)
|
|
|
|
if user.type == "corporate":
|
|
self._set_corporate_default_groups(user)
|
|
|
|
if user.corporate:
|
|
self._assign_corporate_to_user(user, code)
|
|
|
|
if user.groupName and len(user.groupName.strip()) > 0:
|
|
self._assign_user_to_group_by_name(user, maker_id)
|
|
|
|
def _insert_new_user(self, user: UserDTO):
|
|
new_user = {
|
|
**user.dict(exclude={
|
|
'id', 'passport_id', 'groupName', 'expiryDate',
|
|
'corporate', 'passwordHash', 'passwordSalt'
|
|
}),
|
|
'bio': "",
|
|
'focus': "academic",
|
|
'status': "active",
|
|
'desiredLevels': self._DEFAULT_DESIRED_LEVELS,
|
|
'profilePicture': "/defaultAvatar.png",
|
|
'levels': self._DEFAULT_LEVELS,
|
|
'isFirstLogin': False,
|
|
'isVerified': True,
|
|
'registrationDate': datetime.now(),
|
|
'subscriptionExpirationDate': user.expiryDate
|
|
}
|
|
self._db.collection('users').document(str(user.id)).set(new_user)
|
|
|
|
def _create_code(self, user: UserDTO, maker_id: str) -> str:
|
|
code = shortuuid.ShortUUID().random(length=6)
|
|
self._db.collection('codes').document(code).set({
|
|
'code': code,
|
|
'creator': maker_id,
|
|
'expiryDate': user.expiryDate,
|
|
'type': user.type,
|
|
'creationDate': datetime.now(),
|
|
'userId': str(user.id),
|
|
'email': user.email,
|
|
'name': user.name,
|
|
'passport_id': user.passport_id
|
|
})
|
|
return code
|
|
|
|
def _set_corporate_default_groups(self, user: UserDTO):
|
|
user_id = str(user.id)
|
|
default_groups = [
|
|
{
|
|
'admin': user_id,
|
|
'id': str(uuid.uuid4()),
|
|
'name': "Teachers",
|
|
'participants': [],
|
|
'disableEditing': True,
|
|
},
|
|
{
|
|
'admin': user_id,
|
|
'id': str(uuid.uuid4()),
|
|
'name': "Students",
|
|
'participants': [],
|
|
'disableEditing': True,
|
|
},
|
|
{
|
|
'admin': user_id,
|
|
'id': str(uuid.uuid4()),
|
|
'name': "Corporate",
|
|
'participants': [],
|
|
'disableEditing': True,
|
|
}
|
|
]
|
|
for group in default_groups:
|
|
self._db.collection('groups').document(group['id']).set(group)
|
|
|
|
def _assign_corporate_to_user(self, user: UserDTO, code: str):
|
|
user_id = str(user.id)
|
|
corporate_users = self._db.collection('users').where(
|
|
filter=FieldFilter('email', '==', user.corporate)
|
|
).limit(1).get()
|
|
if len(corporate_users) > 0:
|
|
corporate_user = corporate_users[0]
|
|
self._db.collection('codes').document(code).set({'creator': corporate_user.id}, merge=True)
|
|
|
|
group_type = "Students" if user.type == "student" else "Teachers"
|
|
|
|
groups = self._db.collection('groups').where(
|
|
filter=FieldFilter('admin', '==', corporate_user.id)
|
|
).where(
|
|
filter=FieldFilter('name', '==', group_type)
|
|
).limit(1).get()
|
|
|
|
if len(groups) > 0:
|
|
group = groups[0]
|
|
participants = group.get('participants')
|
|
if user_id not in participants:
|
|
participants.append(user_id)
|
|
group.reference.update({'participants': participants})
|
|
|
|
def _assign_user_to_group_by_name(self, user: UserDTO, maker_id: str):
|
|
user_id = str(user.id)
|
|
|
|
groups = self._db.collection('groups').where(
|
|
filter=FieldFilter('admin', '==', maker_id)
|
|
).where(
|
|
filter=FieldFilter('name', '==', user.groupName.strip())
|
|
).limit(1).get()
|
|
|
|
if len(groups) == 0:
|
|
new_group = {
|
|
'id': str(uuid.uuid4()),
|
|
'admin': maker_id,
|
|
'name': user.groupName.strip(),
|
|
'participants': [user_id],
|
|
'disableEditing': False,
|
|
}
|
|
self._db.collection('groups').document(new_group['id']).set(new_group)
|
|
else:
|
|
group = groups[0]
|
|
participants = group.get('participants')
|
|
if user_id not in participants:
|
|
participants.append(user_id)
|
|
group.reference.update({'participants': participants})
|