import sys import time import traceback from aiohttp import web from server.model.toolbox_ui.conversation import ConversationHelper from server.model.toolbox_ui.page_title import PageTitleHelper from service.database import DatabaseService from service.event import EventService from service.mediawiki_api import MediaWikiApi, MediaWikiApiException, MediaWikiPageNotFoundException import utils.web class ToolboxIndex: @staticmethod @utils.web.token_auth async def update_title_info(request: web.Request): params = await utils.web.get_param(request, { "title": { "required": True, } }) title = params.get("title") mwapi = MediaWikiApi.create() db = await DatabaseService.create(request.app) async with PageTitleHelper(db) as page_title_helper: title_info = await page_title_helper.find_by_title(title) if title_info is not None and time.time() - title_info.updated_at < 60: return await utils.web.api_response(1, { "cached": True, "title": title_info.title, "page_id": title_info.page_id, }, request=request) # Load page info from MediaWiki API try: page_info = await mwapi.get_page_info(title) page_id = page_info.get("pageid") real_title = page_info.get("title") if title_info is None: title_info = await page_title_helper.add(page_id, real_title) else: title_info.page_id = page_id title_info.title = real_title await page_title_helper.update(title_info) return await utils.web.api_response(1, { "cached": False, "title": real_title, "page_id": page_id }, request=request) except MediaWikiPageNotFoundException: error_msg = "Page \"%s\" not found." % title return await utils.web.api_response(-2, error={ "code": "page-not-found", "message": error_msg }, request=request, http_status=404) except MediaWikiApiException as e: error_msg = "MediaWiki API error: %s" % e.info print(error_msg, file=sys.stderr) traceback.print_exc() return await utils.web.api_response(-3, error={ "code": "mediawiki-api-error", "message": error_msg }, request=request, http_status=500) except Exception as e: error_msg = str(e) print(error_msg, file=sys.stderr) traceback.print_exc() return await utils.web.api_response(-1, error={ "code": "internal-server-error", "message": error_msg }, request=request, http_status=500) @staticmethod async def search_title(request: web.Request): params = await utils.web.get_param(request, { "kw": { "required": True, } }) keyword = params.get("kw") mwapi = MediaWikiApi.create() search_result = await mwapi.search_title(keyword) return await utils.web.api_response(1, { "titles": search_result }, request=request) @staticmethod @utils.web.token_auth async def get_conversation_list(request: web.Request): params = await utils.web.get_param(request, { "user_id": { "required": False, "type": int }, "title": { "required": False, }, "module": { "required": False } }) if request.get("caller") == "user": user_id = request.get("user") else: user_id = params.get("user_id") page_title = params.get("title") module = params.get("module") db = await DatabaseService.create(request.app) page_id = None if page_title is not None: async with PageTitleHelper(db) as page_title_helper: page_id = await page_title_helper.get_page_id_by_title(page_title) if page_id is None: return await utils.web.api_response(-2, error={ "code": "page-not-found", "message": "Page not found.", }, request=request, http_status=404) async with ConversationHelper(db) as conversation_helper: conversation_list = await conversation_helper.get_conversation_list(user_id, module=module, page_id=page_id) conversation_result = [] for result in conversation_list: conversation_result.append({ "id": result.id, "module": result.module, "page_title": result.page_info.title if result.page_info is not None else None, "title": result.title, "description": result.description, "thumbnail": result.thumbnail, "rev_id": result.rev_id, "updated_at": result.updated_at, "pinned": result.pinned, "extra": result.extra, }) return await utils.web.api_response(1, { "conversations": conversation_result }, request=request) @staticmethod @utils.web.token_auth async def get_conversation_info(request: web.Request): params = await utils.web.get_param(request, { "id": { "required": True, "type": int } }) conversation_id = params.get("id") db = await DatabaseService.create(request.app) async with ConversationHelper(db) as conversation_helper: conversation_info = await conversation_helper.find_by_id(conversation_id) if conversation_info is None: return await utils.web.api_response(-2, error={ "code": "conversation-not-found", "message": "Conversation not found.", }, request=request, http_status=404) if request.get("caller") == "user" and int(request.get("user")) != conversation_info.user_id: return await utils.web.api_response(-3, error={ "code": "permission-denied", "message": "Permission denied." }, request=request, http_status=403) conversation_result = { "id": conversation_info.id, "module": conversation_info.module, "title": conversation_info.title, "description": conversation_info.description, "thumbnail": conversation_info.thumbnail, "rev_id": conversation_info.rev_id, "updated_at": conversation_info.updated_at, "pinned": conversation_info.pinned, "extra": conversation_info.extra, } return await utils.web.api_response(1, conversation_result, request=request) @staticmethod @utils.web.token_auth async def remove_conversation(request: web.Request): params = await utils.web.get_param(request, { "id": { "type": int }, "ids": { "type": str } }) conversation_id = params.get("id") conversation_ids = params.get("ids") if conversation_id is None and conversation_ids is None: return await utils.web.api_response(-2, error={ "code": "invalid-params", "message": "Invalid params." }, request=request, http_status=400) if conversation_id is not None: conversation_ids = [conversation_id] else: conversation_ids = conversation_ids.split(",") conversation_ids = [int(id) for id in conversation_ids] db = await DatabaseService.create(request.app) async with ConversationHelper(db) as conversation_helper: user_id = None if request.get("caller") == "user": user_id = int(request.get("user")) conversation_ids = await conversation_helper.filter_user_owned_ids(conversation_ids, user_id=user_id) if len(conversation_ids) > 0: await conversation_helper.remove_multiple(conversation_ids) # 通知其他模块删除 events = EventService.create() events.emit("conversation/removed", { "ids": conversation_ids, "dbs": db, "app": request.app, }) return await utils.web.api_response(1, data={ "count": len(conversation_ids) }, request=request) @staticmethod @utils.web.token_auth async def set_conversation_pinned(request: web.Request): params = await utils.web.get_param(request, { "id": { "required": True, "type": int }, "pinned": { "required": True, "type": bool } }) conversation_id = params.get("id") pinned = params.get("pinned") db = await DatabaseService.create(request.app) async with ConversationHelper(db) as conversation_helper: conversation_info = await conversation_helper.find_by_id(conversation_id) if conversation_info is None: return await utils.web.api_response(-2, error={ "code": "conversation-not-found", "message": "Conversation not found." }, request=request, http_status=404) if request.get("caller") == "user" and int(request.get("user")) != conversation_info.user_id: return await utils.web.api_response(-3, error={ "code": "permission-denied", "message": "Permission denied." }, request=request, http_status=403) conversation_info.pinned = pinned await conversation_helper.update(conversation_info) return await utils.web.api_response(1, request=request) @staticmethod @utils.web.token_auth async def set_conversation_title(request: web.Request): params = await utils.web.get_param(request, { "id": { "required": True, "type": int }, "new_title": { "required": True, "type": str } }) conversation_id = params.get("id") new_title = params.get("new_title") db = await DatabaseService.create(request.app) async with ConversationHelper(db) as conversation_helper: conversation_info = await conversation_helper.find_by_id(conversation_id) if conversation_info is None: return await utils.web.api_response(-2, error={ "code": "conversation-not-found", "message": "Conversation not found." }, request=request, http_status=404) if request.get("caller") == "user" and int(request.get("user")) != conversation_info.user_id: return await utils.web.api_response(-3, error={ "code": "permission-denied", "message": "Permission denied." }, request=request, http_status=403) conversation_info.title = new_title await conversation_helper.update(conversation_info) return await utils.web.api_response(1, request=request) @staticmethod @utils.web.token_auth async def get_user_info(request: web.Request): params = await utils.web.get_param(request, { "user_id": { "required": False, "type": int } }) if request.get("caller") == "user": user_id = request.get("user") else: user_id = params.get("user_id") mwapi = MediaWikiApi.create() try: user_info = await mwapi.ai_toolbox_get_user_info(user_id) return await utils.web.api_response(1, user_info, request=request) except MediaWikiPageNotFoundException as e: return await utils.web.api_response(-2, error={ "code": "user-not-found", "message": "User not found." }, request=request, http_status=403) except MediaWikiApiException as e: err_str = "MediaWiki API error: %s" % e.info print(err_str, file=sys.stderr) traceback.print_exc() return await utils.web.api_response(-3, error={ "code": "mediawiki-api-error", "info": e.info, "message": err_str }, request=request, http_status=500) except Exception as e: err_str = str(e) print(err_str, file=sys.stderr) traceback.print_exc() return await utils.web.api_response(-1, error={ "code": "internal-server-error", "message": err_str }, request=request, http_status=500)