42 lines
1.4 KiB
Python
42 lines
1.4 KiB
Python
import logging
|
|
import uuid
|
|
from typing import Optional, List, Dict
|
|
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
|
|
from app.repositories.abc import IDocumentStore
|
|
|
|
|
|
class MongoDB(IDocumentStore):
|
|
|
|
def __init__(self, mongo_db: AsyncIOMotorDatabase):
|
|
self._mongo_db = mongo_db
|
|
self._logger = logging.getLogger(__name__)
|
|
|
|
async def save_to_db(self, collection: str, item, doc_id: Optional[str] = None) -> Optional[str]:
|
|
collection_ref = self._mongo_db[collection]
|
|
|
|
if doc_id is None:
|
|
doc_id = str(uuid.uuid4())
|
|
|
|
item['id'] = doc_id
|
|
|
|
result = await collection_ref.insert_one(item)
|
|
if result.inserted_id:
|
|
# returning id instead of _id
|
|
self._logger.info(f"Document added with ID: {doc_id}")
|
|
return doc_id
|
|
|
|
return None
|
|
|
|
async def find(self, collection: str, query: Optional[Dict] = None) -> List[Dict]:
|
|
query = query if query else {}
|
|
cursor = self._mongo_db[collection].find(query)
|
|
return [document async for document in cursor]
|
|
|
|
async def update(self, collection: str, filter_query: Dict, update: Dict) -> Optional[str]:
|
|
return (await self._mongo_db[collection].update_one(filter_query, update, upsert=True)).upserted_id
|
|
|
|
async def get_doc_by_id(self, collection: str, doc_id: str) -> Optional[Dict]:
|
|
return await self._mongo_db[collection].find_one({"id": doc_id})
|