from __future__ import annotations import json import sys import time from typing import Optional, TypedDict import aiohttp from libs.config import Config class MediaWikiApiException(Exception): def __init__(self, info: str, code: Optional[str] = None) -> None: super().__init__(info) self.info = info self.code = code self.message = self.info def __str__(self) -> str: return self.info class MediaWikiPageNotFoundException(Exception): def __init__(self, info: str, code: Optional[str] = None) -> None: super().__init__(info) self.info = info self.code = code self.message = self.info def __str__(self) -> str: return self.info class MediaWikiUserNoEnoughPointsException(Exception): def __init__(self, info: str, code: Optional[str] = None) -> None: super().__init__(info) self.info = info self.code = code self.message = self.info def __str__(self) -> str: return self.info class GetAllPagesResponse(TypedDict): title_list: list[str] continue_key: Optional[str] class ChatCompleteGetPointUsageResponse(TypedDict): point_usage: int class ChatCompleteReportUsageResponse(TypedDict): transaction_id: str class MediaWikiApi: instance: MediaWikiApi = None @staticmethod def create(): mw_api = Config.get("mw.api_endpoint", "https://www.isekai.cn/api.php") if MediaWikiApi.instance is None: MediaWikiApi.instance = MediaWikiApi(mw_api) return MediaWikiApi.instance def __init__(self, api_url: str): self.api_url = api_url self.request_proxy = Config.get("request.proxy", type=str, empty_is_none=True) self.cookie_jar = aiohttp.CookieJar(unsafe=True) self.login_identity = None self.login_time = 0.0 async def get_page_info(self, title: str): async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "query", "format": "json", "formatversion": "2", "prop": "info", "titles": title, "inprop": "url" } async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) if "missing" in data["query"]["pages"][0]: raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"]) return data["query"]["pages"][0] async def parse_page(self, title: str): async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "parse", "format": "json", "formatversion": "2", "prop": "text", "page": title, "disableeditsection": "true", "disabletoc": "true", "disablelimitreport": "true", } async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) return data["parse"]["text"] async def get_site_meta(self): async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "query", "format": "json", "formatversion": "2", "meta": "siteinfo|userinfo", "siprop": "general" } async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) ret = { "sitename": "Unknown", "user": "Anonymous", } if "query" in data: if "general" in data["query"]: ret["sitename"] = data["query"]["general"]["sitename"] if "userinfo" in data["query"]: ret["user"] = data["query"]["userinfo"]["name"] return ret async def get_all_pages(self, continue_key: Optional[str] = None, start_title: Optional[str] = None) -> GetAllPagesResponse: async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "query", "format": "json", "formatversion": "2", "list": "allpages", "apnamespace": 0, "apfilterredir": "nonredirects", "aplimit": 100, } if continue_key is not None: params["apcontinue"] = continue_key if start_title is not None: params["apfrom"] = start_title async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) title_list = [] for page in data["query"]["allpages"]: title_list.append(page["title"]) ret = GetAllPagesResponse(title_list=title_list, continue_key=None) if "continue" in data and "apcontinue" in data["continue"]: ret["continue_key"] = data["continue"]["apcontinue"] return ret async def is_logged_in(self): async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "query", "format": "json", "formatversion": "2", "meta": "userinfo" } async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) return data["query"]["userinfo"]["id"] != 0 async def get_token(self, token_type: str): async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "query", "format": "json", "formatversion": "2", "meta": "tokens", "type": token_type } async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) return data["query"]["tokens"][token_type + "token"] async def robot_login(self, username: str, password: str): if await self.is_logged_in(): self.login_time = time.time() return True token = await self.get_token("login") async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: post_data = { "action": "login", "format": "json", "formatversion": "2", "lgname": username, "lgpassword": password, "lgtoken": token, } async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) if "result" not in data["login"] or data["login"]["result"] != "Success": raise MediaWikiApiException("Login failed", data["login"]["result"]) self.login_time = time.time() self.login_identity = { "username": username, "password": password, } return True async def refresh_login(self): if self.login_identity is None: print("刷新MW机器人账号登录状态失败:没有保存的用户") return False if time.time() - self.login_time > 3600: print("刷新MW机器人账号登录状态") return await self.robot_login(self.login_identity["username"], self.login_identity["password"]) async def search_title(self, keyword: str) -> list[str]: async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "opensearch", "search": keyword, "namespace": 0, "format": "json", } async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() return data[1] async def ai_toolbox_get_user_info(self, user_id: int): await self.refresh_login() async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: params = { "action": "aitoolboxbot", "method": "userinfo", "userid": user_id, "format": "json", "formatversion": "2", } async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: if data["error"]["code"] == "user-not-found": raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"]) else: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) return data["aitoolboxbot"]["userinfo"] async def ai_toolbox_start_transaction(self, user_id: int, user_action: str, bot_id: Optional[str] = None, tokens: Optional[int] = None, point_usage: Optional[int] = None) -> ChatCompleteReportUsageResponse: await self.refresh_login() async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: post_data = { "action": "aitoolboxbot", "method": "reportusage", "step": "start", "userid": int(user_id) if user_id is not None else None, "useraction": user_action, "botid": bot_id, "tokens": int(tokens) if tokens is not None else None, "pointusage": int(point_usage) if point_usage is not None else 0, "format": "json", "formatversion": "2", } # Filter out None values post_data = {k: v for k, v in post_data.items() if v is not None} async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: if data["error"]["code"] == "noenoughpoints": raise MediaWikiUserNoEnoughPointsException(data["error"]["info"], data["error"]["info"]) else: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) return ChatCompleteReportUsageResponse(transaction_id=data["aitoolboxbot"]["reportusage"]["transactionid"]) async def ai_toolbox_end_transaction(self, transaction_id: str, point_usage: Optional[int] = None, tokens: Optional[int] = None): await self.refresh_login() try: async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: post_data = { "action": "aitoolboxbot", "method": "reportusage", "step": "end", "transactionid": transaction_id, "pointusage": int(point_usage) if point_usage is not None else 0, "format": "json", "formatversion": "2", } # Filter out None values post_data = {k: v for k, v in post_data.items() if v is not None} async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) return data["aitoolboxbot"]["reportusage"]["success"] except Exception as e: print(e, file=sys.stderr) async def ai_toolbox_cancel_transaction(self, transaction_id: str, error: Optional[str] = None): await self.refresh_login() try: async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session: post_data = { "action": "aitoolboxbot", "method": "reportusage", "step": "cancel", "transactionid": transaction_id, "error": error, "format": "json", "formatversion": "2", } # Filter out None values post_data = {k: v for k, v in post_data.items() if v is not None} async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp: data = await resp.json() if "error" in data: raise MediaWikiApiException(data["error"]["info"], data["error"]["code"]) return data["aitoolboxbot"]["reportusage"]["success"] except Exception as e: print(e, file=sys.stderr)