|
|
from __future__ import annotations
|
|
|
import json
|
|
|
import sys
|
|
|
import time
|
|
|
from typing import Optional, TypedDict
|
|
|
import aiohttp
|
|
|
from libs.config import Config
|
|
|
|
|
|
class MediaWikiApiException(Exception):
|
|
|
def __init__(self, info: str, code: Optional[str] = None) -> None:
|
|
|
super().__init__(info)
|
|
|
self.info = info
|
|
|
self.code = code
|
|
|
self.message = self.info
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
return self.info
|
|
|
|
|
|
class MediaWikiPageNotFoundException(Exception):
|
|
|
def __init__(self, info: str, code: Optional[str] = None) -> None:
|
|
|
super().__init__(info)
|
|
|
self.info = info
|
|
|
self.code = code
|
|
|
self.message = self.info
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
return self.info
|
|
|
|
|
|
class MediaWikiUserNoEnoughPointsException(Exception):
|
|
|
def __init__(self, info: str, code: Optional[str] = None) -> None:
|
|
|
super().__init__(info)
|
|
|
self.info = info
|
|
|
self.code = code
|
|
|
self.message = self.info
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
return self.info
|
|
|
|
|
|
class GetAllPagesResponse(TypedDict):
|
|
|
title_list: list[str]
|
|
|
continue_key: Optional[str]
|
|
|
|
|
|
class ChatCompleteGetPointUsageResponse(TypedDict):
|
|
|
point_usage: int
|
|
|
|
|
|
class ChatCompleteReportUsageResponse(TypedDict):
|
|
|
transaction_id: str
|
|
|
|
|
|
class MediaWikiApi:
|
|
|
instance: MediaWikiApi = None
|
|
|
|
|
|
@staticmethod
|
|
|
def create():
|
|
|
mw_api = Config.get("mw.api_endpoint", "https://www.isekai.cn/api.php")
|
|
|
if MediaWikiApi.instance is None:
|
|
|
MediaWikiApi.instance = MediaWikiApi(mw_api)
|
|
|
|
|
|
return MediaWikiApi.instance
|
|
|
|
|
|
|
|
|
def __init__(self, api_url: str):
|
|
|
self.api_url = api_url
|
|
|
self.request_proxy = Config.get("request.proxy", type=str, empty_is_none=True)
|
|
|
|
|
|
self.cookie_jar = aiohttp.CookieJar(unsafe=True)
|
|
|
self.login_identity = None
|
|
|
self.login_time = 0.0
|
|
|
|
|
|
|
|
|
async def get_page_info(self, title: str):
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "query",
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
"prop": "info",
|
|
|
"titles": title,
|
|
|
"inprop": "url"
|
|
|
}
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
if "missing" in data["query"]["pages"][0]:
|
|
|
raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return data["query"]["pages"][0]
|
|
|
|
|
|
|
|
|
async def parse_page(self, title: str):
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "parse",
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
"prop": "text",
|
|
|
"page": title,
|
|
|
"disableeditsection": "true",
|
|
|
"disabletoc": "true",
|
|
|
"disablelimitreport": "true",
|
|
|
}
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return data["parse"]["text"]
|
|
|
|
|
|
|
|
|
async def get_site_meta(self):
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "query",
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
"meta": "siteinfo|userinfo",
|
|
|
"siprop": "general"
|
|
|
}
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
ret = {
|
|
|
"sitename": "Unknown",
|
|
|
"user": "Anonymous",
|
|
|
}
|
|
|
if "query" in data:
|
|
|
if "general" in data["query"]:
|
|
|
ret["sitename"] = data["query"]["general"]["sitename"]
|
|
|
if "userinfo" in data["query"]:
|
|
|
ret["user"] = data["query"]["userinfo"]["name"]
|
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
async def get_all_pages(self, continue_key: Optional[str] = None, start_title: Optional[str] = None) -> GetAllPagesResponse:
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "query",
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
"list": "allpages",
|
|
|
"apnamespace": 0,
|
|
|
"apfilterredir": "nonredirects",
|
|
|
"aplimit": 100,
|
|
|
}
|
|
|
if continue_key is not None:
|
|
|
params["apcontinue"] = continue_key
|
|
|
if start_title is not None:
|
|
|
params["apfrom"] = start_title
|
|
|
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
title_list = []
|
|
|
for page in data["query"]["allpages"]:
|
|
|
title_list.append(page["title"])
|
|
|
|
|
|
ret = GetAllPagesResponse(title_list=title_list, continue_key=None)
|
|
|
|
|
|
if "continue" in data and "apcontinue" in data["continue"]:
|
|
|
ret["continue_key"] = data["continue"]["apcontinue"]
|
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
async def is_logged_in(self):
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "query",
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
"meta": "userinfo"
|
|
|
}
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return data["query"]["userinfo"]["id"] != 0
|
|
|
|
|
|
|
|
|
async def get_token(self, token_type: str):
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "query",
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
"meta": "tokens",
|
|
|
"type": token_type
|
|
|
}
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return data["query"]["tokens"][token_type + "token"]
|
|
|
|
|
|
|
|
|
async def robot_login(self, username: str, password: str):
|
|
|
if await self.is_logged_in():
|
|
|
self.login_time = time.time()
|
|
|
return True
|
|
|
|
|
|
token = await self.get_token("login")
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
post_data = {
|
|
|
"action": "login",
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
"lgname": username,
|
|
|
"lgpassword": password,
|
|
|
"lgtoken": token,
|
|
|
}
|
|
|
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
if "result" not in data["login"] or data["login"]["result"] != "Success":
|
|
|
raise MediaWikiApiException("Login failed", data["login"]["result"])
|
|
|
|
|
|
self.login_time = time.time()
|
|
|
self.login_identity = {
|
|
|
"username": username,
|
|
|
"password": password,
|
|
|
}
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
async def refresh_login(self):
|
|
|
if self.login_identity is None:
|
|
|
print("刷新MW机器人账号登录状态失败:没有保存的用户")
|
|
|
return False
|
|
|
if time.time() - self.login_time > 3600:
|
|
|
print("刷新MW机器人账号登录状态")
|
|
|
return await self.robot_login(self.login_identity["username"], self.login_identity["password"])
|
|
|
|
|
|
|
|
|
async def search_title(self, keyword: str) -> list[str]:
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "opensearch",
|
|
|
"search": keyword,
|
|
|
"namespace": 0,
|
|
|
"format": "json",
|
|
|
}
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
return data[1]
|
|
|
|
|
|
|
|
|
async def ai_toolbox_get_user_info(self, user_id: int):
|
|
|
await self.refresh_login()
|
|
|
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
params = {
|
|
|
"action": "aitoolboxbot",
|
|
|
"method": "userinfo",
|
|
|
"userid": user_id,
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
}
|
|
|
async with session.get(self.api_url, params=params, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
if data["error"]["code"] == "user-not-found":
|
|
|
raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"])
|
|
|
else:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return data["aitoolboxbot"]["userinfo"]
|
|
|
|
|
|
|
|
|
async def ai_toolbox_start_transaction(self, user_id: int, user_action: str, bot_id: Optional[str] = None,
|
|
|
tokens: Optional[int] = None, point_usage: Optional[int] = None) -> ChatCompleteReportUsageResponse:
|
|
|
await self.refresh_login()
|
|
|
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
post_data = {
|
|
|
"action": "aitoolboxbot",
|
|
|
"method": "reportusage",
|
|
|
"step": "start",
|
|
|
"userid": int(user_id) if user_id is not None else None,
|
|
|
"useraction": user_action,
|
|
|
"botid": bot_id,
|
|
|
"tokens": int(tokens) if tokens is not None else None,
|
|
|
"pointusage": int(point_usage) if point_usage is not None else 0,
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
}
|
|
|
# Filter out None values
|
|
|
post_data = {k: v for k, v in post_data.items() if v is not None}
|
|
|
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
if data["error"]["code"] == "noenoughpoints":
|
|
|
raise MediaWikiUserNoEnoughPointsException(data["error"]["info"], data["error"]["info"])
|
|
|
else:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return ChatCompleteReportUsageResponse(transaction_id=data["aitoolboxbot"]["reportusage"]["transactionid"])
|
|
|
|
|
|
|
|
|
async def ai_toolbox_end_transaction(self, transaction_id: str, point_usage: Optional[int] = None, tokens: Optional[int] = None):
|
|
|
await self.refresh_login()
|
|
|
|
|
|
try:
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
post_data = {
|
|
|
"action": "aitoolboxbot",
|
|
|
"method": "reportusage",
|
|
|
"step": "end",
|
|
|
"transactionid": transaction_id,
|
|
|
"pointusage": int(point_usage) if point_usage is not None else 0,
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
}
|
|
|
# Filter out None values
|
|
|
post_data = {k: v for k, v in post_data.items() if v is not None}
|
|
|
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return data["aitoolboxbot"]["reportusage"]["success"]
|
|
|
except Exception as e:
|
|
|
print(e, file=sys.stderr)
|
|
|
|
|
|
|
|
|
async def ai_toolbox_cancel_transaction(self, transaction_id: str, error: Optional[str] = None):
|
|
|
await self.refresh_login()
|
|
|
|
|
|
try:
|
|
|
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
|
|
|
post_data = {
|
|
|
"action": "aitoolboxbot",
|
|
|
"method": "reportusage",
|
|
|
"step": "cancel",
|
|
|
"transactionid": transaction_id,
|
|
|
"error": error,
|
|
|
"format": "json",
|
|
|
"formatversion": "2",
|
|
|
}
|
|
|
# Filter out None values
|
|
|
post_data = {k: v for k, v in post_data.items() if v is not None}
|
|
|
async with session.post(self.api_url, data=post_data, proxy=self.request_proxy) as resp:
|
|
|
data = await resp.json()
|
|
|
if "error" in data:
|
|
|
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
|
|
|
|
|
|
return data["aitoolboxbot"]["reportusage"]["success"]
|
|
|
except Exception as e:
|
|
|
print(e, file=sys.stderr) |