import os import subprocess import time import uuid from datetime import datetime from logging import getLogger import pandas as pd from typing import Dict import shortuuid from google.cloud.firestore_v1 import Client from google.cloud.firestore_v1.base_query import FieldFilter from modules.batch_users.batch_users import BatchUsersDTO, UserDTO from modules.helper.file_helper import FileHelper class BatchUsers: _DEFAULT_DESIRED_LEVELS = { "reading": 9, "listening": 9, "writing": 9, "speaking": 9, } _DEFAULT_LEVELS = { "reading": 0, "listening": 0, "writing": 0, "speaking": 0, } def __init__(self, firestore: Client): self._db = firestore self._logger = getLogger(__name__) def batch_users(self, request_data: Dict): batch_dto = self._map_to_batch(request_data) file_name = f'{uuid.uuid4()}.csv' path = f'./tmp/{file_name}' self._generate_firebase_auth_csv(batch_dto, path) result = self._upload_users('./tmp', file_name) if result.returncode != 0: error_msg = f"Couldn't upload users. Failed to run command firebase auth import -> ```cmd {result.stderr}```" self._logger.error(error_msg) return error_msg self._init_users(batch_dto) FileHelper.remove_file(path) return {"ok": True} @staticmethod def _map_to_batch(request_data: Dict) -> BatchUsersDTO: users: list[UserDTO] = [UserDTO(**user) for user in request_data["users"]] return BatchUsersDTO(makerID=request_data["makerID"], users=users) @staticmethod def _generate_firebase_auth_csv(batch_dto: BatchUsersDTO, path: str): # https://firebase.google.com/docs/cli/auth#file_format columns = [ 'UID', 'Email', 'Email Verified', 'Password Hash', 'Password Salt', 'Name', 'Photo URL', 'Google ID', 'Google Email', 'Google Display Name', 'Google Photo URL', 'Facebook ID', 'Facebook Email', 'Facebook Display Name', 'Facebook Photo URL', 'Twitter ID', 'Twitter Email', 'Twitter Display Name', 'Twitter Photo URL', 'GitHub ID', 'GitHub Email', 'GitHub Display Name', 'GitHub Photo URL', 'User Creation Time', 'Last Sign-In Time', 'Phone Number' ] users_data = [] current_time = int(time.time() * 1000) for user in batch_dto.users: user_data = { 'UID': str(user.id), 'Email': user.email, 'Email Verified': False, 'Password Hash': user.passwordHash, 'Password Salt': user.passwordSalt, 'Name': '', 'Photo URL': '', 'Google ID': '', 'Google Email': '', 'Google Display Name': '', 'Google Photo URL': '', 'Facebook ID': '', 'Facebook Email': '', 'Facebook Display Name': '', 'Facebook Photo URL': '', 'Twitter ID': '', 'Twitter Email': '', 'Twitter Display Name': '', 'Twitter Photo URL': '', 'GitHub ID': '', 'GitHub Email': '', 'GitHub Display Name': '', 'GitHub Photo URL': '', 'User Creation Time': current_time, 'Last Sign-In Time': '', 'Phone Number': '' } users_data.append(user_data) df = pd.DataFrame(users_data, columns=columns) df.to_csv(path, index=False, header=False) @staticmethod def _upload_users(directory: str, file_name: str): command = ( f'firebase auth:import {file_name} ' f'--hash-algo=SCRYPT ' f'--hash-key={os.getenv("FIREBASE_SCRYPT_B64_SIGNER_KEY")} ' f'--salt-separator={os.getenv("FIREBASE_SCRYPT_B64_SALT_SEPARATOR")} ' f'--rounds={os.getenv("FIREBASE_SCRYPT_ROUNDS")} ' f'--mem-cost={os.getenv("FIREBASE_SCRYPT_MEM_COST")} ' f'--project={os.getenv("FIREBASE_PROJECT_ID")} ' ) result = subprocess.run(command, shell=True, cwd=directory, capture_output=True, text=True) return result def _init_users(self, batch_users: BatchUsersDTO): maker_id = batch_users.makerID for user in batch_users.users: self._insert_new_user(user) code = self._create_code(user, maker_id) if user.type == "corporate": self._set_corporate_default_groups(user) if user.corporate: self._assign_corporate_to_user(user, code) if user.groupName and len(user.groupName.strip()) > 0: self._assign_user_to_group_by_name(user, maker_id) def _insert_new_user(self, user: UserDTO): new_user = { **user.dict(exclude={ 'id', 'passport_id', 'groupName', 'expiryDate', 'corporate', 'passwordHash', 'passwordSalt' }), 'bio': "", 'focus': "academic", 'status': "active", 'desiredLevels': self._DEFAULT_DESIRED_LEVELS, 'profilePicture': "/defaultAvatar.png", 'levels': self._DEFAULT_LEVELS, 'isFirstLogin': False, 'isVerified': True, 'registrationDate': datetime.now(), 'subscriptionExpirationDate': user.expiryDate } self._db.collection('users').document(str(user.id)).set(new_user) def _create_code(self, user: UserDTO, maker_id: str) -> str: code = shortuuid.ShortUUID().random(length=6) self._db.collection('codes').document(code).set({ 'code': code, 'creator': maker_id, 'expiryDate': user.expiryDate, 'type': user.type, 'creationDate': datetime.now(), 'userId': str(user.id), 'email': user.email, 'name': user.name, 'passport_id': user.passport_id }) return code def _set_corporate_default_groups(self, user: UserDTO): user_id = str(user.id) default_groups = [ { 'admin': user_id, 'id': str(uuid.uuid4()), 'name': "Teachers", 'participants': [], 'disableEditing': True, }, { 'admin': user_id, 'id': str(uuid.uuid4()), 'name': "Students", 'participants': [], 'disableEditing': True, }, { 'admin': user_id, 'id': str(uuid.uuid4()), 'name': "Corporate", 'participants': [], 'disableEditing': True, } ] for group in default_groups: self._db.collection('groups').document(group['id']).set(group) def _assign_corporate_to_user(self, user: UserDTO, code: str): user_id = str(user.id) corporate_users = self._db.collection('users').where( filter=FieldFilter('email', '==', user.corporate) ).limit(1).get() if len(corporate_users) > 0: corporate_user = corporate_users[0] self._db.collection('codes').document(code).set({'creator': corporate_user.id}, merge=True) group_type = "Students" if user.type == "student" else "Teachers" groups = self._db.collection('groups').where( filter=FieldFilter('admin', '==', corporate_user.id) ).where( filter=FieldFilter('name', '==', group_type) ).limit(1).get() if len(groups) > 0: group = groups[0] participants = group.get('participants') if user_id not in participants: participants.append(user_id) group.reference.update({'participants': participants}) else: group = { 'admin': corporate_user.id, 'id': str(uuid.uuid4()), 'name': group_type, 'participants': [user_id], 'disableEditing': True, } self._db.collection('groups').document(group['id']).set(group) def _assign_user_to_group_by_name(self, user: UserDTO, maker_id: str): user_id = str(user.id) groups = self._db.collection('groups').where( filter=FieldFilter('admin', '==', maker_id) ).where( filter=FieldFilter('name', '==', user.groupName.strip()) ).limit(1).get() if len(groups) == 0: new_group = { 'id': str(uuid.uuid4()), 'admin': maker_id, 'name': user.groupName.strip(), 'participants': [user_id], 'disableEditing': False, } self._db.collection('groups').document(new_group['id']).set(new_group) else: group = groups[0] participants = group.get('participants') if user_id not in participants: participants.append(user_id) group.reference.update({'participants': participants})