完成基础算法

main
落雨楓 3 months ago
commit efe72e325b

13
.gitignore vendored

@ -0,0 +1,13 @@
*.csv
*.xls
*.xlsx
*.xml
*.zip
# Ignore Python cache files
__pycache__/
# Ignore environment files
.env
.venv/
venv/

@ -0,0 +1,339 @@
import argparse
from datetime import timedelta
import os
import traceback
from typing import Any, TypedDict
import pandas as pd
from danmaku_reader import DanmakuReader
from structs import Percentage
class BlindboxItemData(TypedDict):
price: int
probability: float
class BlindboxInfo(TypedDict):
price: int
items: dict[str, BlindboxItemData]
blindbox_config: dict[str, BlindboxInfo] = {
"星月盲盒": {
"price": 50,
"items": {
"落樱缤纷": {
"price": 600,
"probability": 0.75
},
"星河入梦": {
"price": 199,
"probability": 1
},
"冲鸭": {
"price": 99,
"probability": 10.5
},
"少女祈祷": {
"price": 66,
"probability": 20
},
"情书": {
"price": 52,
"probability": 23.15
},
"星与月": {
"price": 25,
"probability": 24
},
"小蛋糕": {
"price": 15,
"probability": 20.6
}
}
},
"心动盲盒": {
"price": 150, # 请根据实际盲盒价格填写
"items": {
"浪漫城堡": {
"price": 22330,
"probability": 0.04
},
"蛇形护符": {
"price": 2000,
"probability": 0.08
},
"时空之站": {
"price": 1000,
"probability": 0.12
},
"绮彩权杖": {
"price": 400,
"probability": 3.7
},
"爱心抱枕": {
"price": 160,
"probability": 45.56
},
"棉花糖": {
"price": 90,
"probability": 44.5
},
"电影票": {
"price": 20,
"probability": 6
}
}
},
"至尊盲盒": {
"price": 1000, # 请根据实际盲盒价格调整
"items": {
"奇幻之城": {
"price": 32000,
"probability": 0.6
},
"金蛇献福": {
"price": 5000,
"probability": 0.2
},
"蛇形护符": {
"price": 2000,
"probability": 1.45
},
"星际启航": {
"price": 1010,
"probability": 42
},
"许愿精灵": {
"price": 888,
"probability": 34
},
"绮彩权杖": {
"price": 400,
"probability": 19
},
"璀璨钻石": {
"price": 200,
"probability": 2.75
}
}
}
}
gift_to_blindbox: dict[str, str] = {}
for box_name, box_info in blindbox_config.items():
for item_name in box_info["items"].keys():
gift_to_blindbox[item_name] = box_name
def analyze_blindbox_file(filepath: str) -> pd.DataFrame:
"""分析单个盲盒数据文件"""
if not os.path.isfile(filepath):
raise FileNotFoundError(f"文件未找到: {filepath}")
danmaku_data = DanmakuReader(filepath)
datalist = {
"uid": [],
"username": [],
"blindbox_name": [],
"blindbox_price": [],
"item_name": [],
"item_price": [],
"item_probability": [],
"profit": [],
"time": []
}
for gift_info in danmaku_data.gift_list:
gift_name = gift_info.giftname
if gift_name in gift_to_blindbox:
box_name = gift_to_blindbox[gift_name]
box_info = blindbox_config[box_name]
item_info = box_info["items"][gift_name]
item_time = danmaku_data.record_info.start_time + timedelta(seconds=gift_info.timestamp)
datalist["uid"].append(gift_info.uid)
datalist["username"].append(gift_info.user)
datalist["blindbox_name"].append(box_name)
datalist["blindbox_price"].append(box_info["price"])
datalist["item_name"].append(gift_name)
datalist["item_price"].append(item_info["price"])
datalist["item_probability"].append(item_info["probability"])
datalist["profit"].append(item_info["price"] - box_info["price"])
datalist["time"].append(item_time.strftime("%Y-%m-%d %H:%M:%S"))
return pd.DataFrame(datalist)
def analysis_file(filepath: str) -> pd.DataFrame:
"""分析单个盲盒数据文件"""
try:
print(f"正在分析文件: {filepath}")
df = analyze_blindbox_file(filepath)
df.sort_values(by="time", inplace=True)
df.reset_index(drop=True, inplace=True)
return df
except Exception as e:
print(f"分析文件时出错: {e}")
traceback.print_exc()
return pd.DataFrame()
def analysis_directory(directory: str) -> pd.DataFrame:
"""分析指定目录下所有的盲盒数据"""
df = pd.DataFrame()
for filename in os.listdir(directory):
if filename.endswith('.xml'):
filepath = os.path.join(directory, filename)
try:
print(f"正在分析文件: {filepath}")
sub_df = analyze_blindbox_file(filepath)
if df.empty:
df = sub_df
elif not sub_df.empty:
df = pd.concat([df, sub_df], ignore_index=True)
except Exception as e:
print(f"分析文件时出错: {e}")
traceback.print_exc()
df.sort_values(by="time", inplace=True)
df.reset_index(drop=True, inplace=True)
return df
def profit_statistic(df: pd.DataFrame) -> dict[str, dict[str, Any]]:
"""对分析结果进行统计"""
if df.empty:
return {}
# 统计每个盲盒中的各个物品的数量和总利润
stats = {}
grouped = df.groupby("blindbox_name")
for box_name, group in grouped:
box_config = blindbox_config.get(box_name, {})
total_count = group["item_name"].count()
total_investment = (group["blindbox_price"]).sum()
total_revenue = (group["item_price"]).sum()
total_profit = group["profit"].sum()
box_stats = {
"总数": total_count,
"总投入": total_investment,
"总收益": total_revenue,
"总利润": total_profit,
"盈亏比例": Percentage(total_profit / total_investment if total_investment > 0 else 0)
}
# 统计每个物品的出现概率
item_counts = group["item_name"].value_counts()
item_stats = {}
for item_name, count in item_counts.items():
item_config = box_config.get("items", {}).get(item_name, {})
item_stats[item_name] = {
"出现次数": count,
"出现概率": Percentage(count / total_count if total_count > 0 else 0),
"预期概率": Percentage(item_config.get("probability", 0) / 100),
}
sorted_item_stats = {}
for item_name in box_config.get("items", {}).keys():
if item_name in item_stats:
sorted_item_stats[item_name] = item_stats[item_name]
else:
sorted_item_stats[item_name] = {
"出现次数": 0,
"出现概率": Percentage(0),
"预期概率": Percentage(box_config["items"][item_name].get("probability", 0) / 100),
}
box_stats["物品统计"] = sorted_item_stats
stats[box_name] = box_stats
return stats
def run_statistics(df: pd.DataFrame) -> dict[str, dict[str, Any]]:
"""运行统计分析"""
stats = {}
# 统计整体数据
total_stats = profit_statistic(df)
stats["整体"] = total_stats
# 统计周五以外的数据
non_friday_df = df[pd.to_datetime(df["time"]).dt.weekday != 4]
non_friday_stats = profit_statistic(non_friday_df)
stats["非周五"] = non_friday_stats
# 统计周五的数据
friday_df = df[pd.to_datetime(df["time"]).dt.weekday == 4]
friday_stats = profit_statistic(friday_df)
stats["周五"] = friday_stats
return stats
def print_tree(tree_data: dict[str, Any], indent: int = 0, current_depth: int = 0):
"""以树状结构打印统计结果"""
indent_str = " " * indent
for key, value in tree_data.items():
if isinstance(value, dict):
if current_depth == 0:
print("=" * 40)
print(f" {key}")
print("=" * 40)
print_tree(value, indent, current_depth + 1)
else:
print(f"{indent_str}{key}:")
print_tree(value, indent + 1, current_depth + 1)
else:
print(f"{indent_str}{key}: {value}")
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(description="盲盒数据分析工具")
arg_parser.add_argument('-f', '--file', type=str, help='要分析的盲盒数据文件路径', action='append')
arg_parser.add_argument('-d', '--directory', type=str, help='要分析的盲盒数据文件夹路径', action='append')
arg_parser.add_argument('-i', '--inputcsv', type=str, help='从上次导出的分析结果继续分析', action='append', default=[])
arg_parser.add_argument('-o', '--output', type=str, help='分析结果输出文件路径')
args = arg_parser.parse_args()
if (not args.file and not args.directory and not args.inputcsv):
arg_parser.print_help()
exit(1)
elif not args.output and not args.inputcsv:
print("请指定输出文件路径")
arg_parser.print_help()
exit(1)
result_df = pd.DataFrame()
if args.file:
for file_path in args.file:
sub_df = analysis_file(file_path)
if result_df.empty:
result_df = sub_df
elif not sub_df.empty:
result_df = pd.concat([result_df, sub_df], ignore_index=True)
if args.directory:
for dir_path in args.directory:
sub_df = analysis_directory(dir_path)
if result_df.empty:
result_df = sub_df
elif not sub_df.empty:
result_df = pd.concat([result_df, sub_df], ignore_index=True)
if args.inputcsv:
for csv_path in args.inputcsv:
if os.path.isfile(csv_path):
try:
print(f"正在导入文件: {csv_path}")
sub_df = pd.read_csv(csv_path, encoding='utf-8-sig')
if result_df.empty:
result_df = sub_df
elif not sub_df.empty:
result_df = pd.concat([result_df, sub_df], ignore_index=True)
except Exception as e:
print(f"导入文件时出错: {e}")
traceback.print_exc()
else:
print(f"文件未找到: {csv_path}")
if not result_df.empty:
result_df.to_csv(args.output, index=False, encoding='utf-8-sig')
# 运行统计分析
stats = run_statistics(result_df)
print("\n统计结果:")
print_tree(stats)

