Files
encoach_backend/helper/firebase_helper.py
2024-09-07 19:14:40 +01:00

66 lines
2.2 KiB
Python

import logging
from google.cloud import storage
from pymongo.database import Database
def download_firebase_file(bucket_name, source_blob_name, destination_file_name):
# Downloads a file from Firebase Storage.
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
logging.info(f"File downloaded to {destination_file_name}")
return destination_file_name
def upload_file_firebase(bucket_name, destination_blob_name, source_file_name):
# Uploads a file to Firebase Storage.
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
try:
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
logging.info(f"File uploaded to {destination_blob_name}")
return True
except Exception as e:
import app
app.app.logger.error("Error uploading file to Google Cloud Storage: " + str(e))
return False
def upload_file_firebase_get_url(bucket_name, destination_blob_name, source_file_name):
# Uploads a file to Firebase Storage.
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
try:
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
logging.info(f"File uploaded to {destination_blob_name}")
# Make the file public
blob.make_public()
# Get the public URL
url = blob.public_url
return url
except Exception as e:
import app
app.app.logger.error("Error uploading file to Google Cloud Storage: " + str(e))
return None
def save_to_db_with_id(mongo_db: Database, collection: str, item, id: str):
collection_ref = mongo_db[collection]
document_ref = collection_ref.insert_one({"id": id, **item})
if document_ref:
logging.info(f"Document added with ID: {document_ref.inserted_id}")
return (True, document_ref.inserted_id)
else:
return (False, None)
def get_all(mongo_db: Database, collection: str):
return list(mongo_db[collection].find())