You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

318 lines
14 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import json
import sys
import time
from typing import Optional, TypedDict
import aiohttp
import config
class MediaWikiApiException(Exception):
def __init__(self, info: str, code: Optional[str] = None) -> None:
super().__init__(info)
self.info = info
self.code = code
self.message = self.info
def __str__(self) -> str:
return self.info
class MediaWikiPageNotFoundException(Exception):
def __init__(self, info: str, code: Optional[str] = None) -> None:
super().__init__(info)
self.info = info
self.code = code
self.message = self.info
def __str__(self) -> str:
return self.info
class MediaWikiUserNoEnoughPointsException(Exception):
def __init__(self, info: str, code: Optional[str] = None) -> None:
super().__init__(info)
self.info = info
self.code = code
self.message = self.info
def __str__(self) -> str:
return self.info
class ChatCompleteGetPointUsageResponse(TypedDict):
point_cost: int
class ChatCompleteReportUsageResponse(TypedDict):
point_cost: int
transaction_id: str
class MediaWikiApi:
instance: MediaWikiApi = None
@staticmethod
def create():
if MediaWikiApi.instance is None:
MediaWikiApi.instance = MediaWikiApi(config.MW_API)
return MediaWikiApi.instance
def __init__(self, api_url: str):
self.api_url = api_url
self.cookie_jar = aiohttp.CookieJar(unsafe=True)
self.login_identity = None
self.login_time = 0.0
async def get_page_info(self, title: str):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"prop": "info",
"titles": title,
"inprop": "url"
}
async with session.get(self.api_url, params=params, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
if "missing" in data["query"]["pages"][0]:
raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"])
return data["query"]["pages"][0]
async def parse_page(self, title: str):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "parse",
"format": "json",
"formatversion": "2",
"prop": "text",
"page": title,
"disableeditsection": "true",
"disabletoc": "true",
"disablelimitreport": "true",
}
async with session.get(self.api_url, params=params, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["parse"]["text"]
async def get_site_meta(self):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"meta": "siteinfo|userinfo",
"siprop": "general"
}
async with session.get(self.api_url, params=params, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
ret = {
"sitename": "Unknown",
"user": "Anonymous",
}
if "query" in data:
if "general" in data["query"]:
ret["sitename"] = data["query"]["general"]["sitename"]
if "userinfo" in data["query"]:
ret["user"] = data["query"]["userinfo"]["name"]
return ret
async def is_logged_in(self,):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"meta": "userinfo"
}
async with session.get(self.api_url, params=params, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["query"]["userinfo"]["id"] != 0
async def get_token(self, token_type: str):
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "query",
"format": "json",
"formatversion": "2",
"meta": "tokens",
"type": token_type
}
async with session.get(self.api_url, params=params, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["query"]["tokens"][token_type + "token"]
async def robot_login(self, username: str, password: str):
token = await self.get_token("login")
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "login",
"format": "json",
"formatversion": "2",
"lgname": username,
"lgpassword": password,
"lgtoken": token,
}
async with session.post(self.api_url, data=post_data, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
if "result" not in data["login"] or data["login"]["result"] != "Success":
raise MediaWikiApiException("Login failed")
self.login_time = time.time()
self.login_identity = {
"username": username,
"password": password,
}
return True
async def refresh_login(self):
if self.login_identity is None:
print("刷新MW机器人账号登录状态失败没有保存的用户")
return False
if time.time() - self.login_time > 3600:
print("刷新MW机器人账号登录状态")
return await self.robot_login(self.login_identity["username"], self.login_identity["password"])
async def chat_complete_user_info(self, user_id: int):
await self.refresh_login()
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
params = {
"action": "chatcompletebot",
"method": "userinfo",
"userid": user_id,
"format": "json",
"formatversion": "2",
}
async with session.get(self.api_url, params=params, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
if data["error"]["code"] == "user-not-found":
raise MediaWikiPageNotFoundException(data["error"]["info"], data["error"]["code"])
else:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["chatcompletebot"]["userinfo"]
async def chat_complete_get_point_cost(self, user_id: int, user_action: str, tokens: Optional[int] = None, extractlines: Optional[int] = None) -> ChatCompleteGetPointUsageResponse:
await self.refresh_login()
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "chatcompletebot",
"method": "reportusage",
"step": "check",
"userid": int(user_id) if user_id is not None else None,
"useraction": user_action,
"tokens": int(tokens) if tokens is not None else None,
"extractlines": int(extractlines) if extractlines is not None else None,
"format": "json",
"formatversion": "2",
}
# Filter out None values
post_data = {k: v for k, v in post_data.items() if v is not None}
async with session.post(self.api_url, data=post_data, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
print(data)
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
point_cost = int(data["chatcompletebot"]["reportusage"]["pointcost"] or 0)
return ChatCompleteGetPointUsageResponse(point_cost=point_cost)
async def chat_complete_start_transaction(self, user_id: int, user_action: str, tokens: Optional[int] = None, extractlines: Optional[int] = None) -> ChatCompleteReportUsageResponse:
await self.refresh_login()
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "chatcompletebot",
"method": "reportusage",
"step": "start",
"userid": int(user_id) if user_id is not None else None,
"useraction": user_action,
"tokens": int(tokens) if tokens is not None else None,
"extractlines": int(extractlines) if extractlines is not None else None,
"format": "json",
"formatversion": "2",
}
# Filter out None values
post_data = {k: v for k, v in post_data.items() if v is not None}
async with session.post(self.api_url, data=post_data, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
if data["error"]["code"] == "noenoughpoints":
raise MediaWikiUserNoEnoughPointsException(data["error"]["info"], data["error"]["info"])
else:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
point_cost = int(data["chatcompletebot"]["reportusage"]["pointcost"] or 0)
return ChatCompleteReportUsageResponse(point_cost=point_cost,
transaction_id=data["chatcompletebot"]["reportusage"]["transactionid"])
async def chat_complete_end_transaction(self, transaction_id: str, tokens: Optional[int] = None):
await self.refresh_login()
try:
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "chatcompletebot",
"method": "reportusage",
"step": "end",
"transactionid": transaction_id,
"tokens": tokens,
"format": "json",
"formatversion": "2",
}
# Filter out None values
post_data = {k: v for k, v in post_data.items() if v is not None}
async with session.post(self.api_url, data=post_data, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["chatcompletebot"]["reportusage"]["success"]
except Exception as e:
print(e, file=sys.stderr)
async def chat_complete_cancel_transaction(self, transaction_id: str, error: Optional[str] = None):
await self.refresh_login()
try:
async with aiohttp.ClientSession(cookie_jar=self.cookie_jar) as session:
post_data = {
"action": "chatcompletebot",
"method": "reportusage",
"step": "cancel",
"transactionid": transaction_id,
"error": error,
"format": "json",
"formatversion": "2",
}
# Filter out None values
post_data = {k: v for k, v in post_data.items() if v is not None}
async with session.post(self.api_url, data=post_data, proxy=config.REQUEST_PROXY) as resp:
data = await resp.json()
if "error" in data:
raise MediaWikiApiException(data["error"]["info"], data["error"]["code"])
return data["chatcompletebot"]["reportusage"]["success"]
except Exception as e:
print(e, file=sys.stderr)