@ -0,0 +1,302 @@
"""
mikufans录播姬弹幕XML文件读取器
支持解析mikufans录播姬生成的弹幕XML文件包括
- 弹幕数据
- 礼物数据
- SuperChat数据
- 舰长购买数据
- 录制信息
使用示例:
# 从文件读取
with DanmakuReader('danmaku.xml') as reader:
danmaku_list = reader.get_danmaku()
gift_list = reader.get_gifts()
# 从字符串读取
reader = DanmakuReader(xml_content.encode('utf-8'))
print(f"弹幕数量: {reader.get_danmaku_count()}")
"""
import io
import xml.etree.ElementTree as ET
from typing import TextIO, List, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Danmaku:
"""弹幕数据类"""
time: float # 弹幕出现时间(秒)
type: int # 弹幕类型1-滚动5-顶部4-底部)
fontsize: int # 字号
color: int # 颜色
timestamp: int # 时间戳
pool: int # 弹幕池
user_id: int # 用户ID
user_hash: int # 用户哈希
content: str # 弹幕内容
user: str # 用户名
@dataclass
class Gift:
"""礼物数据类"""
timestamp: float # 时间戳
user: str # 用户名
uid: int # 用户ID
giftname: str # 礼物名称
giftcount: int # 礼物数量
@dataclass
class SuperChat:
"""SuperChat数据类"""
user: str # 用户名
uid: int # 用户ID
content: str # SC内容
time: int # 显示时长
price: float # 价格
timestamp: float # 时间戳
@dataclass
class Guard:
"""上船数据类"""
user: str # 用户名
uid: int # 用户ID
level: int # 舰长等级
count: int # 购买数量
timestamp: float # 时间戳
@dataclass
class RecordInfo:
"""录制信息数据类"""
roomid: int # 房间号
shortid: int # 短房间号
name: str # 主播名
title: str # 直播标题
areanameparent: str # 父分区
areanamechild: str # 子分区
start_time: datetime # 开始时间
class DanmakuReader:
def __init__(self, file: str | TextIO | bytes):
if isinstance(file, str):
self.file = open(file, 'r', encoding='utf-8')
elif isinstance(file, bytes):
self.file = io.StringIO(file.decode('utf-8'))
else:
self.file = file
# 存储解析后的数据
self.danmaku_list: List[Danmaku] = []
self.gift_list: List[Gift] = []
self.superchat_list: List[SuperChat] = []
self.guard_list: List[Guard] = []
self.record_info: Optional[RecordInfo] = None
# 自动解析XML
self._parse_xml()
def _parse_xml(self):
"""解析XML文件"""
try:
content = self.file.read()
# 重置文件指针
if hasattr(self.file, 'seek'):
self.file.seek(0)
# 解析XML
root = ET.fromstring(content)
# 解析录制信息
self._parse_record_info(root)
# 解析各种数据
self._parse_danmaku(root)
self._parse_gifts(root)
self._parse_superchat(root)
self._parse_guard(root)
except ET.ParseError as e:
raise ValueError(f"XML解析错误: {e}")
except Exception as e:
raise ValueError(f"文件读取错误: {e}")
def _parse_iso_time(self, time_str: str) -> datetime:
# 只保留小数点后6位
if '.' in time_str:
date_part, rest = time_str.split('.', 1)
microsecond = rest[:6]
tz = rest[7:] # 跳过6位微秒和1位时区分隔符
s_fixed = f"{date_part}.{microsecond}{tz}"
else:
s_fixed = time_str
return datetime.fromisoformat(s_fixed)
def _parse_record_info(self, root: ET.Element):
"""解析录制信息"""
record_element = root.find('BililiveRecorderRecordInfo')
if record_element is not None:
start_time_str = record_element.get('start_time', '')
start_time = None
if start_time_str:
try:
# 尝试解析为标准时间格式
start_time = self._parse_iso_time(start_time_str)
except ValueError:
pass
self.record_info = RecordInfo(
roomid=int(record_element.get('roomid', 0)),
shortid=int(record_element.get('shortid', 0)),
name=record_element.get('name', ''),
title=record_element.get('title', ''),
areanameparent=record_element.get('areanameparent', ''),
areanamechild=record_element.get('areanamechild', ''),
start_time=start_time
)
def _parse_danmaku(self, root: ET.Element):
"""解析弹幕数据"""
for d_element in root.findall('d'):
try:
# 解析p属性格式时间,类型,字号,颜色,时间戳,池,用户ID,用户哈希
p_attr = d_element.get('p', '')
if not p_attr:
continue
p_parts = p_attr.split(',')
if len(p_parts) < 8:
continue
danmaku = Danmaku(
time=float(p_parts[0]),
type=int(p_parts[1]),
fontsize=int(p_parts[2]),
color=int(p_parts[3]),
timestamp=int(p_parts[4]),
pool=int(p_parts[5]),
user_id=int(p_parts[6]),
user_hash=int(p_parts[7]),
content=d_element.text or '',
user=d_element.get('user', '')
)
self.danmaku_list.append(danmaku)
except (ValueError, IndexError) as e:
# 跳过解析错误的弹幕
continue
def _parse_gifts(self, root: ET.Element):
"""解析礼物数据"""
for gift_element in root.findall('gift'):
try:
gift = Gift(
timestamp=float(gift_element.get('ts', None) or 0),
user=gift_element.get('user', ''),
uid=int(gift_element.get('uid', 0)),
giftname=gift_element.get('giftname', ''),
giftcount=int(gift_element.get('giftcount', 0))
)
self.gift_list.append(gift)
except (ValueError, TypeError) as e:
# 跳过解析错误的礼物
continue
def _parse_superchat(self, root: ET.Element):
"""解析SuperChat数据"""
for sc_element in root.findall('sc'):
try:
superchat = SuperChat(
user=sc_element.get('user', ''),
uid=int(sc_element.get('uid', 0)),
content=sc_element.text or '',
time=int(sc_element.get('time', 0)),
price=float(sc_element.get('price', 0.0)),
timestamp=float(sc_element.get('ts', None) or 0)
)
self.superchat_list.append(superchat)
except (ValueError, TypeError) as e:
# 跳过解析错误的SC
continue
def _parse_guard(self, root: ET.Element):
"""解析舰长数据"""
for guard_element in root.findall('guard'):
try:
guard = Guard(
user=guard_element.get('user', ''),
uid=int(guard_element.get('uid', 0)),
level=int(guard_element.get('level', 0)),
count=int(guard_element.get('count', 0)),
timestamp=float(guard_element.get('ts', None) or 0)
)
self.guard_list.append(guard)
except (ValueError, TypeError) as e:
# 跳过解析错误的舰长数据
continue
# 数据访问方法
def get_danmaku(self) -> List[Danmaku]:
"""获取所有弹幕数据"""
return self.danmaku_list.copy()
def get_gifts(self) -> List[Gift]:
"""获取所有礼物数据"""
return self.gift_list.copy()
def get_superchat(self) -> List[SuperChat]:
"""获取所有SuperChat数据"""
return self.superchat_list.copy()
def get_guard(self) -> List[Guard]:
"""获取所有舰长数据"""
return self.guard_list.copy()
def get_record_info(self) -> Optional[RecordInfo]:
"""获取录制信息"""
return self.record_info
def get_danmaku_count(self) -> int:
"""获取弹幕数量"""
return len(self.danmaku_list)
def get_gift_count(self) -> int:
"""获取礼物数量"""
return len(self.gift_list)
def get_superchat_count(self) -> int:
"""获取SuperChat数量"""
return len(self.superchat_list)
def get_guard_count(self) -> int:
"""获取舰长数量"""
return len(self.guard_list)
def get_danmaku_by_time_range(self, start_time: float, end_time: float) -> List[Danmaku]:
"""获取指定时间范围内的弹幕"""
return [d for d in self.danmaku_list if start_time <= d.time <= end_time]
def get_danmaku_by_user(self, user_name: str) -> List[Danmaku]:
"""获取指定用户的弹幕"""
return [d for d in self.danmaku_list if d.user == user_name]
def get_gifts_by_user(self, user_name: str) -> List[Gift]:
"""获取指定用户的礼物"""
return [g for g in self.gift_list if g.user == user_name]
def __enter__(self):
"""支持with语句"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""清理资源"""
if hasattr(self.file, 'close'):
self.file.close()

@ -0,0 +1,5 @@
class Percentage(float):
"""表示百分比的类转换字符串时自动乘以100并添加百分号"""
def __str__(self):
return f"{self * 100:.2f}%"
Loading…
Cancel
Save