You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
13 KiB
Python

import sys
import time
import traceback
from aiohttp import web
from server.model.toolbox_ui.conversation import ConversationHelper
from server.model.toolbox_ui.page_title import PageTitleHelper
from service.database import DatabaseService
from service.event import EventService
from service.mediawiki_api import MediaWikiApi, MediaWikiApiException, MediaWikiPageNotFoundException
import utils.web
class ToolboxIndex:
@staticmethod
@utils.web.token_auth
async def update_title_info(request: web.Request):
params = await utils.web.get_param(request, {
"title": {
"required": True,
}
})
title = params.get("title")
mwapi = MediaWikiApi.create()
db = await DatabaseService.create(request.app)
async with PageTitleHelper(db) as page_title_helper:
title_info = await page_title_helper.find_by_title(title)
if title_info is not None and time.time() - title_info.updated_at < 60:
return await utils.web.api_response(1, {
"cached": True,
"title": title_info.title,
"page_id": title_info.page_id,
}, request=request)
# Load page info from MediaWiki API
try:
page_info = await mwapi.get_page_info(title)
page_id = page_info.get("pageid")
real_title = page_info.get("title")
if title_info is None:
title_info = await page_title_helper.add(page_id, real_title)
else:
title_info.page_id = page_id
title_info.title = real_title
await page_title_helper.update(title_info)
return await utils.web.api_response(1, {
"cached": False,
"title": real_title,
"page_id": page_id
}, request=request)
except MediaWikiPageNotFoundException:
error_msg = "Page \"%s\" not found." % title
return await utils.web.api_response(-2, error={
"code": "page-not-found",
"message": error_msg
}, request=request, http_status=404)
except MediaWikiApiException as e:
error_msg = "MediaWiki API error: %s" % e.info
print(error_msg, file=sys.stderr)
traceback.print_exc()
return await utils.web.api_response(-3, error={
"code": "mediawiki-api-error",
"message": error_msg
}, request=request, http_status=500)
except Exception as e:
error_msg = str(e)
print(error_msg, file=sys.stderr)
traceback.print_exc()
return await utils.web.api_response(-1, error={
"code": "internal-server-error",
"message": error_msg
}, request=request, http_status=500)
@staticmethod
async def search_title(request: web.Request):
params = await utils.web.get_param(request, {
"kw": {
"required": True,
}
})
keyword = params.get("kw")
mwapi = MediaWikiApi.create()
search_result = await mwapi.search_title(keyword)
return await utils.web.api_response(1, {
"titles": search_result
}, request=request)
@staticmethod
@utils.web.token_auth
async def get_conversation_list(request: web.Request):
params = await utils.web.get_param(request, {
"user_id": {
"required": False,
"type": int
},
"title": {
"required": False,
},
"module": {
"required": False
}
})
if request.get("caller") == "user":
user_id = request.get("user")
else:
user_id = params.get("user_id")
page_title = params.get("title")
module = params.get("module")
db = await DatabaseService.create(request.app)
page_id = None
if page_title is not None:
async with PageTitleHelper(db) as page_title_helper:
page_id = await page_title_helper.get_page_id_by_title(page_title)
if page_id is None:
return await utils.web.api_response(-2, error={
"code": "page-not-found",
"message": "Page not found.",
}, request=request, http_status=404)
async with ConversationHelper(db) as conversation_helper:
conversation_list = await conversation_helper.get_conversation_list(user_id, module=module, page_id=page_id)
conversation_result = []
for result in conversation_list:
conversation_result.append({
"id": result.id,
"module": result.module,
"page_title": result.page_info.title if result.page_info is not None else None,
"title": result.title,
"description": result.description,
"thumbnail": result.thumbnail,
"rev_id": result.rev_id,
"updated_at": result.updated_at,
"pinned": result.pinned,
"extra": result.extra,
})
return await utils.web.api_response(1, {
"conversations": conversation_result
}, request=request)
@staticmethod
@utils.web.token_auth
async def get_conversation_info(request: web.Request):
params = await utils.web.get_param(request, {
"id": {
"required": True,
"type": int
}
})
conversation_id = params.get("id")
db = await DatabaseService.create(request.app)
async with ConversationHelper(db) as conversation_helper:
conversation_info = await conversation_helper.find_by_id(conversation_id)
if conversation_info is None:
return await utils.web.api_response(-2, error={
"code": "conversation-not-found",
"message": "Conversation not found.",
}, request=request, http_status=404)
if request.get("caller") == "user" and int(request.get("user")) != conversation_info.user_id:
return await utils.web.api_response(-3, error={
"code": "permission-denied",
"message": "Permission denied."
}, request=request, http_status=403)
conversation_result = {
"id": conversation_info.id,
"module": conversation_info.module,
"title": conversation_info.title,
"description": conversation_info.description,
"thumbnail": conversation_info.thumbnail,
"rev_id": conversation_info.rev_id,
"updated_at": conversation_info.updated_at,
"pinned": conversation_info.pinned,
"extra": conversation_info.extra,
}
return await utils.web.api_response(1, conversation_result, request=request)
@staticmethod
@utils.web.token_auth
async def remove_conversation(request: web.Request):
params = await utils.web.get_param(request, {
"id": {
"type": int
},
"ids": {
"type": str
}
})
conversation_id = params.get("id")
conversation_ids = params.get("ids")
if conversation_id is None and conversation_ids is None:
return await utils.web.api_response(-2, error={
"code": "invalid-params",
"message": "Invalid params."
}, request=request, http_status=400)
if conversation_id is not None:
conversation_ids = [conversation_id]
else:
conversation_ids = conversation_ids.split(",")
conversation_ids = [int(id) for id in conversation_ids]
db = await DatabaseService.create(request.app)
async with ConversationHelper(db) as conversation_helper:
user_id = None
if request.get("caller") == "user":
user_id = int(request.get("user"))
conversation_ids = await conversation_helper.filter_user_owned_ids(conversation_ids, user_id=user_id)
if len(conversation_ids) > 0:
await conversation_helper.remove_multiple(conversation_ids)
# 通知其他模块删除
events = EventService.create()
events.emit("conversation/removed", {
"ids": conversation_ids,
"dbs": db,
"app": request.app,
})
return await utils.web.api_response(1, data={
"count": len(conversation_ids)
}, request=request)
@staticmethod
@utils.web.token_auth
async def set_conversation_pinned(request: web.Request):
params = await utils.web.get_param(request, {
"id": {
"required": True,
"type": int
},
"pinned": {
"required": True,
"type": bool
}
})
conversation_id = params.get("id")
pinned = params.get("pinned")
db = await DatabaseService.create(request.app)
async with ConversationHelper(db) as conversation_helper:
conversation_info = await conversation_helper.find_by_id(conversation_id)
if conversation_info is None:
return await utils.web.api_response(-2, error={
"code": "conversation-not-found",
"message": "Conversation not found."
}, request=request, http_status=404)
if request.get("caller") == "user" and int(request.get("user")) != conversation_info.user_id:
return await utils.web.api_response(-3, error={
"code": "permission-denied",
"message": "Permission denied."
}, request=request, http_status=403)
conversation_info.pinned = pinned
await conversation_helper.update(conversation_info)
return await utils.web.api_response(1, request=request)
@staticmethod
@utils.web.token_auth
async def set_conversation_title(request: web.Request):
params = await utils.web.get_param(request, {
"id": {
"required": True,
"type": int
},
"new_title": {
"required": True,
"type": str
}
})
conversation_id = params.get("id")
new_title = params.get("new_title")
db = await DatabaseService.create(request.app)
async with ConversationHelper(db) as conversation_helper:
conversation_info = await conversation_helper.find_by_id(conversation_id)
if conversation_info is None:
return await utils.web.api_response(-2, error={
"code": "conversation-not-found",
"message": "Conversation not found."
}, request=request, http_status=404)
if request.get("caller") == "user" and int(request.get("user")) != conversation_info.user_id:
return await utils.web.api_response(-3, error={
"code": "permission-denied",
"message": "Permission denied."
}, request=request, http_status=403)
conversation_info.title = new_title
await conversation_helper.update(conversation_info)
return await utils.web.api_response(1, request=request)
@staticmethod
@utils.web.token_auth
async def get_user_info(request: web.Request):
params = await utils.web.get_param(request, {
"user_id": {
"required": False,
"type": int
}
})
if request.get("caller") == "user":
user_id = request.get("user")
else:
user_id = params.get("user_id")
mwapi = MediaWikiApi.create()
try:
user_info = await mwapi.ai_toolbox_get_user_info(user_id)
return await utils.web.api_response(1, user_info, request=request)
except MediaWikiPageNotFoundException as e:
return await utils.web.api_response(-2, error={
"code": "user-not-found",
"message": "User not found."
}, request=request, http_status=403)
except MediaWikiApiException as e:
err_str = "MediaWiki API error: %s" % e.info
print(err_str, file=sys.stderr)
traceback.print_exc()
return await utils.web.api_response(-3, error={
"code": "mediawiki-api-error",
"info": e.info,
"message": err_str
}, request=request, http_status=500)
except Exception as e:
err_str = str(e)
print(err_str, file=sys.stderr)
traceback.print_exc()
return await utils.web.api_response(-1, error={
"code": "internal-server-error",
"message": err_str
}, request=request, http_status=500)