|
|
# Copyright (c) Alibaba Cloud.
|
|
|
#
|
|
|
# This source code is licensed under the license found in the
|
|
|
# LICENSE file in the root directory of this source tree.
|
|
|
|
|
|
"""A simple command-line interactive chat demo."""
|
|
|
|
|
|
import argparse
|
|
|
import os
|
|
|
import platform
|
|
|
import shutil
|
|
|
import json
|
|
|
import json5
|
|
|
from copy import deepcopy
|
|
|
|
|
|
import torch
|
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer
|
|
|
from peft import AutoPeftModelForCausalLM
|
|
|
from transformers.generation import GenerationConfig
|
|
|
from transformers.trainer_utils import set_seed
|
|
|
|
|
|
DEFAULT_CKPT_PATH = 'Qwen/Qwen-7B-Chat'
|
|
|
|
|
|
_WELCOME_MSG = '''\
|
|
|
Welcome to use Qwen-Chat model, type text to start chat, type :h to show command help.
|
|
|
(欢迎使用 Qwen-Chat 模型,输入内容即可进行对话,:h 显示命令帮助。)
|
|
|
|
|
|
Note: This demo is governed by the original license of Qwen.
|
|
|
We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content, including hate speech, violence, pornography, deception, etc.
|
|
|
(注:本演示受Qwen的许可协议限制。我们强烈建议,用户不应传播及不应允许他人传播以下内容,包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息。)
|
|
|
'''
|
|
|
_HELP_MSG = '''\
|
|
|
Commands:
|
|
|
:help / :h Show this help message 显示帮助信息
|
|
|
:exit / :quit / :q Exit the demo 退出Demo
|
|
|
:clear / :cl Clear screen 清屏
|
|
|
:clear-his / :clh Clear history 清除对话历史
|
|
|
:history / :his Show history 显示对话历史
|
|
|
:seed Show current random seed 显示当前随机种子
|
|
|
:seed <N> Set random seed to <N> 设置随机种子
|
|
|
:conf Show current generation config 显示生成配置
|
|
|
:conf <key>=<value> Change generation config 修改生成配置
|
|
|
:reset-conf Reset generation config 重置生成配置
|
|
|
'''
|
|
|
|
|
|
TOOL_DESC = """{name_for_model}: 你可以调用该工具与 {name_for_human} API 进行交互。{name_for_human} API 有什么作用?{description_for_model} 参数列表:{parameters}"""
|
|
|
|
|
|
REACT_INSTRUCTION = """请尽可能回答下列问题。您可以访问以下 API:
|
|
|
|
|
|
{tools_text}
|
|
|
|
|
|
使用以下格式回答问题:
|
|
|
|
|
|
Question: 你需要回答的问题
|
|
|
Thought: 你的思考过程
|
|
|
Action: 要使用的操作,必须是 [{tools_name_text}] 其中之一
|
|
|
Action Input: 操作的输入参数
|
|
|
Observation: 操作的结果
|
|
|
... (这些 Thought/Action/Action Input/Observation 可以是零次或重复多次)
|
|
|
Thought: 你的最终思考过程
|
|
|
Final Answer: 你的最终回答"""
|
|
|
|
|
|
|
|
|
def build_react_instruction(functions: list[dict]):
|
|
|
tools_text = []
|
|
|
tools_name_text = []
|
|
|
for func_info in functions:
|
|
|
name = func_info.get("name", "")
|
|
|
name_m = func_info.get("name_for_model", name)
|
|
|
name_h = func_info.get("name_for_human", name)
|
|
|
desc = func_info.get("description", "")
|
|
|
desc_m = func_info.get("description_for_model", desc)
|
|
|
tool = TOOL_DESC.format(
|
|
|
name_for_model=name_m,
|
|
|
name_for_human=name_h,
|
|
|
description_for_model=desc_m,
|
|
|
parameters=json.dumps(func_info["parameters"], ensure_ascii=False),
|
|
|
)
|
|
|
tools_text.append(tool)
|
|
|
tools_name_text.append(name_m)
|
|
|
tools_text = "\n\n".join(tools_text)
|
|
|
tools_name_text = ", ".join(tools_name_text)
|
|
|
instruction = REACT_INSTRUCTION.format(
|
|
|
tools_text=tools_text,
|
|
|
tools_name_text=tools_name_text,
|
|
|
)
|
|
|
return instruction
|
|
|
|
|
|
|
|
|
def _load_model_tokenizer(args):
|
|
|
model_path = args.model_path or args.checkpoint_path
|
|
|
tokenizer = AutoTokenizer.from_pretrained(
|
|
|
args.checkpoint_path, trust_remote_code=True, resume_download=True,
|
|
|
)
|
|
|
|
|
|
if args.cpu_only:
|
|
|
device_map = "cpu"
|
|
|
else:
|
|
|
device_map = "cuda"
|
|
|
|
|
|
model = AutoPeftModelForCausalLM.from_pretrained(
|
|
|
args.checkpoint_path,
|
|
|
device_map=device_map,
|
|
|
trust_remote_code=True,
|
|
|
resume_download=True,
|
|
|
).eval()
|
|
|
|
|
|
config = GenerationConfig.from_pretrained(
|
|
|
model_path, trust_remote_code=True, resume_download=True,
|
|
|
)
|
|
|
|
|
|
return model, tokenizer, config
|
|
|
|
|
|
|
|
|
def _gc():
|
|
|
import gc
|
|
|
gc.collect()
|
|
|
if torch.cuda.is_available():
|
|
|
torch.cuda.empty_cache()
|
|
|
|
|
|
|
|
|
def _clear_screen():
|
|
|
if platform.system() == "Windows":
|
|
|
os.system("cls")
|
|
|
else:
|
|
|
os.system("clear")
|
|
|
|
|
|
|
|
|
def _print_history(history):
|
|
|
terminal_width = shutil.get_terminal_size()[0]
|
|
|
print(f'History ({len(history)})'.center(terminal_width, '='))
|
|
|
for index, (query, response) in enumerate(history):
|
|
|
print(f'User[{index}]: {query}')
|
|
|
print(f'QWen[{index}]: {response}')
|
|
|
print('=' * terminal_width)
|
|
|
|
|
|
|
|
|
def _get_input() -> str:
|
|
|
while True:
|
|
|
try:
|
|
|
message = input('User> ').strip()
|
|
|
except UnicodeDecodeError:
|
|
|
print('[ERROR] Encoding error in input')
|
|
|
continue
|
|
|
except KeyboardInterrupt:
|
|
|
exit(1)
|
|
|
if message:
|
|
|
return message
|
|
|
print('[ERROR] Query is empty')
|
|
|
|
|
|
|
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description='QWen-Chat command-line interactive chat demo.')
|
|
|
parser.add_argument("-c", "--checkpoint-path", type=str, default=DEFAULT_CKPT_PATH,
|
|
|
help="Checkpoint name or path, default to %(default)r")
|
|
|
parser.add_argument("-m", "--model-path", type=str, default=None,
|
|
|
help="Model name or path, default to None")
|
|
|
parser.add_argument("-s", "--seed", type=int, default=1234, help="Random seed")
|
|
|
parser.add_argument("-sf", "--system-prompt-file", type=str, default=None,
|
|
|
help="System prompt file, default to None")
|
|
|
parser.add_argument("-fd", "--function-definition", type=str, default=None,
|
|
|
help="Function definition file, should be json or json5, default to None")
|
|
|
parser.add_argument("--cpu-only", action="store_true", help="Run demo with CPU only")
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
history, response = [], ''
|
|
|
|
|
|
model, tokenizer, config = _load_model_tokenizer(args)
|
|
|
orig_gen_config = deepcopy(model.generation_config)
|
|
|
|
|
|
system_prompt = "You are a helpful assistant."
|
|
|
if args.system_prompt_file:
|
|
|
with open(args.system_prompt_file, 'r', encoding="utf-8") as f:
|
|
|
system_prompt = f.read()
|
|
|
|
|
|
function_prompt = None
|
|
|
if args.function_definition:
|
|
|
with open(args.function_definition, 'r', encoding="utf-8") as f:
|
|
|
functions = json5.load(f)
|
|
|
function_prompt = build_react_instruction(functions)
|
|
|
|
|
|
_clear_screen()
|
|
|
print(_WELCOME_MSG)
|
|
|
|
|
|
seed = args.seed
|
|
|
|
|
|
is_first_msg = True
|
|
|
|
|
|
while True:
|
|
|
query = _get_input()
|
|
|
|
|
|
# Process commands.
|
|
|
if query.startswith(':'):
|
|
|
command_words = query[1:].strip().split()
|
|
|
if not command_words:
|
|
|
command = ''
|
|
|
else:
|
|
|
command = command_words[0]
|
|
|
|
|
|
if command in ['exit', 'quit', 'q']:
|
|
|
break
|
|
|
elif command in ['clear', 'cl']:
|
|
|
_clear_screen()
|
|
|
print(_WELCOME_MSG)
|
|
|
_gc()
|
|
|
continue
|
|
|
elif command in ['clear-history', 'clh']:
|
|
|
print(f'[INFO] All {len(history)} history cleared')
|
|
|
history.clear()
|
|
|
_gc()
|
|
|
continue
|
|
|
elif command in ['help', 'h']:
|
|
|
print(_HELP_MSG)
|
|
|
continue
|
|
|
elif command in ['history', 'his']:
|
|
|
_print_history(history)
|
|
|
continue
|
|
|
elif command in ['seed']:
|
|
|
if len(command_words) == 1:
|
|
|
print(f'[INFO] Current random seed: {seed}')
|
|
|
continue
|
|
|
else:
|
|
|
new_seed_s = command_words[1]
|
|
|
try:
|
|
|
new_seed = int(new_seed_s)
|
|
|
except ValueError:
|
|
|
print(f'[WARNING] Fail to change random seed: {new_seed_s!r} is not a valid number')
|
|
|
else:
|
|
|
print(f'[INFO] Random seed changed to {new_seed}')
|
|
|
seed = new_seed
|
|
|
continue
|
|
|
elif command in ['conf']:
|
|
|
if len(command_words) == 1:
|
|
|
print(model.generation_config)
|
|
|
else:
|
|
|
for key_value_pairs_str in command_words[1:]:
|
|
|
eq_idx = key_value_pairs_str.find('=')
|
|
|
if eq_idx == -1:
|
|
|
print('[WARNING] format: <key>=<value>')
|
|
|
continue
|
|
|
conf_key, conf_value_str = key_value_pairs_str[:eq_idx], key_value_pairs_str[eq_idx + 1:]
|
|
|
try:
|
|
|
conf_value = eval(conf_value_str)
|
|
|
except Exception as e:
|
|
|
print(e)
|
|
|
continue
|
|
|
else:
|
|
|
print(f'[INFO] Change config: model.generation_config.{conf_key} = {conf_value}')
|
|
|
setattr(model.generation_config, conf_key, conf_value)
|
|
|
continue
|
|
|
elif command in ['reset-conf']:
|
|
|
print('[INFO] Reset generation config')
|
|
|
model.generation_config = deepcopy(orig_gen_config)
|
|
|
print(model.generation_config)
|
|
|
continue
|
|
|
else:
|
|
|
# As normal query.
|
|
|
pass
|
|
|
|
|
|
# Run chat.
|
|
|
set_seed(seed)
|
|
|
try:
|
|
|
prompt = query
|
|
|
if function_prompt:
|
|
|
prompt = f"{function_prompt}\n\nQuestion: {query}"
|
|
|
|
|
|
for response in model.chat_stream(tokenizer, prompt, history=history, system=system_prompt, generation_config=config):
|
|
|
_clear_screen()
|
|
|
print(f"\nUser: {query}")
|
|
|
print(f"\nQwen-Chat: {response}")
|
|
|
except KeyboardInterrupt:
|
|
|
print('[WARNING] Generation interrupted')
|
|
|
continue
|
|
|
|
|
|
history.append((query, response))
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|