You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

359 lines
14 KiB
Python

from __future__ import annotations
import json
import sys
import time
from typing import Optional, TypedDict
import aiohttp
from libs.config import Config
class MediaWikiApiException(Exception):
def __init__(self, info: str, code: Optional[str] = None) -> None:
super().__init__(info)
self.info = info
self.code = code
self.message = self.info
def __str__(self) -> str:
return self.info
class MediaWikiPageNotFoundException(Exception):
def __init__(self, info: str, code: Optional[str] = None) -> None:
super().__init__(info)
self.info = info
self.code = code
self.message = self.info
def __str__(self) -> str:
return self.info
class MediaWikiUserNoEnoughPointsException(Exception):
def __init__(self, info: str, code: Optional[str] = None) -> None:
super().__init__(info)
self.info = info
self.code = code
self.message = self.info
def __str__(self) -> str:
return self.info
class GetAllPagesResponse(TypedDict):
title_list: list[str]
continue_key: Optional[str]
class ChatCompleteGetPointUsageResponse(TypedDict):
point_usage: int
class ChatCompleteReportUsageResponse(TypedDict):
transaction_id: str
class MediaWikiApi:
instance: MediaWikiApi = None
@staticmethod
def create():
mw_api = Config.get("mw.api_endpoint", "https://www.isekai.cn/api.php")
if MediaWikiApi.instance is None:
MediaWikiApi.instance = MediaWikiApi(mw_api)
return MediaWikiApi.instance
def __init__(self, api_url: str):
self.api_url = api_url
self.request_proxy = Config.get("request.proxy", type=str, empty_is_none=True)
self.cookie_jar = aiohttp.CookieJar(unsafe=True)
self.login_identity = None
self.login_time = 0.0
async def get_page_info(self, title: str):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"prop": "info",
"titles": title,
"inprop": "url"
}
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
if "missing" in data["query"]["pages"][0]:
raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"])
return data["query"]["pages"][0]
async def parse_page(self, title: str):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "parse",
"format": "json",
"formatversion": "2",
"prop": "text",
"page": title,
"disableeditsection": "true",
"disabletoc": "true",
"disablelimitreport": "true",
}
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["parse"]["text"]
async def get_site_meta(self):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"meta": "siteinfo|userinfo",
"siprop": "general"
}
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
ret = {
"sitename": "Unknown",
"user": "Anonymous",
}
if "query" in data:
if "general" in data["query"]:
ret["sitename"] = data["query"]["general"]["sitename"]
if "userinfo" in data["query"]:
ret["user"] = data["query"]["userinfo"]["name"]
return ret
async def get_all_pages(self, continue_key: Optional[str] = None, start_title: Optional[str] = None) -> GetAllPagesResponse:
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"list": "allpages",
"apnamespace": 0,
"apfilterredir": "nonredirects",
"aplimit": 100,
}
if continue_key is not None:
params["apcontinue"] = continue_key
if start_title is not None:
params["apfrom"] = start_title
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
title_list = []
for page in data["query"]["allpages"]:
title_list.append(page["title"])
ret = GetAllPagesResponse(title_list=title_list, continue_key=None)
if "continue" in data and "apcontinue" in data["continue"]:
ret["continue_key"] = data["continue"]["apcontinue"]
return ret
async def is_logged_in(self):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"meta": "userinfo"
}
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["query"]["userinfo"]["id"] != 0
async def get_token(self, token_type: str):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"meta": "tokens",
"type": token_type
}
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["query"]["tokens"][token_type + "token"]
async def robot_login(self, username: str, password: str):
if await self.is_logged_in():
self.login_time = time.time()
return True
token = await self.get_token("login")
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "login",
"format": "json",
"formatversion": "2",
"lgname": username,
"lgpassword": password,
"lgtoken": token,
}
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
if "result" not in data["login"] or data["login"]["result"] != "Success":
raise MediaWikiApiException("Login failed", data["login"]["result"])
self.login_time = time.time()
self.login_identity = {
"username": username,
"password": password,
}
return True
async def refresh_login(self):
if self.login_identity is None:
print("刷新MW机器人账号登录状态失败没有保存的用户")
return False
if time.time() - self.login_time > 3600:
print("刷新MW机器人账号登录状态")
return await self.robot_login(self.login_identity["username"], self.login_identity["password"])
async def search_title(self, keyword: str) -> list[str]:
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "opensearch",
"search": keyword,
"namespace": 0,
"format": "json",
}
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
return data[1]
async def ai_toolbox_get_user_info(self, user_id: int):
await self.refresh_login()
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "aitoolboxbot",
"method": "userinfo",
"userid": user_id,
"format": "json",
"formatversion": "2",
}
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
if data["error"]["code"] == "user-not-found":
raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"])
else:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["aitoolboxbot"]["userinfo"]
async def ai_toolbox_start_transaction(self, user_id: int, user_action: str, bot_id: Optional[str] = None,
tokens: Optional[int] = None, point_usage: Optional[int] = None) -> ChatCompleteReportUsageResponse:
await self.refresh_login()
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "aitoolboxbot",
"method": "reportusage",
"step": "start",
"userid": int(user_id) if user_id is not None else None,
"useraction": user_action,
"botid": bot_id,
"tokens": int(tokens) if tokens is not None else None,
"pointusage": int(point_usage) if point_usage is not None else 0,
"format": "json",
"formatversion": "2",
}
# Filter out None values
post_data = {k: v for k, v in post_data.items() if v is not None}
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
if data["error"]["code"] == "noenoughpoints":
raise MediaWikiUserNoEnoughPointsException(data["error"]["info"], data["error"]["info"])
else:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return ChatCompleteReportUsageResponse(transaction_id=data["aitoolboxbot"]["reportusage"]["transactionid"])
async def ai_toolbox_end_transaction(self, transaction_id: str, point_usage: Optional[int] = None, tokens: Optional[int] = None):
await self.refresh_login()
try:
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "aitoolboxbot",
"method": "reportusage",
"step": "end",
"transactionid": transaction_id,
"pointusage": int(point_usage) if point_usage is not None else 0,
"format": "json",
"formatversion": "2",
}
# Filter out None values
post_data = {k: v for k, v in post_data.items() if v is not None}
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["aitoolboxbot"]["reportusage"]["success"]
except Exception as e:
print(e, file=sys.stderr)
async def ai_toolbox_cancel_transaction(self, transaction_id: str, error: Optional[str] = None):
await self.refresh_login()
try:
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "aitoolboxbot",
"method": "reportusage",
"step": "cancel",
"transactionid": transaction_id,
"error": error,
"format": "json",
"formatversion": "2",
}
# Filter out None values
post_data = {k: v for k, v in post_data.items() if v is not None}
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["aitoolboxbot"]["reportusage"]["success"]
except Exception as e:
print(e, file=sys.stderr)