import logging from typing import Optional import aiofiles from httpx import AsyncClient from app.repositories.abc import IFileStorage class FirebaseStorage(IFileStorage): def __init__(self, client: AsyncClient, token: str, bucket: str): self._httpx_client = client self._token = token self._storage_url = f'https://firebasestorage.googleapis.com/v0/b/{bucket}' self._logger = logging.getLogger(__name__) async def download_firebase_file(self, source_blob_name: str, destination_file_name: str) -> Optional[str]: source_blob_name = source_blob_name.replace('/', '%2F') download_url = f"{self._storage_url}/o/{source_blob_name}?alt=media" response = await self._httpx_client.get( download_url, headers={'Authorization': f'Firebase {self._token}'} ) if response.status_code == 200: async with aiofiles.open(destination_file_name, 'wb') as file: await file.write(response.content) self._logger.info(f"File downloaded to {destination_file_name}") return destination_file_name else: self._logger.error(f"Failed to download blob {source_blob_name}. {response.status_code} - {response.content}") return None async def upload_file_firebase_get_url(self, destination_blob_name: str, source_file_name: str) -> Optional[str]: destination_blob_name = destination_blob_name.replace('/', '%2F') upload_url = f"{self._storage_url}/o/{destination_blob_name}" async with aiofiles.open(source_file_name, 'rb') as file: file_bytes = await file.read() response = await self._httpx_client.post( upload_url, headers={ 'Authorization': f'Firebase {self._token}', "X-Goog-Upload-Protocol": "multipart" }, files={ 'metadata': (None, '{"metadata":{"test":"testMetadata"}}', 'application/json'), 'file': file_bytes } ) if response.status_code == 200: self._logger.info(f"File {source_file_name} uploaded to {self._storage_url}/o/{destination_blob_name}.") await self.make_public(destination_blob_name) file_url = f"{self._storage_url}/o/{destination_blob_name}" return file_url else: self._logger.error(f"Failed to upload file {source_file_name}. Error: {response.status_code} - {str(response.content)}") return None async def make_public(self, destination_blob_name: str): acl_url = f"{self._storage_url}/o/{destination_blob_name}/acl" acl = {'entity': 'allUsers', 'role': 'READER'} response = await self._httpx_client.post( acl_url, headers={ 'Authorization': f'Bearer {self._token}', 'Content-Type': 'application/json' }, json=acl ) if response.status_code == 200: self._logger.info(f"Blob {destination_blob_name} is now public.") else: self._logger.error(f"Failed to make blob {destination_blob_name} public. {response.status_code} - {response.content}")