from __future__ import annotations import utils.local as local from urllib.parse import quote_plus from aiohttp import web import asyncpg from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine from libs.config import Config def get_dsn(): db_conf = Config.get("database") return "postgresql+asyncpg://%s:%s@%s:%s/%s" % ( quote_plus(db_conf["user"]), quote_plus(db_conf["password"]), db_conf["host"], db_conf["port"], quote_plus(db_conf["database"])) class DatabaseService: instance = None @staticmethod async def create(app: web.Application = None) -> DatabaseService: if app is None: if DatabaseService.instance is None: DatabaseService.instance = DatabaseService() await DatabaseService.instance.init() return DatabaseService.instance else: if "database" not in app: instance = DatabaseService() await instance.init() app["database"] = instance return app["database"] def __init__(self): self.pool: asyncpg.pool.Pool = None self.engine: AsyncEngine = None self.create_session: async_sessionmaker[AsyncSession] = None async def init(self): db_conf = Config.get("database") loop = local.loop self.pool = await asyncpg.create_pool(**db_conf, loop=loop) engine = create_async_engine(get_dsn(), echo=local.debug) self.engine = engine self.create_session = async_sessionmaker(engine, expire_on_commit=False) async def close(self): await self.engine.dispose() await self.pool.close()