from __future__ import annotations import local from urllib.parse import quote_plus from aiohttp import web import asyncpg from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine import config def get_dsn(): return "postgresql+asyncpg://%s:%s@%s:%s/%s" % ( quote_plus(config.DATABASE["user"]), quote_plus(config.DATABASE["password"]), config.DATABASE["host"], config.DATABASE["port"], quote_plus(config.DATABASE["database"])) class DatabaseService: instance = None @staticmethod async def create(app: web.Application = None) -> DatabaseService: if app is None: if DatabaseService.instance is None: DatabaseService.instance = DatabaseService() await DatabaseService.instance.init() return DatabaseService.instance else: if "database" not in app: instance = DatabaseService() await instance.init() app["database"] = instance return app["database"] def __init__(self): self.pool: asyncpg.pool.Pool = None self.engine: AsyncEngine = None self.create_session: async_sessionmaker[AsyncSession] = None async def init(self): loop = local.loop self.pool = await asyncpg.create_pool(**config.DATABASE, loop=loop) engine = create_async_engine(get_dsn(), echo=config.DEBUG) self.engine = engine self.create_session = async_sessionmaker(engine, expire_on_commit=False)