You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

386 lines
13 KiB
Python

import argparse
from datetime import timedelta
import os
import traceback
from typing import Any, TypedDict
import pandas as pd
from danmaku_reader import DanmakuReader
from structs import Percentage
class BlindboxItemData(TypedDict):
price: int
probability: float
class BlindboxInfo(TypedDict):
price: int
items: dict[str, BlindboxItemData]
blindbox_config: dict[str, BlindboxInfo] = {
"星月盲盒": {
"price": 50,
"items": {
"落樱缤纷": {
"price": 600,
"probability": 0.75
},
"星河入梦": {
"price": 199,
"probability": 1
},
"冲鸭": {
"price": 99,
"probability": 10.5
},
"少女祈祷": {
"price": 66,
"probability": 20
},
"情书": {
"price": 52,
"probability": 23.15
},
"星与月": {
"price": 25,
"probability": 24
},
"小蛋糕": {
"price": 15,
"probability": 20.6
}
}
},
"幸运盲盒": {
"price": 50,
"items": {
"幸运泡泡": {
"price": 15,
"probability": 49.6
},
"星光铃铛": {
"price": 52,
"probability": 42.2
},
"梦雾纸签": {
"price": 100,
"probability": 5
},
"福灵小兽": {
"price": 200,
"probability": 2.4
},
"星愿花园": {
"price": 600,
"probability": 0.8
}
}
},
"心动盲盒": {
"price": 150, # 请根据实际盲盒价格填写
"items": {
"浪漫城堡": {
"price": 22330,
"probability": 0.04
},
"蛇形护符": {
"price": 2000,
"probability": 0.08
},
"时空之站": {
"price": 1000,
"probability": 0.12
},
"绮彩权杖": {
"price": 400,
"probability": 3.7
},
"爱心抱枕": {
"price": 160,
"probability": 45.56
},
"棉花糖": {
"price": 90,
"probability": 44.5
},
"电影票": {
"price": 20,
"probability": 6
}
}
},
"至尊盲盒": {
"price": 1000, # 请根据实际盲盒价格调整
"items": {
"奇幻之城": {
"price": 32000,
"probability": 0.6
},
"金蛇献福": {
"price": 5000,
"probability": 0.2
},
"蛇形护符": {
"price": 2000,
"probability": 1.45
},
"星际启航": {
"price": 1010,
"probability": 42
},
"许愿精灵": {
"price": 888,
"probability": 34
},
"绮彩权杖": {
"price": 400,
"probability": 19
},
"璀璨钻石": {
"price": 200,
"probability": 2.75
}
}
}
}
gift_to_blindbox: dict[str, str] = {}
for box_name, box_info in blindbox_config.items():
for item_name in box_info["items"].keys():
gift_to_blindbox[item_name] = box_name
def analyze_blindbox_file(filepath: str) -> pd.DataFrame:
"""分析单个盲盒数据文件"""
if not os.path.isfile(filepath):
raise FileNotFoundError(f"文件未找到: {filepath}")
danmaku_data = DanmakuReader(filepath)
datalist = {
"uid": [],
"username": [],
"blindbox_name": [],
"blindbox_price": [],
"item_name": [],
"item_price": [],
"item_probability": [],
"profit": [],
"time": []
}
for gift_info in danmaku_data.gift_list:
gift_name = gift_info.giftname
if gift_name in gift_to_blindbox:
box_name = gift_to_blindbox[gift_name]
box_info = blindbox_config[box_name]
item_info = box_info["items"][gift_name]
item_time = danmaku_data.record_info.start_time + timedelta(seconds=gift_info.timestamp)
datalist["uid"].append(gift_info.uid)
datalist["username"].append(gift_info.user)
datalist["blindbox_name"].append(box_name)
datalist["blindbox_price"].append(box_info["price"])
datalist["item_name"].append(gift_name)
datalist["item_price"].append(item_info["price"])
datalist["item_probability"].append(item_info["probability"])
datalist["profit"].append(item_info["price"] - box_info["price"])
datalist["time"].append(item_time.strftime("%Y-%m-%d %H:%M:%S"))
return pd.DataFrame(datalist)
def analysis_file(filepath: str) -> pd.DataFrame:
"""分析单个盲盒数据文件"""
try:
print(f"正在分析文件: {filepath}")
df = analyze_blindbox_file(filepath)
df.sort_values(by="time", inplace=True)
df.reset_index(drop=True, inplace=True)
return df
except Exception as e:
print(f"分析文件时出错: {e}")
traceback.print_exc()
return pd.DataFrame()
def analysis_directory(directory: str) -> pd.DataFrame:
"""分析指定目录下所有的盲盒数据"""
df = pd.DataFrame()
for filename in os.listdir(directory):
if filename.endswith('.xml'):
filepath = os.path.join(directory, filename)
try:
print(f"正在分析文件: {filepath}")
sub_df = analyze_blindbox_file(filepath)
if df.empty:
df = sub_df
elif not sub_df.empty:
df = pd.concat([df, sub_df], ignore_index=True)
except Exception as e:
print(f"分析文件时出错: {e}")
traceback.print_exc()
df.sort_values(by="time", inplace=True)
df.reset_index(drop=True, inplace=True)
return df
def profit_statistic(df: pd.DataFrame) -> dict[str, dict[str, Any]]:
"""对分析结果进行统计"""
if df.empty:
return {}
# 统计每个盲盒中的各个物品的数量和总利润
stats = {}
grouped = df.groupby("blindbox_name")
for box_name, group in grouped:
box_config = blindbox_config.get(box_name, {})
total_count = group["item_name"].count()
total_investment = (group["blindbox_price"]).sum()
total_revenue = (group["item_price"]).sum()
total_profit = group["profit"].sum()
box_stats = {
"总数": total_count,
"总投入": total_investment,
"总收益": total_revenue,
"总利润": total_profit,
"盈亏比例": Percentage(total_profit / total_investment if total_investment > 0 else 0)
}
# 统计每个物品的出现概率
item_counts = group["item_name"].value_counts()
item_stats = {}
for item_name, count in item_counts.items():
item_config = box_config.get("items", {}).get(item_name, {})
item_stats[item_name] = {
"出现次数": count,
"出现概率": Percentage(count / total_count if total_count > 0 else 0),
"预期概率": Percentage(item_config.get("probability", 0) / 100),
}
sorted_item_stats = {}
for item_name in box_config.get("items", {}).keys():
if item_name in item_stats:
sorted_item_stats[item_name] = item_stats[item_name]
else:
sorted_item_stats[item_name] = {
"出现次数": 0,
"出现概率": Percentage(0),
"预期概率": Percentage(box_config["items"][item_name].get("probability", 0) / 100),
}
box_stats["物品统计"] = sorted_item_stats
stats[box_name] = box_stats
return stats
def run_statistics(df: pd.DataFrame) -> dict[str, dict[str, Any]]:
"""运行统计分析"""
stats = {}
# 统计整体数据
total_stats = profit_statistic(df)
stats["整体"] = total_stats
# 统计去掉每个用户每天第一次开盲盒的数据
def remove_first_record_per_group(group):
"""去掉每组的第一条记录"""
return group.iloc[1:] # 从第二条开始返回
# 确保时间列是datetime格式
if 'time' in df.columns:
df['time'] = pd.to_datetime(df['time'])
# 提取日期部分(去掉时间)
df['date'] = df['time'].dt.date
# 按日期、用户ID、盲盒名称分组
grouped = df.groupby(['date', 'username', 'blindbox_name'])
# 去掉每组的第一条数据
df_filtered = grouped.apply(remove_first_record_per_group).reset_index(drop=True)
filtered_stats = profit_statistic(df_filtered)
stats["去除保底"] = filtered_stats
# 统计周五以外的数据
non_friday_df = df[pd.to_datetime(df["time"]).dt.weekday != 4]
non_friday_stats = profit_statistic(non_friday_df)
stats["非周五"] = non_friday_stats
# 统计周五的数据
friday_df = df[pd.to_datetime(df["time"]).dt.weekday == 4]
friday_stats = profit_statistic(friday_df)
stats["周五"] = friday_stats
return stats
def print_tree(tree_data: dict[str, Any], indent: int = 0, current_depth: int = 0):
"""以树状结构打印统计结果"""
indent_str = " " * indent
for key, value in tree_data.items():
if isinstance(value, dict):
if current_depth == 0:
print("=" * 40)
print(f" {key}")
print("=" * 40)
print_tree(value, indent, current_depth + 1)
else:
print(f"{indent_str}{key}:")
print_tree(value, indent + 1, current_depth + 1)
else:
print(f"{indent_str}{key}: {value}")
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(description="盲盒数据分析工具")
arg_parser.add_argument('-f', '--file', type=str, help='要分析的盲盒数据文件路径', action='append')
arg_parser.add_argument('-d', '--directory', type=str, help='要分析的盲盒数据文件夹路径', action='append')
arg_parser.add_argument('-i', '--inputcsv', type=str, help='从上次导出的分析结果继续分析', action='append', default=[])
arg_parser.add_argument('-o', '--output', type=str, help='分析结果输出文件路径')
args = arg_parser.parse_args()
if (not args.file and not args.directory and not args.inputcsv):
arg_parser.print_help()
exit(1)
elif not args.output and not args.inputcsv:
print("请指定输出文件路径")
arg_parser.print_help()
exit(1)
result_df = pd.DataFrame()
if args.file:
for file_path in args.file:
sub_df = analysis_file(file_path)
if result_df.empty:
result_df = sub_df
elif not sub_df.empty:
result_df = pd.concat([result_df, sub_df], ignore_index=True)
if args.directory:
for dir_path in args.directory:
sub_df = analysis_directory(dir_path)
if result_df.empty:
result_df = sub_df
elif not sub_df.empty:
result_df = pd.concat([result_df, sub_df], ignore_index=True)
if args.inputcsv:
for csv_path in args.inputcsv:
if os.path.isfile(csv_path):
try:
print(f"正在导入文件: {csv_path}")
sub_df = pd.read_csv(csv_path, encoding='utf-8-sig')
if result_df.empty:
result_df = sub_df
elif not sub_df.empty:
result_df = pd.concat([result_df, sub_df], ignore_index=True)
except Exception as e:
print(f"导入文件时出错: {e}")
traceback.print_exc()
else:
print(f"文件未找到: {csv_path}")
if not result_df.empty:
result_df.to_csv(args.output, index=False, encoding='utf-8-sig')
# 运行统计分析
stats = run_statistics(result_df)
print("\n统计结果:")
print_tree(stats)
else:
print("未找到任何盲盒数据")